Passed
Push — master ( 0f43bb...103de2 )
by Ian
04:18
created

build.rsudp.c_alert.Alert._subloop()   A

Complexity

Conditions 4

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 6
nop 1
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters.
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
	def __init__(self, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
38
				 debug=True, cha='HZ', q=False, sound=False, deconv=False,
39
				 *args, **kwargs):
40
		
41
		"""
42
		Initializing the alert thread with parameters to set up the recursive
43
		STA-LTA trigger, filtering, and the channel used for listening.
44
45
		"""
46
		super().__init__()
47
		self.sender = 'Alert'
48
		self.alive = True
49
50
		if q:
51
			self.queue = q
52
		else:
53
			printE('no queue passed to consumer! Thread will exit now!', self.sender)
54
			sys.stdout.flush()
55
			sys.exit()
56
57
		self.default_ch = 'HZ'
58
		self.sta = sta
59
		self.lta = lta
60
		self.thresh = thresh
61
		self.reset = reset
62
		self.debug = debug
63
		self.args = args
64
		self.kwargs = kwargs
65
		self.raw = rs.Stream()
66
		self.stream = rs.Stream()
67
		cha = self.default_ch if (cha == 'all') else cha
68
		self.cha = cha if isinstance(cha, str) else cha[0]
69
		self.sps = rs.sps
70
		self.inv = rs.inv
71
		self.stalta = np.ndarray(1)
72
		self.maxstalta = 0
73
		self.units = 'counts'
74
		
75
		deconv = deconv.upper() if deconv else False
76
		self.deconv = self.deconv if (deconv in rs.UNITS) else False
77
		if self.deconv and rs.inv:
78
			self.units = '%s (%s)' % (rs.UNITS[0], rs.UNITS[1]) if (self.deconv in rs.UNITS) else self.units
79
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
80
		else:
81
			self.units = rs.UNITS['CHAN'][1]
82
			self.deconv = False
83
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
84
85
		self.exceed = False
86
		self.sound = sound
87
		if bp:
88
			self.freqmin = bp[0]
89
			self.freqmax = bp[1]
90
			self.freq = 0
91
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
92
				self.filt = False
93
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
94
				self.filt = 'highpass'
95
				self.freq = bp[0]
96
				desc = 'low corner %s' % (bp[0])
97
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
98
				self.filt = 'lowpass'
99
				self.freq = bp[1]
100
			else:
101
				self.filt = 'bandpass'
102
		else:
103
			self.filt = False
104
105
		if self.cha not in str(rs.chns):
106
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
107
			sys.exit(2)
108
109
		listen_ch = '?%s' % self.cha
110
		printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
111
				% (self.sta, self.lta, self.thresh, listen_ch), self.sender)
112
		if self.filt == 'bandpass':
113
			printM('Alert stream will be %s filtered from %s to %s Hz'
114
					% (self.filt, self.freqmin, self.freqmax), self.sender)
115
		elif self.filt in ('lowpass', 'highpass'):
116
			modifier = 'below' if self.filt in 'lowpass' else 'above'
117
			printM('Alert stream will be %s filtered %s %s Hz'
118
					% (self.filt, modifier, self.freq), self.sender)
119
120
121
	def _getq(self):
122
		'''
123
		Reads data from the queue and updates the stream.
124
125
		:rtype: bool
126
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
127
		'''
128
		d = self.queue.get(True, timeout=None)
129
		self.queue.task_done()
130
		if self.cha in str(d):
131
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
132
			return True
133
		elif 'TERM' in str(d):
134
			self.alive = False
135
			printM('Exiting.', self.sender)
136
			sys.exit()
137
		else:
138
			return False
139
140
141
	def _deconvolve(self):
142
		'''
143
		Deconvolves the stream associated with this class.
144
		'''
145
		if self.deconv:
146
			rs.deconvolve(self)
147
148
149
	def _subloop(self):
150
		'''
151
		Gets the queue and figures out whether or not the specified channel is in the packet.
152
		'''
153
		while True:
154
			if self.queue.qsize() > 0:
155
				self._getq()			# get recent packets
156
			else:
157
				if self._getq():		# is this the specified channel? if so break
158
					break
159
160
161
	def _filter(self):
162
		'''
163
		Filters the stream associated with this class.
164
		'''
165
		if self.filt:
166
			if self.filt in 'bandpass':
167
				self.stalta = recursive_sta_lta(
168
							self.stream[0].copy().filter(type=self.filt,
169
							freqmin=self.freqmin, freqmax=self.freqmax),
170
							int(self.sta * self.sps), int(self.lta * self.sps))
171
			else:
172
				self.stalta = recursive_sta_lta(
173
							self.stream[0].copy().filter(type=self.filt,
174
							freq=self.freq),
175
							int(self.sta * self.sps), int(self.lta * self.sps))
176
		else:
177
			self.stalta = recursive_sta_lta(self.stream[0],
178
					int(self.sta * self.sps), int(self.lta * self.sps))
179
180
181
	def _is_trigger(self):
182
		'''
183
		Figures out it there's a trigger active.
184
		'''
185
		if self.stalta.max() > self.thresh:
186
			if not self.exceed:
187
				# raise a flag that the Producer can read and modify 
188
				self.alarm = rs.fsec(self.stream[0].stats.starttime + timedelta(seconds=
189
										trigger_onset(self.stalta, self.thresh,
190
										self.reset)[-1][0] * self.stream[0].stats.delta))
191
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
192
				print()
193
				printM('Trigger threshold of %s exceeded at %s'
194
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
195
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
196
				COLOR['current'] = COLOR['purple']
197
			else:
198
				pass
199
200
			if self.stalta.max() > self.maxstalta:
201
				self.maxstalta = self.stalta.max()
202
		else:
203
			if self.exceed:
204
				if self.stalta[-1] < self.reset:
205
					self.alarm_reset = rs.fsec(self.stream[0].stats.endtime)	# lazy; effective
206
					self.exceed = False
207
					print()
208
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
209
							self.sender)
210
					printM('Earthquake trigger reset and active again at %s' % (
211
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
212
							self.sender)
213
					self.maxstalta = 0
214
					COLOR['current'] = COLOR['green']
215
			else:
216
				pass
217
218
219
	def _print_stalta(self):
220
		'''
221
		Print the current max STA/LTA of the stream.
222
		'''
223
		if self.debug:
224
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
225
					(self.stream[0].stats.starttime + timedelta(seconds=
226
					len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
227
					self.sender,
228
					self.thresh,
229
					round(np.max(self.stalta[-50:]), 4)
230
					)
231
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
232
233
234
	def run(self):
235
		"""
236
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
237
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
238
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
239
		The producer reads this flag and uses it to notify other consumers.
240
		"""
241
		n = 0
242
243
		wait_pkts = (self.lta) / (rs.tf / 1000)
244
245
		while n > 3:
246
			self.getq()
247
			n += 1
248
249
		n = 0
250
		while True:
251
			self._subloop()
252
253
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
254
			self.stream = self.raw.copy()
255
			self._deconvolve()
256
257
			if n > wait_pkts:
258
				# if the trigger is activated
259
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
260
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
261
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
262
263
				# filter
264
				self._filter()
265
				# figure out if the trigger has gone off
266
				self._is_trigger()
267
268
				# copy the stream (necessary to avoid memory leak)
269
				self.stream = rs.copy(self.stream)
270
271
				# print the current STA/LTA calculation
272
				self._print_stalta()
273
274
			elif n == 0:
275
				printM('Listening to channel %s'
276
						% (self.stream[0].stats.channel), self.sender)
277
				printM('Earthquake trigger warmup time of %s seconds...'
278
						% (self.lta), self.sender)
279
			elif n == wait_pkts:
280
				printM('Earthquake trigger up and running normally.',
281
						self.sender)
282
			else:
283
				pass
284
285
			n += 1
286
			sys.stdout.flush()
287