source: fact/tools/pyscripts/pyfact/extractor.py@ 13420

Last change on this file since 13420 was 13367, checked in by neise, 13 years ago
ZeroXing supports also Xings at different levels, so one can set the zero-level to another level
File size: 6.2 KB
Line 
1#!/usr/bin/python -tt
2#
3# Dominik Neise, Werner Lustermann
4# TU Dortmund, ETH Zurich
5#
6import numpy as np
7from generator import *
8from fir_filter import *
9
10class GlobalMaxFinder(object):
11 """ Pulse Extractor
12 Finds the global maximum in the given window.
13 (Best used with filtered data)
14 """
15
16 def __init__(self, min=30, max=250 , name = 'GlobalMaxFinder'):
17 """ initialize search Window
18
19 """
20 self.__module__="extractor"
21 self.min = min
22 self.max = max
23 self.name = name
24
25 def __call__(self, data):
26 if data.ndim > 1:
27 time = np.argmax( data[ : , self.min:self.max ], 1)
28 amplitude = np.max( data[ : , self.min:self.max], 1)
29 else:
30 time = np.argmax( data[self.min:self.max])
31 amplitude = np.max( data[self.min:self.max])
32 return amplitude, time+self.min
33
34 def __str__(self):
35 s = self.name + '\n'
36 s += 'window:\n'
37 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
38 return s
39
40 def test(self):
41 pass
42
43
44class WindowIntegrator(object):
45 """ Integrates in a given intergration window around the given position
46 """
47
48 def __init__(self, min=13, max=23 , name = 'WindowIntegrator'):
49 """ initialize integration Window
50 """
51 self.__module__="extractor"
52 self.min = min
53 self.max = max
54 self.name = name
55
56 def __call__(self, data, pos):
57 integral = np.empty( data.shape[0] )
58 for pixel in range( data.shape[0] ):
59 integral[pixel] = data[pixel, (pos[pixel]-self.min):(pos[pixel]+self.max)].sum()
60 return integral
61
62 def __str__(self):
63 s = self.name + '\n'
64 s += 'window:\n'
65 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
66 return s
67
68class FixedWindowIntegrator(object):
69 """ Integrates in a given intergration window
70 """
71
72 def __init__(self, min=55, max=105 , name = 'FixedWindowIntegrator'):
73 """ initialize integration Window
74 """
75 self.__module__="extractor"
76 self.min = min
77 self.max = max
78 self.name = name
79
80 def __call__(self, data):
81 integral = np.empty( data.shape[0] )
82 for pixel in range( data.shape[0] ):
83 integral[pixel] = data[pixel, self.min:self.max].sum()
84 return integral
85
86 def __str__(self):
87 s = self.name + '\n'
88 s += 'window:\n'
89 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
90 return s
91
92class ZeroXing(object):
93 """ Finds zero crossings in given data
94 (should be used on CFD output for peak finding)
95 returns list of lists of time_of_zero_crossing
96 """
97 def __init__(self, slope=1, name = 'ZeroXing'):
98 self.__module__="extractor"
99 if (slope >= 0):
100 self.slope = 1 # search for rising edge crossing
101 elif (slope < 0):
102 self.slope = -1 # search for falling edge crossing
103 self.name = name
104
105
106 def __call__(self, data, zero_level = 0):
107 all_hits = []
108 for pix_data in data:
109 hits = []
110 for i in range( data.shape[1]-1 ):
111 dat = pix_data[i] - zero_level
112 next_dat = pix_data[i+1] - zero_level
113 if ( self.slope > 0 ):
114 if ( dat > 0 ):
115 continue
116 else:
117 if ( dat < 0):
118 continue
119 if ( dat * next_dat <= 0 ):
120 # interpolate time of zero crossing with
121 # linear polynomial: y = ax + b
122 a = (next_dat - dat) / ((i+1) - i)
123 time = -1.0/a * dat + i
124 hits.append(time)
125 all_hits.append(hits)
126 return all_hits
127
128 def __str__(self):
129 s = self.name + '\n'
130 if (self.slope == 1):
131 s += 'search for rising edge crossing.\n'
132 else:
133 s += 'search for falling edge crossing.\n'
134 return s
135
136
137
138def _test_GlobalMaxFinder():
139 gmf = GlobalMaxFinder(30,250)
140 print gmf
141 amplitude, time = gmf(event)
142 if abs(amplitude.mean() - 10) < 0.5:
143 print "Test 1: OK GlobalMaxFinder found amplitude correctly", amplitude.mean()
144 if abs(time.mean() - 65) < 2:
145 print "Test 1: OK GlobalMaxFinder found time correctly", time.mean()
146 else:
147 print "BAD: time mean:", time.mean()
148
149def _test_FixedWindowIntegrator():
150 fwi = FixedWindowIntegrator(50,200)
151 print fwi
152 integral = fwi(event)
153 #value of integral should be: 150*bsl + 8*10/2 + 100*10/2 = 465
154 if abs( integral.mean() - 465) < 2:
155 print "Test 2: OK FixedWindowIntegrator found integral correctly", integral.mean()
156 else:
157 print "Test 2: X FixedWindowIntegrator integral.mean failed:", integral.mean()
158
159def _test_ZeroXing():
160 cfd = CFD()
161 sa = SlidingAverage(8)
162 print sa
163 cfd_out = sa(event)
164 cfd_out = cfd(cfd_out )
165 cfd_out = sa(cfd_out)
166 zx = ZeroXing()
167 print zx
168 list_of_list_of_times = zx(cfd_out)
169 times = []
170 for list_of_times in list_of_list_of_times:
171 times.extend(list_of_times)
172 times = np.array(times)
173
174 hist,bins = np.histogram(times,3000,(0,300))
175 most_probable_time = np.argmax(hist)
176 print 'most probable time of zero-crossing', most_probable_time/10.
177 print 'this includes filter delays ... for average filter setting 8 this turns out to be 78.8 most of the time'
178
179if __name__ == '__main__':
180 import matplotlib.pyplot as plt
181 """ test the extractors """
182
183 # Generate a fake event, with a triangular pulse at slice 65
184 sg = SignalGenerator()
185 pulse_str = 'len 300 bsl -0.5 noise 0.5 triangle 65 10 8 100'
186 pulse = sg(pulse_str)
187 event = []
188 for i in range(1440):
189 event.append(sg(pulse_str))
190 event = np.array(event)
191 print 'test event with 1000 pixel generated, like this:'
192 print pulse_str
193 print
194
195 print '_test_GlobalMaxFinder()'
196 _test_GlobalMaxFinder()
197 print
198 print
199 print '_test_FixedWindowIntegrator()'
200 _test_FixedWindowIntegrator()
201 print
202 print
203 print '_test_ZeroXing()'
204 _test_ZeroXing()
205 print
Note: See TracBrowser for help on using the repository browser.