#!/usr/bin/python -tt # # Dominik Neise, Werner Lustermann # TU Dortmund, ETH Zurich # import numpy as np from scipy import signal # For FastSlidingAverage from scipy import weave from scipy.weave import converters class FirFilter(object): """ finite impulse response filter """ def __init__(self, b, a, name = 'general FIR filter'): """ See `scipy.signal.lfilter() `_ *b* The numerator coefficient vector (1D) *a* The denominator coefficient vector (1D). If a[0] is not 1, then both a and b are normalized by a[0]. """ self.a = a self.b = b self.name = name def __call__(self, data): """ Apply generic FIR filter to *data* using scipy.signal.lfilter() *data* 1D or 2D numpy array remark: I did not understand how to use the initial filter conditions of lfilter() to produce the output, I expected. So I apply the filters as follows. the filter *delay* is equal to its length-1 Then I extend the input data by this delay-length, adding copies of the first value. Then the filter runs ovter this extended data. The output will have a filter artifact in the first samples, which will be cut off anyway, because they were artificially added before. """ delay = max(len(self.a),len(self.b))-1 if ( data.ndim == 1): initial = np.ones(delay) initial *= data[0] elif ( data.ndim == 2): initial = np.ones( (data.shape[0], delay) ) for i in range(data.shape[0]): initial[i,:] *= data[i,0] else: print 'HELP.' pass data = np.hstack( (initial,data) ) filtered= signal.lfilter(self.b, self.a, data) if ( data.ndim == 1): filtered = filtered[delay:] elif ( data.ndim == 2): filtered = filtered[:,delay:] return filtered def __str__(self): s = self.name + '\n' s += 'initial condition for filter: signal@rest = 1st sample\n' s += 'filter, coefficients:\n' s += 'nominator ' + str(self.b) + '\n' s += 'denominator ' + str(self.a) return s class SlidingAverage(FirFilter): """ data smoothing in the time domain with a sliding average """ def __init__(self, length=8): """ initialize the object length: lenght of the averaging window """ b = np.ones(length) a = np.zeros(length) if length > 0: a[0] = len(b) FirFilter.__init__(self, b, a, 'sliding average') class FastSlidingAverage( object ): def __init__(self, shape, length=8): """ initialize the object length: lenght of the averaging window """ self.length = length # allocate memory for the filtered data once self.filtered = np.zeros( shape, np.float64 ) self.shape = shape def __call__(self, data): if self.shape != data.shape: raise TypeException('data has wrong shape') length = self.length numpix = data.shape[0] numslices = data.shape[1] filtered = self.filtered cppcode = """ double sum = 0; for (int pix=0; pix 0: b[0] = -1. * ratio b[length-1] = 1. a[0] = 1. FirFilter.__init__(self, b, a, 'constant fraction discriminator') class RemoveSignal(FirFilter): """ estimator to identify DRS4 spikes """ def __init__(self): """ initialize the object """ b = np.array((-0.5, 1., -0.5)) a = np.zeros(len(b)) a[0] = 1.0 FirFilter.__init__(self, b, a, 'remove signal') def _test_SlidingAverage(): """ test the sliding average function use a step function as input """ from plotters import Plotter from generator import SignalGenerator generate = SignalGenerator() plot = Plotter('_test_SlidingAverage') safilter = SlidingAverage(8) print safilter signal = generate('len 100 noise 1.5 step 20 10 50') filtered = safilter(signal) plot( [signal, filtered] , ['original', 'filtered'] ) raw_input('press any key to go on') plt.close(plot.figure) def _test_SlidingAverage2(): """ test the sliding average function use a step function as input """ from plotters import Plotter from generator import SignalGenerator generate = SignalGenerator() plot = Plotter('_test_SlidingAverage') safilter = SlidingAverage(8) print safilter signal = np.ones( (6,20) ) * 3.0 filtered = safilter(signal) #plot( [signal[0], filtered[0]] , ['original', 'filtered'] ) raw_input('press any key to go on') plt.close(plot.figure) def _test_CFD(): """ test the remove signal function """ from plotters import Plotter from generator import SignalGenerator generate = SignalGenerator() plot = Plotter('_test_CFD') sa = SlidingAverage(3) print 'I apply a weak smooting with a filter length of 3' cfd = CFD(8, 0.6) print cfd signal = generate('len 100 noise 1.5 bsl -20 triangle 30 30 8 50') filtered = cfd(sa(signal)) plot( [signal, filtered] , ['original', 'filtered'] ) raw_input('press any key to go on') plt.close(plot.figure) def _test_RemoveSignal(): """ test the remove signal function """ from plotters import Plotter from generator import SignalGenerator generate = SignalGenerator() plot = Plotter('_test_RemoveSignal') remove_signal = RemoveSignal() print remove_signal signal = generate('len 100 noise 2 bsl -20 triangle 20 30 8 40 50 30 spike 50 50 15 50 80 50') filtered = remove_signal(signal) plot( [signal, filtered] , ['original', 'filtered'] ) raw_input('press any key to go on') plt.close(plot.figure) def _test(filter_type, sig, noise_sigma = 1.): from plotters import Plotter filt = filter_type samples = len(sig) # add noise to the signal sig += np.random.randn(samples) * noise_sigma plot = Plotter('_test with ' + str(filt.name)) plot( [sig, filt(sig)], ['original', 'filtered'] ) raw_input('press any key to go on') plt.close(plot.figure) if __name__ == '__main__': import matplotlib.pyplot as plt """ test the class """ _test_SlidingAverage() _test_CFD() _test_RemoveSignal() tsig = np.ones(100) _test(filter_type=SlidingAverage(8), sig=tsig, noise_sigma=3.)