Passed
Push — master ( 0acd6f...3b78b5 )
by Ian
04:08
created

build.rsudp.c_alert.Alert._set_filt()   C

Complexity

Conditions 11

Size

Total Lines 34
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 23
nop 2
dl 0
loc 34
rs 5.4
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.rsudp.c_alert.Alert._set_filt() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47
		if bp:
48
			self.freqmin = bp[0]
49
			self.freqmax = bp[1]
50
			self.freq = 0
51
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
52
				self.filt = False
53
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
54
				self.filt = 'highpass'
55
				self.freq = bp[0]
56
				desc = 'low corner %s' % (bp[0])
57
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
58
				self.filt = 'lowpass'
59
				self.freq = bp[1]
60
			else:
61
				self.filt = 'bandpass'
62
		else:
63
			self.filt = False
64
65
		if self.filt == 'bandpass':
66
			printM('Alert stream will be %s filtered from %s to %s Hz'
67
					% (self.filt, self.freqmin, self.freqmax), self.sender)
68
		elif self.filt in ('lowpass', 'highpass'):
69
			modifier = 'below' if self.filt in 'lowpass' else 'above'
70
			printM('Alert stream will be %s filtered %s %s Hz'
71
					% (self.filt, modifier, self.freq), self.sender)
72
73
74
	def _set_deconv(self, deconv):
75
		'''
76
		This function sets the deconvolution units. Allowed values are as follows:
77
78
		.. |ms2| replace:: m/s\ :sup:`2`\
79
80
		- ``'VEL'`` - velocity (m/s)
81
		- ``'ACC'`` - acceleration (|ms2|)
82
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
83
		- ``'DISP'`` - displacement (m)
84
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
85
86
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
87
		'''
88
		deconv = deconv.upper() if deconv else False
89
		self.deconv = self.deconv if (deconv in rs.UNITS) else False
90
		if self.deconv and rs.inv:
91
			self.units = '%s (%s)' % (rs.UNITS[0], rs.UNITS[1]) if (self.deconv in rs.UNITS) else self.units
92
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
93
		else:
94
			self.units = rs.UNITS['CHAN'][1]
95
			self.deconv = False
96
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
97
98
99
	def _find_chn(self):
100
		'''
101
		Finds channel match in list of channels.
102
		'''
103
		for chn in rs.chns:
104
			if self.cha in chn:
105
				self.cha = chn
106
107
108
	def _set_channel(self, cha):
109
		'''
110
		This function sets the channel to listen to. Allowed values are as follows:
111
112
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
113
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
114
		- ``"HDF"`` - pressure transducer channel
115
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
116
117
		:param cha: the channel to listen to
118
		:type cha: str
119
		'''
120
		cha = self.default_ch if (cha == 'all') else cha
121
		self.cha = cha if isinstance(cha, str) else cha[0]
122
123
		if self.cha in str(rs.chns):
124
			self._find_chn()
125
		else:
126
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
127
			sys.exit(2)
128
129
130
	def __init__(self, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
131
				 debug=True, cha='HZ', q=False, sound=False, deconv=False,
132
				 *args, **kwargs):
133
		"""
134
		Initializing the alert thread with parameters to set up the recursive
135
		STA-LTA trigger, filtering, and the channel used for listening.
136
		"""
137
		super().__init__()
138
		self.sender = 'Alert'
139
		self.alive = True
140
141
		self.queue = q
142
143
		self.default_ch = 'HZ'
144
		self.sta = sta
145
		self.lta = lta
146
		self.thresh = thresh
147
		self.reset = reset
148
		self.debug = debug
149
		self.args = args
150
		self.kwargs = kwargs
151
		self.raw = rs.Stream()
152
		self.stream = rs.Stream()
153
154
		self._set_channel(cha)
155
156
		self.sps = rs.sps
157
		self.inv = rs.inv
158
		self.stalta = np.ndarray(1)
159
		self.maxstalta = 0
160
		self.units = 'counts'
161
		
162
		self._set_deconv(deconv)
163
164
		self.exceed = False
165
		self.sound = sound
166
		
167
		self._set_filt(bp)
168
169
170
	def _getq(self):
171
		'''
172
		Reads data from the queue and updates the stream.
173
174
		:rtype: bool
175
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
176
		'''
177
		d = self.queue.get(True, timeout=None)
178
		self.queue.task_done()
179
		if self.cha in str(d):
180
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
181
			return True
182
		elif 'TERM' in str(d):
183
			self.alive = False
184
			printM('Exiting.', self.sender)
185
			sys.exit()
186
		else:
187
			return False
188
189
190
	def _deconvolve(self):
191
		'''
192
		Deconvolves the stream associated with this class.
193
		'''
194
		if self.deconv:
195
			rs.deconvolve(self)
196
197
198
	def _subloop(self):
199
		'''
200
		Gets the queue and figures out whether or not the specified channel is in the packet.
201
		'''
202
		while True:
203
			if self.queue.qsize() > 0:
204
				self._getq()			# get recent packets
205
			else:
206
				if self._getq():		# is this the specified channel? if so break
207
					break
208
209
210
	def _filter(self):
211
		'''
212
		Filters the stream associated with this class.
213
		'''
214
		if self.filt:
215
			if self.filt in 'bandpass':
216
				self.stalta = recursive_sta_lta(
217
							self.stream[0].copy().filter(type=self.filt,
218
							freqmin=self.freqmin, freqmax=self.freqmax),
219
							int(self.sta * self.sps), int(self.lta * self.sps))
220
			else:
221
				self.stalta = recursive_sta_lta(
222
							self.stream[0].copy().filter(type=self.filt,
223
							freq=self.freq),
224
							int(self.sta * self.sps), int(self.lta * self.sps))
225
		else:
226
			self.stalta = recursive_sta_lta(self.stream[0],
227
					int(self.sta * self.sps), int(self.lta * self.sps))
228
229
230
	def _is_trigger(self):
231
		'''
232
		Figures out it there's a trigger active.
233
		'''
234
		if self.stalta.max() > self.thresh:
235
			if not self.exceed:
236
				# raise a flag that the Producer can read and modify 
237
				self.alarm = rs.fsec(self.stream[0].stats.starttime + timedelta(seconds=
238
										trigger_onset(self.stalta, self.thresh,
239
										self.reset)[-1][0] * self.stream[0].stats.delta))
240
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
241
				print()
242
				printM('Trigger threshold of %s exceeded at %s'
243
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
244
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
245
				COLOR['current'] = COLOR['purple']
246
			else:
247
				pass
248
249
			if self.stalta.max() > self.maxstalta:
250
				self.maxstalta = self.stalta.max()
251
		else:
252
			if self.exceed:
253
				if self.stalta[-1] < self.reset:
254
					self.alarm_reset = rs.fsec(self.stream[0].stats.endtime)	# lazy; effective
255
					self.exceed = False
256
					print()
257
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
258
							self.sender)
259
					printM('Earthquake trigger reset and active again at %s' % (
260
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
261
							self.sender)
262
					self.maxstalta = 0
263
					COLOR['current'] = COLOR['green']
264
			else:
265
				pass
266
267
268
	def _print_stalta(self):
269
		'''
270
		Print the current max STA/LTA of the stream.
271
		'''
272
		if self.debug:
273
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
274
					(self.stream[0].stats.starttime + timedelta(seconds=
275
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
276
					self.sender,
277
					self.thresh,
278
					round(np.max(self.stalta[-50:]), 4)
279
					)
280
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
281
282
283
	def run(self):
284
		"""
285
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
286
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
287
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
288
		The producer reads this flag and uses it to notify other consumers.
289
		"""
290
		n = 0
291
292
		wait_pkts = (self.lta) / (rs.tf / 1000)
293
294
		while n > 3:
295
			self.getq()
296
			n += 1
297
298
		n = 0
299
		while True:
300
			self._subloop()
301
302
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
303
			self.stream = self.raw.copy()
304
			self._deconvolve()
305
306
			if n > wait_pkts:
307
				# if the trigger is activated
308
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
309
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
310
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
311
312
				# filter
313
				self._filter()
314
				# figure out if the trigger has gone off
315
				self._is_trigger()
316
317
				# copy the stream (necessary to avoid memory leak)
318
				self.stream = rs.copy(self.stream)
319
320
				# print the current STA/LTA calculation
321
				self._print_stalta()
322
323
			elif n == 0:
324
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
325
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
326
				printM('Earthquake trigger warmup time of %s seconds...'
327
					   % (self.lta), self.sender)
328
			elif n == wait_pkts:
329
				printM('Earthquake trigger up and running normally.',
330
					   self.sender)
331
			else:
332
				pass
333
334
			n += 1
335
			sys.stdout.flush()
336