1 | #!/usr/bin/python -tt
|
---|
2 | #
|
---|
3 | # Dominik Neise, Werner Lustermann
|
---|
4 | # TU Dortmund, ETH Zurich
|
---|
5 | #
|
---|
6 | import numpy as np
|
---|
7 | import sys
|
---|
8 | from ROOT import *
|
---|
9 | from scipy import interpolate
|
---|
10 |
|
---|
11 | from plotters import Plotter
|
---|
12 | import matplotlib.pyplot as plt
|
---|
13 |
|
---|
14 | class SignalGenerator(object):
|
---|
15 | """ Signal Generator
|
---|
16 | generates signals for testing several helper classes like:
|
---|
17 | * fir filters
|
---|
18 | * signal extractors
|
---|
19 | """
|
---|
20 |
|
---|
21 | def __init__(self, option_str = 'len 100 noise 3', name = 'SignalGenerator'):
|
---|
22 | """ initialize the generator
|
---|
23 | sets default signal to generate
|
---|
24 | """
|
---|
25 | self.__module__ = 'generator'
|
---|
26 | self.option_str = option_str.lower()
|
---|
27 | self.options = make_options_from_str(option_str)
|
---|
28 | self.parse_options()
|
---|
29 | self.name = name
|
---|
30 |
|
---|
31 | def parse_options(self):
|
---|
32 | o = self.options #shortcut
|
---|
33 | if 'len' in o:
|
---|
34 | self.npoints = int(o['len'][0])
|
---|
35 | else:
|
---|
36 | self.npoints = 100
|
---|
37 | if 'noise' in o:
|
---|
38 | self.sigma = float(o['noise'][0])
|
---|
39 | else:
|
---|
40 | self.sigma = 1
|
---|
41 | if 'bsl' in o:
|
---|
42 | self.bsl = float(o['bsl'][0])
|
---|
43 | else:
|
---|
44 | self.bsl = -0.5
|
---|
45 |
|
---|
46 | if 'step' in o:
|
---|
47 | self.step_height = float(o['step'][0])
|
---|
48 | self.step_start = int(o['step'][1])
|
---|
49 | self.step_stop = int(o['step'][2])
|
---|
50 |
|
---|
51 | if 'triangle' in o:
|
---|
52 | self.pulses = []
|
---|
53 | # append 1st pulse to list of pulses
|
---|
54 | self.pulses.append( ( float(o['triangle'][0]) , float(o['triangle'][1]), int(o['triangle'][2]), int(o['triangle'][3]) ) )
|
---|
55 | number_of_pulses_after_1st = (len(o['triangle'])-4)/2
|
---|
56 | for i in range(number_of_pulses_after_1st):
|
---|
57 | self.pulses.append( ( float(o['triangle'][2*i+4]) , float(o['triangle'][2*i+5]), int(o['triangle'][2]), int(o['triangle'][3]) ) )
|
---|
58 |
|
---|
59 | if 'spike' in o:
|
---|
60 | self.spikes = []
|
---|
61 | for i in range(len(o['spike'])/2):
|
---|
62 | self.spikes.append( ( int(o['spike'][2*i]), float(o['spike'][2*i+1]) ) )
|
---|
63 |
|
---|
64 | def __call__(self, option_str = ''):
|
---|
65 | if option_str:
|
---|
66 | self.option_str = option_str.lower()
|
---|
67 | self.options = make_options_from_str(self.option_str)
|
---|
68 | self.parse_options()
|
---|
69 |
|
---|
70 | signal = np.zeros(self.npoints)
|
---|
71 | signal += self.bsl
|
---|
72 | signal += np.random.randn(self.npoints) * self.sigma
|
---|
73 | if 'step' in self.options:
|
---|
74 | signal[self.step_start:self.step_stop] += self.step_height
|
---|
75 | if 'triangle' in self.options:
|
---|
76 | for pulse in self.pulses:
|
---|
77 | pos = pulse[0]
|
---|
78 | height = pulse[1]
|
---|
79 | rise = pulse[2]
|
---|
80 | fall = pulse[3]
|
---|
81 | start = pos - rise
|
---|
82 | stop = pos + fall
|
---|
83 | signal[start:pos] += np.linspace(0., height, rise)
|
---|
84 | signal[pos:stop] += np.linspace(height, 0. , fall)
|
---|
85 | if 'spike' in self.options:
|
---|
86 | for spike in self.spikes:
|
---|
87 | signal[spike[0]] += spike[1]
|
---|
88 | return signal
|
---|
89 |
|
---|
90 | def __str__(self):
|
---|
91 | s = self.name + '\n'
|
---|
92 | s += 'possible options and parameters\n'
|
---|
93 | s += ' * len: number of samples (100)\n'
|
---|
94 | s += ' * noise: sigma (1)\n'
|
---|
95 | s += ' * bsl: level (-0.5)\n'
|
---|
96 | s += ' * step: height, start, end\n'
|
---|
97 | s += ' * triangle: pos height risingedge, fallingedge [pos height ...]\n'
|
---|
98 | s += ' * spike: pos height [pos height ...]\n'
|
---|
99 |
|
---|
100 | s += 'current options are:\n'
|
---|
101 | for key in self.options.keys():
|
---|
102 | s += key + ':' + str(self.options[key]) + '\n'
|
---|
103 | return s
|
---|
104 |
|
---|
105 |
|
---|
106 | class SignalGeneratorCSV(object):
|
---|
107 |
|
---|
108 | def __init__(self, file_name, option_str = 'len 100 noise 3', name = 'SignalGenerator'):
|
---|
109 | time, maxprob, mean, median = np.loadtxt( file_name, delimiter=',', unpack=True)
|
---|
110 | csv_data = median
|
---|
111 |
|
---|
112 | self.csv_maxprob = maxprob
|
---|
113 | self.csv_mean = mean
|
---|
114 | self.csv_median = median
|
---|
115 |
|
---|
116 | # shift the input data such, that the left baseline is equal to zero
|
---|
117 | csv_data = csv_data - csv_data[:50].mean()
|
---|
118 |
|
---|
119 | # I want the data to be longer than they are, so I add 1000 slices left and right
|
---|
120 | self.csv_data = np.zeros(2000 + len(csv_data))
|
---|
121 |
|
---|
122 | for i in range(len(csv_data)):
|
---|
123 | self.csv_data[1000+i] = csv_data[i]
|
---|
124 | for i in range(1000):
|
---|
125 | self.csv_data[i] = csv_data[:50].mean()
|
---|
126 | self.csv_data[-1*i] = csv_data[-50:].mean()
|
---|
127 |
|
---|
128 |
|
---|
129 | x = np.arange(0,len(self.csv_data),1)
|
---|
130 | x2 = np.arange(0,len(self.csv_data),0.1)
|
---|
131 |
|
---|
132 | # fermi dirac distribution for weighting the input data.
|
---|
133 | # we do not believe the right tail is really higher than zero, so we make it go to zero.
|
---|
134 | weight_function_right = lambda x : 1. / (1 + np.exp((x - 1180.)/100.))
|
---|
135 | weight_function_left = lambda x : 1. / (1 + np.exp((-1*(x - 1045.))/5.))
|
---|
136 |
|
---|
137 | weights = np.array(map(weight_function_right, x))
|
---|
138 | weights *= np.array(map(weight_function_left, x))
|
---|
139 | weights *= 8
|
---|
140 | weights [1200:] += 3
|
---|
141 | weights += 2
|
---|
142 | #y_weighted = weights * y
|
---|
143 |
|
---|
144 | #print len(weights) == len(x)
|
---|
145 | #print len(weights) == len(self.csv_data)
|
---|
146 | #print weights > 0
|
---|
147 |
|
---|
148 | tck= interpolate.splrep(x, self.csv_data, w=weights)
|
---|
149 | self.spline_int = tck
|
---|
150 |
|
---|
151 |
|
---|
152 | x2 = np.arange(1000,1500,1)
|
---|
153 | t,c,k = tck
|
---|
154 | y2 = interpolate.splev(x2,(t,c,k),der=0)
|
---|
155 |
|
---|
156 |
|
---|
157 | #tck = interpolate.splrep(x, self.csv_data ,w=weights, s=50)
|
---|
158 | #xnew = np.arange(0,len(self.csv_data), 0.1)
|
---|
159 | #y = interpolate.splev(x,tck,der=0)
|
---|
160 | #y2 = interpolate.splev(x2,tck,der=0)
|
---|
161 |
|
---|
162 |
|
---|
163 | #plt.figure()
|
---|
164 | #plt.plot(x,self.csv_data,'.:b',x,weights,'m',x2,y2,'.:r')
|
---|
165 | #plt.show()
|
---|
166 | #raw_input('stop')
|
---|
167 |
|
---|
168 | # csv data was downshifted, I shift it up here
|
---|
169 | self.csv_data = csv_data - csv_data.min()
|
---|
170 |
|
---|
171 |
|
---|
172 | self.__module__ = 'CSV generator'
|
---|
173 | self.option_str = option_str.lower()
|
---|
174 | self.options = make_options_from_str(option_str)
|
---|
175 | self.parse_options()
|
---|
176 | self.name = name
|
---|
177 |
|
---|
178 | def parse_options(self):
|
---|
179 | o = self.options #shortcut
|
---|
180 | if 'len' in o:
|
---|
181 | self.npoints = int(o['len'][0])
|
---|
182 | else:
|
---|
183 | self.npoints = 1024
|
---|
184 | if 'noise' in o:
|
---|
185 | self.sigma = float(o['noise'][0])
|
---|
186 | else:
|
---|
187 | self.sigma = 1
|
---|
188 | if 'bsl' in o:
|
---|
189 | self.bsl = float(o['bsl'][0])
|
---|
190 | else:
|
---|
191 | self.bsl = -0.5
|
---|
192 |
|
---|
193 | if 'step' in o:
|
---|
194 | self.step_height = float(o['step'][0])
|
---|
195 | self.step_start = int(o['step'][1])
|
---|
196 | self.step_stop = int(o['step'][2])
|
---|
197 |
|
---|
198 | if 'triangle' in o:
|
---|
199 | self.pulses = []
|
---|
200 | # append 1st pulse to list of pulses
|
---|
201 | self.pulses.append( ( float(o['triangle'][0]) , float(o['triangle'][1]), int(o['triangle'][2]), int(o['triangle'][3]) ) )
|
---|
202 | number_of_pulses_after_1st = (len(o['triangle'])-4)/2
|
---|
203 | for i in range(number_of_pulses_after_1st):
|
---|
204 | self.pulses.append( ( float(o['triangle'][2*i+4]) , float(o['triangle'][2*i+5]), int(o['triangle'][2]), int(o['triangle'][3]) ) )
|
---|
205 |
|
---|
206 | if 'spike' in o:
|
---|
207 | self.spikes = []
|
---|
208 | for i in range(len(o['spike'])/2):
|
---|
209 | self.spikes.append( ( int(o['spike'][2*i]), float(o['spike'][2*i+1]) ) )
|
---|
210 |
|
---|
211 | if 'csv' in o:
|
---|
212 | self.csvs = []
|
---|
213 | for i in range(len(o['csv'])/2):
|
---|
214 | time = int( o['csv'][2*i] )
|
---|
215 | amplitude = float( o['csv'][2*i+1] )
|
---|
216 | self.csvs.append( (time, amplitude) )
|
---|
217 | if 'rate' in o:
|
---|
218 | self.rate = float(o['rate'][0])
|
---|
219 |
|
---|
220 | if 'signal' in o:
|
---|
221 | self.amplitude = float(o['signal'][0])
|
---|
222 | self.position = int(o['signal'][1])
|
---|
223 |
|
---|
224 | def __call__(self, option_str = ''):
|
---|
225 | if option_str:
|
---|
226 | self.option_str = option_str.lower()
|
---|
227 | self.options = make_options_from_str(self.option_str)
|
---|
228 | self.parse_options()
|
---|
229 |
|
---|
230 | signal = np.zeros(self.npoints)
|
---|
231 | signal += self.bsl
|
---|
232 |
|
---|
233 | # shortcut
|
---|
234 | o = self.options
|
---|
235 |
|
---|
236 | if 'step' in o:
|
---|
237 | signal[self.step_start:self.step_stop] += self.step_height
|
---|
238 | if 'triangle' in o:
|
---|
239 | for pulse in o:
|
---|
240 | pos = pulse[0]
|
---|
241 | height = pulse[1]
|
---|
242 | rise = pulse[2]
|
---|
243 | fall = pulse[3]
|
---|
244 | start = pos - rise
|
---|
245 | stop = pos + fall
|
---|
246 | signal[start:pos] += np.linspace(0., height, rise)
|
---|
247 | signal[pos:stop] += np.linspace(height, 0. , fall)
|
---|
248 | if 'spike' in o:
|
---|
249 | for spike in self.spikes:
|
---|
250 | signal[spike[0]] += spike[1]
|
---|
251 |
|
---|
252 | if 'csv' in o:
|
---|
253 | for csv in self.csvs:
|
---|
254 | amplitude = csv[1]
|
---|
255 | time = csv[0]
|
---|
256 |
|
---|
257 | #csv_data = self.csv_data.copy()
|
---|
258 | #scale
|
---|
259 | #csv_data *= amplitude
|
---|
260 |
|
---|
261 | x = np.arange(0,300,1)
|
---|
262 | t,c,k = self.spline_int
|
---|
263 | d = c.copy()
|
---|
264 | d *= amplitude
|
---|
265 | csv_data = interpolate.splev(x,(t,d,k),der=0)
|
---|
266 |
|
---|
267 | # add shifted
|
---|
268 | signal[time:time+len(csv_data)] += csv_data
|
---|
269 |
|
---|
270 | if 'rate' in o:
|
---|
271 | self._add_template_pulses(signal, self.rate)
|
---|
272 |
|
---|
273 | if 'signal' in o:
|
---|
274 | self._add_signal(signal)
|
---|
275 |
|
---|
276 | # add noise
|
---|
277 | if 'noise' in o:
|
---|
278 | signal += + np.random.normal(0.0,self.sigma, signal.shape)
|
---|
279 |
|
---|
280 | return signal
|
---|
281 |
|
---|
282 | def _add_signal(self, signal):
|
---|
283 | a = self.amplitude
|
---|
284 | p = self.position
|
---|
285 |
|
---|
286 | #csv_data = self.csv_data
|
---|
287 | #csv = csv_data.copy() * a
|
---|
288 |
|
---|
289 | x = np.arange(1000,1500,1)
|
---|
290 | t,c,k = self.spline_int
|
---|
291 | d = c.copy()
|
---|
292 | d *= a
|
---|
293 | csv = interpolate.splev(x,(t,d,k),der=0)
|
---|
294 |
|
---|
295 | npoints = self.npoints + 2 * len(csv)
|
---|
296 | internal_signal = np.zeros(npoints)
|
---|
297 |
|
---|
298 | internal_signal[p-70+len(csv):p+2*len(csv)-70] += csv
|
---|
299 |
|
---|
300 | signal += internal_signal[len(csv):-len(csv)]
|
---|
301 |
|
---|
302 | def _add_template_pulses(self, signal, rate):
|
---|
303 |
|
---|
304 | #csv_data = self.csv_data
|
---|
305 | x = np.arange(1000,1500,1)
|
---|
306 | t,c,k = self.spline_int
|
---|
307 | d = c.copy()
|
---|
308 | csv_data = interpolate.splev(x,(t,d,k),der=0)
|
---|
309 |
|
---|
310 |
|
---|
311 | period = 2.*1e9/(float(rate)*1e6) # in slices
|
---|
312 | # in order to simulate pulses which are just at the beginning and
|
---|
313 | # at the end of the signal, we pre- and append some points
|
---|
314 | npoints = self.npoints + 2 * len(csv_data)
|
---|
315 |
|
---|
316 | internal_signal = np.zeros(npoints)
|
---|
317 |
|
---|
318 | d=np.random.exponential(period)
|
---|
319 | pulse_positions = []
|
---|
320 | while d<npoints-len(csv_data):
|
---|
321 | pulse_positions.append(d)
|
---|
322 | d+=np.random.exponential(period)
|
---|
323 |
|
---|
324 | for pos in pulse_positions:
|
---|
325 | pos = int(pos + 0.5)
|
---|
326 | internal_signal[pos:pos+len(csv_data)] += csv_data
|
---|
327 |
|
---|
328 | signal += internal_signal[len(csv_data):-len(csv_data)]
|
---|
329 |
|
---|
330 | def __str__(self):
|
---|
331 | s = self.name + '\n'
|
---|
332 | s += 'possible options and parameters\n'
|
---|
333 | s += ' * len: number of samples (100)\n'
|
---|
334 | s += ' * noise: sigma (1)\n'
|
---|
335 | s += ' * bsl: level (-0.5)\n'
|
---|
336 | s += ' * step: height, start, end\n'
|
---|
337 | s += ' * triangle: pos height risingedge, fallingedge [pos height ...]\n'
|
---|
338 | s += ' * spike: pos height [pos height ...]\n'
|
---|
339 | s += ' * csv: pos height [pos height ...]\n'
|
---|
340 |
|
---|
341 | s += 'current options are:\n'
|
---|
342 | for key in self.options.keys():
|
---|
343 | s += key + ':' + str(self.options[key]) + '\n'
|
---|
344 | return s
|
---|
345 |
|
---|
346 |
|
---|
347 | # Helper function to parse signalname and create a dictionary
|
---|
348 | # dictionary layout :
|
---|
349 | # key : string
|
---|
350 | # value : [list of parameters]
|
---|
351 | def make_options_from_str(signalname):
|
---|
352 | options = {}
|
---|
353 | for word in (signalname.lower()).split():
|
---|
354 | if word.isalpha():
|
---|
355 | current_key = word
|
---|
356 | options[current_key] = []
|
---|
357 | # if word.isdigit():
|
---|
358 | else:
|
---|
359 | options[current_key].append(word)
|
---|
360 | # else:
|
---|
361 | # print '-nothing'
|
---|
362 | return options
|
---|
363 |
|
---|
364 | if __name__ == '__main__':
|
---|
365 |
|
---|
366 |
|
---|
367 | num_events = 10
|
---|
368 |
|
---|
369 | # interesting
|
---|
370 | phe = 1 # i.e. the signal amplitude
|
---|
371 | rate = 10 # MHz
|
---|
372 | noise = 1. # mV sigma
|
---|
373 |
|
---|
374 | if len(sys.argv) > 1:
|
---|
375 | num_events = int(sys.argv[1])
|
---|
376 | if len(sys.argv) > 2:
|
---|
377 | phe = float(sys.argv[2])
|
---|
378 | if len(sys.argv) > 3:
|
---|
379 | rate = float(sys.argv[3])
|
---|
380 | if len(sys.argv) > 4:
|
---|
381 | noise = float(sys.argv[4])
|
---|
382 |
|
---|
383 |
|
---|
384 | #less interesting
|
---|
385 | length = 1000
|
---|
386 | baseline = 0.0
|
---|
387 | signal_pos = 65
|
---|
388 |
|
---|
389 | generator_string = 'len ' + str(length) + ' '
|
---|
390 | generator_string += 'bsl ' + str(baseline) + ' '
|
---|
391 | generator_string += 'signal ' + str(phe) + ' ' + str(signal_pos) + ' '
|
---|
392 | generator_string += 'noise ' + str(noise) + ' '
|
---|
393 | generator_string += 'rate ' + str(rate) + ' '
|
---|
394 | gen = SignalGeneratorCSV('PulseTemplate_PointSet_0.csv', generator_string)
|
---|
395 | print gen
|
---|
396 |
|
---|
397 | rootfilename = str(phe)+'phe'
|
---|
398 | rootfilename += '_'+str(rate)+'MHz'
|
---|
399 | rootfilename += '_'+str(noise)+'mV'
|
---|
400 | rootfilename += '_'+str(num_events)
|
---|
401 | rootfilename += '.root'
|
---|
402 |
|
---|
403 |
|
---|
404 |
|
---|
405 | file = TFile(rootfilename, 'recreate')
|
---|
406 |
|
---|
407 | hist = TH1D('hist','to be done', length, -0.5,length-0.5)
|
---|
408 | hist.SetStats(kFALSE)
|
---|
409 | hist.SetXTitle('time in slices')
|
---|
410 | hist.SetYTitle('amplitude in mV - no baseline adjustment done yet :-( ')
|
---|
411 |
|
---|
412 | for event in range(10):
|
---|
413 | signal = gen()
|
---|
414 |
|
---|
415 | histogram_title = str(phe)+'phe @' + str(signal_pos)
|
---|
416 | histogram_title += ' and ' + str(rate) +'MHz dark count rate'
|
---|
417 | histogram_title += ', electronics noise with sigma=' + str(noise) +'mV'
|
---|
418 | histogram_title += ' #' + str(event)
|
---|
419 | hist.SetTitle(histogram_title )
|
---|
420 |
|
---|
421 | for i,s in enumerate(signal):
|
---|
422 | hist.SetBinContent(i+1,s)
|
---|
423 | hist.Write()
|
---|
424 | hist.Reset()
|
---|
425 | file.Close() |
---|