1 | #!/usr/bin/python -tti
|
---|
2 | #
|
---|
3 | # Dominik Neise
|
---|
4 | # TU Dortmund
|
---|
5 | # March 2012
|
---|
6 | import numpy as np
|
---|
7 | import random
|
---|
8 | from coor import Coordinator # class which prepares next neighbor dictionary
|
---|
9 |
|
---|
10 | # just a dummy callback function
|
---|
11 | def _dummy( data, core_c, core, surv ):
|
---|
12 | pass
|
---|
13 |
|
---|
14 | class AmplitudeCleaner( object ):
|
---|
15 | """ Image Cleaning based on signal strength
|
---|
16 |
|
---|
17 | signal strength is a very general term here
|
---|
18 | it could be:
|
---|
19 | * max amplitude
|
---|
20 | * integral
|
---|
21 | * max of sliding sum
|
---|
22 | * ...
|
---|
23 | The Cleaning procedure or algorith is based on the
|
---|
24 | 3 step precedute on the diss of M.Gauk called 'absolute cleaning'
|
---|
25 | """
|
---|
26 |
|
---|
27 | def __init__(self, coreTHR, edgeTHR=None):
|
---|
28 | """ initialize object
|
---|
29 |
|
---|
30 | set the two needed thresholds
|
---|
31 | in case only one is given:
|
---|
32 | edgeTHR is assumen to be coreTHR/2.
|
---|
33 | """
|
---|
34 | self.coreTHR = coreTHR
|
---|
35 | if edgeTHR==None:
|
---|
36 | self.edgeTHR = coreTHR/2.
|
---|
37 | else:
|
---|
38 | self.edgeTHR = edgeTHR
|
---|
39 |
|
---|
40 | self.return_bool_mask = True # default value!
|
---|
41 |
|
---|
42 | # init coordinator
|
---|
43 | self.coordinator = Coordinator()
|
---|
44 | # retrieve next neighbor dict
|
---|
45 | self.nn = self.coordinator.nn
|
---|
46 |
|
---|
47 | def __call__( self, data, return_bool_mask=None , callback=_dummy ):
|
---|
48 | """ compute cleaned image
|
---|
49 |
|
---|
50 | the return value might be:
|
---|
51 | np.array of same shape as data (dtype=bool)
|
---|
52 | or
|
---|
53 | an np.array (dtype=int), which lengths is the number of
|
---|
54 | pixel which survived the cleaning, and which contains the CHIDs
|
---|
55 | of these survivors
|
---|
56 |
|
---|
57 | the default is to return the bool array
|
---|
58 | but if you set it once, differently, eg like this:
|
---|
59 | myAmplitudeCleaner.return_bool_mask = False
|
---|
60 | or like
|
---|
61 | myAmplitudeCleaner( mydata, False)
|
---|
62 |
|
---|
63 | it will be stored, until you change it again...
|
---|
64 | """
|
---|
65 | #shortcuts
|
---|
66 | coreTHR = self.coreTHR
|
---|
67 | edgeTHR = self.edgeTHR
|
---|
68 | nn = self.nn
|
---|
69 |
|
---|
70 | # once set, never forget :-)
|
---|
71 | if return_bool_mask != None:
|
---|
72 | self.return_bool_mask = return_bool_mask
|
---|
73 | return_bool_mask = self.return_bool_mask
|
---|
74 |
|
---|
75 | # these will hold the outcome of..
|
---|
76 | core_c = np.zeros( len(data), dtype=bool ) # ... step 1
|
---|
77 | core = np.zeros( len(data), dtype=bool ) # ... step 2
|
---|
78 | surv = np.zeros( len(data), dtype=bool ) # ... step 3
|
---|
79 | # It could be done in one variable, but for debugging and simplicity,
|
---|
80 | # I use more ...
|
---|
81 |
|
---|
82 | # this is Gauks step 1
|
---|
83 | core_c = data > coreTHR
|
---|
84 | # loop over all candidates and check if it has a next neighbor core pixel
|
---|
85 |
|
---|
86 | for c in np.where(core_c)[0]:
|
---|
87 | # loop over all n'eighbors of c'andidate
|
---|
88 | for n in nn[c]:
|
---|
89 | # if c has a neighbor, beeing also a candidate
|
---|
90 | # then c is definitely a core.
|
---|
91 | # Note: DN 13.03.12
|
---|
92 | # actually the neighbor is also now found to be core pixel,
|
---|
93 | # and still this knowledge is thrown away and later this
|
---|
94 | # neighbor itself is treated again as a c'andidate.
|
---|
95 | # this should be improved.
|
---|
96 | if core_c[n]:
|
---|
97 | core[c]=True
|
---|
98 | break
|
---|
99 | # at this moment step 2 is done
|
---|
100 |
|
---|
101 | # start of step 3.
|
---|
102 | # every core pixel is automaticaly a survivor, --> copy it
|
---|
103 | surv = core.copy()
|
---|
104 | for c in np.where(core)[0]:
|
---|
105 | for n in nn[c]:
|
---|
106 | # if neighbor is a core pixel, then do nothing
|
---|
107 | if core[n]:
|
---|
108 | pass
|
---|
109 | # if neighbor is over edgeTHR, it is lucky and survived.
|
---|
110 | elif data[n] > edgeTHR:
|
---|
111 | surv[n] = True
|
---|
112 |
|
---|
113 |
|
---|
114 | callback( data, core_c, core, surv)
|
---|
115 |
|
---|
116 | if return_bool_mask:
|
---|
117 | return surv
|
---|
118 | else:
|
---|
119 | return np.where(surv)[0]
|
---|
120 |
|
---|
121 |
|
---|
122 | def info(self):
|
---|
123 | """ print Cleaner Informatio
|
---|
124 |
|
---|
125 | """
|
---|
126 | print 'coreTHR: ', self.coreTHR
|
---|
127 | print 'edgeTHR: ', self.edgeTHR
|
---|
128 | print 'return_bool_mask:', self.return_bool_mask
|
---|
129 |
|
---|
130 | def _test_callback( data, core_c, core, surv ):
|
---|
131 | """ test callback functionality of AmplitudeCleaner"""
|
---|
132 | print 'core_c', np.where(core_c)[0], '<--', core_c.sum()
|
---|
133 | print 'core', np.where(core)[0], '<--', core.sum()
|
---|
134 | print 'surv', np.where(surv)[0], '<--', surv.sum()
|
---|
135 | print 'data', '*'*60
|
---|
136 | print data
|
---|
137 |
|
---|
138 |
|
---|
139 | def _test_cleaner():
|
---|
140 | """ test for class AmplitudeCleaner"""
|
---|
141 | from plotters import CamPlotter
|
---|
142 | NPIX = 1440
|
---|
143 | SIGMA = 1
|
---|
144 |
|
---|
145 | CORE_THR = 45
|
---|
146 | EDGE_THR = 18
|
---|
147 |
|
---|
148 | harvey_keitel = AmplitudeCleaner( CORE_THR, EDGE_THR)
|
---|
149 | harvey_keitel.info()
|
---|
150 | # if you wonder why the cleaner object is called is it is:
|
---|
151 | # http://www.youtube.com/watch?v=pf-Amvro2SY
|
---|
152 |
|
---|
153 | nn = Coordinator().nn
|
---|
154 |
|
---|
155 | testdata = np.zeros( NPIX )
|
---|
156 | #add some noise
|
---|
157 | testdata += 3
|
---|
158 |
|
---|
159 | # 'make' 3 doubleflowers
|
---|
160 | cores = []
|
---|
161 | for i in range(3):
|
---|
162 | cores.append( random.randint(0, NPIX-1) )
|
---|
163 | nene = nn[ cores[-1] ] # shortcut
|
---|
164 | luckynn = random.sample( nene, 1)[0] # shortcut
|
---|
165 | #print nene
|
---|
166 | #print luckynn
|
---|
167 | cores.append( luckynn )
|
---|
168 | edges = []
|
---|
169 | for c in cores:
|
---|
170 | for n in nn[c]:
|
---|
171 | if n not in cores:
|
---|
172 | edges.append(n)
|
---|
173 |
|
---|
174 | # add those doubleflowers to the testdata
|
---|
175 | for c in cores:
|
---|
176 | testdata[c] += 1.2*CORE_THR
|
---|
177 | for e in edges:
|
---|
178 | testdata[e] += 1.2*EDGE_THR
|
---|
179 |
|
---|
180 |
|
---|
181 | #cleaning_mask = harvey_keitel(testdata, callback=_test_callback)
|
---|
182 | cleaning_mask = harvey_keitel(testdata)
|
---|
183 |
|
---|
184 | plotall = CamPlotter('all')
|
---|
185 | plotclean = CamPlotter('cleaned')
|
---|
186 |
|
---|
187 | plotall(testdata)
|
---|
188 | plotclean(testdata, cleaning_mask)
|
---|
189 |
|
---|
190 |
|
---|
191 |
|
---|
192 | if __name__ == '__main__':
|
---|
193 | """ tests """
|
---|
194 |
|
---|
195 | _test_cleaner() |
---|