#!/usr/bin/python -tt # # Dominik Neise, Werner Lustermann # TU Dortmund, ETH Zurich # import numpy as np from ROOT import * class SignalGenerator(object): """ Signal Generator generates signals for testing several helper classes like: * fir filters * signal extractors """ def __init__(self, option_str = 'len 100 noise 3', name = 'SignalGenerator'): """ initialize the generator sets default signal to generate """ self.__module__ = 'generator' self.option_str = option_str.lower() self.options = make_options_from_str(option_str) self.parse_options() self.name = name def parse_options(self): o = self.options #shortcut if 'len' in o: self.npoints = int(o['len'][0]) else: self.npoints = 100 if 'noise' in o: self.sigma = float(o['noise'][0]) else: self.sigma = 1 if 'bsl' in o: self.bsl = float(o['bsl'][0]) else: self.bsl = -0.5 if 'step' in o: self.step_height = float(o['step'][0]) self.step_start = int(o['step'][1]) self.step_stop = int(o['step'][2]) if 'triangle' in o: self.pulses = [] # append 1st pulse to list of pulses self.pulses.append( ( float(o['triangle'][0]) , float(o['triangle'][1]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) number_of_pulses_after_1st = (len(o['triangle'])-4)/2 for i in range(number_of_pulses_after_1st): self.pulses.append( ( float(o['triangle'][2*i+4]) , float(o['triangle'][2*i+5]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) if 'spike' in o: self.spikes = [] for i in range(len(o['spike'])/2): self.spikes.append( ( int(o['spike'][2*i]), float(o['spike'][2*i+1]) ) ) def __call__(self, option_str = ''): if option_str: self.option_str = option_str.lower() self.options = make_options_from_str(self.option_str) self.parse_options() signal = np.zeros(self.npoints) signal += self.bsl signal += np.random.randn(self.npoints) * self.sigma if 'step' in self.options: signal[self.step_start:self.step_stop] += self.step_height if 'triangle' in self.options: for pulse in self.pulses: pos = pulse[0] height = pulse[1] rise = pulse[2] fall = pulse[3] start = pos - rise stop = pos + fall signal[start:pos] += np.linspace(0., height, rise) signal[pos:stop] += np.linspace(height, 0. , fall) if 'spike' in self.options: for spike in self.spikes: signal[spike[0]] += spike[1] return signal def __str__(self): s = self.name + '\n' s += 'possible options and parameters\n' s += ' * len: number of samples (100)\n' s += ' * noise: sigma (1)\n' s += ' * bsl: level (-0.5)\n' s += ' * step: height, start, end\n' s += ' * triangle: pos height risingedge, fallingedge [pos height ...]\n' s += ' * spike: pos height [pos height ...]\n' s += 'current options are:\n' for key in self.options.keys(): s += key + ':' + str(self.options[key]) + '\n' return s class SignalGeneratorCSV(object): def __init__(self, file_name, option_str = 'len 100 noise 3', name = 'SignalGenerator'): time, maxprob, mean, median = np.loadtxt( file_name, delimiter=',', unpack=True) csv_data = maxprob # csv data was downshifted, I shift it up here self.csv_data = csv_data - csv_data.min() self.__module__ = 'CSV generator' self.option_str = option_str.lower() self.options = make_options_from_str(option_str) self.parse_options() self.name = name def parse_options(self): o = self.options #shortcut if 'len' in o: self.npoints = int(o['len'][0]) else: self.npoints = 1024 if 'noise' in o: self.sigma = float(o['noise'][0]) else: self.sigma = 1 if 'bsl' in o: self.bsl = float(o['bsl'][0]) else: self.bsl = -0.5 if 'step' in o: self.step_height = float(o['step'][0]) self.step_start = int(o['step'][1]) self.step_stop = int(o['step'][2]) if 'triangle' in o: self.pulses = [] # append 1st pulse to list of pulses self.pulses.append( ( float(o['triangle'][0]) , float(o['triangle'][1]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) number_of_pulses_after_1st = (len(o['triangle'])-4)/2 for i in range(number_of_pulses_after_1st): self.pulses.append( ( float(o['triangle'][2*i+4]) , float(o['triangle'][2*i+5]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) if 'spike' in o: self.spikes = [] for i in range(len(o['spike'])/2): self.spikes.append( ( int(o['spike'][2*i]), float(o['spike'][2*i+1]) ) ) if 'csv' in o: self.csvs = [] for i in range(len(o['csv'])/2): time = int( o['csv'][2*i] ) amplitude = float( o['csv'][2*i+1] ) self.csvs.append( (time, amplitude) ) if 'rate' in o: self.rate = float(o['rate'][0]) if 'signal' in o: self.amplitude = float(o['signal'][0]) self.position = int(o['signal'][1]) def __call__(self, option_str = ''): if option_str: self.option_str = option_str.lower() self.options = make_options_from_str(self.option_str) self.parse_options() signal = np.zeros(self.npoints) signal += self.bsl # shortcut o = self.options if 'step' in o: signal[self.step_start:self.step_stop] += self.step_height if 'triangle' in o: for pulse in o: pos = pulse[0] height = pulse[1] rise = pulse[2] fall = pulse[3] start = pos - rise stop = pos + fall signal[start:pos] += np.linspace(0., height, rise) signal[pos:stop] += np.linspace(height, 0. , fall) if 'spike' in o: for spike in self.spikes: signal[spike[0]] += spike[1] if 'csv' in o: for csv in self.csvs: amplitude = csv[1] time = csv[0] csv_data = self.csv_data.copy() #scale csv_data *= amplitude # add shifted signal[time:time+len(csv_data)] += csv_data if 'rate' in o: self._add_template_pulses(signal, self.rate) if 'signal' in o: self._add_signal(signal) # add noise if 'noise' in o: signal += + np.random.normal(0.0,self.sigma, signal.shape) return signal def _add_signal(self, signal): a = self.amplitude p = self.position csv_data = self.csv_data csv = csv_data.copy() * a npoints = self.npoints + 2 * len(csv_data) internal_signal = np.zeros(npoints) internal_signal[p-69+len(csv_data):p+2*len(csv_data)-69] += csv signal += internal_signal[len(csv_data):-len(csv_data)] def _add_template_pulses(self, signal, rate): csv_data = self.csv_data period = 2.*1e9/(float(rate)*1e6) # in slices # in order to simulate pulses which are just at the beginning and # at the end of the signal, we pre- and append some points npoints = self.npoints + 2 * len(csv_data) internal_signal = np.zeros(npoints) d=np.random.exponential(period) pulse_positions = [] while d