source: fact/tools/pyscripts/pyfact/extractor.py@ 13028

Last change on this file since 13028 was 13028, checked in by neise, 13 years ago
added class WindowIntegrator, which integrate around a give position in a given window
File size: 6.2 KB
Line 
1#!/usr/bin/python
2#
3# Dominik Neise, Werner Lustermann
4# TU Dortmund, ETH Zurich
5#
6import numpy as np
7import matplotlib.pyplot as plt
8from generator import *
9from fir_filter import *
10
11class GlobalMaxFinder(object):
12 """ Pulse Extractor
13 Finds the global maximum in the given window.
14 (Best used with filtered data)
15 """
16
17 def __init__(self, min=30, max=250 , name = 'GlobalMaxFinder'):
18 """ initialize search Window
19
20 """
21 self.min = min
22 self.max = max
23 self.name = name
24
25 def __call__(self, data):
26 if data.ndim > 1:
27 time = np.argmax( data[ : , self.min:self.max ], 1)
28 amplitude = np.max( data[ : , self.min:self.max], 1)
29 else:
30 time = np.argmax( data[self.min:self.max])
31 amplitude = np.max( data[self.min:self.max])
32 return amplitude, time+self.min
33
34 def __str__(self):
35 s = self.name + '\n'
36 s += 'window:\n'
37 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
38 return s
39
40 def test(self):
41 pass
42
43
44class WindowIntegrator(object):
45 """ Integrates in a given intergration window around the given position
46 """
47
48 def __init__(self, min=13, max=23 , name = 'WindowIntegrator'):
49 """ initialize integration Window
50 """
51 self.min = min
52 self.max = max
53 self.name = name
54
55 def __call__(self, data, pos):
56 integral = np.empty( data.shape[0] )
57 for pixel in range( data.shape[0] ):
58 integral[pixel] = data[pixel, (pos[pixel]-self.min):(pos[pixel]+self.max)].sum()
59 return integral
60
61 def __str__(self):
62 s = self.name + '\n'
63 s += 'window:\n'
64 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
65 return s
66
67class FixedWindowIntegrator(object):
68 """ Integrates in a given intergration window
69 """
70
71 def __init__(self, min=55, max=105 , name = 'FixedWindowIntegrator'):
72 """ initialize integration Window
73 """
74 self.min = min
75 self.max = max
76 self.name = name
77
78 def __call__(self, data):
79 integral = np.empty( data.shape[0] )
80 for pixel in range( data.shape[0] ):
81 integral[pixel] = data[pixel, self.min:self.max].sum()
82 return integral
83
84 def __str__(self):
85 s = self.name + '\n'
86 s += 'window:\n'
87 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
88 return s
89
90class ZeroXing(object):
91 """ Finds zero crossings in given data
92 (should be used on CFD output for peak finding)
93 returns list of lists of time_of_zero_crossing
94 """
95 def __init__(self, slope=1, name = 'ZeroXing'):
96 if (slope >= 0):
97 self.slope = 1 # search for rising edge crossing
98 elif (slope < 0):
99 self.slope = -1 # search for falling edge crossing
100 self.name = name
101
102
103 def __call__(self, data):
104 all_hits = []
105 for pix_data in data:
106 hits = []
107 for i in range( data.shape[1]-1 ):
108 if ( self.slope > 0 ):
109 if ( pix_data[i] > 0 ):
110 continue
111 else:
112 if ( pix_data[i] < 0):
113 continue
114 if ( pix_data[i] * pix_data[i+1] <= 0 ):
115 # interpolate time of zero crossing with
116 # linear polynomial: y = ax + b
117 a = (pix_data[i+1] - pix_data[i]) / ((i+1) - i)
118 time = -1.0/a * pix_data[i] + i
119 hits.append(time)
120 all_hits.append(hits)
121 return all_hits
122
123 def __str__(self):
124 s = self.name + '\n'
125 if (self.slope == 1):
126 s += 'search for rising edge crossing.\n'
127 else:
128 s += 'search for falling edge crossing.\n'
129 return s
130
131
132def plotter(signal, text):
133 x=range(len(signal))
134 ax = plt.plot(x, signal, 'b.', label='signal')
135 plt.title(text)
136 plt.xlabel('sample')
137 plt.legend()
138 plt.grid(True)
139 plt.show()
140
141def histplotter(signal, text):
142 plt.xlabel('time of zero crossing')
143 plt.title(text)
144 plt.axis([0, 300,0,1440])
145 plt.grid(True)
146 n, bins, patches = plt.hist(signal, 3000, facecolor='r', alpha=0.75)
147 plt.show()
148
149
150if __name__ == '__main__':
151 """ test the extractors """
152 sg = SignalGenerator()
153 pulse_str = 'len 300 bsl -0.5 noise 0.5 triangle 65 10 8 100'
154 pulse = sg(pulse_str)
155 event = []
156 for i in range(1440):
157 event.append(sg(pulse_str))
158 event = np.array(event)
159 print 'test event with 1000 pixel generated, like this:'
160 print pulse_str
161 print
162
163 # Test GlobalMaxFinder
164 gmf = GlobalMaxFinder(30,250)
165 print gmf
166 amplitude, time = gmf(event)
167 if abs(amplitude.mean() - 10) < 0.5:
168 print "Test 1: OK GlobalMaxFinder found amplitude correctly", amplitude.mean()
169 if abs(time.mean() - 65) < 2:
170 print "Test 1: OK GlobalMaxFinder found time correctly", time.mean()
171 else:
172 print "BAD: time mean:", time.mean()
173
174 print
175
176 # Test FixedWindowIntegrator
177 fwi = FixedWindowIntegrator(50,200)
178 print fwi
179 integral = fwi(event)
180 #value of integral should be: 150*bsl + 8*10/2 + 100*10/2 = 465
181 if abs( integral.mean() - 465) < 2:
182 print "Test 2: OK FixedWindowIntegrator found integral correctly", integral.mean()
183 else:
184 print "Test 2: X FixedWindowIntegrator integral.mean failed:", integral.mean()
185
186 print
187 cfd = CFD()
188 sa = SlidingAverage(8)
189 print sa
190 cfd_out = sa(event)
191 cfd_out = cfd(cfd_out )
192 cfd_out = sa(cfd_out)
193 zx = ZeroXing()
194 print zx
195 list_of_list_of_times = zx(cfd_out)
196 times = []
197 for list_of_times in list_of_list_of_times:
198 times.extend(list_of_times)
199 times = np.array(times)
200
201 hist,bins = np.histogram(times,3000,(0,300))
202 most_probable_time = np.argmax(hist)
203 print 'most probable time of zero-crossing', most_probable_time/10.
204 print 'this includes filter delays ... for average filter setting 8 this turns out to be 78.8 most of the time'
205
206 #histplotter(times.flatten(), 'times.flatten()')
207 #plotter(cfd_out[0], 'cfd_out')
208 #plotter (pulse, pulse_str)
209
210
211
Note: See TracBrowser for help on using the repository browser.