Passed
Pull Request — master (#55)
by
unknown
06:09
created

build.rsudp.c_alert   C

Complexity

Total Complexity 54

Size/Duplication

Total Lines 348
Duplicated Lines 17.53 %

Test Coverage

Coverage 88.68%

Importance

Changes 0
Metric Value
wmc 54
eloc 190
dl 61
loc 348
ccs 141
cts 159
cp 0.8868
rs 6.4799
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
B Alert._set_deconv() 23 23 6
B Alert._set_filt() 0 25 8
A Alert._find_chn() 0 7 3
A Alert._set_channel() 20 20 4
A Alert._print_filt() 0 11 4
B Alert._is_trigger() 0 41 8
A Alert._filter() 0 18 3
A Alert._subloop() 0 10 4
B Alert.run() 0 53 6
A Alert._print_stalta() 0 13 2
A Alert.__init__() 0 41 1
A Alert._getq() 18 18 3
A Alert._deconvolve() 0 6 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.rsudp.c_alert often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import sys
2 1
from datetime import timedelta
3 1
import rsudp.raspberryshake as rs
4 1
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5 1
from rsudp import printM, printW, printE
6 1
from rsudp import COLOR, helpers
7 1
from rsudp.test import TEST
8 1
import numpy as np
9
10
# set the terminal text color to green
11 1
COLOR['current'] = COLOR['green']
12
13
14 1
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38 1
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47 1
		self.filt = False
48 1
		if bp:
49 1
			self.freqmin = bp[0]
50 1
			self.freqmax = bp[1]
51 1
			self.freq = 0
52 1
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
53
				self.filt = False
54 1
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
55
				self.filt = 'highpass'
56
				self.freq = bp[0]
57
				desc = 'low corner %s' % (bp[0])
58 1
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
59
				self.filt = 'lowpass'
60
				self.freq = bp[1]
61
			else:
62 1
				self.filt = 'bandpass'
63
64
65 1 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
		'''
67
		This function sets the deconvolution units. Allowed values are as follows:
68
69
		.. |ms2| replace:: m/s\ :sup:`2`\
70
71
		- ``'VEL'`` - velocity (m/s)
72
		- ``'ACC'`` - acceleration (|ms2|)
73
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
74
		- ``'DISP'`` - displacement (m)
75
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
76
77
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
78
		'''
79 1
		deconv = deconv.upper() if deconv else False
80 1
		self.deconv = deconv if (deconv in rs.UNITS) else False
81 1
		if self.deconv and rs.inv:
82
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
83
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
84
		else:
85 1
			self.units = rs.UNITS['CHAN'][1]
86 1
			self.deconv = False
87 1
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
88
89
90 1
	def _find_chn(self):
91
		'''
92
		Finds channel match in list of channels.
93
		'''
94 1
		for chn in rs.chns:
95 1
			if self.cha in chn:
96 1
				self.cha = chn
97
98
99 1 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
100
		'''
101
		This function sets the channel to listen to. Allowed values are as follows:
102
103
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
104
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
105
		- ``"HDF"`` - pressure transducer channel
106
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
107
108
		:param cha: the channel to listen to
109
		:type cha: str
110
		'''
111 1
		cha = self.default_ch if (cha == 'all') else cha
112 1
		self.cha = cha if isinstance(cha, str) else cha[0]
113
114 1
		if self.cha in str(rs.chns):
115 1
			self._find_chn()
116
		else:
117
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
118
			sys.exit(2)
119
120
121 1
	def _print_filt(self):
122
		'''
123
		Prints stream filtering information.
124
		'''
125 1
		if self.filt == 'bandpass':
126 1
			printM('Alert stream will be %s filtered from %s to %s Hz'
127
					% (self.filt, self.freqmin, self.freqmax), self.sender)
128
		elif self.filt in ('lowpass', 'highpass'):
129
			modifier = 'below' if self.filt in 'lowpass' else 'above'
130
			printM('Alert stream will be %s filtered %s %s Hz'
131
					% (self.filt, modifier, self.freq), self.sender)
132
133
134 1
	def __init__(self, q, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
135
				 debug=True, cha='HZ', sound=False, deconv=False, testing=False,
136
				 *args, **kwargs):
137
		"""
138
		Initializing the alert thread with parameters to set up the recursive
139
		STA-LTA trigger, filtering, and the channel used for listening.
140
		"""
141 1
		super().__init__()
142 1
		self.sender = 'Alert'
143 1
		self.alive = True
144 1
		self.testing = testing
145
146 1
		self.queue = q
147
148 1
		self.default_ch = 'HZ'
149 1
		self.sta = sta
150 1
		self.lta = lta
151 1
		self.thresh = thresh
152 1
		self.reset = reset
153 1
		self.debug = debug
154 1
		self.args = args
155 1
		self.kwargs = kwargs
156 1
		self.raw = rs.Stream()
157 1
		self.stream = rs.Stream()
158
159 1
		self._set_channel(cha)
160
161 1
		self.sps = rs.sps
162 1
		self.inv = rs.inv
163 1
		self.stalta = np.ndarray(1)
164 1
		self.maxstalta = 0
165 1
		self.units = 'counts'
166
167 1
		print('deconv>>>>', deconv)
168 1
		self._set_deconv(deconv)
169
170 1
		self.exceed = False
171 1
		self.sound = sound
172
173 1
		self._set_filt(bp)
174 1
		self._print_filt()
175
176
177 1 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
178
		'''
179
		Reads data from the queue and updates the stream.
180
181
		:rtype: bool
182
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
183
		'''
184 1
		d = self.queue.get(True, timeout=None)
185 1
		self.queue.task_done()
186 1
		if self.cha in str(d):
187 1
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
188 1
			return True
189 1
		elif 'TERM' in str(d):
190 1
			self.alive = False
191 1
			printM('Exiting.', self.sender)
192 1
			sys.exit()
193
		else:
194 1
			return False
195
196
197 1
	def _deconvolve(self):
198
		'''
199
		Deconvolves the stream associated with this class.
200
		'''
201 1
		if self.deconv:
202
			helpers.deconvolve(self)
203
204
205 1
	def _subloop(self):
206
		'''
207
		Gets the queue and figures out whether or not the specified channel is in the packet.
208
		'''
209 1
		while True:
210 1
			if self.queue.qsize() > 0:
211 1
				self._getq()			# get recent packets
212
			else:
213 1
				if self._getq():		# is this the specified channel? if so break
214 1
					break
215
216
217 1
	def _filter(self):
218
		'''
219
		Filters the stream associated with this class.
220
		'''
221 1
		if self.filt:
222 1
			if self.filt in 'bandpass':
223 1
				self.stalta = recursive_sta_lta(
224
							self.stream[0].copy().filter(type=self.filt,
225
							freqmin=self.freqmin, freqmax=self.freqmax),
226
							int(self.sta * self.sps), int(self.lta * self.sps))
227
			else:
228
				self.stalta = recursive_sta_lta(
229
							self.stream[0].copy().filter(type=self.filt,
230
							freq=self.freq),
231
							int(self.sta * self.sps), int(self.lta * self.sps))
232
		else:
233
			self.stalta = recursive_sta_lta(self.stream[0],
234
					int(self.sta * self.sps), int(self.lta * self.sps))
235
236
237 1
	def _is_trigger(self):
238
		'''
239
		Figures out it there's a trigger active.
240
		'''
241 1
		if self.stalta.max() > self.thresh:
242 1
			if not self.exceed:
243
				# raise a flag that the Producer can read and modify
244 1
				self.alarm = helpers.fsec(self.stream[0].stats.starttime + timedelta(seconds=
245
										trigger_onset(self.stalta, self.thresh,
246
										self.reset)[-1][0] * self.stream[0].stats.delta))
247 1
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
248 1
				print()
249 1
				printM('Trigger threshold of %s exceeded at %s'
250
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
251 1
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
252 1
				COLOR['current'] = COLOR['purple']
253 1
				if self.testing:
254 1
					TEST['c_alerton'][1] = True
255
			else:
256
				pass
257
258 1
			if self.stalta.max() > self.maxstalta:
259 1
				self.maxstalta = self.stalta.max()
260
		else:
261 1
			if self.exceed:
262 1
				if self.stalta[-1] < self.reset:
263 1
					self.alarm_reset = helpers.fsec(self.stream[0].stats.endtime)	# lazy; effective
264 1
					self.exceed = False
265 1
					print()
266 1
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
267
							self.sender)
268 1
					printM('Earthquake trigger reset and active again at %s' % (
269
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
270
							self.sender)
271 1
					self.maxstalta = 0
272 1
					COLOR['current'] = COLOR['green']
273 1
				if self.testing:
274 1
					TEST['c_alertoff'][1] = True
275
276
			else:
277 1
				pass
278
279
280 1
	def _print_stalta(self):
281
		'''
282
		Print the current max STA/LTA of the stream.
283
		'''
284
		if self.debug:
285
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
286
					(self.stream[0].stats.starttime + timedelta(seconds=
287
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
288
					self.sender,
289
					self.thresh,
290
					round(np.max(self.stalta[-50:]), 4)
291
					)
292
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
293
294
295 1
	def run(self):
296
		"""
297
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
298
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
299
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
300
		The producer reads this flag and uses it to notify other consumers.
301
		"""
302 1
		n = 0
303
304 1
		wait_pkts = (self.lta) / (rs.tf / 1000)
305
306 1
		while n > 3:
307
			self.getq()
308
			n += 1
309
310 1
		n = 0
311 1
		while True:
312 1
			self._subloop()
313
314 1
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
315 1
			self.stream = self.raw.copy()
316 1
			self._deconvolve()
317
318 1
			if n > wait_pkts:
319
				# if the trigger is activated
320 1
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
321 1
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
322 1
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
323
324
				# filter
325 1
				self._filter()
326
				# figure out if the trigger has gone off
327 1
				self._is_trigger()
328
329
				# copy the stream (necessary to avoid memory leak)
330 1
				self.stream = rs.copy(self.stream)
331
332
				# print the current STA/LTA calculation
333 1
				self._print_stalta()
334
335 1
			elif n == 0:
336 1
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
337
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
338 1
				printM('Earthquake trigger warmup time of %s seconds...'
339
					   % (self.lta), self.sender)
340 1
			elif n == wait_pkts:
341 1
				printM('Earthquake trigger up and running normally.',
342
					   self.sender)
343
			else:
344
				pass
345
346 1
			n += 1
347
			sys.stdout.flush()
348