Test Failed
Pull Request — master (#25)
by Ian
03:06
created

build.rsudp.c_alert.Alert._set_channel()   A

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 7

Duplication

Lines 20
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 7
dl 20
loc 20
rs 10
c 0
b 0
f 0
cc 4
nop 2
1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
from rsudp import COLOR, helpers
7
from rsudp.test import TEST
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47
		self.filt = False
48
		if bp:
49
			self.freqmin = bp[0]
50
			self.freqmax = bp[1]
51
			self.freq = 0
52
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
53
				self.filt = False
54
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
55
				self.filt = 'highpass'
56
				self.freq = bp[0]
57
				desc = 'low corner %s' % (bp[0])
58
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
59
				self.filt = 'lowpass'
60
				self.freq = bp[1]
61
			else:
62
				self.filt = 'bandpass'
63
64
65 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
		'''
67
		This function sets the deconvolution units. Allowed values are as follows:
68
69
		.. |ms2| replace:: m/s\ :sup:`2`\
70
71
		- ``'VEL'`` - velocity (m/s)
72
		- ``'ACC'`` - acceleration (|ms2|)
73
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
74
		- ``'DISP'`` - displacement (m)
75
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
76
77
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
78
		'''
79
		deconv = deconv.upper() if deconv else False
80
		self.deconv = deconv if (deconv in rs.UNITS) else False
81
		if self.deconv and rs.inv:
82
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
83
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
84
		else:
85
			self.units = rs.UNITS['CHAN'][1]
86
			self.deconv = False
87
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
88
89
90
	def _find_chn(self):
91
		'''
92
		Finds channel match in list of channels.
93
		'''
94
		for chn in rs.chns:
95
			if self.cha in chn:
96
				self.cha = chn
97
98
99 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
100
		'''
101
		This function sets the channel to listen to. Allowed values are as follows:
102
103
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
104
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
105
		- ``"HDF"`` - pressure transducer channel
106
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
107
108
		:param cha: the channel to listen to
109
		:type cha: str
110
		'''
111
		cha = self.default_ch if (cha == 'all') else cha
112
		self.cha = cha if isinstance(cha, str) else cha[0]
113
114
		if self.cha in str(rs.chns):
115
			self._find_chn()
116
		else:
117
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
118
			sys.exit(2)
119
120
121
	def _print_filt(self):
122
		'''
123
		Prints stream filtering information.
124
		'''
125
		if self.filt == 'bandpass':
126
			printM('Alert stream will be %s filtered from %s to %s Hz'
127
					% (self.filt, self.freqmin, self.freqmax), self.sender)
128
		elif self.filt in ('lowpass', 'highpass'):
129
			modifier = 'below' if self.filt in 'lowpass' else 'above'
130
			printM('Alert stream will be %s filtered %s %s Hz'
131
					% (self.filt, modifier, self.freq), self.sender)
132
133
134
	def __init__(self, q, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
135
				 debug=True, cha='HZ', sound=False, deconv=False, testing=False,
136
				 *args, **kwargs):
137
		"""
138
		Initializing the alert thread with parameters to set up the recursive
139
		STA-LTA trigger, filtering, and the channel used for listening.
140
		"""
141
		super().__init__()
142
		self.sender = 'Alert'
143
		self.alive = True
144
		self.testing = testing
145
146
		self.queue = q
147
148
		self.default_ch = 'HZ'
149
		self.sta = sta
150
		self.lta = lta
151
		self.thresh = thresh
152
		self.reset = reset
153
		self.debug = debug
154
		self.args = args
155
		self.kwargs = kwargs
156
		self.raw = rs.Stream()
157
		self.stream = rs.Stream()
158
159
		self._set_channel(cha)
160
161
		self.sps = rs.sps
162
		self.inv = rs.inv
163
		self.stalta = np.ndarray(1)
164
		self.maxstalta = 0
165
		self.units = 'counts'
166
		
167
		self._set_deconv(deconv)
168
169
		self.exceed = False
170
		self.sound = sound
171
		
172
		self._set_filt(bp)
173
		self._print_filt()
174
175
176 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
177
		'''
178
		Reads data from the queue and updates the stream.
179
180
		:rtype: bool
181
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
182
		'''
183
		d = self.queue.get(True, timeout=None)
184
		self.queue.task_done()
185
		if self.cha in str(d):
186
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
187
			return True
188
		elif 'TERM' in str(d):
189
			self.alive = False
190
			printM('Exiting.', self.sender)
191
			sys.exit()
192
		else:
193
			return False
194
195
196
	def _deconvolve(self):
197
		'''
198
		Deconvolves the stream associated with this class.
199
		'''
200
		if self.deconv:
201
			helpers.deconvolve(self)
202
203
204
	def _subloop(self):
205
		'''
206
		Gets the queue and figures out whether or not the specified channel is in the packet.
207
		'''
208
		while True:
209
			if self.queue.qsize() > 0:
210
				self._getq()			# get recent packets
211
			else:
212
				if self._getq():		# is this the specified channel? if so break
213
					break
214
215
216
	def _filter(self):
217
		'''
218
		Filters the stream associated with this class.
219
		'''
220
		if self.filt:
221
			if self.filt in 'bandpass':
222
				self.stalta = recursive_sta_lta(
223
							self.stream[0].copy().filter(type=self.filt,
224
							freqmin=self.freqmin, freqmax=self.freqmax),
225
							int(self.sta * self.sps), int(self.lta * self.sps))
226
			else:
227
				self.stalta = recursive_sta_lta(
228
							self.stream[0].copy().filter(type=self.filt,
229
							freq=self.freq),
230
							int(self.sta * self.sps), int(self.lta * self.sps))
231
		else:
232
			self.stalta = recursive_sta_lta(self.stream[0],
233
					int(self.sta * self.sps), int(self.lta * self.sps))
234
235
236
	def _is_trigger(self):
237
		'''
238
		Figures out it there's a trigger active.
239
		'''
240
		if self.stalta.max() > self.thresh:
241
			if not self.exceed:
242
				# raise a flag that the Producer can read and modify 
243
				self.alarm = helpers.fsec(self.stream[0].stats.starttime + timedelta(seconds=
244
										trigger_onset(self.stalta, self.thresh,
245
										self.reset)[-1][0] * self.stream[0].stats.delta))
246
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
247
				print()
248
				printM('Trigger threshold of %s exceeded at %s'
249
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
250
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
251
				COLOR['current'] = COLOR['purple']
252
				if self.testing:
253
					TEST['c_alerton'][1] = True
254
			else:
255
				pass
256
257
			if self.stalta.max() > self.maxstalta:
258
				self.maxstalta = self.stalta.max()
259
		else:
260
			if self.exceed:
261
				if self.stalta[-1] < self.reset:
262
					self.alarm_reset = helpers.fsec(self.stream[0].stats.endtime)	# lazy; effective
263
					self.exceed = False
264
					print()
265
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
266
							self.sender)
267
					printM('Earthquake trigger reset and active again at %s' % (
268
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
269
							self.sender)
270
					self.maxstalta = 0
271
					COLOR['current'] = COLOR['green']
272
				if self.testing:
273
					TEST['c_alertoff'][1] = True
274
275
			else:
276
				pass
277
278
279
	def _print_stalta(self):
280
		'''
281
		Print the current max STA/LTA of the stream.
282
		'''
283
		if self.debug:
284
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
285
					(self.stream[0].stats.starttime + timedelta(seconds=
286
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
287
					self.sender,
288
					self.thresh,
289
					round(np.max(self.stalta[-50:]), 4)
290
					)
291
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
292
293
294
	def run(self):
295
		"""
296
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
297
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
298
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
299
		The producer reads this flag and uses it to notify other consumers.
300
		"""
301
		n = 0
302
303
		wait_pkts = (self.lta) / (rs.tf / 1000)
304
305
		while n > 3:
306
			self.getq()
307
			n += 1
308
309
		n = 0
310
		while True:
311
			self._subloop()
312
313
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
314
			self.stream = self.raw.copy()
315
			self._deconvolve()
316
317
			if n > wait_pkts:
318
				# if the trigger is activated
319
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
320
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
321
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
322
323
				# filter
324
				self._filter()
325
				# figure out if the trigger has gone off
326
				self._is_trigger()
327
328
				# copy the stream (necessary to avoid memory leak)
329
				self.stream = rs.copy(self.stream)
330
331
				# print the current STA/LTA calculation
332
				self._print_stalta()
333
334
			elif n == 0:
335
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
336
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
337
				printM('Earthquake trigger warmup time of %s seconds...'
338
					   % (self.lta), self.sender)
339
			elif n == wait_pkts:
340
				printM('Earthquake trigger up and running normally.',
341
					   self.sender)
342
			else:
343
				pass
344
345
			n += 1
346
			sys.stdout.flush()
347