#!/usr/bin/python -tt # # Dominik Neise, Werner Lustermann # TU Dortmund, ETH Zurich # import numpy as np class ZeroSearch( object ): """ class closely related to the methods in rootmacros/zerosearch.C and .h """ def __init__(self, slope=1, name = 'ZeroSearch'): def __call__(self, data): """ returns list of lists """ for pid, pixel in enumerate(data): # candidates for zero crossings are, where two neighboring # cells have opposite signs, i.e. their product is < 0 product = pixel[:-1] * pixel[1:] candidates = np.where(product < 0)[0] if slope>0: candidates = candidates[np.where(pixel[candidates] > 0)[0]] else: candidates = candidates[np.where(pixel[candidates] < 0)[0]] class GlobalMaxFinder(object): """ Pulse Extractor Finds the global maximum in the given window. (Best used with filtered data) """ def __init__(self, min=30, max=250 , name = 'GlobalMaxFinder'): """ initialize search Window """ self.__module__="extractor" self.min = min self.max = max self.name = name def __call__(self, data): if data.ndim > 1: time = np.argmax( data[ : , self.min:self.max ], 1) amplitude = np.max( data[ : , self.min:self.max], 1) else: time = np.argmax( data[self.min:self.max]) amplitude = np.max( data[self.min:self.max]) return amplitude, time+self.min def __str__(self): s = self.name + '\n' s += 'window:\n' s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')' return s def test(self): pass class WindowIntegrator(object): """ Integrates in a given intergration window around the given position """ def __init__(self, min=13, max=23 , name = 'WindowIntegrator'): """ initialize integration Window """ self.__module__="extractor" self.min = min self.max = max self.name = name def __call__(self, data, pos): integral = np.empty( data.shape[0] ) for pixel in range( data.shape[0] ): integral[pixel] = data[pixel, (pos[pixel]-self.min):(pos[pixel]+self.max)].sum() return integral def __str__(self): s = self.name + '\n' s += 'window:\n' s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')' return s class FixedWindowIntegrator(object): """ Integrates in a given intergration window """ def __init__(self, min=55, max=105 , name = 'FixedWindowIntegrator'): """ initialize integration Window """ self.__module__="extractor" self.min = min self.max = max self.name = name def __call__(self, data): integral = np.empty( data.shape[0] ) for pixel in range( data.shape[0] ): integral[pixel] = data[pixel, self.min:self.max].sum() return integral def __str__(self): s = self.name + '\n' s += 'window:\n' s += '(min,max) = (' + str(self.min) + ',' + str(self.max) + ')' return s class ZeroXing(object): """ Finds zero crossings in given data (should be used on CFD output for peak finding) returns list of lists of time_of_zero_crossing """ def __init__(self, slope=1, name = 'ZeroXing'): self.__module__="extractor" if (slope >= 0): self.slope = 1 # search for rising edge crossing elif (slope < 0): self.slope = -1 # search for falling edge crossing self.name = name def __call__(self, data, zero_level = 0): all_hits = [] for pix_data in data: hits = [] for i in range( data.shape[1]-1 ): dat = pix_data[i] - zero_level next_dat = pix_data[i+1] - zero_level if ( self.slope > 0 ): if ( dat > 0 ): continue else: if ( dat < 0): continue if ( dat * next_dat <= 0 ): # interpolate time of zero crossing with # linear polynomial: y = ax + b a = (next_dat - dat) / ((i+1) - i) time = -1.0/a * dat + i hits.append(time) all_hits.append(hits) return all_hits def __str__(self): s = self.name + '\n' if (self.slope == 1): s += 'search for rising edge crossing.\n' else: s += 'search for falling edge crossing.\n' return s def _test_GlobalMaxFinder(): gmf = GlobalMaxFinder(30,250) print gmf amplitude, time = gmf(event) if abs(amplitude.mean() - 10) < 0.5: print "Test 1: OK GlobalMaxFinder found amplitude correctly", amplitude.mean() if abs(time.mean() - 65) < 2: print "Test 1: OK GlobalMaxFinder found time correctly", time.mean() else: print "BAD: time mean:", time.mean() def _test_FixedWindowIntegrator(): fwi = FixedWindowIntegrator(50,200) print fwi integral = fwi(event) #value of integral should be: 150*bsl + 8*10/2 + 100*10/2 = 465 if abs( integral.mean() - 465) < 2: print "Test 2: OK FixedWindowIntegrator found integral correctly", integral.mean() else: print "Test 2: X FixedWindowIntegrator integral.mean failed:", integral.mean() def _test_ZeroXing(): cfd = CFD() sa = SlidingAverage(8) print sa cfd_out = sa(event) cfd_out = cfd(cfd_out ) cfd_out = sa(cfd_out) zx = ZeroXing() print zx list_of_list_of_times = zx(cfd_out) times = [] for list_of_times in list_of_list_of_times: times.extend(list_of_times) times = np.array(times) hist,bins = np.histogram(times,3000,(0,300)) most_probable_time = np.argmax(hist) print 'most probable time of zero-crossing', most_probable_time/10. print 'this includes filter delays ... for average filter setting 8 this turns out to be 78.8 most of the time' if __name__ == '__main__': import matplotlib.pyplot as plt """ test the extractors """ # Generate a fake event, with a triangular pulse at slice 65 sg = SignalGenerator() pulse_str = 'len 300 bsl -0.5 noise 0.5 triangle 65 10 8 100' pulse = sg(pulse_str) event = [] for i in range(1440): event.append(sg(pulse_str)) event = np.array(event) print 'test event with 1000 pixel generated, like this:' print pulse_str print print '_test_GlobalMaxFinder()' _test_GlobalMaxFinder() print print print '_test_FixedWindowIntegrator()' _test_FixedWindowIntegrator() print print print '_test_ZeroXing()' _test_ZeroXing() print