#!/usr/bin/python -tti # # Dominik Neise # TU Dortmund # March 2012 import numpy as np import random from coor import Coordinator # class which prepares next neighbor dictionary # just a dummy callback function def _dummy( data, core_c, core, surv ): pass class AmplitudeCleaner( object ): """ Image Cleaning based on signal strength signal strength is a very general term here it could be: * max amplitude * integral * max of sliding sum * ... The Cleaning procedure or algorith is based on the 3 step precedute on the diss of M.Gauk called 'absolute cleaning' """ def __init__(self, coreTHR, edgeTHR=None): """ initialize object set the two needed thresholds in case only one is given: edgeTHR is assumen to be coreTHR/2. """ self.coreTHR = coreTHR if edgeTHR==None: self.edgeTHR = coreTHR/2. else: self.edgeTHR = edgeTHR self.return_bool_mask = True # default value! # init coordinator self.coordinator = Coordinator() # retrieve next neighbor dict self.nn = self.coordinator.nn def __call__( self, data, return_bool_mask=None , callback=_dummy ): """ compute cleaned image the return value might be: np.array of same shape as data (dtype=bool) or an np.array (dtype=int), which lengths is the number of pixel which survived the cleaning, and which contains the CHIDs of these survivors the default is to return the bool array but if you set it once, differently, eg like this: myAmplitudeCleaner.return_bool_mask = False or like myAmplitudeCleaner( mydata, False) it will be stored, until you change it again... """ #shortcuts coreTHR = self.coreTHR edgeTHR = self.edgeTHR nn = self.nn # once set, never forget :-) if return_bool_mask != None: self.return_bool_mask = return_bool_mask return_bool_mask = self.return_bool_mask # these will hold the outcome of.. core_c = np.zeros( len(data), dtype=bool ) # ... step 1 core = np.zeros( len(data), dtype=bool ) # ... step 2 surv = np.zeros( len(data), dtype=bool ) # ... step 3 # It could be done in one variable, but for debugging and simplicity, # I use more ... # this is Gauks step 1 core_c = data > coreTHR # loop over all candidates and check if it has a next neighbor core pixel for c in np.where(core_c)[0]: # loop over all n'eighbors of c'andidate for n in nn[c]: # if c has a neighbor, beeing also a candidate # then c is definitely a core. # Note: DN 13.03.12 # actually the neighbor is also now found to be core pixel, # and still this knowledge is thrown away and later this # neighbor itself is treated again as a c'andidate. # this should be improved. if core_c[n]: core[c]=True break # at this moment step 2 is done # start of step 3. # every core pixel is automaticaly a survivor, --> copy it surv = core.copy() for c in np.where(core)[0]: for n in nn[c]: # if neighbor is a core pixel, then do nothing if core[n]: pass # if neighbor is over edgeTHR, it is lucky and survived. elif data[n] > edgeTHR: surv[n] = True # calculate number of islands surv_id = np.where(surv)[0] # make a copy of survivors: self.surv_copy = list(surv_id.copy()) surv_copy = self.surv_copy # the first survivor belongs to the first island, by definition self.islands = [] islands = self.islands if len(surv_copy) > 0: islands.append([surv_copy[0]]) del surv_copy[0] nn_found = True while len(surv_copy) > 0: if nn_found: nn_found = False for i in islands[-1]: #print 'i:',i #print 'nn[i]:', nn[i] #print 'type(nn[i]):', type(nn[i]) for n in nn[i]: if n in surv_copy: del surv_copy[surv_copy.index(n)] islands[-1].append(n) nn_found = True #print 'islands' #print islands, len(islands) #print 'surv_copy' #print surv_copy, len(surv_copy) #print '*'*80 #print 'END of for i in islands[-1]:' #print '*'*80 else: islands.append( [ surv_copy[0] ]) del surv_copy[0] nn_found = True #print 'cleaner found' ,len(islands), 'islands' callback( data, core_c, core, surv) if return_bool_mask: return surv else: return np.where(surv)[0] def info(self): """ print Cleaner Informatio """ print 'coreTHR: ', self.coreTHR print 'edgeTHR: ', self.edgeTHR print 'return_bool_mask:', self.return_bool_mask def _test_callback( data, core_c, core, surv ): """ test callback functionality of AmplitudeCleaner""" print 'core_c', np.where(core_c)[0], '<--', core_c.sum() print 'core', np.where(core)[0], '<--', core.sum() print 'surv', np.where(surv)[0], '<--', surv.sum() print 'data', '*'*60 print data def _test_cleaner(): """ test for class AmplitudeCleaner""" from plotters import CamPlotter NPIX = 1440 SIGMA = 1 CORE_THR = 45 EDGE_THR = 18 harvey_keitel = AmplitudeCleaner( CORE_THR, EDGE_THR) harvey_keitel.info() # if you wonder why the cleaner object is called is it is: # http://www.youtube.com/watch?v=pf-Amvro2SY nn = Coordinator().nn testdata = np.zeros( NPIX ) #add some noise testdata += 3 # 'make' 3 doubleflowers cores = [] for i in range(3): cores.append( random.randint(0, NPIX-1) ) nene = nn[ cores[-1] ] # shortcut luckynn = random.sample( nene, 1)[0] # shortcut #print nene #print luckynn cores.append( luckynn ) edges = [] for c in cores: for n in nn[c]: if n not in cores: edges.append(n) # add those doubleflowers to the testdata for c in cores: testdata[c] += 1.2*CORE_THR for e in edges: testdata[e] += 1.2*EDGE_THR #cleaning_mask = harvey_keitel(testdata, callback=_test_callback) cleaning_mask = harvey_keitel(testdata) plotall = CamPlotter('all') plotclean = CamPlotter('cleaned') plotall(testdata) plotclean(testdata, cleaning_mask) if __name__ == '__main__': """ tests """ _test_cleaner()