Conditions | 20 |
Total Lines | 170 |
Lines | 5 |
Ratio | 2.94 % |
Changes | 2 | ||
Bugs | 0 | Features | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like BasicPulseExtractor.ungated_conv_deriv() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
85 | def ungated_conv_deriv(self, count_data, conv_std_dev=20.0): |
||
86 | """ Detects the laser pulses in the ungated timetrace data and extracts |
||
87 | them. |
||
88 | |||
89 | @param numpy.ndarray count_data: 1D array the raw timetrace data from an ungated fast counter |
||
90 | @param dict measurement_settings: The measurement settings of the currently running measurement. |
||
91 | @param float conv_std_dev: The standard deviation of the gaussian used for smoothing |
||
92 | |||
93 | @return 2D numpy.ndarray: 2D array, the extracted laser pulses of the timetrace. |
||
94 | dimensions: 0: laser number, 1: time bin |
||
95 | |||
96 | Procedure: |
||
97 | Edge Detection: |
||
98 | --------------- |
||
99 | |||
100 | The count_data array with the laser pulses is smoothed with a |
||
101 | gaussian filter (convolution), which used a defined standard |
||
102 | deviation of 10 entries (bins). Then the derivation of the convolved |
||
103 | time trace is taken to obtain the maxima and minima, which |
||
104 | corresponds to the rising and falling edge of the pulses. |
||
105 | |||
106 | The convolution with a gaussian removes nasty peaks due to count |
||
107 | fluctuation within a laser pulse and at the same time ensures a |
||
108 | clear distinction of the maxima and minima in the derived convolved |
||
109 | trace. |
||
110 | |||
111 | The maxima and minima are not found sequentially, pulse by pulse, |
||
112 | but are rather globally obtained. I.e. the convolved and derived |
||
113 | array is searched iteratively for a maximum and a minimum, and after |
||
114 | finding those the array entries within the 4 times |
||
115 | self.conv_std_dev (2*self.conv_std_dev to the left and |
||
116 | 2*self.conv_std_dev) are set to zero. |
||
117 | |||
118 | The crucial part is the knowledge of the number of laser pulses and |
||
119 | the choice of the appropriate std_dev for the gauss filter. |
||
120 | |||
121 | To ensure a good performance of the edge detection, you have to |
||
122 | ensure a steep rising and falling edge of the laser pulse! Be also |
||
123 | careful in choosing a large conv_std_dev value and using a small |
||
124 | laser pulse (rule of thumb: conv_std_dev < laser_length/10). |
||
125 | """ |
||
126 | # Create return dictionary |
||
127 | return_dict = {'laser_counts_arr': np.empty(0, dtype='int64'), |
||
128 | 'laser_indices_rising': np.empty(0, dtype='int64'), |
||
129 | 'laser_indices_falling': np.empty(0, dtype='int64')} |
||
130 | |||
131 | number_of_lasers = self.measurement_settings.get('number_of_lasers') |
||
132 | if not isinstance(number_of_lasers, int): |
||
133 | return return_dict |
||
134 | |||
135 | # apply gaussian filter to remove noise and compute the gradient of the timetrace sum |
||
136 | try: |
||
137 | conv = ndimage.filters.gaussian_filter1d(count_data.astype(float), conv_std_dev) |
||
138 | except: |
||
139 | conv = np.zeros(count_data.size) |
||
140 | try: |
||
141 | conv_deriv = np.gradient(conv) |
||
142 | except: |
||
143 | conv_deriv = np.zeros(conv.size) |
||
144 | |||
145 | # if gaussian smoothing or derivative failed, the returned array only contains zeros. |
||
146 | # Check for that and return also only zeros to indicate a failed pulse extraction. |
||
147 | if len(conv_deriv.nonzero()[0]) == 0: |
||
148 | return_dict['laser_counts_arr'] = np.zeros((number_of_lasers, 10), dtype='int64') |
||
149 | return return_dict |
||
150 | |||
151 | # use a reference for array, because the exact position of the peaks or dips |
||
152 | # (i.e. maxima or minima, which are the inflection points in the pulse) are distorted by |
||
153 | # a large conv_std_dev value. |
||
154 | try: |
||
155 | conv = ndimage.filters.gaussian_filter1d(count_data.astype(float), 10) |
||
156 | except: |
||
157 | conv = np.zeros(count_data.size) |
||
158 | try: |
||
159 | conv_deriv_ref = np.gradient(conv) |
||
160 | except: |
||
161 | conv_deriv_ref = np.zeros(conv.size) |
||
162 | |||
163 | # initialize arrays to contain indices for all rising and falling |
||
164 | # flanks, respectively |
||
165 | rising_ind = np.empty(number_of_lasers, dtype='int64') |
||
166 | falling_ind = np.empty(number_of_lasers, dtype='int64') |
||
167 | |||
168 | # Find as many rising and falling flanks as there are laser pulses in |
||
169 | # the trace: |
||
170 | for i in range(number_of_lasers): |
||
171 | # save the index of the absolute maximum of the derived time trace |
||
172 | # as rising edge position |
||
173 | rising_ind[i] = np.argmax(conv_deriv) |
||
174 | |||
175 | # refine the rising edge detection, by using a small and fixed |
||
176 | # conv_std_dev parameter to find the inflection point more precise |
||
177 | start_ind = int(rising_ind[i] - conv_std_dev) |
||
178 | if start_ind < 0: |
||
179 | start_ind = 0 |
||
180 | |||
181 | stop_ind = int(rising_ind[i] + conv_std_dev) |
||
182 | if stop_ind > len(conv_deriv): |
||
183 | stop_ind = len(conv_deriv) |
||
184 | |||
185 | if start_ind == stop_ind: |
||
186 | stop_ind = start_ind + 1 |
||
187 | |||
188 | rising_ind[i] = start_ind + np.argmax(conv_deriv_ref[start_ind:stop_ind]) |
||
189 | |||
190 | # set this position and the surrounding of the saved edge to 0 to |
||
191 | # avoid a second detection |
||
192 | if rising_ind[i] < 2 * conv_std_dev: |
||
193 | del_ind_start = 0 |
||
194 | else: |
||
195 | del_ind_start = rising_ind[i] - int(2 * conv_std_dev) |
||
196 | if (conv_deriv.size - rising_ind[i]) < 2 * conv_std_dev: |
||
197 | del_ind_stop = conv_deriv.size - 1 |
||
198 | else: |
||
199 | del_ind_stop = rising_ind[i] + int(2 * conv_std_dev) |
||
200 | conv_deriv[del_ind_start:del_ind_stop] = 0 |
||
201 | |||
202 | # save the index of the absolute minimum of the derived time trace |
||
203 | # as falling edge position |
||
204 | falling_ind[i] = np.argmin(conv_deriv) |
||
205 | |||
206 | # refine the falling edge detection, by using a small and fixed |
||
207 | # conv_std_dev parameter to find the inflection point more precise |
||
208 | start_ind = int(falling_ind[i] - conv_std_dev) |
||
209 | if start_ind < 0: |
||
210 | start_ind = 0 |
||
211 | |||
212 | stop_ind = int(falling_ind[i] + conv_std_dev) |
||
213 | if stop_ind > len(conv_deriv): |
||
214 | stop_ind = len(conv_deriv) |
||
215 | |||
216 | if start_ind == stop_ind: |
||
217 | stop_ind = start_ind + 1 |
||
218 | |||
219 | falling_ind[i] = start_ind + np.argmin(conv_deriv_ref[start_ind:stop_ind]) |
||
220 | |||
221 | # set this position and the sourrounding of the saved flank to 0 to |
||
222 | # avoid a second detection |
||
223 | if falling_ind[i] < 2 * conv_std_dev: |
||
224 | del_ind_start = 0 |
||
225 | else: |
||
226 | del_ind_start = falling_ind[i] - int(2 * conv_std_dev) |
||
227 | if (conv_deriv.size - falling_ind[i]) < 2 * conv_std_dev: |
||
228 | del_ind_stop = conv_deriv.size - 1 |
||
229 | else: |
||
230 | del_ind_stop = falling_ind[i] + int(2 * conv_std_dev) |
||
231 | conv_deriv[del_ind_start:del_ind_stop] = 0 |
||
232 | |||
233 | # sort all indices of rising and falling flanks |
||
234 | rising_ind.sort() |
||
235 | falling_ind.sort() |
||
236 | |||
237 | # find the maximum laser length to use as size for the laser array |
||
238 | laser_length = np.max(falling_ind - rising_ind) |
||
239 | |||
240 | # initialize the empty output array |
||
241 | laser_arr = np.zeros((number_of_lasers, laser_length), dtype='int64') |
||
242 | # slice the detected laser pulses of the timetrace and save them in the |
||
243 | # output array according to the found rising edge |
||
244 | for i in range(number_of_lasers): |
||
245 | View Code Duplication | if rising_ind[i] + laser_length > count_data.size: |
|
|
|||
246 | lenarr = count_data[rising_ind[i]:].size |
||
247 | laser_arr[i, 0:lenarr] = count_data[rising_ind[i]:] |
||
248 | else: |
||
249 | laser_arr[i] = count_data[rising_ind[i]:rising_ind[i] + laser_length] |
||
250 | |||
251 | return_dict['laser_counts_arr'] = laser_arr.astype('int64') |
||
252 | return_dict['laser_indices_rising'] = rising_ind |
||
253 | return_dict['laser_indices_falling'] = falling_ind |
||
254 | return return_dict |
||
255 | |||
337 |