build.rsudp.c_alert.Alert._set_channel()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 20
Code Lines 7

Duplication

Lines 20
Ratio 100 %

Code Coverage

Tests 5
CRAP Score 4.3731

Importance

Changes 0
Metric Value
eloc 7
dl 20
loc 20
ccs 5
cts 7
cp 0.7143
rs 10
c 0
b 0
f 0
cc 4
nop 2
crap 4.3731
1 1
import sys
2 1
from datetime import timedelta
3 1
import rsudp.raspberryshake as rs
4 1
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5 1
from rsudp import printM, printW, printE
6 1
from rsudp import COLOR, helpers
7 1
from rsudp.test import TEST
8 1
import numpy as np
9
10
# set the terminal text color to green
11 1
COLOR['current'] = COLOR['green']
12
13
14 1
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
38 1
	def _set_filt(self, bp):
39
		'''
40
		This function sets the filter parameters (if specified).
41
		Set to a boolean if not filtering, or ``[highpass, lowpass]``
42
		if filtering.
43
44
		:param bp: bandpass filter parameters. if set, should be in the format ``[highpass, lowpass]``
45
		:type bp: :py:class:`bool` or :py:class:`list`
46
		'''
47 1
		self.filt = False
48 1
		if bp:
49 1
			self.freqmin = bp[0]
50 1
			self.freqmax = bp[1]
51 1
			self.freq = 0
52 1
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
53
				self.filt = False
54 1
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
55
				self.filt = 'highpass'
56
				self.freq = bp[0]
57
				desc = 'low corner %s' % (bp[0])
58 1
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
59
				self.filt = 'lowpass'
60
				self.freq = bp[1]
61
			else:
62 1
				self.filt = 'bandpass'
63
64
65 1 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
66
		'''
67
		This function sets the deconvolution units. Allowed values are as follows:
68
69
		.. |ms2| replace:: m/s\ :sup:`2`\
70
71
		- ``'VEL'`` - velocity (m/s)
72
		- ``'ACC'`` - acceleration (|ms2|)
73
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
74
		- ``'DISP'`` - displacement (m)
75
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
76
77
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
78
		'''
79 1
		deconv = deconv.upper() if deconv else False
80 1
		self.deconv = deconv if (deconv in rs.UNITS) else False
81 1
		if self.deconv and rs.inv:
82
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
83
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
84
		else:
85 1
			self.units = rs.UNITS['CHAN'][1]
86 1
			self.deconv = False
87 1
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
88
89
90 1
	def _find_chn(self):
91
		'''
92
		Finds channel match in list of channels.
93
		'''
94 1
		for chn in rs.chns:
95 1
			if self.cha in chn:
96 1
				self.cha = chn
97
98
99 1 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
100
		'''
101
		This function sets the channel to listen to. Allowed values are as follows:
102
103
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
104
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
105
		- ``"HDF"`` - pressure transducer channel
106
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
107
108
		:param cha: the channel to listen to
109
		:type cha: str
110
		'''
111 1
		cha = self.default_ch if (cha == 'all') else cha
112 1
		self.cha = cha if isinstance(cha, str) else cha[0]
113
114 1
		if self.cha in str(rs.chns):
115 1
			self._find_chn()
116
		else:
117
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
118
			sys.exit(2)
119
120
121 1
	def _print_filt(self):
122
		'''
123
		Prints stream filtering information.
124
		'''
125 1
		if self.filt == 'bandpass':
126 1
			printM('Alert stream will be %s filtered from %s to %s Hz'
127
					% (self.filt, self.freqmin, self.freqmax), self.sender)
128
		elif self.filt in ('lowpass', 'highpass'):
129
			modifier = 'below' if self.filt in 'lowpass' else 'above'
130
			printM('Alert stream will be %s filtered %s %s Hz'
131
					% (self.filt, modifier, self.freq), self.sender)
132
133
134 1
	def __init__(self, q, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
135
				 debug=True, cha='HZ', sound=False, deconv=False, testing=False,
136
				 *args, **kwargs):
137
		"""
138
		Initializing the alert thread with parameters to set up the recursive
139
		STA-LTA trigger, filtering, and the channel used for listening.
140
		"""
141 1
		super().__init__()
142 1
		self.sender = 'Alert'
143 1
		self.alive = True
144 1
		self.testing = testing
145
146 1
		self.queue = q
147
148 1
		self.default_ch = 'HZ'
149 1
		self.sta = sta
150 1
		self.lta = lta
151 1
		self.thresh = thresh
152 1
		self.reset = reset
153 1
		self.debug = debug
154 1
		self.args = args
155 1
		self.kwargs = kwargs
156 1
		self.raw = rs.Stream()
157 1
		self.stream = rs.Stream()
158
159 1
		self._set_channel(cha)
160
161 1
		self.sps = rs.sps
162 1
		self.inv = rs.inv
163 1
		self.stalta = np.ndarray(1)
164 1
		self.maxstalta = 0
165 1
		self.units = 'counts'
166
		
167 1
		self._set_deconv(deconv)
168
169 1
		self.exceed = False
170 1
		self.sound = sound
171
		
172 1
		self._set_filt(bp)
173 1
		self._print_filt()
174
175
176 1 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
177
		'''
178
		Reads data from the queue and updates the stream.
179
180
		:rtype: bool
181
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
182
		'''
183 1
		d = self.queue.get(True, timeout=None)
184 1
		self.queue.task_done()
185 1
		if self.cha in str(d):
186 1
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
187 1
			return True
188 1
		elif 'TERM' in str(d):
189 1
			self.alive = False
190 1
			printM('Exiting.', self.sender)
191 1
			sys.exit()
192
		else:
193 1
			return False
194
195
196 1
	def _deconvolve(self):
197
		'''
198
		Deconvolves the stream associated with this class.
199
		'''
200 1
		if self.deconv:
201
			helpers.deconvolve(self)
202
203
204 1
	def _subloop(self):
205
		'''
206
		Gets the queue and figures out whether or not the specified channel is in the packet.
207
		'''
208 1
		while True:
209 1
			if self.queue.qsize() > 0:
210 1
				self._getq()			# get recent packets
211
			else:
212 1
				if self._getq():		# is this the specified channel? if so break
213 1
					break
214
215
216 1
	def _filter(self):
217
		'''
218
		Filters the stream associated with this class.
219
		'''
220 1
		if self.filt:
221 1
			if self.filt in 'bandpass':
222 1
				self.stalta = recursive_sta_lta(
223
							self.stream[0].copy().filter(type=self.filt,
224
							freqmin=self.freqmin, freqmax=self.freqmax),
225
							int(self.sta * self.sps), int(self.lta * self.sps))
226
			else:
227
				self.stalta = recursive_sta_lta(
228
							self.stream[0].copy().filter(type=self.filt,
229
							freq=self.freq),
230
							int(self.sta * self.sps), int(self.lta * self.sps))
231
		else:
232
			self.stalta = recursive_sta_lta(self.stream[0],
233
					int(self.sta * self.sps), int(self.lta * self.sps))
234
235
236 1
	def _is_trigger(self):
237
		'''
238
		Figures out it there's a trigger active.
239
		'''
240 1
		if self.stalta.max() > self.thresh:
241 1
			if not self.exceed:
242
				# raise a flag that the Producer can read and modify 
243 1
				self.alarm = helpers.fsec(self.stream[0].stats.starttime + timedelta(seconds=
244
										trigger_onset(self.stalta, self.thresh,
245
										self.reset)[-1][0] * self.stream[0].stats.delta))
246 1
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
247 1
				print()
248 1
				printM('Trigger threshold of %s exceeded at %s'
249
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
250 1
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
251 1
				COLOR['current'] = COLOR['purple']
252 1
				if self.testing:
253 1
					TEST['c_alerton'][1] = True
254
			else:
255
				pass
256
257 1
			if self.stalta.max() > self.maxstalta:
258 1
				self.maxstalta = self.stalta.max()
259
		else:
260 1
			if self.exceed:
261 1
				if self.stalta[-1] < self.reset:
262 1
					self.alarm_reset = helpers.fsec(self.stream[0].stats.endtime)	# lazy; effective
263 1
					self.exceed = False
264 1
					print()
265 1
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
266
							self.sender)
267 1
					printM('Earthquake trigger reset and active again at %s' % (
268
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
269
							self.sender)
270 1
					self.maxstalta = 0
271 1
					COLOR['current'] = COLOR['green']
272 1
				if self.testing:
273 1
					TEST['c_alertoff'][1] = True
274
275
			else:
276 1
				pass
277
278
279 1
	def _print_stalta(self):
280
		'''
281
		Print the current max STA/LTA of the stream.
282
		'''
283
		if self.debug:
284
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
285
					(self.stream[0].stats.starttime + timedelta(seconds=
286
					 len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
287
					self.sender,
288
					self.thresh,
289
					round(np.max(self.stalta[-50:]), 4)
290
					)
291
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
292
293
294 1
	def run(self):
295
		"""
296
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
297
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
298
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
299
		The producer reads this flag and uses it to notify other consumers.
300
		"""
301 1
		n = 0
302
303 1
		wait_pkts = (self.lta) / (rs.tf / 1000)
304
305 1
		while n > 3:
306
			self.getq()
307
			n += 1
308
309 1
		n = 0
310 1
		while True:
311 1
			self._subloop()
312
313 1
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
314 1
			self.stream = self.raw.copy()
315 1
			self._deconvolve()
316
317 1
			if n > wait_pkts:
318
				# if the trigger is activated
319 1
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
320 1
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
321 1
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
322
323
				# filter
324 1
				self._filter()
325
				# figure out if the trigger has gone off
326 1
				self._is_trigger()
327
328
				# copy the stream (necessary to avoid memory leak)
329 1
				self.stream = rs.copy(self.stream)
330
331
				# print the current STA/LTA calculation
332 1
				self._print_stalta()
333
334 1
			elif n == 0:
335 1
				printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
336
					   % (self.sta, self.lta, self.thresh, self.cha), self.sender)
337 1
				printM('Earthquake trigger warmup time of %s seconds...'
338
					   % (self.lta), self.sender)
339 1
			elif n == wait_pkts:
340 1
				printM('Earthquake trigger up and running normally.',
341
					   self.sender)
342
			else:
343
				pass
344
345 1
			n += 1
346
			sys.stdout.flush()
347