| Total Complexity | 52 | 
| Total Lines | 341 | 
| Duplicated Lines | 17.89 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.rsudp.c_alert often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import sys, os  | 
            ||
| 2 | from datetime import datetime, timedelta  | 
            ||
| 3 | import rsudp.raspberryshake as rs  | 
            ||
| 4 | from obspy.signal.trigger import recursive_sta_lta, trigger_onset  | 
            ||
| 5 | from rsudp import printM, printW, printE  | 
            ||
| 6 | COLOR = {} | 
            ||
| 7 | from rsudp import COLOR, helpers  | 
            ||
| 8 | import numpy as np  | 
            ||
| 9 | |||
| 10 | # set the terminal text color to green  | 
            ||
| 11 | COLOR['current'] = COLOR['green']  | 
            ||
| 12 | |||
| 13 | |||
| 14 | class Alert(rs.ConsumerThread):  | 
            ||
| 15 | """  | 
            ||
| 16 | A data consumer class that listens to a specific incoming data channel  | 
            ||
| 17 | and calculates a recursive STA/LTA (short term average over long term  | 
            ||
| 18 | average). If a threshold of STA/LTA ratio is exceeded, the class  | 
            ||
| 19 | sets the :py:data:`alarm` flag to the alarm time as a  | 
            ||
| 20 | :py:class:`obspy.core.utcdatetime.UTCDateTime` object.  | 
            ||
| 21 | The :py:class:`rsudp.p_producer.Producer` will see this flag  | 
            ||
| 22 | and send an :code:`ALARM` message to the queues with the time set here.  | 
            ||
| 23 | Likewise, when the :py:data:`alarm_reset` flag is set with a  | 
            ||
| 24 | :py:class:`obspy.core.utcdatetime.UTCDateTime`,  | 
            ||
| 25 | the Producer will send a :code:`RESET` message to the queues.  | 
            ||
| 26 | |||
| 27 | :param float sta: short term average (STA) duration in seconds.  | 
            ||
| 28 | :param float lta: long term average (LTA) duration in seconds.  | 
            ||
| 29 | :param float thresh: threshold for STA/LTA trigger.  | 
            ||
| 30 | :type bp: :py:class:`bool` or :py:class:`list`  | 
            ||
| 31 | :param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``  | 
            ||
| 32 | :param bool debug: whether or not to display max STA/LTA calculation live to the console.  | 
            ||
| 33 | :param str cha: listening channel (defaults to [S,E]HZ)  | 
            ||
| 34 | :param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`  | 
            ||
| 35 | |||
| 36 | """  | 
            ||
| 37 | |||
| 38 | def _set_filt(self, bp):  | 
            ||
| 39 | '''  | 
            ||
| 40 | This function sets the filter parameters (if specified).  | 
            ||
| 41 | Set to a boolean if not filtering, or ``[highpass, lowpass]``  | 
            ||
| 42 | if filtering.  | 
            ||
| 43 | |||
| 44 | :param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``  | 
            ||
| 45 | :type bp: :py:class:`bool` or :py:class:`list`  | 
            ||
| 46 | '''  | 
            ||
| 47 | self.filt = False  | 
            ||
| 48 | if bp:  | 
            ||
| 49 | self.freqmin = bp[0]  | 
            ||
| 50 | self.freqmax = bp[1]  | 
            ||
| 51 | self.freq = 0  | 
            ||
| 52 | if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):  | 
            ||
| 53 | self.filt = False  | 
            ||
| 54 | elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):  | 
            ||
| 55 | self.filt = 'highpass'  | 
            ||
| 56 | self.freq = bp[0]  | 
            ||
| 57 | desc = 'low corner %s' % (bp[0])  | 
            ||
| 58 | elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):  | 
            ||
| 59 | self.filt = 'lowpass'  | 
            ||
| 60 | self.freq = bp[1]  | 
            ||
| 61 | else:  | 
            ||
| 62 | self.filt = 'bandpass'  | 
            ||
| 63 | |||
| 64 | |||
| 65 | View Code Duplication | def _set_deconv(self, deconv):  | 
            |
| 
                                                                                                    
                        
                         | 
                |||
| 66 | '''  | 
            ||
| 67 | This function sets the deconvolution units. Allowed values are as follows:  | 
            ||
| 68 | |||
| 69 | .. |ms2| replace:: m/s\ :sup:`2`\  | 
            ||
| 70 | |||
| 71 | - ``'VEL'`` - velocity (m/s)  | 
            ||
| 72 | - ``'ACC'`` - acceleration (|ms2|)  | 
            ||
| 73 | - ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)  | 
            ||
| 74 | - ``'DISP'`` - displacement (m)  | 
            ||
| 75 | - ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels  | 
            ||
| 76 | |||
| 77 | :param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``  | 
            ||
| 78 | '''  | 
            ||
| 79 | deconv = deconv.upper() if deconv else False  | 
            ||
| 80 | self.deconv = deconv if (deconv in rs.UNITS) else False  | 
            ||
| 81 | if self.deconv and rs.inv:  | 
            ||
| 82 | self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units  | 
            ||
| 83 | 			printM('Signal deconvolution set to %s' % (self.deconv), self.sender) | 
            ||
| 84 | else:  | 
            ||
| 85 | self.units = rs.UNITS['CHAN'][1]  | 
            ||
| 86 | self.deconv = False  | 
            ||
| 87 | 		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender) | 
            ||
| 88 | |||
| 89 | |||
| 90 | def _find_chn(self):  | 
            ||
| 91 | '''  | 
            ||
| 92 | Finds channel match in list of channels.  | 
            ||
| 93 | '''  | 
            ||
| 94 | for chn in rs.chns:  | 
            ||
| 95 | if self.cha in chn:  | 
            ||
| 96 | self.cha = chn  | 
            ||
| 97 | |||
| 98 | |||
| 99 | View Code Duplication | def _set_channel(self, cha):  | 
            |
| 100 | '''  | 
            ||
| 101 | This function sets the channel to listen to. Allowed values are as follows:  | 
            ||
| 102 | |||
| 103 | - "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels  | 
            ||
| 104 | - ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels  | 
            ||
| 105 | - ``"HDF"`` - pressure transducer channel  | 
            ||
| 106 | - ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available  | 
            ||
| 107 | |||
| 108 | :param cha: the channel to listen to  | 
            ||
| 109 | :type cha: str  | 
            ||
| 110 | '''  | 
            ||
| 111 | cha = self.default_ch if (cha == 'all') else cha  | 
            ||
| 112 | self.cha = cha if isinstance(cha, str) else cha[0]  | 
            ||
| 113 | |||
| 114 | if self.cha in str(rs.chns):  | 
            ||
| 115 | self._find_chn()  | 
            ||
| 116 | else:  | 
            ||
| 117 | 			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender) | 
            ||
| 118 | sys.exit(2)  | 
            ||
| 119 | |||
| 120 | |||
| 121 | def _print_filt(self):  | 
            ||
| 122 | '''  | 
            ||
| 123 | Prints stream filtering information.  | 
            ||
| 124 | '''  | 
            ||
| 125 | if self.filt == 'bandpass':  | 
            ||
| 126 | 			printM('Alert stream will be %s filtered from %s to %s Hz' | 
            ||
| 127 | % (self.filt, self.freqmin, self.freqmax), self.sender)  | 
            ||
| 128 | 		elif self.filt in ('lowpass', 'highpass'): | 
            ||
| 129 | modifier = 'below' if self.filt in 'lowpass' else 'above'  | 
            ||
| 130 | 			printM('Alert stream will be %s filtered %s %s Hz' | 
            ||
| 131 | % (self.filt, modifier, self.freq), self.sender)  | 
            ||
| 132 | |||
| 133 | |||
| 134 | def __init__(self, q, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,  | 
            ||
| 135 | debug=True, cha='HZ', sound=False, deconv=False,  | 
            ||
| 136 | *args, **kwargs):  | 
            ||
| 137 | """  | 
            ||
| 138 | Initializing the alert thread with parameters to set up the recursive  | 
            ||
| 139 | STA-LTA trigger, filtering, and the channel used for listening.  | 
            ||
| 140 | """  | 
            ||
| 141 | super().__init__()  | 
            ||
| 142 | self.sender = 'Alert'  | 
            ||
| 143 | self.alive = True  | 
            ||
| 144 | |||
| 145 | self.queue = q  | 
            ||
| 146 | |||
| 147 | self.default_ch = 'HZ'  | 
            ||
| 148 | self.sta = sta  | 
            ||
| 149 | self.lta = lta  | 
            ||
| 150 | self.thresh = thresh  | 
            ||
| 151 | self.reset = reset  | 
            ||
| 152 | self.debug = debug  | 
            ||
| 153 | self.args = args  | 
            ||
| 154 | self.kwargs = kwargs  | 
            ||
| 155 | self.raw = rs.Stream()  | 
            ||
| 156 | self.stream = rs.Stream()  | 
            ||
| 157 | |||
| 158 | self._set_channel(cha)  | 
            ||
| 159 | |||
| 160 | self.sps = rs.sps  | 
            ||
| 161 | self.inv = rs.inv  | 
            ||
| 162 | self.stalta = np.ndarray(1)  | 
            ||
| 163 | self.maxstalta = 0  | 
            ||
| 164 | self.units = 'counts'  | 
            ||
| 165 | |||
| 166 | self._set_deconv(deconv)  | 
            ||
| 167 | |||
| 168 | self.exceed = False  | 
            ||
| 169 | self.sound = sound  | 
            ||
| 170 | |||
| 171 | self._set_filt(bp)  | 
            ||
| 172 | self._print_filt()  | 
            ||
| 173 | |||
| 174 | |||
| 175 | View Code Duplication | def _getq(self):  | 
            |
| 176 | '''  | 
            ||
| 177 | Reads data from the queue and updates the stream.  | 
            ||
| 178 | |||
| 179 | :rtype: bool  | 
            ||
| 180 | :return: Returns ``True`` if stream is updated, otherwise ``False``.  | 
            ||
| 181 | '''  | 
            ||
| 182 | d = self.queue.get(True, timeout=None)  | 
            ||
| 183 | self.queue.task_done()  | 
            ||
| 184 | if self.cha in str(d):  | 
            ||
| 185 | self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')  | 
            ||
| 186 | return True  | 
            ||
| 187 | elif 'TERM' in str(d):  | 
            ||
| 188 | self.alive = False  | 
            ||
| 189 | 			printM('Exiting.', self.sender) | 
            ||
| 190 | sys.exit()  | 
            ||
| 191 | else:  | 
            ||
| 192 | return False  | 
            ||
| 193 | |||
| 194 | |||
| 195 | def _deconvolve(self):  | 
            ||
| 196 | '''  | 
            ||
| 197 | Deconvolves the stream associated with this class.  | 
            ||
| 198 | '''  | 
            ||
| 199 | if self.deconv:  | 
            ||
| 200 | helpers.deconvolve(self)  | 
            ||
| 201 | |||
| 202 | |||
| 203 | def _subloop(self):  | 
            ||
| 204 | '''  | 
            ||
| 205 | Gets the queue and figures out whether or not the specified channel is in the packet.  | 
            ||
| 206 | '''  | 
            ||
| 207 | while True:  | 
            ||
| 208 | if self.queue.qsize() > 0:  | 
            ||
| 209 | self._getq() # get recent packets  | 
            ||
| 210 | else:  | 
            ||
| 211 | if self._getq(): # is this the specified channel? if so break  | 
            ||
| 212 | break  | 
            ||
| 213 | |||
| 214 | |||
| 215 | def _filter(self):  | 
            ||
| 216 | '''  | 
            ||
| 217 | Filters the stream associated with this class.  | 
            ||
| 218 | '''  | 
            ||
| 219 | if self.filt:  | 
            ||
| 220 | if self.filt in 'bandpass':  | 
            ||
| 221 | self.stalta = recursive_sta_lta(  | 
            ||
| 222 | self.stream[0].copy().filter(type=self.filt,  | 
            ||
| 223 | freqmin=self.freqmin, freqmax=self.freqmax),  | 
            ||
| 224 | int(self.sta * self.sps), int(self.lta * self.sps))  | 
            ||
| 225 | else:  | 
            ||
| 226 | self.stalta = recursive_sta_lta(  | 
            ||
| 227 | self.stream[0].copy().filter(type=self.filt,  | 
            ||
| 228 | freq=self.freq),  | 
            ||
| 229 | int(self.sta * self.sps), int(self.lta * self.sps))  | 
            ||
| 230 | else:  | 
            ||
| 231 | self.stalta = recursive_sta_lta(self.stream[0],  | 
            ||
| 232 | int(self.sta * self.sps), int(self.lta * self.sps))  | 
            ||
| 233 | |||
| 234 | |||
| 235 | def _is_trigger(self):  | 
            ||
| 236 | '''  | 
            ||
| 237 | Figures out it there's a trigger active.  | 
            ||
| 238 | '''  | 
            ||
| 239 | if self.stalta.max() > self.thresh:  | 
            ||
| 240 | if not self.exceed:  | 
            ||
| 241 | # raise a flag that the Producer can read and modify  | 
            ||
| 242 | self.alarm = helpers.fsec(self.stream[0].stats.starttime + timedelta(seconds=  | 
            ||
| 243 | trigger_onset(self.stalta, self.thresh,  | 
            ||
| 244 | self.reset)[-1][0] * self.stream[0].stats.delta))  | 
            ||
| 245 | self.exceed = True # the state machine; this one should not be touched from the outside, otherwise bad things will happen  | 
            ||
| 246 | print()  | 
            ||
| 247 | 				printM('Trigger threshold of %s exceeded at %s' | 
            ||
| 248 | 						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender) | 
            ||
| 249 | 				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender) | 
            ||
| 250 | COLOR['current'] = COLOR['purple']  | 
            ||
| 251 | else:  | 
            ||
| 252 | pass  | 
            ||
| 253 | |||
| 254 | if self.stalta.max() > self.maxstalta:  | 
            ||
| 255 | self.maxstalta = self.stalta.max()  | 
            ||
| 256 | else:  | 
            ||
| 257 | if self.exceed:  | 
            ||
| 258 | if self.stalta[-1] < self.reset:  | 
            ||
| 259 | self.alarm_reset = helpers.fsec(self.stream[0].stats.endtime) # lazy; effective  | 
            ||
| 260 | self.exceed = False  | 
            ||
| 261 | print()  | 
            ||
| 262 | 					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)), | 
            ||
| 263 | self.sender)  | 
            ||
| 264 | 					printM('Earthquake trigger reset and active again at %s' % ( | 
            ||
| 265 | 							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), | 
            ||
| 266 | self.sender)  | 
            ||
| 267 | self.maxstalta = 0  | 
            ||
| 268 | COLOR['current'] = COLOR['green']  | 
            ||
| 269 | else:  | 
            ||
| 270 | pass  | 
            ||
| 271 | |||
| 272 | |||
| 273 | def _print_stalta(self):  | 
            ||
| 274 | '''  | 
            ||
| 275 | Print the current max STA/LTA of the stream.  | 
            ||
| 276 | '''  | 
            ||
| 277 | if self.debug:  | 
            ||
| 278 | msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (  | 
            ||
| 279 | (self.stream[0].stats.starttime + timedelta(seconds=  | 
            ||
| 280 | 					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'), | 
            ||
| 281 | self.sender,  | 
            ||
| 282 | self.thresh,  | 
            ||
| 283 | round(np.max(self.stalta[-50:]), 4)  | 
            ||
| 284 | )  | 
            ||
| 285 | print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)  | 
            ||
| 286 | |||
| 287 | |||
| 288 | def run(self):  | 
            ||
| 289 | """  | 
            ||
| 290 | Reads data from the queue into a :class:`obspy.core.stream.Stream` object,  | 
            ||
| 291 | then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to  | 
            ||
| 292 | determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).  | 
            ||
| 293 | The producer reads this flag and uses it to notify other consumers.  | 
            ||
| 294 | """  | 
            ||
| 295 | n = 0  | 
            ||
| 296 | |||
| 297 | wait_pkts = (self.lta) / (rs.tf / 1000)  | 
            ||
| 298 | |||
| 299 | while n > 3:  | 
            ||
| 300 | self.getq()  | 
            ||
| 301 | n += 1  | 
            ||
| 302 | |||
| 303 | n = 0  | 
            ||
| 304 | while True:  | 
            ||
| 305 | self._subloop()  | 
            ||
| 306 | |||
| 307 | self.raw = rs.copy(self.raw) # necessary to avoid memory leak  | 
            ||
| 308 | self.stream = self.raw.copy()  | 
            ||
| 309 | self._deconvolve()  | 
            ||
| 310 | |||
| 311 | if n > wait_pkts:  | 
            ||
| 312 | # if the trigger is activated  | 
            ||
| 313 | obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta) # obspy time  | 
            ||
| 314 | self.raw = self.raw.slice(starttime=obstart) # slice the stream to the specified length (seconds variable)  | 
            ||
| 315 | self.stream = self.stream.slice(starttime=obstart) # slice the stream to the specified length (seconds variable)  | 
            ||
| 316 | |||
| 317 | # filter  | 
            ||
| 318 | self._filter()  | 
            ||
| 319 | # figure out if the trigger has gone off  | 
            ||
| 320 | self._is_trigger()  | 
            ||
| 321 | |||
| 322 | # copy the stream (necessary to avoid memory leak)  | 
            ||
| 323 | self.stream = rs.copy(self.stream)  | 
            ||
| 324 | |||
| 325 | # print the current STA/LTA calculation  | 
            ||
| 326 | self._print_stalta()  | 
            ||
| 327 | |||
| 328 | elif n == 0:  | 
            ||
| 329 | 				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s' | 
            ||
| 330 | % (self.sta, self.lta, self.thresh, self.cha), self.sender)  | 
            ||
| 331 | 				printM('Earthquake trigger warmup time of %s seconds...' | 
            ||
| 332 | % (self.lta), self.sender)  | 
            ||
| 333 | elif n == wait_pkts:  | 
            ||
| 334 | 				printM('Earthquake trigger up and running normally.', | 
            ||
| 335 | self.sender)  | 
            ||
| 336 | else:  | 
            ||
| 337 | pass  | 
            ||
| 338 | |||
| 339 | n += 1  | 
            ||
| 340 | sys.stdout.flush()  | 
            ||
| 341 |