Completed
Pull Request — master (#384)
by
unknown
01:25
created

BasicPulseExtractor   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 309
Duplicated Lines 1.62 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
dl 5
loc 309
rs 9.68
c 2
b 0
f 0
wmc 34

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 2 1
C ungated_threshold() 0 81 9
F ungated_conv_deriv() 5 170 20
B gated_conv_deriv() 0 49 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
"""
3
This file contains basic pulse extraction methods for Qudi.
4
5
Qudi is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
Qudi is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with Qudi. If not, see <http://www.gnu.org/licenses/>.
17
18
Copyright (c) the Qudi Developers. See the COPYRIGHT.txt file at the
19
top-level directory of this distribution and at <https://github.com/Ulm-IQO/qudi/>
20
"""
21
22
import numpy as np
23
from scipy import ndimage
24
25
from logic.pulsed.pulse_extractor import PulseExtractorBase
26
27
28
class BasicPulseExtractor(PulseExtractorBase):
29
    """
30
31
    """
32
    def __init__(self, *args, **kwargs):
33
        super().__init__(*args, **kwargs)
34
35
    def gated_conv_deriv(self, count_data, conv_std_dev=20.0):
36
        """
37
        Detects the rising flank in the gated timetrace data and extracts just the laser pulses.
38
        The flank detection is based on an image edge detection technique performed in 1D.
39
        There is no information about the data needed.
40
        Only the gaussian filter width to reduce shot noise can be given as parameter.
41
42
        @param 2D numpy.ndarray count_data: the raw timetrace data from a gated fast counter
43
                                            dim 0: gate number; dim 1: time bin
44
        @param float conv_std_dev: The standard deviation of the gaussian filter used for smoothing
45
46
        @return dict: The extracted laser pulses of the timetrace as well as the indices for rising
47
                      and falling flanks.
48
        """
49
        # Create return dictionary
50
        return_dict = {'laser_counts_arr': np.zeros(0, dtype='int64'),
51
                       'laser_indices_rising': -1,
52
                       'laser_indices_falling': -1}
53
54
        # sum up all gated timetraces to ease flank detection
55
        timetrace_sum = np.sum(count_data, 0)
56
57
        # apply gaussian filter to remove noise and compute the gradient of the timetrace sum
58
        try:
59
            conv = ndimage.filters.gaussian_filter1d(timetrace_sum.astype(float), conv_std_dev)
60
        except:
61
            conv = np.zeros(timetrace_sum.size)
62
        try:
63
            conv_deriv = np.gradient(conv)
64
        except:
65
            conv_deriv = np.zeros(conv.size)
66
67
        # get indices of rising and falling flank
68
        rising_ind = conv_deriv.argmax()
69
        falling_ind = conv_deriv.argmin()
70
71
        # If gaussian smoothing or derivative failed, the returned array only contains zeros.
72
        # Check for that and return also only zeros to indicate a failed pulse extraction.
73
        if len(conv_deriv.nonzero()[0]) == 0:
74
            laser_arr = np.zeros(count_data.shape, dtype='int64')
75
        else:
76
            # slice the data array to cut off anything but laser pulses
77
            laser_arr = count_data[:, rising_ind:falling_ind]
78
79
        return_dict['laser_counts_arr'] = laser_arr.astype('int64')
80
        return_dict['laser_indices_rising'] = rising_ind
81
        return_dict['laser_indices_falling'] = falling_ind
82
83
        return return_dict
84
85
    def ungated_conv_deriv(self, count_data, conv_std_dev=20.0):
86
        """ Detects the laser pulses in the ungated timetrace data and extracts
87
            them.
88
89
        @param numpy.ndarray count_data: 1D array the raw timetrace data from an ungated fast counter
90
        @param dict measurement_settings: The measurement settings of the currently running measurement.
91
        @param float conv_std_dev: The standard deviation of the gaussian used for smoothing
92
93
        @return 2D numpy.ndarray:   2D array, the extracted laser pulses of the timetrace.
94
                                    dimensions: 0: laser number, 1: time bin
95
96
        Procedure:
97
            Edge Detection:
98
            ---------------
99
100
            The count_data array with the laser pulses is smoothed with a
101
            gaussian filter (convolution), which used a defined standard
102
            deviation of 10 entries (bins). Then the derivation of the convolved
103
            time trace is taken to obtain the maxima and minima, which
104
            corresponds to the rising and falling edge of the pulses.
105
106
            The convolution with a gaussian removes nasty peaks due to count
107
            fluctuation within a laser pulse and at the same time ensures a
108
            clear distinction of the maxima and minima in the derived convolved
109
            trace.
110
111
            The maxima and minima are not found sequentially, pulse by pulse,
112
            but are rather globally obtained. I.e. the convolved and derived
113
            array is searched iteratively for a maximum and a minimum, and after
114
            finding those the array entries within the 4 times
115
            self.conv_std_dev (2*self.conv_std_dev to the left and
116
            2*self.conv_std_dev) are set to zero.
117
118
            The crucial part is the knowledge of the number of laser pulses and
119
            the choice of the appropriate std_dev for the gauss filter.
120
121
            To ensure a good performance of the edge detection, you have to
122
            ensure a steep rising and falling edge of the laser pulse! Be also
123
            careful in choosing a large conv_std_dev value and using a small
124
            laser pulse (rule of thumb: conv_std_dev < laser_length/10).
125
        """
126
        # Create return dictionary
127
        return_dict = {'laser_counts_arr': np.empty(0, dtype='int64'),
128
                       'laser_indices_rising': np.empty(0, dtype='int64'),
129
                       'laser_indices_falling': np.empty(0, dtype='int64')}
130
131
        number_of_lasers = self.measurement_settings.get('number_of_lasers')
132
        if not isinstance(number_of_lasers, int):
133
            return return_dict
134
135
        # apply gaussian filter to remove noise and compute the gradient of the timetrace sum
136
        try:
137
            conv = ndimage.filters.gaussian_filter1d(count_data.astype(float), conv_std_dev)
138
        except:
139
            conv = np.zeros(count_data.size)
140
        try:
141
            conv_deriv = np.gradient(conv)
142
        except:
143
            conv_deriv = np.zeros(conv.size)
144
145
        # if gaussian smoothing or derivative failed, the returned array only contains zeros.
146
        # Check for that and return also only zeros to indicate a failed pulse extraction.
147
        if len(conv_deriv.nonzero()[0]) == 0:
148
            return_dict['laser_counts_arr'] = np.zeros((number_of_lasers, 10), dtype='int64')
149
            return return_dict
150
151
        # use a reference for array, because the exact position of the peaks or dips
152
        # (i.e. maxima or minima, which are the inflection points in the pulse) are distorted by
153
        # a large conv_std_dev value.
154
        try:
155
            conv = ndimage.filters.gaussian_filter1d(count_data.astype(float), 10)
156
        except:
157
            conv = np.zeros(count_data.size)
158
        try:
159
            conv_deriv_ref = np.gradient(conv)
160
        except:
161
            conv_deriv_ref = np.zeros(conv.size)
162
163
        # initialize arrays to contain indices for all rising and falling
164
        # flanks, respectively
165
        rising_ind = np.empty(number_of_lasers, dtype='int64')
166
        falling_ind = np.empty(number_of_lasers, dtype='int64')
167
168
        # Find as many rising and falling flanks as there are laser pulses in
169
        # the trace:
170
        for i in range(number_of_lasers):
171
            # save the index of the absolute maximum of the derived time trace
172
            # as rising edge position
173
            rising_ind[i] = np.argmax(conv_deriv)
174
175
            # refine the rising edge detection, by using a small and fixed
176
            # conv_std_dev parameter to find the inflection point more precise
177
            start_ind = int(rising_ind[i] - conv_std_dev)
178
            if start_ind < 0:
179
                start_ind = 0
180
181
            stop_ind = int(rising_ind[i] + conv_std_dev)
182
            if stop_ind > len(conv_deriv):
183
                stop_ind = len(conv_deriv)
184
185
            if start_ind == stop_ind:
186
                stop_ind = start_ind + 1
187
188
            rising_ind[i] = start_ind + np.argmax(conv_deriv_ref[start_ind:stop_ind])
189
190
            # set this position and the surrounding of the saved edge to 0 to
191
            # avoid a second detection
192
            if rising_ind[i] < 2 * conv_std_dev:
193
                del_ind_start = 0
194
            else:
195
                del_ind_start = rising_ind[i] - int(2 * conv_std_dev)
196
            if (conv_deriv.size - rising_ind[i]) < 2 * conv_std_dev:
197
                del_ind_stop = conv_deriv.size - 1
198
            else:
199
                del_ind_stop = rising_ind[i] + int(2 * conv_std_dev)
200
                conv_deriv[del_ind_start:del_ind_stop] = 0
201
202
            # save the index of the absolute minimum of the derived time trace
203
            # as falling edge position
204
            falling_ind[i] = np.argmin(conv_deriv)
205
206
            # refine the falling edge detection, by using a small and fixed
207
            # conv_std_dev parameter to find the inflection point more precise
208
            start_ind = int(falling_ind[i] - conv_std_dev)
209
            if start_ind < 0:
210
                start_ind = 0
211
212
            stop_ind = int(falling_ind[i] + conv_std_dev)
213
            if stop_ind > len(conv_deriv):
214
                stop_ind = len(conv_deriv)
215
216
            if start_ind == stop_ind:
217
                stop_ind = start_ind + 1
218
219
            falling_ind[i] = start_ind + np.argmin(conv_deriv_ref[start_ind:stop_ind])
220
221
            # set this position and the sourrounding of the saved flank to 0 to
222
            #  avoid a second detection
223
            if falling_ind[i] < 2 * conv_std_dev:
224
                del_ind_start = 0
225
            else:
226
                del_ind_start = falling_ind[i] - int(2 * conv_std_dev)
227
            if (conv_deriv.size - falling_ind[i]) < 2 * conv_std_dev:
228
                del_ind_stop = conv_deriv.size - 1
229
            else:
230
                del_ind_stop = falling_ind[i] + int(2 * conv_std_dev)
231
            conv_deriv[del_ind_start:del_ind_stop] = 0
232
233
        # sort all indices of rising and falling flanks
234
        rising_ind.sort()
235
        falling_ind.sort()
236
237
        # find the maximum laser length to use as size for the laser array
238
        laser_length = np.max(falling_ind - rising_ind)
239
240
        # initialize the empty output array
241
        laser_arr = np.zeros((number_of_lasers, laser_length), dtype='int64')
242
        # slice the detected laser pulses of the timetrace and save them in the
243
        # output array according to the found rising edge
244
        for i in range(number_of_lasers):
245 View Code Duplication
            if rising_ind[i] + laser_length > count_data.size:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
246
                lenarr = count_data[rising_ind[i]:].size
247
                laser_arr[i, 0:lenarr] = count_data[rising_ind[i]:]
248
            else:
249
                laser_arr[i] = count_data[rising_ind[i]:rising_ind[i] + laser_length]
250
251
        return_dict['laser_counts_arr'] = laser_arr.astype('int64')
252
        return_dict['laser_indices_rising'] = rising_ind
253
        return_dict['laser_indices_falling'] = falling_ind
254
        return return_dict
255
256
    def ungated_threshold(self, count_data, count_threshold=10, min_laser_length=200e-9,
257
                          threshold_tolerance=20e-9):
258
        """
259
260
        @param count_data:
261
262
        @return:
263
        """
264
        """
265
        Detects the laser pulses in the ungated timetrace data and extracts them.
266
    
267
        @param numpy.ndarray count_data: 1D array the raw timetrace data from an ungated fast counter
268
        @param measurement_settings: 
269
        @param fast_counter_settings: 
270
        @param count_threshold: 
271
        @param min_laser_length: 
272
        @param threshold_tolerance: 
273
        
274
        @return 2D numpy.ndarray:   2D array, the extracted laser pulses of the timetrace.
275
                                    dimensions: 0: laser number, 1: time bin
276
    
277
        Procedure:
278
            Threshold detection:
279
            ---------------
280
    
281
            All count data from the time trace is compared to a threshold value.
282
            Values above the threshold are considered to belong to a laser pulse.
283
            If the length of a pulse would be below the minimum length the pulse is discarded.
284
            If a number of bins which are below the threshold is smaller than the number of bins making the
285
            threshold_tolerance then they are still considered to belong to a laser pulse.
286
        """
287
        return_dict = dict()
288
289
        number_of_lasers = self.measurement_settings.get('number_of_lasers')
290
        counter_bin_width = self.fast_counter_settings.get('bin_width')
291
292
        if not isinstance(number_of_lasers, int):
293
            return_dict['laser_indices_rising'] = np.zeros(1, dtype='int64')
294
            return_dict['laser_indices_falling'] = np.zeros(1, dtype='int64')
295
            return_dict['laser_counts_arr'] = np.zeros((1, 3000), dtype='int64')
296
            return return_dict
297
        else:
298
            return_dict['laser_indices_rising'] = np.zeros(number_of_lasers, dtype='int64')
299
            return_dict['laser_indices_falling'] = np.zeros(number_of_lasers, dtype='int64')
300
            return_dict['laser_counts_arr'] = np.zeros((number_of_lasers, 3000), dtype='int64')
301
302
        # Convert length in seconds into length in time bins
303
        threshold_tolerance = round(threshold_tolerance / counter_bin_width)
304
        min_laser_length = round(min_laser_length / counter_bin_width)
305
306
        # get all bin indices with counts > threshold value
307
        bigger_indices = np.where(count_data >= count_threshold)[0]
308
309
        # get all indices with consecutive numbering (bin chains not interrupted by values < threshold
310
        index_list = np.split(bigger_indices,
311
                              np.where(np.diff(bigger_indices) >= threshold_tolerance)[0] + 1)
312
        for i, index_group in enumerate(index_list):
313
            if index_group.size > 0:
314
                start, end = index_list[i][0], index_list[i][-1]
315
                index_list[i] = np.arange(start, end + 1)
316
        consecutive_indices_unfiltered = index_list
317
318
        # sort out all groups shorter than minimum laser length
319
        consecutive_indices = [item for item in consecutive_indices_unfiltered if
320
                               len(item) > min_laser_length]
321
322
        # Check if the number of lasers matches the number of remaining index groups
323
        if number_of_lasers != len(consecutive_indices):
324
            return return_dict
325
326
        # determine max length of laser pulse and initialize laser array
327
        max_laser_length = max([index_array.size for index_array in consecutive_indices])
328
        return_dict['laser_counts_arr'] = np.zeros((number_of_lasers, max_laser_length), dtype='int64')
329
330
        # fill laser array with slices of raw data array. Also populate the rising/falling index arrays
331
        for i, index_group in enumerate(consecutive_indices):
332
            return_dict['laser_indices_rising'][i] = index_group[0]
333
            return_dict['laser_indices_falling'][i] = index_group[-1]
334
            return_dict['laser_counts_arr'][i, :index_group.size] = count_data[index_group]
335
336
        return return_dict
337