source: fact/tools/pyscripts/sandbox/dneise/spike_studies/zerosearch.py@ 15446

Last change on this file since 15446 was 13632, checked in by neise, 13 years ago
initial commit of my tests and so on
File size: 6.9 KB
Line 
1#!/usr/bin/python -tt
2#
3# Dominik Neise, Werner Lustermann
4# TU Dortmund, ETH Zurich
5#
6import numpy as np
7
8class ZeroSearch( object ):
9 """ class closely related to the methods in rootmacros/zerosearch.C and .h
10 """
11 def __init__(self, slope=1, name = 'ZeroSearch'):
12
13 def __call__(self, data):
14 """ returns list of lists """
15
16 for pid, pixel in enumerate(data):
17
18 # candidates for zero crossings are, where two neighboring
19 # cells have opposite signs, i.e. their product is < 0
20 product = pixel[:-1] * pixel[1:]
21 candidates = np.where(product < 0)[0]
22
23 if slope>0:
24 candidates = candidates[np.where(pixel[candidates] > 0)[0]]
25 else:
26 candidates = candidates[np.where(pixel[candidates] < 0)[0]]
27
28
29class GlobalMaxFinder(object):
30 """ Pulse Extractor
31 Finds the global maximum in the given window.
32 (Best used with filtered data)
33 """
34
35 def __init__(self, min=30, max=250 , name = 'GlobalMaxFinder'):
36 """ initialize search Window
37
38 """
39 self.__module__="extractor"
40 self.min = min
41 self.max = max
42 self.name = name
43
44 def __call__(self, data):
45 if data.ndim > 1:
46 time = np.argmax( data[ : , self.min:self.max ], 1)
47 amplitude = np.max( data[ : , self.min:self.max], 1)
48 else:
49 time = np.argmax( data[self.min:self.max])
50 amplitude = np.max( data[self.min:self.max])
51 return amplitude, time+self.min
52
53 def __str__(self):
54 s = self.name + '\n'
55 s += 'window:\n'
56 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
57 return s
58
59 def test(self):
60 pass
61
62
63class WindowIntegrator(object):
64 """ Integrates in a given intergration window around the given position
65 """
66
67 def __init__(self, min=13, max=23 , name = 'WindowIntegrator'):
68 """ initialize integration Window
69 """
70 self.__module__="extractor"
71 self.min = min
72 self.max = max
73 self.name = name
74
75 def __call__(self, data, pos):
76 integral = np.empty( data.shape[0] )
77 for pixel in range( data.shape[0] ):
78 integral[pixel] = data[pixel, (pos[pixel]-self.min):(pos[pixel]+self.max)].sum()
79 return integral
80
81 def __str__(self):
82 s = self.name + '\n'
83 s += 'window:\n'
84 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
85 return s
86
87class FixedWindowIntegrator(object):
88 """ Integrates in a given intergration window
89 """
90
91 def __init__(self, min=55, max=105 , name = 'FixedWindowIntegrator'):
92 """ initialize integration Window
93 """
94 self.__module__="extractor"
95 self.min = min
96 self.max = max
97 self.name = name
98
99 def __call__(self, data):
100 integral = np.empty( data.shape[0] )
101 for pixel in range( data.shape[0] ):
102 integral[pixel] = data[pixel, self.min:self.max].sum()
103 return integral
104
105 def __str__(self):
106 s = self.name + '\n'
107 s += 'window:\n'
108 s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')'
109 return s
110
111class ZeroXing(object):
112 """ Finds zero crossings in given data
113 (should be used on CFD output for peak finding)
114 returns list of lists of time_of_zero_crossing
115 """
116 def __init__(self, slope=1, name = 'ZeroXing'):
117 self.__module__="extractor"
118 if (slope >= 0):
119 self.slope = 1 # search for rising edge crossing
120 elif (slope < 0):
121 self.slope = -1 # search for falling edge crossing
122 self.name = name
123
124
125 def __call__(self, data, zero_level = 0):
126 all_hits = []
127 for pix_data in data:
128 hits = []
129 for i in range( data.shape[1]-1 ):
130 dat = pix_data[i] - zero_level
131 next_dat = pix_data[i+1] - zero_level
132 if ( self.slope > 0 ):
133 if ( dat > 0 ):
134 continue
135 else:
136 if ( dat < 0):
137 continue
138 if ( dat * next_dat <= 0 ):
139 # interpolate time of zero crossing with
140 # linear polynomial: y = ax + b
141 a = (next_dat - dat) / ((i+1) - i)
142 time = -1.0/a * dat + i
143 hits.append(time)
144 all_hits.append(hits)
145 return all_hits
146
147 def __str__(self):
148 s = self.name + '\n'
149 if (self.slope == 1):
150 s += 'search for rising edge crossing.\n'
151 else:
152 s += 'search for falling edge crossing.\n'
153 return s
154
155
156
157def _test_GlobalMaxFinder():
158 gmf = GlobalMaxFinder(30,250)
159 print gmf
160 amplitude, time = gmf(event)
161 if abs(amplitude.mean() - 10) < 0.5:
162 print "Test 1: OK GlobalMaxFinder found amplitude correctly", amplitude.mean()
163 if abs(time.mean() - 65) < 2:
164 print "Test 1: OK GlobalMaxFinder found time correctly", time.mean()
165 else:
166 print "BAD: time mean:", time.mean()
167
168def _test_FixedWindowIntegrator():
169 fwi = FixedWindowIntegrator(50,200)
170 print fwi
171 integral = fwi(event)
172 #value of integral should be: 150*bsl + 8*10/2 + 100*10/2 = 465
173 if abs( integral.mean() - 465) < 2:
174 print "Test 2: OK FixedWindowIntegrator found integral correctly", integral.mean()
175 else:
176 print "Test 2: X FixedWindowIntegrator integral.mean failed:", integral.mean()
177
178def _test_ZeroXing():
179 cfd = CFD()
180 sa = SlidingAverage(8)
181 print sa
182 cfd_out = sa(event)
183 cfd_out = cfd(cfd_out )
184 cfd_out = sa(cfd_out)
185 zx = ZeroXing()
186 print zx
187 list_of_list_of_times = zx(cfd_out)
188 times = []
189 for list_of_times in list_of_list_of_times:
190 times.extend(list_of_times)
191 times = np.array(times)
192
193 hist,bins = np.histogram(times,3000,(0,300))
194 most_probable_time = np.argmax(hist)
195 print 'most probable time of zero-crossing', most_probable_time/10.
196 print 'this includes filter delays ... for average filter setting 8 this turns out to be 78.8 most of the time'
197
198if __name__ == '__main__':
199 import matplotlib.pyplot as plt
200 """ test the extractors """
201
202 # Generate a fake event, with a triangular pulse at slice 65
203 sg = SignalGenerator()
204 pulse_str = 'len 300 bsl -0.5 noise 0.5 triangle 65 10 8 100'
205 pulse = sg(pulse_str)
206 event = []
207 for i in range(1440):
208 event.append(sg(pulse_str))
209 event = np.array(event)
210 print 'test event with 1000 pixel generated, like this:'
211 print pulse_str
212 print
213
214 print '_test_GlobalMaxFinder()'
215 _test_GlobalMaxFinder()
216 print
217 print
218 print '_test_FixedWindowIntegrator()'
219 _test_FixedWindowIntegrator()
220 print
221 print
222 print '_test_ZeroXing()'
223 _test_ZeroXing()
224 print
Note: See TracBrowser for help on using the repository browser.