1 | #!/usr/bin/python -i
2 | #
3 | # Dominik Neise
4 | # TU Dortmund
5 | # March 2012
6 | import numpy as np
7 | import random
8 | from coor import Coordinator # class which prepares next neighbor dictionary
9 |
10 | # just a dummy callback function
11 | def _dummy( data, core_c, core, surv ):
12 | pass
13 |
14 | class AmplitudeCleaner( object ):
15 | """ Image Cleaning based on signal strength
16 |
17 | signal strength is a very general term here
18 | it could be:
19 | * max amplitude
20 | * integral
21 | * max of sliding sum
22 | * ...
23 | The Cleaning procedure or algorith is based on the
24 | 3 step precedute on the diss of M.Gauk called 'absolute cleaning'
25 | """
26 |
27 | def __init__(self, coreTHR, edgeTHR=None):
28 | """ initialize object
29 |
30 | set the two needed thresholds
31 | in case only one is given:
32 | edgeTHR is assumen to be coreTHR/2.
33 | """
34 | self.coreTHR = coreTHR
35 | if edgeTHR==None:
36 | self.edgeTHR = coreTHR/2.
37 | else:
38 | self.edgeTHR = edgeTHR
39 |
40 | self.return_bool_mask = True # default value!
41 |
42 | # init coordinator
43 | self.coordinator = Coordinator()
44 | # retrieve next neighbor dict
45 | self.nn = self.coordinator.nn
46 |
47 | def __call__( self, data, return_bool_mask=None , callback=_dummy ):
48 | """ compute cleaned image
49 |
50 | the return value might be:
51 | np.array of same shape as data (dtype=bool)
52 | or
53 | an np.array (dtype=int), which lengths is the number of
54 | pixel which survived the cleaning, and which contains the CHIDs
55 | of these survivors
56 |
57 | the default is to return the bool array
58 | but if you set it once, differently, eg like this:
59 | myAmplitudeCleaner.return_bool_mask = False
60 | or like
61 | myAmplitudeCleaner( mydata, False)
62 |
63 | it will be stored, until you change it again...
64 | """
65 | #shortcuts
66 | coreTHR = self.coreTHR
67 | edgeTHR = self.edgeTHR
68 | nn = self.nn
69 |
70 | # once set, never forget :-)
71 | if return_bool_mask != None:
72 | self.return_bool_mask = return_bool_mask
73 | return_bool_mask = self.return_bool_mask
74 |
75 | # these will hold the outcome of..
76 | core_c = np.zeros( len(data), dtype=bool ) # ... step 1
77 | core = np.zeros( len(data), dtype=bool ) # ... step 2
78 | surv = np.zeros( len(data), dtype=bool ) # ... step 3
79 | # It could be done in one variable, but for debugging and simplicity,
80 | # I use more ...
81 |
82 | # this is Gauks step 1
83 | core_c = data > coreTHR
84 | # loop over all candidates and check if it has a next neighbor core pixel
85 |
86 | for c in np.where(core_c)[0]:
87 | # loop over all n'eighbors of c'andidate
88 | for n in nn[c]:
89 | # if c has a neighbor, beeing also a candidate
90 | # then c is definitely a core.
91 | # Note: DN 13.03.12
92 | # actually the neighbor is also now found to be core pixel,
93 | # and still this knowledge is thrown away and later this
94 | # neighbor itself is treated again as a c'andidate.
95 | # this should be improved.
96 | if core_c[n]:
97 | core[c]=True
98 | break
99 | # at this moment step 2 is done
100 |
101 | # start of step 3.
102 | # every core pixel is automaticaly a survivor, --> copy it
103 | surv = core.copy()
104 | for c in np.where(core)[0]:
105 | for n in nn[c]:
106 | # if neighbor is a core pixel, then do nothing
107 | if core[n]:
108 | pass
109 | # if neighbor is over edgeTHR, it is lucky and survived.
110 | elif data[n] > edgeTHR:
111 | surv[n] = True
112 |
113 |
114 | callback( data, core_c, core, surv)
115 |
116 | if return_bool_mask:
117 | return surv
118 | else:
119 | return np.where(surv)
120 |
121 |
122 | def info(self):
123 | """ print Cleaner Informatio
124 |
125 | """
126 | print 'coreTHR: ', self.coreTHR
127 | print 'edgeTHR: ', self.edgeTHR
128 | print 'return_bool_mask:', self.return_bool_mask
129 |
130 | def _test_callback( data, core_c, core, surv ):
131 | """ test callback functionality of AmplitudeCleaner"""
132 | print 'core_c', np.where(core_c)[0], '<--', core_c.sum()
133 | print 'core', np.where(core)[0], '<--', core.sum()
134 | print 'surv', np.where(surv)[0], '<--', surv.sum()
135 | print 'data', '*'*60
136 | print data
137 |
138 |
139 | def _test_cleaner():
140 | """ test for class AmplitudeCleaner"""
141 | from plotters import CamPlotter
142 | NPIX = 1440
143 | SIGMA = 1
144 |
145 | CORE_THR = 45
146 | EDGE_THR = 18
147 |
148 | harvey_keitel = AmplitudeCleaner( CORE_THR, EDGE_THR)
149 | harvey_keitel.info()
150 | # if you wonder why the cleaner object is called is it is:
151 | # http://www.youtube.com/watch?v=pf-Amvro2SY
152 |
153 | nn = Coordinator().nn
154 |
155 | testdata = np.zeros( NPIX )
156 | #add some noise
157 | testdata += 3
158 |
159 | # 'make' 3 doubleflowers
160 | cores = []
161 | for i in range(3):
162 | cores.append( random.randint(0, NPIX-1) )
163 | nene = nn[ cores[-1] ] # shortcut
164 | luckynn = random.sample( nene, 1)[0] # shortcut
165 | #print nene
166 | #print luckynn
167 | cores.append( luckynn )
168 | edges = []
169 | for c in cores:
170 | for n in nn[c]:
171 | if n not in cores:
172 | edges.append(n)
173 |
174 | # add those doubleflowers to the testdata
175 | for c in cores:
176 | testdata[c] += 1.2*CORE_THR
177 | for e in edges:
178 | testdata[e] += 1.2*EDGE_THR
179 |
180 |
181 | #cleaning_mask = harvey_keitel(testdata, callback=_test_callback)
182 | cleaning_mask = harvey_keitel(testdata)
183 |
184 | plotall = CamPlotter('all')
185 | plotclean = CamPlotter('cleaned')
186 |
187 | plotall(testdata)
188 | plotclean(testdata, cleaning_mask)
189 |
190 |
191 |
192 | if __name__ == '__main__':
193 | """ tests """
194 |
195 | _test_cleaner() |