Passed
Push — master ( 3b78b5...c00388 )
by Ian
04:10
created

build.rsudp.c_alert.Alert.__init__()   A

Complexity

Conditions 1

Size

Total Lines 39
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 28
nop 13
dl 0
loc 39
rs 9.208
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47
		if bp:
48
			self.freqmin = bp[0]
49
			self.freqmax = bp[1]
50
			self.freq = 0
51
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
52
				self.filt = False
53
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
54
				self.filt = 'highpass'
55
				self.freq = bp[0]
56
				desc = 'low corner %s' % (bp[0])
57
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
58
				self.filt = 'lowpass'
59
				self.freq = bp[1]
60
			else:
61
				self.filt = 'bandpass'
62
		else:
63
			self.filt = False
64
65
66
	def _set_deconv(self, deconv):
67
		'''
68
		This function sets the deconvolution units. Allowed values are as follows:
69
70
		.. |ms2| replace:: m/s\ :sup:`2`\
71
72
		- ``'VEL'`` - velocity (m/s)
73
		- ``'ACC'`` - acceleration (|ms2|)
74
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
75
		- ``'DISP'`` - displacement (m)
76
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
77
78
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
79
		'''
80
		deconv = deconv.upper() if deconv else False
81
		self.deconv = self.deconv if (deconv in rs.UNITS) else False
82
		if self.deconv and rs.inv:
83
			self.units = '%s (%s)' % (rs.UNITS[0], rs.UNITS[1]) if (self.deconv in rs.UNITS) else self.units
84
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
85
		else:
86
			self.units = rs.UNITS['CHAN'][1]
87
			self.deconv = False
88
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
89
90
91
	def _find_chn(self):
92
		'''
93
		Finds channel match in list of channels.
94
		'''
95
		for chn in rs.chns:
96
			if self.cha in chn:
97
				self.cha = chn
98
99
100
	def _set_channel(self, cha):
101
		'''
102
		This function sets the channel to listen to. Allowed values are as follows:
103
104
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
105
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
106
		- ``"HDF"`` - pressure transducer channel
107
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
108
109
		:param cha: the channel to listen to
110
		:type cha: str
111
		'''
112
		cha = self.default_ch if (cha == 'all') else cha
113
		self.cha = cha if isinstance(cha, str) else cha[0]
114
115
		if self.cha in str(rs.chns):
116
			self._find_chn()
117
		else:
118
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
119
			sys.exit(2)
120
121
122
	def _print_filt(self):
123
		'''
124
		Prints stream filtering information.
125
		'''
126
		if self.filt == 'bandpass':
127
			printM('Alert stream will be %s filtered from %s to %s Hz'
128
					% (self.filt, self.freqmin, self.freqmax), self.sender)
129
		elif self.filt in ('lowpass', 'highpass'):
130
			modifier = 'below' if self.filt in 'lowpass' else 'above'
131
			printM('Alert stream will be %s filtered %s %s Hz'
132
					% (self.filt, modifier, self.freq), self.sender)
133
134
135
	def __init__(self, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
136
				 debug=True, cha='HZ', q=False, sound=False, deconv=False,
137
				 *args, **kwargs):
138
		"""
139
		Initializing the alert thread with parameters to set up the recursive
140
		STA-LTA trigger, filtering, and the channel used for listening.
141
		"""
142
		super().__init__()
143
		self.sender = 'Alert'
144
		self.alive = True
145
146
		self.queue = q
147
148
		self.default_ch = 'HZ'
149
		self.sta = sta
150
		self.lta = lta
151
		self.thresh = thresh
152
		self.reset = reset
153
		self.debug = debug
154
		self.args = args
155
		self.kwargs = kwargs
156
		self.raw = rs.Stream()
157
		self.stream = rs.Stream()
158
159
		self._set_channel(cha)
160
161
		self.sps = rs.sps
162
		self.inv = rs.inv
163
		self.stalta = np.ndarray(1)
164
		self.maxstalta = 0
165
		self.units = 'counts'
166
		
167
		self._set_deconv(deconv)
168
169
		self.exceed = False
170
		self.sound = sound
171
		
172
		self._set_filt(bp)
173
		self._print_filt()
174
175
176
	def _getq(self):
177
		'''
178
		Reads data from the queue and updates the stream.
179
180
		:rtype: bool
181
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
182
		'''
183
		d = self.queue.get(True, timeout=None)
184
		self.queue.task_done()
185
		if self.cha in str(d):
186
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
187
			return True
188
		elif 'TERM' in str(d):
189
			self.alive = False
190
			printM('Exiting.', self.sender)
191
			sys.exit()
192
		else:
193
			return False
194
195
196
	def _deconvolve(self):
197
		'''
198
		Deconvolves the stream associated with this class.
199
		'''
200
		if self.deconv:
201
			rs.deconvolve(self)
202
203
204
	def _subloop(self):
205
		'''
206
		Gets the queue and figures out whether or not the specified channel is in the packet.
207
		'''
208
		while True:
209
			if self.queue.qsize() > 0:
210
				self._getq()			# get recent packets
211
			else:
212
				if self._getq():		# is this the specified channel? if so break
213
					break
214
215
216
	def _filter(self):
217
		'''
218
		Filters the stream associated with this class.
219
		'''
220
		if self.filt:
221
			if self.filt in 'bandpass':
222
				self.stalta = recursive_sta_lta(
223
							self.stream[0].copy().filter(type=self.filt,
224
							freqmin=self.freqmin, freqmax=self.freqmax),
225
							int(self.sta * self.sps), int(self.lta * self.sps))
226
			else:
227
				self.stalta = recursive_sta_lta(
228
							self.stream[0].copy().filter(type=self.filt,
229
							freq=self.freq),
230
							int(self.sta * self.sps), int(self.lta * self.sps))
231
		else:
232
			self.stalta = recursive_sta_lta(self.stream[0],
233
					int(self.sta * self.sps), int(self.lta * self.sps))
234
235
236
	def _is_trigger(self):
237
		'''
238
		Figures out it there's a trigger active.
239
		'''
240
		if self.stalta.max() > self.thresh:
241
			if not self.exceed:
242
				# raise a flag that the Producer can read and modify 
243
				self.alarm = rs.fsec(self.stream[0].stats.starttime + timedelta(seconds=
244
										trigger_onset(self.stalta, self.thresh,
245
										self.reset)[-1][0] * self.stream[0].stats.delta))
246
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
247
				print()
248
				printM('Trigger threshold of %s exceeded at %s'
249
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
250
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
251
				COLOR['current'] = COLOR['purple']
252
			else:
253
				pass
254
255
			if self.stalta.max() > self.maxstalta:
256
				self.maxstalta = self.stalta.max()
257
		else:
258
			if self.exceed:
259
				if self.stalta[-1] < self.reset:
260
					self.alarm_reset = rs.fsec(self.stream[0].stats.endtime)	# lazy; effective
261
					self.exceed = False
262
					print()
263
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
264
							self.sender)
265
					printM('Earthquake trigger reset and active again at %s' % (
266
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
267
							self.sender)
268
					self.maxstalta = 0
269
					COLOR['current'] = COLOR['green']
270
			else:
271
				pass
272
273
274
	def _print_stalta(self):
275
		'''
276
		Print the current max STA/LTA of the stream.
277
		'''
278
		if self.debug:
279
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
280
					(self.stream[0].stats.starttime + timedelta(seconds=
281
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
282
					self.sender,
283
					self.thresh,
284
					round(np.max(self.stalta[-50:]), 4)
285
					)
286
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
287
288
289
	def run(self):
290
		"""
291
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
292
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
293
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
294
		The producer reads this flag and uses it to notify other consumers.
295
		"""
296
		n = 0
297
298
		wait_pkts = (self.lta) / (rs.tf / 1000)
299
300
		while n > 3:
301
			self.getq()
302
			n += 1
303
304
		n = 0
305
		while True:
306
			self._subloop()
307
308
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
309
			self.stream = self.raw.copy()
310
			self._deconvolve()
311
312
			if n > wait_pkts:
313
				# if the trigger is activated
314
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
315
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
316
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
317
318
				# filter
319
				self._filter()
320
				# figure out if the trigger has gone off
321
				self._is_trigger()
322
323
				# copy the stream (necessary to avoid memory leak)
324
				self.stream = rs.copy(self.stream)
325
326
				# print the current STA/LTA calculation
327
				self._print_stalta()
328
329
			elif n == 0:
330
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
331
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
332
				printM('Earthquake trigger warmup time of %s seconds...'
333
					   % (self.lta), self.sender)
334
			elif n == wait_pkts:
335
				printM('Earthquake trigger up and running normally.',
336
					   self.sender)
337
			else:
338
				pass
339
340
			n += 1
341
			sys.stdout.flush()
342