Passed
Pull Request — master (#6)
by
unknown
04:21
created

build.rsudp.c_rsam.RSAM._print_rsam()   A

Complexity

Conditions 2

Size

Total Lines 15
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 15
rs 9.85
c 0
b 0
f 0
cc 2
nop 1
1
import sys, os, time
2
import socket as s
3
from datetime import datetime, timedelta
4
import statistics
5
from rsudp import printM, printW, printE
6
from rsudp import helpers
7
import rsudp.raspberryshake as rs
8
COLOR = {}
9
from rsudp import COLOR
10
11
# set the terminal text color to green
12
COLOR['current'] = COLOR['green']
13
14
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.0
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
20
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
21
	:param bool debug: whether or not to display RSAM analysis live to the console.
22
	:param float interval: window of time in seconds to apply RSAM analysis.
23
	:param str cha: listening channel (defaults to [S,E]HZ)
24
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
25
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
26
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
27
	"""
28
29
	def __init__(self, q=False, debug=False, interval=5, cha='HZ', deconv=False, fwaddr=False, fwport=False, *args, **kwargs):
30
		"""
31
		Initializes the RSAM analysis thread.
32
		"""
33
		super().__init__()
34
		self.sender = 'RSAM'
35
		self.alive = True
36
		self.debug = debug
37
		self.fwaddr = fwaddr
38
		self.fwport = fwport
39
		self.sock = False
40
		self.interval = interval
41
		self.default_ch = 'HZ'
42
		self.args = args
43
		self.kwargs = kwargs
44
		self.raw = rs.Stream()
45
		self.stream = rs.Stream()
46
		self.units = 'counts'
47
48
		self._set_deconv(deconv)
49
50
		self._set_channel(cha)
51
52
		self.rsam = [1, 1, 1]
53
54
		if q:
55
			self.queue = q
56
		else:
57
			printE('no queue passed to the consumer thread! We will exit now!',
58
				   self.sender)
59
			sys.stdout.flush()
60
			self.alive = False
61
			sys.exit()
62
63
		printM('Starting.', self.sender)
64
65
66 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
67
		"""
68
		This function sets the deconvolution units. Allowed values are as follows:
69
70
		.. |ms2| replace:: m/s\ :sup:`2`\
71
72
		- ``'VEL'`` - velocity (m/s)
73
		- ``'ACC'`` - acceleration (|ms2|)
74
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
75
		- ``'DISP'`` - displacement (m)
76
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
77
78
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
79
		"""
80
		deconv = deconv.upper() if deconv else False
81
		self.deconv = deconv if (deconv in rs.UNITS) else False
82
		if self.deconv and rs.inv:
83
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
84
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
85
		else:
86
			self.units = rs.UNITS['CHAN'][1]
87
			self.deconv = False
88
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
89
90
91
	def _find_chn(self):
92
		"""
93
		Finds channel match in list of channels.
94
		"""
95
		for chn in rs.chns:
96
			if self.cha in chn:
97
				self.cha = chn
98
99
100 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
101
		"""
102
		This function sets the channel to listen to. Allowed values are as follows:
103
104
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
105
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
106
		- ``"HDF"`` - pressure transducer channel
107
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
108
109
		:param cha: the channel to listen to
110
		:type cha: str
111
		"""
112
		cha = self.default_ch if (cha == 'all') else cha
113
		self.cha = cha if isinstance(cha, str) else cha[0]
114
115
		if self.cha in str(rs.chns):
116
			self._find_chn()
117
		else:
118
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
119
			sys.exit(2)
120
121
122 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
123
		"""
124
		Reads data from the queue and updates the stream.
125
126
		:rtype: bool
127
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
128
		"""
129
		d = self.queue.get(True, timeout=None)
130
		self.queue.task_done()
131
		if self.cha in str(d):
132
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
133
			return True
134
		elif 'TERM' in str(d):
135
			self.alive = False
136
			printM('Exiting.', self.sender)
137
			sys.exit()
138
		else:
139
			return False
140
141
142
	def _deconvolve(self):
143
		"""
144
		Deconvolves the stream associated with this class.
145
		"""
146
		if self.deconv:
147
			helpers.deconvolve(self)
148
149
150
	def _subloop(self):
151
		"""
152
		Gets the queue and figures out whether or not the specified channel is in the packet.
153
		"""
154
		while True:
155
			if self.queue.qsize() > 0:
156
				self._getq()			# get recent packets
157
			else:
158
				if self._getq():		# is this the specified channel? if so break
159
					break
160
161
162
	def _rsam(self):
163
		"""
164
		Run the RSAM analysis
165
		"""
166
		arr = [abs(el) for el in self.stream[0].data]
167
		meanv = statistics.mean(arr)
168
		medianv = statistics.median(arr)
169
		minv = min(arr)
170
		maxv = max(arr)
171
		self.rsam = [meanv, medianv, minv, maxv]
172
173
174
	def _print_rsam(self):
175
		"""
176
		Print the current RSAM analysis
177
		"""
178
		if self.debug:
179
			msg = '%s [%s] Current RSAM: mean %s median %s min %s max %s' % (
180
				(self.stream[0].stats.starttime + timedelta(seconds=
181
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
182
				self.sender,
183
				self.rsam[0],
184
				self.rsam[1],
185
				self.rsam[2],
186
				self.rsam[3]
187
			)
188
			printM(msg, self.sender)
189
190
	def _forward_rsam(self):
191
		"""
192
		Send the RSAM analysis via UDP to another destination
193
		"""
194
		if self.sock:
195
			msg = '%s:%s,%s,%s,%s' % (self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
196
			packet = bytes(msg, 'utf-8')
197
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
198
199
200
	def run(self):
201
		"""
202
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
203
		Quits if it sees a ``TERM`` message.
204
		"""
205
		if self.fwaddr and self.fwport:
206
			printM('Opening socket...', sender=self.sender)
207
			socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
208
			self.sock = s.socket(s.AF_INET, socket_type)
209
210
		n = 0
211
		next_int = time.time() + self.interval
212
213
		wait_pkts = self.interval / (rs.tf / 1000)
214
215
		while n > 3:
216
			self.getq()
217
			n += 1
218
219
		n = 0
220
		while True:
221
			self._subloop()
222
223
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
224
			self.stream = self.raw.copy()
225
			self._deconvolve()
226
227
			if n > wait_pkts:
228
				# if the trigger is activated
229
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)	# obspy time
230
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
231
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
232
233
				# run rsam analysis
234
				if time.time() > next_int:
235
					self._rsam()
236
					self.stream = rs.copy(self.stream)  # prevent mem leak
237
					self._forward_rsam()
238
					self._print_rsam()
239
					next_int = time.time() + self.interval
240
241
			elif n == 0:
242
				printM('Starting RSAM analysis with interval=%s on channel=%s' % (self.interval, self.cha), self.sender)
243
			elif n == wait_pkts:
244
				printM('RSAM analysis up and running normally.', self.sender)
245
			else:
246
				pass
247
248
			n += 1
249
			sys.stdout.flush()
250