#!/usr/bin/python -tt # # Dominik Neise, Werner Lustermann # TU Dortmund, ETH Zurich # import numpy as np import sys from ROOT import * from scipy import interpolate from plotters import Plotter import matplotlib.pyplot as plt class SignalGenerator(object): """ Signal Generator generates signals for testing several helper classes like: * fir filters * signal extractors """ def __init__(self, option_str = 'len 100 noise 3', name = 'SignalGenerator'): """ initialize the generator sets default signal to generate """ self.__module__ = 'generator' self.option_str = option_str.lower() self.options = make_options_from_str(option_str) self.parse_options() self.name = name def parse_options(self): o = self.options #shortcut if 'len' in o: self.npoints = int(o['len'][0]) else: self.npoints = 100 if 'noise' in o: self.sigma = float(o['noise'][0]) else: self.sigma = 1 if 'bsl' in o: self.bsl = float(o['bsl'][0]) else: self.bsl = -0.5 if 'step' in o: self.step_height = float(o['step'][0]) self.step_start = int(o['step'][1]) self.step_stop = int(o['step'][2]) if 'triangle' in o: self.pulses = [] # append 1st pulse to list of pulses self.pulses.append( ( float(o['triangle'][0]) , float(o['triangle'][1]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) number_of_pulses_after_1st = (len(o['triangle'])-4)/2 for i in range(number_of_pulses_after_1st): self.pulses.append( ( float(o['triangle'][2*i+4]) , float(o['triangle'][2*i+5]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) if 'spike' in o: self.spikes = [] for i in range(len(o['spike'])/2): self.spikes.append( ( int(o['spike'][2*i]), float(o['spike'][2*i+1]) ) ) def __call__(self, option_str = ''): if option_str: self.option_str = option_str.lower() self.options = make_options_from_str(self.option_str) self.parse_options() signal = np.zeros(self.npoints) signal += self.bsl signal += np.random.randn(self.npoints) * self.sigma if 'step' in self.options: signal[self.step_start:self.step_stop] += self.step_height if 'triangle' in self.options: for pulse in self.pulses: pos = pulse[0] height = pulse[1] rise = pulse[2] fall = pulse[3] start = pos - rise stop = pos + fall signal[start:pos] += np.linspace(0., height, rise) signal[pos:stop] += np.linspace(height, 0. , fall) if 'spike' in self.options: for spike in self.spikes: signal[spike[0]] += spike[1] return signal def __str__(self): s = self.name + '\n' s += 'possible options and parameters\n' s += ' * len: number of samples (100)\n' s += ' * noise: sigma (1)\n' s += ' * bsl: level (-0.5)\n' s += ' * step: height, start, end\n' s += ' * triangle: pos height risingedge, fallingedge [pos height ...]\n' s += ' * spike: pos height [pos height ...]\n' s += 'current options are:\n' for key in self.options.keys(): s += key + ':' + str(self.options[key]) + '\n' return s class SignalGeneratorCSV(object): def __init__(self, file_name, option_str = 'len 100 noise 3', name = 'SignalGenerator'): time, maxprob, mean, median = np.loadtxt( file_name, delimiter=',', unpack=True) csv_data = median self.csv_maxprob = maxprob self.csv_mean = mean self.csv_median = median # shift the input data such, that the left baseline is equal to zero csv_data = csv_data - csv_data[:50].mean() # I want the data to be longer than they are, so I add 1000 slices left and right self.csv_data = np.zeros(2000 + len(csv_data)) for i in range(len(csv_data)): self.csv_data[1000+i] = csv_data[i] for i in range(1000): self.csv_data[i] = csv_data[:50].mean() self.csv_data[-1*i] = csv_data[-50:].mean() x = np.arange(0,len(self.csv_data),1) x2 = np.arange(0,len(self.csv_data),0.1) # fermi dirac distribution for weighting the input data. # we do not believe the right tail is really higher than zero, so we make it go to zero. weight_function_right = lambda x : 1. / (1 + np.exp((x - 1180.)/100.)) weight_function_left = lambda x : 1. / (1 + np.exp((-1*(x - 1045.))/5.)) weights = np.array(map(weight_function_right, x)) weights *= np.array(map(weight_function_left, x)) weights *= 8 weights [1200:] += 3 weights += 2 #y_weighted = weights * y #print len(weights) == len(x) #print len(weights) == len(self.csv_data) #print weights > 0 tck= interpolate.splrep(x, self.csv_data, w=weights) self.spline_int = tck x2 = np.arange(1000,1500,1) t,c,k = tck y2 = interpolate.splev(x2,(t,c,k),der=0) #tck = interpolate.splrep(x, self.csv_data ,w=weights, s=50) #xnew = np.arange(0,len(self.csv_data), 0.1) #y = interpolate.splev(x,tck,der=0) #y2 = interpolate.splev(x2,tck,der=0) #plt.figure() #plt.plot(x,self.csv_data,'.:b',x,weights,'m',x2,y2,'.:r') #plt.show() #raw_input('stop') # csv data was downshifted, I shift it up here self.csv_data = csv_data - csv_data.min() self.__module__ = 'CSV generator' self.option_str = option_str.lower() self.options = make_options_from_str(option_str) self.parse_options() self.name = name def parse_options(self): o = self.options #shortcut if 'len' in o: self.npoints = int(o['len'][0]) else: self.npoints = 1024 if 'noise' in o: self.sigma = float(o['noise'][0]) else: self.sigma = 1 if 'bsl' in o: self.bsl = float(o['bsl'][0]) else: self.bsl = -0.5 if 'step' in o: self.step_height = float(o['step'][0]) self.step_start = int(o['step'][1]) self.step_stop = int(o['step'][2]) if 'triangle' in o: self.pulses = [] # append 1st pulse to list of pulses self.pulses.append( ( float(o['triangle'][0]) , float(o['triangle'][1]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) number_of_pulses_after_1st = (len(o['triangle'])-4)/2 for i in range(number_of_pulses_after_1st): self.pulses.append( ( float(o['triangle'][2*i+4]) , float(o['triangle'][2*i+5]), int(o['triangle'][2]), int(o['triangle'][3]) ) ) if 'spike' in o: self.spikes = [] for i in range(len(o['spike'])/2): self.spikes.append( ( int(o['spike'][2*i]), float(o['spike'][2*i+1]) ) ) if 'csv' in o: self.csvs = [] for i in range(len(o['csv'])/2): time = int( o['csv'][2*i] ) amplitude = float( o['csv'][2*i+1] ) self.csvs.append( (time, amplitude) ) if 'rate' in o: self.rate = float(o['rate'][0]) if 'signal' in o: self.amplitude = float(o['signal'][0]) self.position = int(o['signal'][1]) def __call__(self, option_str = ''): if option_str: self.option_str = option_str.lower() self.options = make_options_from_str(self.option_str) self.parse_options() signal = np.zeros(self.npoints) signal += self.bsl # shortcut o = self.options if 'step' in o: signal[self.step_start:self.step_stop] += self.step_height if 'triangle' in o: for pulse in o: pos = pulse[0] height = pulse[1] rise = pulse[2] fall = pulse[3] start = pos - rise stop = pos + fall signal[start:pos] += np.linspace(0., height, rise) signal[pos:stop] += np.linspace(height, 0. , fall) if 'spike' in o: for spike in self.spikes: signal[spike[0]] += spike[1] if 'csv' in o: for csv in self.csvs: amplitude = csv[1] time = csv[0] #csv_data = self.csv_data.copy() #scale #csv_data *= amplitude x = np.arange(0,300,1) t,c,k = self.spline_int d = c.copy() d *= amplitude csv_data = interpolate.splev(x,(t,d,k),der=0) # add shifted signal[time:time+len(csv_data)] += csv_data if 'rate' in o: self._add_template_pulses(signal, self.rate) if 'signal' in o: self._add_signal(signal) # add noise if 'noise' in o: signal += + np.random.normal(0.0,self.sigma, signal.shape) return signal def _add_signal(self, signal): a = self.amplitude p = self.position #csv_data = self.csv_data #csv = csv_data.copy() * a x = np.arange(1000,1500,1) t,c,k = self.spline_int d = c.copy() d *= a csv = interpolate.splev(x,(t,d,k),der=0) npoints = self.npoints + 2 * len(csv) internal_signal = np.zeros(npoints) internal_signal[p-70+len(csv):p+2*len(csv)-70] += csv signal += internal_signal[len(csv):-len(csv)] def _add_template_pulses(self, signal, rate): #csv_data = self.csv_data x = np.arange(1000,1500,1) t,c,k = self.spline_int d = c.copy() csv_data = interpolate.splev(x,(t,d,k),der=0) period = 2.*1e9/(float(rate)*1e6) # in slices # in order to simulate pulses which are just at the beginning and # at the end of the signal, we pre- and append some points npoints = self.npoints + 2 * len(csv_data) internal_signal = np.zeros(npoints) d=np.random.exponential(period) pulse_positions = [] while d 1: num_events = int(sys.argv[1]) if len(sys.argv) > 2: phe = float(sys.argv[2]) if len(sys.argv) > 3: rate = float(sys.argv[3]) if len(sys.argv) > 4: noise = float(sys.argv[4]) #less interesting length = 1000 baseline = 0.0 signal_pos = 65 generator_string = 'len ' + str(length) + ' ' generator_string += 'bsl ' + str(baseline) + ' ' generator_string += 'signal ' + str(phe) + ' ' + str(signal_pos) + ' ' generator_string += 'noise ' + str(noise) + ' ' generator_string += 'rate ' + str(rate) + ' ' gen = SignalGeneratorCSV('PulseTemplate_PointSet_0.csv', generator_string) print gen rootfilename = str(phe)+'phe' rootfilename += '_'+str(rate)+'MHz' rootfilename += '_'+str(noise)+'mV' rootfilename += '_'+str(num_events) rootfilename += '.root' file = TFile(rootfilename, 'recreate') hist = TH1D('hist','to be done', length, -0.5,length-0.5) hist.SetStats(kFALSE) hist.SetXTitle('time in slices') hist.SetYTitle('amplitude in mV - no baseline adjustment done yet :-( ') for event in range(10): signal = gen() histogram_title = str(phe)+'phe @' + str(signal_pos) histogram_title += ' and ' + str(rate) +'MHz dark count rate' histogram_title += ', electronics noise with sigma=' + str(noise) +'mV' histogram_title += ' #' + str(event) hist.SetTitle(histogram_title ) for i,s in enumerate(signal): hist.SetBinContent(i+1,s) hist.Write() hist.Reset() file.Close()