Passed
Push — master ( e96c3d...44474a )
by Ian
06:20
created

build.rsudp.c_alert   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 341
Duplicated Lines 17.89 %

Importance

Changes 0
Metric Value
wmc 52
eloc 184
dl 61
loc 341
rs 7.44
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
B Alert._is_trigger() 0 36 6
A Alert._filter() 0 18 3
A Alert._subloop() 0 10 4
B Alert._set_filt() 0 25 8
A Alert._find_chn() 0 7 3
B Alert.run() 0 53 6
A Alert._print_stalta() 0 13 2
A Alert.__init__() 0 39 1
A Alert._deconvolve() 0 6 2
A Alert._print_filt() 0 11 4
B Alert._set_deconv() 23 23 6
A Alert._getq() 18 18 3
A Alert._set_channel() 20 20 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.rsudp.c_alert often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR, helpers
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47
		self.filt = False
48
		if bp:
49
			self.freqmin = bp[0]
50
			self.freqmax = bp[1]
51
			self.freq = 0
52
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
53
				self.filt = False
54
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
55
				self.filt = 'highpass'
56
				self.freq = bp[0]
57
				desc = 'low corner %s' % (bp[0])
58
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
59
				self.filt = 'lowpass'
60
				self.freq = bp[1]
61
			else:
62
				self.filt = 'bandpass'
63
64
65 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
		'''
67
		This function sets the deconvolution units. Allowed values are as follows:
68
69
		.. |ms2| replace:: m/s\ :sup:`2`\
70
71
		- ``'VEL'`` - velocity (m/s)
72
		- ``'ACC'`` - acceleration (|ms2|)
73
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
74
		- ``'DISP'`` - displacement (m)
75
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
76
77
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
78
		'''
79
		deconv = deconv.upper() if deconv else False
80
		self.deconv = deconv if (deconv in rs.UNITS) else False
81
		if self.deconv and rs.inv:
82
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
83
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
84
		else:
85
			self.units = rs.UNITS['CHAN'][1]
86
			self.deconv = False
87
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
88
89
90
	def _find_chn(self):
91
		'''
92
		Finds channel match in list of channels.
93
		'''
94
		for chn in rs.chns:
95
			if self.cha in chn:
96
				self.cha = chn
97
98
99 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
100
		'''
101
		This function sets the channel to listen to. Allowed values are as follows:
102
103
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
104
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
105
		- ``"HDF"`` - pressure transducer channel
106
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
107
108
		:param cha: the channel to listen to
109
		:type cha: str
110
		'''
111
		cha = self.default_ch if (cha == 'all') else cha
112
		self.cha = cha if isinstance(cha, str) else cha[0]
113
114
		if self.cha in str(rs.chns):
115
			self._find_chn()
116
		else:
117
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
118
			sys.exit(2)
119
120
121
	def _print_filt(self):
122
		'''
123
		Prints stream filtering information.
124
		'''
125
		if self.filt == 'bandpass':
126
			printM('Alert stream will be %s filtered from %s to %s Hz'
127
					% (self.filt, self.freqmin, self.freqmax), self.sender)
128
		elif self.filt in ('lowpass', 'highpass'):
129
			modifier = 'below' if self.filt in 'lowpass' else 'above'
130
			printM('Alert stream will be %s filtered %s %s Hz'
131
					% (self.filt, modifier, self.freq), self.sender)
132
133
134
	def __init__(self, q, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
135
				 debug=True, cha='HZ', sound=False, deconv=False,
136
				 *args, **kwargs):
137
		"""
138
		Initializing the alert thread with parameters to set up the recursive
139
		STA-LTA trigger, filtering, and the channel used for listening.
140
		"""
141
		super().__init__()
142
		self.sender = 'Alert'
143
		self.alive = True
144
145
		self.queue = q
146
147
		self.default_ch = 'HZ'
148
		self.sta = sta
149
		self.lta = lta
150
		self.thresh = thresh
151
		self.reset = reset
152
		self.debug = debug
153
		self.args = args
154
		self.kwargs = kwargs
155
		self.raw = rs.Stream()
156
		self.stream = rs.Stream()
157
158
		self._set_channel(cha)
159
160
		self.sps = rs.sps
161
		self.inv = rs.inv
162
		self.stalta = np.ndarray(1)
163
		self.maxstalta = 0
164
		self.units = 'counts'
165
		
166
		self._set_deconv(deconv)
167
168
		self.exceed = False
169
		self.sound = sound
170
		
171
		self._set_filt(bp)
172
		self._print_filt()
173
174
175 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
176
		'''
177
		Reads data from the queue and updates the stream.
178
179
		:rtype: bool
180
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
181
		'''
182
		d = self.queue.get(True, timeout=None)
183
		self.queue.task_done()
184
		if self.cha in str(d):
185
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
186
			return True
187
		elif 'TERM' in str(d):
188
			self.alive = False
189
			printM('Exiting.', self.sender)
190
			sys.exit()
191
		else:
192
			return False
193
194
195
	def _deconvolve(self):
196
		'''
197
		Deconvolves the stream associated with this class.
198
		'''
199
		if self.deconv:
200
			helpers.deconvolve(self)
201
202
203
	def _subloop(self):
204
		'''
205
		Gets the queue and figures out whether or not the specified channel is in the packet.
206
		'''
207
		while True:
208
			if self.queue.qsize() > 0:
209
				self._getq()			# get recent packets
210
			else:
211
				if self._getq():		# is this the specified channel? if so break
212
					break
213
214
215
	def _filter(self):
216
		'''
217
		Filters the stream associated with this class.
218
		'''
219
		if self.filt:
220
			if self.filt in 'bandpass':
221
				self.stalta = recursive_sta_lta(
222
							self.stream[0].copy().filter(type=self.filt,
223
							freqmin=self.freqmin, freqmax=self.freqmax),
224
							int(self.sta * self.sps), int(self.lta * self.sps))
225
			else:
226
				self.stalta = recursive_sta_lta(
227
							self.stream[0].copy().filter(type=self.filt,
228
							freq=self.freq),
229
							int(self.sta * self.sps), int(self.lta * self.sps))
230
		else:
231
			self.stalta = recursive_sta_lta(self.stream[0],
232
					int(self.sta * self.sps), int(self.lta * self.sps))
233
234
235
	def _is_trigger(self):
236
		'''
237
		Figures out it there's a trigger active.
238
		'''
239
		if self.stalta.max() > self.thresh:
240
			if not self.exceed:
241
				# raise a flag that the Producer can read and modify 
242
				self.alarm = helpers.fsec(self.stream[0].stats.starttime + timedelta(seconds=
243
										trigger_onset(self.stalta, self.thresh,
244
										self.reset)[-1][0] * self.stream[0].stats.delta))
245
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
246
				print()
247
				printM('Trigger threshold of %s exceeded at %s'
248
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
249
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
250
				COLOR['current'] = COLOR['purple']
251
			else:
252
				pass
253
254
			if self.stalta.max() > self.maxstalta:
255
				self.maxstalta = self.stalta.max()
256
		else:
257
			if self.exceed:
258
				if self.stalta[-1] < self.reset:
259
					self.alarm_reset = helpers.fsec(self.stream[0].stats.endtime)	# lazy; effective
260
					self.exceed = False
261
					print()
262
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
263
							self.sender)
264
					printM('Earthquake trigger reset and active again at %s' % (
265
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
266
							self.sender)
267
					self.maxstalta = 0
268
					COLOR['current'] = COLOR['green']
269
			else:
270
				pass
271
272
273
	def _print_stalta(self):
274
		'''
275
		Print the current max STA/LTA of the stream.
276
		'''
277
		if self.debug:
278
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
279
					(self.stream[0].stats.starttime + timedelta(seconds=
280
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
281
					self.sender,
282
					self.thresh,
283
					round(np.max(self.stalta[-50:]), 4)
284
					)
285
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
286
287
288
	def run(self):
289
		"""
290
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
291
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
292
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
293
		The producer reads this flag and uses it to notify other consumers.
294
		"""
295
		n = 0
296
297
		wait_pkts = (self.lta) / (rs.tf / 1000)
298
299
		while n > 3:
300
			self.getq()
301
			n += 1
302
303
		n = 0
304
		while True:
305
			self._subloop()
306
307
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
308
			self.stream = self.raw.copy()
309
			self._deconvolve()
310
311
			if n > wait_pkts:
312
				# if the trigger is activated
313
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
314
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
315
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
316
317
				# filter
318
				self._filter()
319
				# figure out if the trigger has gone off
320
				self._is_trigger()
321
322
				# copy the stream (necessary to avoid memory leak)
323
				self.stream = rs.copy(self.stream)
324
325
				# print the current STA/LTA calculation
326
				self._print_stalta()
327
328
			elif n == 0:
329
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
330
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
331
				printM('Earthquake trigger warmup time of %s seconds...'
332
					   % (self.lta), self.sender)
333
			elif n == wait_pkts:
334
				printM('Earthquake trigger up and running normally.',
335
					   self.sender)
336
			else:
337
				pass
338
339
			n += 1
340
			sys.stdout.flush()
341