Passed
Push — master ( 0f43bb...103de2 )
by Ian
04:18
created

build.rsudp.c_alert.Alert.__init__()   F

Complexity

Conditions 20

Size

Total Lines 82
Code Lines 66

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 20
eloc 66
nop 13
dl 0
loc 82
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.rsudp.c_alert.Alert.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import sys, os
2
from datetime import datetime, timedelta
3
import rsudp.raspberryshake as rs
4
from obspy.signal.trigger import recursive_sta_lta, trigger_onset
5
from rsudp import printM, printW, printE
6
COLOR = {}
7
from rsudp import COLOR
8
import numpy as np
9
10
# set the terminal text color to green
11
COLOR['current'] = COLOR['green']
12
13
14
class Alert(rs.ConsumerThread):
15
	"""
16
	A data consumer class that listens to a specific incoming data channel
17
	and calculates a recursive STA/LTA (short term average over long term 
18
	average). If a threshold of STA/LTA ratio is exceeded, the class
19
	sets the :py:data:`alarm` flag to the alarm time as a
20
	:py:class:`obspy.core.utcdatetime.UTCDateTime` object.
21
	The :py:class:`rsudp.p_producer.Producer` will see this flag
22
	and send an :code:`ALARM` message to the queues with the time set here.
23
	Likewise, when the :py:data:`alarm_reset` flag is set with a
24
	:py:class:`obspy.core.utcdatetime.UTCDateTime`,
25
	the Producer will send a :code:`RESET` message to the queues.
26
27
	:param float sta: short term average (STA) duration in seconds.
28
	:param float lta: long term average (LTA) duration in seconds.
29
	:param float thresh: threshold for STA/LTA trigger.
30
	:type bp: :py:class:`bool` or :py:class:`list`
31
	:param bp: bandpass filter parameters.
32
	:param bool debug: whether or not to display max STA/LTA calculation live to the console.
33
	:param str cha: listening channel (defaults to [S,E]HZ)
34
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
35
36
	"""
37
	def __init__(self, sta=5, lta=30, thresh=1.6, reset=1.55, bp=False,
38
				 debug=True, cha='HZ', q=False, sound=False, deconv=False,
39
				 *args, **kwargs):
40
		
41
		"""
42
		Initializing the alert thread with parameters to set up the recursive
43
		STA-LTA trigger, filtering, and the channel used for listening.
44
45
		"""
46
		super().__init__()
47
		self.sender = 'Alert'
48
		self.alive = True
49
50
		if q:
51
			self.queue = q
52
		else:
53
			printE('no queue passed to consumer! Thread will exit now!', self.sender)
54
			sys.stdout.flush()
55
			sys.exit()
56
57
		self.default_ch = 'HZ'
58
		self.sta = sta
59
		self.lta = lta
60
		self.thresh = thresh
61
		self.reset = reset
62
		self.debug = debug
63
		self.args = args
64
		self.kwargs = kwargs
65
		self.raw = rs.Stream()
66
		self.stream = rs.Stream()
67
		cha = self.default_ch if (cha == 'all') else cha
68
		self.cha = cha if isinstance(cha, str) else cha[0]
69
		self.sps = rs.sps
70
		self.inv = rs.inv
71
		self.stalta = np.ndarray(1)
72
		self.maxstalta = 0
73
		self.units = 'counts'
74
		
75
		deconv = deconv.upper() if deconv else False
76
		self.deconv = self.deconv if (deconv in rs.UNITS) else False
77
		if self.deconv and rs.inv:
78
			self.units = '%s (%s)' % (rs.UNITS[0], rs.UNITS[1]) if (self.deconv in rs.UNITS) else self.units
79
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
80
		else:
81
			self.units = rs.UNITS['CHAN'][1]
82
			self.deconv = False
83
		printM('Alert stream units are %s' % (self.units.strip(' ').lower()), self.sender)
84
85
		self.exceed = False
86
		self.sound = sound
87
		if bp:
88
			self.freqmin = bp[0]
89
			self.freqmax = bp[1]
90
			self.freq = 0
91
			if (bp[0] <= 0) and (bp[1] >= (self.sps/2)):
92
				self.filt = False
93
			elif (bp[0] > 0) and (bp[1] >= (self.sps/2)):
94
				self.filt = 'highpass'
95
				self.freq = bp[0]
96
				desc = 'low corner %s' % (bp[0])
97
			elif (bp[0] <= 0) and (bp[1] <= (self.sps/2)):
98
				self.filt = 'lowpass'
99
				self.freq = bp[1]
100
			else:
101
				self.filt = 'bandpass'
102
		else:
103
			self.filt = False
104
105
		if self.cha not in str(rs.chns):
106
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
107
			sys.exit(2)
108
109
		listen_ch = '?%s' % self.cha
110
		printM('Starting Alert trigger with sta=%ss, lta=%ss, and threshold=%s on channel=%s'
111
				% (self.sta, self.lta, self.thresh, listen_ch), self.sender)
112
		if self.filt == 'bandpass':
113
			printM('Alert stream will be %s filtered from %s to %s Hz'
114
					% (self.filt, self.freqmin, self.freqmax), self.sender)
115
		elif self.filt in ('lowpass', 'highpass'):
116
			modifier = 'below' if self.filt in 'lowpass' else 'above'
117
			printM('Alert stream will be %s filtered %s %s Hz'
118
					% (self.filt, modifier, self.freq), self.sender)
119
120
121
	def _getq(self):
122
		'''
123
		Reads data from the queue and updates the stream.
124
125
		:rtype: bool
126
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
127
		'''
128
		d = self.queue.get(True, timeout=None)
129
		self.queue.task_done()
130
		if self.cha in str(d):
131
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
132
			return True
133
		elif 'TERM' in str(d):
134
			self.alive = False
135
			printM('Exiting.', self.sender)
136
			sys.exit()
137
		else:
138
			return False
139
140
141
	def _deconvolve(self):
142
		'''
143
		Deconvolves the stream associated with this class.
144
		'''
145
		if self.deconv:
146
			rs.deconvolve(self)
147
148
149
	def _subloop(self):
150
		'''
151
		Gets the queue and figures out whether or not the specified channel is in the packet.
152
		'''
153
		while True:
154
			if self.queue.qsize() > 0:
155
				self._getq()			# get recent packets
156
			else:
157
				if self._getq():		# is this the specified channel? if so break
158
					break
159
160
161
	def _filter(self):
162
		'''
163
		Filters the stream associated with this class.
164
		'''
165
		if self.filt:
166
			if self.filt in 'bandpass':
167
				self.stalta = recursive_sta_lta(
168
							self.stream[0].copy().filter(type=self.filt,
169
							freqmin=self.freqmin, freqmax=self.freqmax),
170
							int(self.sta * self.sps), int(self.lta * self.sps))
171
			else:
172
				self.stalta = recursive_sta_lta(
173
							self.stream[0].copy().filter(type=self.filt,
174
							freq=self.freq),
175
							int(self.sta * self.sps), int(self.lta * self.sps))
176
		else:
177
			self.stalta = recursive_sta_lta(self.stream[0],
178
					int(self.sta * self.sps), int(self.lta * self.sps))
179
180
181
	def _is_trigger(self):
182
		'''
183
		Figures out it there's a trigger active.
184
		'''
185
		if self.stalta.max() > self.thresh:
186
			if not self.exceed:
187
				# raise a flag that the Producer can read and modify 
188
				self.alarm = rs.fsec(self.stream[0].stats.starttime + timedelta(seconds=
189
										trigger_onset(self.stalta, self.thresh,
190
										self.reset)[-1][0] * self.stream[0].stats.delta))
191
				self.exceed = True	# the state machine; this one should not be touched from the outside, otherwise bad things will happen
192
				print()
193
				printM('Trigger threshold of %s exceeded at %s'
194
						% (self.thresh, self.alarm.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]), self.sender)
195
				printM('Trigger will reset when STA/LTA goes below %s...' % self.reset, sender=self.sender)
196
				COLOR['current'] = COLOR['purple']
197
			else:
198
				pass
199
200
			if self.stalta.max() > self.maxstalta:
201
				self.maxstalta = self.stalta.max()
202
		else:
203
			if self.exceed:
204
				if self.stalta[-1] < self.reset:
205
					self.alarm_reset = rs.fsec(self.stream[0].stats.endtime)	# lazy; effective
206
					self.exceed = False
207
					print()
208
					printM('Max STA/LTA ratio reached in alarm state: %s' % (round(self.maxstalta, 3)),
209
							self.sender)
210
					printM('Earthquake trigger reset and active again at %s' % (
211
							self.alarm_reset.strftime('%Y-%m-%d %H:%M:%S.%f')[:22]),
212
							self.sender)
213
					self.maxstalta = 0
214
					COLOR['current'] = COLOR['green']
215
			else:
216
				pass
217
218
219
	def _print_stalta(self):
220
		'''
221
		Print the current max STA/LTA of the stream.
222
		'''
223
		if self.debug:
224
			msg = '\r%s [%s] Threshold: %s; Current max STA/LTA: %.4f' % (
225
					(self.stream[0].stats.starttime + timedelta(seconds=
226
					len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
227
					self.sender,
228
					self.thresh,
229
					round(np.max(self.stalta[-50:]), 4)
230
					)
231
			print(COLOR['current'] + COLOR['bold'] + msg + COLOR['white'], end='', flush=True)
232
233
234
	def run(self):
235
		"""
236
		Reads data from the queue into a :class:`obspy.core.stream.Stream` object,
237
		then runs a :func:`obspy.signal.trigger.recursive_sta_lta` function to
238
		determine whether to raise an alert flag (:py:data:`rsudp.c_alert.Alert.alarm`).
239
		The producer reads this flag and uses it to notify other consumers.
240
		"""
241
		n = 0
242
243
		wait_pkts = (self.lta) / (rs.tf / 1000)
244
245
		while n > 3:
246
			self.getq()
247
			n += 1
248
249
		n = 0
250
		while True:
251
			self._subloop()
252
253
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
254
			self.stream = self.raw.copy()
255
			self._deconvolve()
256
257
			if n > wait_pkts:
258
				# if the trigger is activated
259
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.lta)	# obspy time
260
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
261
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
262
263
				# filter
264
				self._filter()
265
				# figure out if the trigger has gone off
266
				self._is_trigger()
267
268
				# copy the stream (necessary to avoid memory leak)
269
				self.stream = rs.copy(self.stream)
270
271
				# print the current STA/LTA calculation
272
				self._print_stalta()
273
274
			elif n == 0:
275
				printM('Listening to channel %s'
276
						% (self.stream[0].stats.channel), self.sender)
277
				printM('Earthquake trigger warmup time of %s seconds...'
278
						% (self.lta), self.sender)
279
			elif n == wait_pkts:
280
				printM('Earthquake trigger up and running normally.',
281
						self.sender)
282
			else:
283
				pass
284
285
			n += 1
286
			sys.stdout.flush()
287