Passed
Push — master ( 71a4eb...3a5387 )
by Ian
03:45 queued 12s
created

build.rsudp.c_rsam.RSAM._deconvolve()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
import sys, os, time
2
import socket as s
3
from datetime import datetime, timedelta
4
import statistics
5
from rsudp import printM, printW, printE
6
from rsudp import helpers
7
import rsudp.raspberryshake as rs
8
COLOR = {}
9
from rsudp import COLOR
10
11
# set the terminal text color to green
12
COLOR['current'] = COLOR['green']
13
14
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.0
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
20
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
21
	:param bool debug: whether or not to display RSAM analysis live to the console.
22
	:param float interval: window of time in seconds to apply RSAM analysis.
23
	:param str cha: listening channel (defaults to [S,E]HZ)
24
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
25
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
26
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
27
	:param str fwformat: ``'LITE'``, ``'JSON'``, or ``'CSV'``
28
	"""
29
30
	def __init__(self, q=False, debug=False, interval=5, cha='HZ', deconv=False,
31
				 fwaddr=False, fwport=False, fwformat='LITE', *args, **kwargs):
32
		"""
33
		Initializes the RSAM analysis thread.
34
		"""
35
		super().__init__()
36
		self.sender = 'RSAM'
37
		self.alive = True
38
		self.debug = debug
39
		self.stn = rs.stn
40
		self.fwaddr = fwaddr
41
		self.fwport = fwport
42
		self.fwformat = fwformat.upper()
43
		self.sock = False
44
		self.interval = interval
45
		self.default_ch = 'HZ'
46
		self.args = args
47
		self.kwargs = kwargs
48
		self.raw = rs.Stream()
49
		self.stream = rs.Stream()
50
		self.units = 'counts'
51
52
		self._set_deconv(deconv)
53
54
		self._set_channel(cha)
55
56
		self.rsam = [1, 1, 1]
57
58
		if q:
59
			self.queue = q
60
		else:
61
			printE('no queue passed to the consumer thread! We will exit now!',
62
				   self.sender)
63
			sys.stdout.flush()
64
			self.alive = False
65
			sys.exit()
66
67
		printM('Starting.', self.sender)
68
69
70 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
71
		"""
72
		This function sets the deconvolution units. Allowed values are as follows:
73
74
		.. |ms2| replace:: m/s\ :sup:`2`\
75
76
		- ``'VEL'`` - velocity (m/s)
77
		- ``'ACC'`` - acceleration (|ms2|)
78
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
79
		- ``'DISP'`` - displacement (m)
80
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
81
82
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
83
		"""
84
		deconv = deconv.upper() if deconv else False
85
		self.deconv = deconv if (deconv in rs.UNITS) else False
86
		if self.deconv and rs.inv:
87
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
88
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
89
		else:
90
			self.units = rs.UNITS['CHAN'][1]
91
			self.deconv = False
92
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
93
94
95
	def _find_chn(self):
96
		"""
97
		Finds channel match in list of channels.
98
		"""
99
		for chn in rs.chns:
100
			if self.cha in chn:
101
				self.cha = chn
102
103
104 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
105
		"""
106
		This function sets the channel to listen to. Allowed values are as follows:
107
108
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
109
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
110
		- ``"HDF"`` - pressure transducer channel
111
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
112
113
		:param cha: the channel to listen to
114
		:type cha: str
115
		"""
116
		cha = self.default_ch if (cha == 'all') else cha
117
		self.cha = cha if isinstance(cha, str) else cha[0]
118
119
		if self.cha in str(rs.chns):
120
			self._find_chn()
121
		else:
122
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
123
			sys.exit(2)
124
125
126 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
127
		"""
128
		Reads data from the queue and updates the stream.
129
130
		:rtype: bool
131
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
132
		"""
133
		d = self.queue.get(True, timeout=None)
134
		self.queue.task_done()
135
		if self.cha in str(d):
136
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
137
			return True
138
		elif 'TERM' in str(d):
139
			self.alive = False
140
			printM('Exiting.', self.sender)
141
			sys.exit()
142
		else:
143
			return False
144
145
146
	def _deconvolve(self):
147
		"""
148
		Deconvolves the stream associated with this class.
149
		"""
150
		if self.deconv:
151
			helpers.deconvolve(self)
152
153
154
	def _subloop(self):
155
		"""
156
		Gets the queue and figures out whether or not the specified channel is in the packet.
157
		"""
158
		while True:
159
			if self.queue.qsize() > 0:
160
				self._getq()			# get recent packets
161
			else:
162
				if self._getq():		# is this the specified channel? if so break
163
					break
164
165
166
	def _rsam(self):
167
		"""
168
		Run the RSAM analysis
169
		"""
170
		arr = [abs(el) for el in self.stream[0].data]
171
		meanv = statistics.mean(arr)
172
		medianv = statistics.median(arr)
173
		minv = min(arr)
174
		maxv = max(arr)
175
		self.rsam = [meanv, medianv, minv, maxv]
176
177
178
	def _print_rsam(self):
179
		"""
180
		Print the current RSAM analysis
181
		"""
182
		if self.debug:
183
			msg = '%s [%s] Current RSAM: mean %s median %s min %s max %s' % (
184
				(self.stream[0].stats.starttime + timedelta(seconds=
185
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
186
				self.sender,
187
				self.rsam[0],
188
				self.rsam[1],
189
				self.rsam[2],
190
				self.rsam[3]
191
			)
192
			printM(msg, self.sender)
193
194
	def _forward_rsam(self):
195
		"""
196
		Send the RSAM analysis via UDP to another destination in a lightweight format
197
		"""
198
		if self.sock:
199
			msg = 'stn:%s|ch:%s|mean:%s|med:%s|min:%s|max:%s' % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
200
			if self.fwformat is 'JSON':
201
				msg = '{"station":"%s","channel":"%s","mean":%s,"median":%s,"min":%s,"max":%s}' \
202
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
203
			elif self.fwformat is 'CSV':
204
				msg = '%s,%s,%s,%s,%s,%s' \
205
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
206
			packet = bytes(msg, 'utf-8')
207
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
208
209
210
	def run(self):
211
		"""
212
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
213
		Quits if it sees a ``TERM`` message.
214
		"""
215
		if self.fwaddr and self.fwport:
216
			printM('Opening socket...', sender=self.sender)
217
			socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
218
			self.sock = s.socket(s.AF_INET, socket_type)
219
220
		n = 0
221
		next_int = time.time() + self.interval
222
223
		wait_pkts = self.interval / (rs.tf / 1000)
224
225
		while n > 3:
226
			self.getq()
227
			n += 1
228
229
		n = 0
230
		while True:
231
			self._subloop()
232
233
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
234
			self.stream = self.raw.copy()
235
			self._deconvolve()
236
237
			if n > wait_pkts:
238
				# if the trigger is activated
239
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)	# obspy time
240
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
241
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
242
243
				# run rsam analysis
244
				if time.time() > next_int:
245
					self._rsam()
246
					self.stream = rs.copy(self.stream)  # prevent mem leak
247
					self._forward_rsam()
248
					self._print_rsam()
249
					next_int = time.time() + self.interval
250
251
			elif n == 0:
252
				printM('Starting RSAM analysis with interval=%s on station=%s channel=%s forward=%s' %
253
					   (self.interval, self.stn, self.cha, self.fwaddr),
254
					   self.sender)
255
			elif n == wait_pkts:
256
				printM('RSAM analysis up and running normally.', self.sender)
257
			else:
258
				pass
259
260
			n += 1
261
			sys.stdout.flush()
262