Passed
Pull Request — master (#6)
by
unknown
04:17
created

build.rsudp.c_rsam.RSAM._forward_rsam()   A

Complexity

Conditions 4

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 14
rs 9.85
c 0
b 0
f 0
cc 4
nop 1
1
import sys, os, time
2
import socket as s
3
from datetime import datetime, timedelta
4
import statistics
5
from rsudp import printM, printW, printE
6
from rsudp import helpers
7
import rsudp.raspberryshake as rs
8
COLOR = {}
9
from rsudp import COLOR
10
11
# set the terminal text color to green
12
COLOR['current'] = COLOR['green']
13
14
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.0
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
20
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
21
	:param bool debug: whether or not to display RSAM analysis live to the console.
22
	:param float interval: window of time in seconds to apply RSAM analysis.
23
	:param str cha: listening channel (defaults to [S,E]HZ)
24
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
25
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
26
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
27
	:param str fwformat: ``'LITE'``, ``'JSON'``, or ``'CSV'``
28
	"""
29
30
	def __init__(self, q=False, debug=False, interval=5, cha='HZ', deconv=False,
31
				 fwaddr=False, fwport=False, fwformat='LITE', *args, **kwargs):
32
		"""
33
		Initializes the RSAM analysis thread.
34
		"""
35
		super().__init__()
36
		self.sender = 'RSAM'
37
		self.alive = True
38
		self.debug = debug
39
		self.fwaddr = fwaddr
40
		self.fwport = fwport
41
		self.fwformat = fwformat.upper()
42
		self.sock = False
43
		self.interval = interval
44
		self.default_ch = 'HZ'
45
		self.args = args
46
		self.kwargs = kwargs
47
		self.raw = rs.Stream()
48
		self.stream = rs.Stream()
49
		self.units = 'counts'
50
51
		self._set_deconv(deconv)
52
53
		self._set_channel(cha)
54
55
		self.rsam = [1, 1, 1]
56
57
		if q:
58
			self.queue = q
59
		else:
60
			printE('no queue passed to the consumer thread! We will exit now!',
61
				   self.sender)
62
			sys.stdout.flush()
63
			self.alive = False
64
			sys.exit()
65
66
		printM('Starting.', self.sender)
67
68
69 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70
		"""
71
		This function sets the deconvolution units. Allowed values are as follows:
72
73
		.. |ms2| replace:: m/s\ :sup:`2`\
74
75
		- ``'VEL'`` - velocity (m/s)
76
		- ``'ACC'`` - acceleration (|ms2|)
77
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
78
		- ``'DISP'`` - displacement (m)
79
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
80
81
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
82
		"""
83
		deconv = deconv.upper() if deconv else False
84
		self.deconv = deconv if (deconv in rs.UNITS) else False
85
		if self.deconv and rs.inv:
86
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
87
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
88
		else:
89
			self.units = rs.UNITS['CHAN'][1]
90
			self.deconv = False
91
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
92
93
94
	def _find_chn(self):
95
		"""
96
		Finds channel match in list of channels.
97
		"""
98
		for chn in rs.chns:
99
			if self.cha in chn:
100
				self.cha = chn
101
102
103 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
104
		"""
105
		This function sets the channel to listen to. Allowed values are as follows:
106
107
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
108
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
109
		- ``"HDF"`` - pressure transducer channel
110
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
111
112
		:param cha: the channel to listen to
113
		:type cha: str
114
		"""
115
		cha = self.default_ch if (cha == 'all') else cha
116
		self.cha = cha if isinstance(cha, str) else cha[0]
117
118
		if self.cha in str(rs.chns):
119
			self._find_chn()
120
		else:
121
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
122
			sys.exit(2)
123
124
125 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
126
		"""
127
		Reads data from the queue and updates the stream.
128
129
		:rtype: bool
130
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
131
		"""
132
		d = self.queue.get(True, timeout=None)
133
		self.queue.task_done()
134
		if self.cha in str(d):
135
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
136
			return True
137
		elif 'TERM' in str(d):
138
			self.alive = False
139
			printM('Exiting.', self.sender)
140
			sys.exit()
141
		else:
142
			return False
143
144
145
	def _deconvolve(self):
146
		"""
147
		Deconvolves the stream associated with this class.
148
		"""
149
		if self.deconv:
150
			helpers.deconvolve(self)
151
152
153
	def _subloop(self):
154
		"""
155
		Gets the queue and figures out whether or not the specified channel is in the packet.
156
		"""
157
		while True:
158
			if self.queue.qsize() > 0:
159
				self._getq()			# get recent packets
160
			else:
161
				if self._getq():		# is this the specified channel? if so break
162
					break
163
164
165
	def _rsam(self):
166
		"""
167
		Run the RSAM analysis
168
		"""
169
		arr = [abs(el) for el in self.stream[0].data]
170
		meanv = statistics.mean(arr)
171
		medianv = statistics.median(arr)
172
		minv = min(arr)
173
		maxv = max(arr)
174
		self.rsam = [meanv, medianv, minv, maxv]
175
176
177
	def _print_rsam(self):
178
		"""
179
		Print the current RSAM analysis
180
		"""
181
		if self.debug:
182
			msg = '%s [%s] Current RSAM: mean %s median %s min %s max %s' % (
183
				(self.stream[0].stats.starttime + timedelta(seconds=
184
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
185
				self.sender,
186
				self.rsam[0],
187
				self.rsam[1],
188
				self.rsam[2],
189
				self.rsam[3]
190
			)
191
			printM(msg, self.sender)
192
193
	def _forward_rsam(self):
194
		"""
195
		Send the RSAM analysis via UDP to another destination in a lightweight format
196
		"""
197
		if self.sock:
198
			msg = 'ch:%s|mean:%s|med:%s|min:%s|max:%s' % (self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
199
			if self.fwformat is 'JSON':
200
				msg = '{"channel":"%s","mean":%s,"median":%s,"min":%s,"max":%s}' \
201
					  % (self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
202
			elif self.fwformat is 'CSV':
203
				msg = '%s,%s,%s,%s,%s' \
204
					  % (self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
205
			packet = bytes(msg, 'utf-8')
206
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
207
208
209
	def run(self):
210
		"""
211
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
212
		Quits if it sees a ``TERM`` message.
213
		"""
214
		if self.fwaddr and self.fwport:
215
			printM('Opening socket...', sender=self.sender)
216
			socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
217
			self.sock = s.socket(s.AF_INET, socket_type)
218
219
		n = 0
220
		next_int = time.time() + self.interval
221
222
		wait_pkts = self.interval / (rs.tf / 1000)
223
224
		while n > 3:
225
			self.getq()
226
			n += 1
227
228
		n = 0
229
		while True:
230
			self._subloop()
231
232
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
233
			self.stream = self.raw.copy()
234
			self._deconvolve()
235
236
			if n > wait_pkts:
237
				# if the trigger is activated
238
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)	# obspy time
239
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
240
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
241
242
				# run rsam analysis
243
				if time.time() > next_int:
244
					self._rsam()
245
					self.stream = rs.copy(self.stream)  # prevent mem leak
246
					self._forward_rsam()
247
					self._print_rsam()
248
					next_int = time.time() + self.interval
249
250
			elif n == 0:
251
				printM('Starting RSAM analysis with interval=%s on channel=%s' % (self.interval, self.cha), self.sender)
252
			elif n == wait_pkts:
253
				printM('RSAM analysis up and running normally.', self.sender)
254
			else:
255
				pass
256
257
			n += 1
258
			sys.stdout.flush()
259