Passed
Push — master ( 0acd6f...3b78b5 )
by Ian
04:08
created

build.rsudp.c_alert.Alert.__init__()   A

Complexity

Conditions 1

Size

Total Lines 38
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 27
nop 13
dl 0
loc 38
rs 9.232
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47
		if bp:
48
			self.freqmin = bp[0]
49
			self.freqmax = bp[1]
50
			self.freq = 0
51
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
52
				self.filt = False
53
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
54
				self.filt = 'highpass'
55
				self.freq = bp[0]
56
				desc = 'low corner %s' % (bp[0])
57
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
58
				self.filt = 'lowpass'
59
				self.freq = bp[1]
60
			else:
61
				self.filt = 'bandpass'
62
		else:
63
			self.filt = False
64
65
		if self.filt == 'bandpass':
66
			printM('Alert stream will be %s filtered from %s to %s Hz'
67
					% (self.filt, self.freqmin, self.freqmax), self.sender)
68
		elif self.filt in ('lowpass', 'highpass'):
69
			modifier = 'below' if self.filt in 'lowpass' else 'above'
70
			printM('Alert stream will be %s filtered %s %s Hz'
71
					% (self.filt, modifier, self.freq), self.sender)
72
73
74
	def _set_deconv(self, deconv):
75
		'''
76
		This function sets the deconvolution units. Allowed values are as follows:
77
78
		.. |ms2| replace:: m/s\ :sup:`2`\
79
80
		- ``'VEL'`` - velocity (m/s)
81
		- ``'ACC'`` - acceleration (|ms2|)
82
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
83
		- ``'DISP'`` - displacement (m)
84
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
85
86
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
87
		'''
88
		deconv = deconv.upper() if deconv else False
89
		self.deconv = self.deconv if (deconv in rs.UNITS) else False
90
		if self.deconv and rs.inv:
91
			self.units = '%s (%s)' % (rs.UNITS[0], rs.UNITS[1]) if (self.deconv in rs.UNITS) else self.units
92
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
93
		else:
94
			self.units = rs.UNITS['CHAN'][1]
95
			self.deconv = False
96
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
97
98
99
	def _find_chn(self):
100
		'''
101
		Finds channel match in list of channels.
102
		'''
103
		for chn in rs.chns:
104
			if self.cha in chn:
105
				self.cha = chn
106
107
108
	def _set_channel(self, cha):
109
		'''
110
		This function sets the channel to listen to. Allowed values are as follows:
111
112
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
113
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
114
		- ``"HDF"`` - pressure transducer channel
115
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
116
117
		:param cha: the channel to listen to
118
		:type cha: str
119
		'''
120
		cha = self.default_ch if (cha == 'all') else cha
121
		self.cha = cha if isinstance(cha, str) else cha[0]
122
123
		if self.cha in str(rs.chns):
124
			self._find_chn()
125
		else:
126
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
127
			sys.exit(2)
128
129
130
	def __init__(self, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
131
				 debug=True, cha='HZ', q=False, sound=False, deconv=False,
132
				 *args, **kwargs):
133
		"""
134
		Initializing the alert thread with parameters to set up the recursive
135
		STA-LTA trigger, filtering, and the channel used for listening.
136
		"""
137
		super().__init__()
138
		self.sender = 'Alert'
139
		self.alive = True
140
141
		self.queue = q
142
143
		self.default_ch = 'HZ'
144
		self.sta = sta
145
		self.lta = lta
146
		self.thresh = thresh
147
		self.reset = reset
148
		self.debug = debug
149
		self.args = args
150
		self.kwargs = kwargs
151
		self.raw = rs.Stream()
152
		self.stream = rs.Stream()
153
154
		self._set_channel(cha)
155
156
		self.sps = rs.sps
157
		self.inv = rs.inv
158
		self.stalta = np.ndarray(1)
159
		self.maxstalta = 0
160
		self.units = 'counts'
161
		
162
		self._set_deconv(deconv)
163
164
		self.exceed = False
165
		self.sound = sound
166
		
167
		self._set_filt(bp)
168
169
170
	def _getq(self):
171
		'''
172
		Reads data from the queue and updates the stream.
173
174
		:rtype: bool
175
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
176
		'''
177
		d = self.queue.get(True, timeout=None)
178
		self.queue.task_done()
179
		if self.cha in str(d):
180
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
181
			return True
182
		elif 'TERM' in str(d):
183
			self.alive = False
184
			printM('Exiting.', self.sender)
185
			sys.exit()
186
		else:
187
			return False
188
189
190
	def _deconvolve(self):
191
		'''
192
		Deconvolves the stream associated with this class.
193
		'''
194
		if self.deconv:
195
			rs.deconvolve(self)
196
197
198
	def _subloop(self):
199
		'''
200
		Gets the queue and figures out whether or not the specified channel is in the packet.
201
		'''
202
		while True:
203
			if self.queue.qsize() > 0:
204
				self._getq()			# get recent packets
205
			else:
206
				if self._getq():		# is this the specified channel? if so break
207
					break
208
209
210
	def _filter(self):
211
		'''
212
		Filters the stream associated with this class.
213
		'''
214
		if self.filt:
215
			if self.filt in 'bandpass':
216
				self.stalta = recursive_sta_lta(
217
							self.stream[0].copy().filter(type=self.filt,
218
							freqmin=self.freqmin, freqmax=self.freqmax),
219
							int(self.sta * self.sps), int(self.lta * self.sps))
220
			else:
221
				self.stalta = recursive_sta_lta(
222
							self.stream[0].copy().filter(type=self.filt,
223
							freq=self.freq),
224
							int(self.sta * self.sps), int(self.lta * self.sps))
225
		else:
226
			self.stalta = recursive_sta_lta(self.stream[0],
227
					int(self.sta * self.sps), int(self.lta * self.sps))
228
229
230
	def _is_trigger(self):
231
		'''
232
		Figures out it there's a trigger active.
233
		'''
234
		if self.stalta.max() > self.thresh:
235
			if not self.exceed:
236
				# raise a flag that the Producer can read and modify 
237
				self.alarm = rs.fsec(self.stream[0].stats.starttime + timedelta(seconds=
238
										trigger_onset(self.stalta, self.thresh,
239
										self.reset)[-1][0] * self.stream[0].stats.delta))
240
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
241
				print()
242
				printM('Trigger threshold of %s exceeded at %s'
243
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
244
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
245
				COLOR['current'] = COLOR['purple']
246
			else:
247
				pass
248
249
			if self.stalta.max() > self.maxstalta:
250
				self.maxstalta = self.stalta.max()
251
		else:
252
			if self.exceed:
253
				if self.stalta[-1] < self.reset:
254
					self.alarm_reset = rs.fsec(self.stream[0].stats.endtime)	# lazy; effective
255
					self.exceed = False
256
					print()
257
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
258
							self.sender)
259
					printM('Earthquake trigger reset and active again at %s' % (
260
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
261
							self.sender)
262
					self.maxstalta = 0
263
					COLOR['current'] = COLOR['green']
264
			else:
265
				pass
266
267
268
	def _print_stalta(self):
269
		'''
270
		Print the current max STA/LTA of the stream.
271
		'''
272
		if self.debug:
273
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
274
					(self.stream[0].stats.starttime + timedelta(seconds=
275
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
276
					self.sender,
277
					self.thresh,
278
					round(np.max(self.stalta[-50:]), 4)
279
					)
280
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
281
282
283
	def run(self):
284
		"""
285
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
286
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
287
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
288
		The producer reads this flag and uses it to notify other consumers.
289
		"""
290
		n = 0
291
292
		wait_pkts = (self.lta) / (rs.tf / 1000)
293
294
		while n > 3:
295
			self.getq()
296
			n += 1
297
298
		n = 0
299
		while True:
300
			self._subloop()
301
302
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
303
			self.stream = self.raw.copy()
304
			self._deconvolve()
305
306
			if n > wait_pkts:
307
				# if the trigger is activated
308
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
309
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
310
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
311
312
				# filter
313
				self._filter()
314
				# figure out if the trigger has gone off
315
				self._is_trigger()
316
317
				# copy the stream (necessary to avoid memory leak)
318
				self.stream = rs.copy(self.stream)
319
320
				# print the current STA/LTA calculation
321
				self._print_stalta()
322
323
			elif n == 0:
324
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
325
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
326
				printM('Earthquake trigger warmup time of %s seconds...'
327
					   % (self.lta), self.sender)
328
			elif n == wait_pkts:
329
				printM('Earthquake trigger up and running normally.',
330
					   self.sender)
331
			else:
332
				pass
333
334
			n += 1
335
			sys.stdout.flush()
336