Passed
Push — master ( 3b78b5...c00388 )
by Ian
04:10
created

build.rsudp.c_alert.Alert._deconvolve()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47
		if bp:
48
			self.freqmin = bp[0]
49
			self.freqmax = bp[1]
50
			self.freq = 0
51
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
52
				self.filt = False
53
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
54
				self.filt = 'highpass'
55
				self.freq = bp[0]
56
				desc = 'low corner %s' % (bp[0])
57
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
58
				self.filt = 'lowpass'
59
				self.freq = bp[1]
60
			else:
61
				self.filt = 'bandpass'
62
		else:
63
			self.filt = False
64
65
66
	def _set_deconv(self, deconv):
67
		'''
68
		This function sets the deconvolution units. Allowed values are as follows:
69
70
		.. |ms2| replace:: m/s\ :sup:`2`\
71
72
		- ``'VEL'`` - velocity (m/s)
73
		- ``'ACC'`` - acceleration (|ms2|)
74
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
75
		- ``'DISP'`` - displacement (m)
76
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
77
78
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
79
		'''
80
		deconv = deconv.upper() if deconv else False
81
		self.deconv = self.deconv if (deconv in rs.UNITS) else False
82
		if self.deconv and rs.inv:
83
			self.units = '%s (%s)' % (rs.UNITS[0], rs.UNITS[1]) if (self.deconv in rs.UNITS) else self.units
84
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
85
		else:
86
			self.units = rs.UNITS['CHAN'][1]
87
			self.deconv = False
88
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
89
90
91
	def _find_chn(self):
92
		'''
93
		Finds channel match in list of channels.
94
		'''
95
		for chn in rs.chns:
96
			if self.cha in chn:
97
				self.cha = chn
98
99
100
	def _set_channel(self, cha):
101
		'''
102
		This function sets the channel to listen to. Allowed values are as follows:
103
104
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
105
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
106
		- ``"HDF"`` - pressure transducer channel
107
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
108
109
		:param cha: the channel to listen to
110
		:type cha: str
111
		'''
112
		cha = self.default_ch if (cha == 'all') else cha
113
		self.cha = cha if isinstance(cha, str) else cha[0]
114
115
		if self.cha in str(rs.chns):
116
			self._find_chn()
117
		else:
118
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
119
			sys.exit(2)
120
121
122
	def _print_filt(self):
123
		'''
124
		Prints stream filtering information.
125
		'''
126
		if self.filt == 'bandpass':
127
			printM('Alert stream will be %s filtered from %s to %s Hz'
128
					% (self.filt, self.freqmin, self.freqmax), self.sender)
129
		elif self.filt in ('lowpass', 'highpass'):
130
			modifier = 'below' if self.filt in 'lowpass' else 'above'
131
			printM('Alert stream will be %s filtered %s %s Hz'
132
					% (self.filt, modifier, self.freq), self.sender)
133
134
135
	def __init__(self, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
136
				 debug=True, cha='HZ', q=False, sound=False, deconv=False,
137
				 *args, **kwargs):
138
		"""
139
		Initializing the alert thread with parameters to set up the recursive
140
		STA-LTA trigger, filtering, and the channel used for listening.
141
		"""
142
		super().__init__()
143
		self.sender = 'Alert'
144
		self.alive = True
145
146
		self.queue = q
147
148
		self.default_ch = 'HZ'
149
		self.sta = sta
150
		self.lta = lta
151
		self.thresh = thresh
152
		self.reset = reset
153
		self.debug = debug
154
		self.args = args
155
		self.kwargs = kwargs
156
		self.raw = rs.Stream()
157
		self.stream = rs.Stream()
158
159
		self._set_channel(cha)
160
161
		self.sps = rs.sps
162
		self.inv = rs.inv
163
		self.stalta = np.ndarray(1)
164
		self.maxstalta = 0
165
		self.units = 'counts'
166
		
167
		self._set_deconv(deconv)
168
169
		self.exceed = False
170
		self.sound = sound
171
		
172
		self._set_filt(bp)
173
		self._print_filt()
174
175
176
	def _getq(self):
177
		'''
178
		Reads data from the queue and updates the stream.
179
180
		:rtype: bool
181
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
182
		'''
183
		d = self.queue.get(True, timeout=None)
184
		self.queue.task_done()
185
		if self.cha in str(d):
186
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
187
			return True
188
		elif 'TERM' in str(d):
189
			self.alive = False
190
			printM('Exiting.', self.sender)
191
			sys.exit()
192
		else:
193
			return False
194
195
196
	def _deconvolve(self):
197
		'''
198
		Deconvolves the stream associated with this class.
199
		'''
200
		if self.deconv:
201
			rs.deconvolve(self)
202
203
204
	def _subloop(self):
205
		'''
206
		Gets the queue and figures out whether or not the specified channel is in the packet.
207
		'''
208
		while True:
209
			if self.queue.qsize() > 0:
210
				self._getq()			# get recent packets
211
			else:
212
				if self._getq():		# is this the specified channel? if so break
213
					break
214
215
216
	def _filter(self):
217
		'''
218
		Filters the stream associated with this class.
219
		'''
220
		if self.filt:
221
			if self.filt in 'bandpass':
222
				self.stalta = recursive_sta_lta(
223
							self.stream[0].copy().filter(type=self.filt,
224
							freqmin=self.freqmin, freqmax=self.freqmax),
225
							int(self.sta * self.sps), int(self.lta * self.sps))
226
			else:
227
				self.stalta = recursive_sta_lta(
228
							self.stream[0].copy().filter(type=self.filt,
229
							freq=self.freq),
230
							int(self.sta * self.sps), int(self.lta * self.sps))
231
		else:
232
			self.stalta = recursive_sta_lta(self.stream[0],
233
					int(self.sta * self.sps), int(self.lta * self.sps))
234
235
236
	def _is_trigger(self):
237
		'''
238
		Figures out it there's a trigger active.
239
		'''
240
		if self.stalta.max() > self.thresh:
241
			if not self.exceed:
242
				# raise a flag that the Producer can read and modify 
243
				self.alarm = rs.fsec(self.stream[0].stats.starttime + timedelta(seconds=
244
										trigger_onset(self.stalta, self.thresh,
245
										self.reset)[-1][0] * self.stream[0].stats.delta))
246
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
247
				print()
248
				printM('Trigger threshold of %s exceeded at %s'
249
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
250
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
251
				COLOR['current'] = COLOR['purple']
252
			else:
253
				pass
254
255
			if self.stalta.max() > self.maxstalta:
256
				self.maxstalta = self.stalta.max()
257
		else:
258
			if self.exceed:
259
				if self.stalta[-1] < self.reset:
260
					self.alarm_reset = rs.fsec(self.stream[0].stats.endtime)	# lazy; effective
261
					self.exceed = False
262
					print()
263
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
264
							self.sender)
265
					printM('Earthquake trigger reset and active again at %s' % (
266
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
267
							self.sender)
268
					self.maxstalta = 0
269
					COLOR['current'] = COLOR['green']
270
			else:
271
				pass
272
273
274
	def _print_stalta(self):
275
		'''
276
		Print the current max STA/LTA of the stream.
277
		'''
278
		if self.debug:
279
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
280
					(self.stream[0].stats.starttime + timedelta(seconds=
281
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
282
					self.sender,
283
					self.thresh,
284
					round(np.max(self.stalta[-50:]), 4)
285
					)
286
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
287
288
289
	def run(self):
290
		"""
291
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
292
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
293
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
294
		The producer reads this flag and uses it to notify other consumers.
295
		"""
296
		n = 0
297
298
		wait_pkts = (self.lta) / (rs.tf / 1000)
299
300
		while n > 3:
301
			self.getq()
302
			n += 1
303
304
		n = 0
305
		while True:
306
			self._subloop()
307
308
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
309
			self.stream = self.raw.copy()
310
			self._deconvolve()
311
312
			if n > wait_pkts:
313
				# if the trigger is activated
314
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
315
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
316
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
317
318
				# filter
319
				self._filter()
320
				# figure out if the trigger has gone off
321
				self._is_trigger()
322
323
				# copy the stream (necessary to avoid memory leak)
324
				self.stream = rs.copy(self.stream)
325
326
				# print the current STA/LTA calculation
327
				self._print_stalta()
328
329
			elif n == 0:
330
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
331
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
332
				printM('Earthquake trigger warmup time of %s seconds...'
333
					   % (self.lta), self.sender)
334
			elif n == wait_pkts:
335
				printM('Earthquake trigger up and running normally.',
336
					   self.sender)
337
			else:
338
				pass
339
340
			n += 1
341
			sys.stdout.flush()
342