Passed
Pull Request — master (#64)
by
unknown
06:49
created

build.rsudp.c_rsam.RSAM.run()   C

Complexity

Conditions 11

Size

Total Lines 60
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 11.0191

Importance

Changes 0
Metric Value
eloc 40
dl 0
loc 60
ccs 35
cts 37
cp 0.9459
rs 5.4
c 0
b 0
f 0
cc 11
nop 1
crap 11.0191

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.rsudp.c_rsam.RSAM.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import sys, os, time
2 1
import socket as s
3 1
from datetime import timedelta
4 1
import statistics
5 1
from rsudp import printM, printW, printE
6 1
from rsudp import helpers
7 1
import rsudp.raspberryshake as rs
8 1
from rsudp import COLOR
9 1
from rsudp.test import TEST
10
11
# set the terminal text color to green
12 1
COLOR['current'] = COLOR['green']
13
14 1
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.1
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
	If debugging is enabled and ``"quiet"`` is set to ``true``,
20
	RSAM is printed to the console every ``"interval"`` seconds,
21
	and optionally forwarded to an IP address and port specified by ``"fwaddr"`` and
22
	``"fwport"`` with packets formatted as either JSON, "lite", or CSV.
23
24
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
25
	:param bool quiet: ``True`` to suppress printing of RSAM analysis live to the console, ``False`` otherwise.
26
	:param float interval: window of time in seconds to apply RSAM analysis.
27
	:param str cha: listening channel (defaults to [S,E]HZ)
28
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
29
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
30
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
31
	:param str fwformat: Specify a format for the forwarded packet: ``'LITE'``, ``'JSON'``, or ``'CSV'``
32
	"""
33
34 1
	def __init__(self, q=False, interval=5, cha='HZ', deconv=False,
35
				 fwaddr=False, fwport=False, fwformat='LITE', quiet=False,
36
				 testing=False,
37
				 *args, **kwargs):
38
		"""
39
		Initializes the RSAM analysis thread.
40
		"""
41 1
		super().__init__()
42 1
		self.sender = 'RSAM'
43 1
		self.alive = True
44 1
		self.testing = testing
45 1
		self.quiet = quiet	# suppresses printing of transmission stats
46 1
		self.stn = rs.stn
47 1
		self.fwaddr = fwaddr
48 1
		self.fwport = fwport
49 1
		self.fwformat = fwformat.upper()
50 1
		self.sock = False
51 1
		self.interval = interval
52 1
		self.default_ch = 'HZ'
53 1
		self.args = args
54 1
		self.kwargs = kwargs
55 1
		self.raw = rs.Stream()
56 1
		self.stream = rs.Stream()
57 1
		self.units = 'counts'
58
59 1
		self._set_deconv(deconv)
60
61 1
		self._set_channel(cha)
62
63 1
		self.rsam = [1, 1, 1]
64
65 1
		if q:
66 1
			self.queue = q
67
		else:
68
			printE('no queue passed to the consumer thread! We will exit now!',
69
				   self.sender)
70
			sys.stdout.flush()
71
			self.alive = False
72
			sys.exit()
73
74 1
		printM('Starting.', self.sender)
75
76
77 1 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
78
		"""
79
		This function sets the deconvolution units. Allowed values are as follows:
80
81
		.. |ms2| replace:: m/s\ :sup:`2`\
82
83
		- ``'VEL'`` - velocity (m/s)
84
		- ``'ACC'`` - acceleration (|ms2|)
85
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
86
		- ``'DISP'`` - displacement (m)
87
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
88
89
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
90
		"""
91 1
		deconv = deconv.upper() if deconv else False
92 1
		self.deconv = deconv if (deconv in rs.UNITS) else False
93 1
		if self.deconv and rs.inv:
94
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
95
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
96
		else:
97 1
			self.units = rs.UNITS['CHAN'][1]
98 1
			self.deconv = False
99 1
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
100
101
102 1
	def _find_chn(self):
103
		"""
104
		Finds channel match in list of channels.
105
		"""
106 1
		for chn in rs.chns:
107 1
			if self.cha in chn:
108 1
				self.cha = chn
109
110
111 1 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
112
		"""
113
		This function sets the channel to listen to. Allowed values are as follows:
114
115
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
116
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
117
		- ``"HDF"`` - pressure transducer channel
118
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
119
120
		:param cha: the channel to listen to
121
		:type cha: str
122
		"""
123 1
		cha = self.default_ch if (cha == 'all') else cha
124 1
		self.cha = cha if isinstance(cha, str) else cha[0]
125
126 1
		if self.cha in str(rs.chns):
127 1
			self._find_chn()
128
		else:
129
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
130
			sys.exit(2)
131
132
133 1 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
134
		"""
135
		Reads data from the queue and updates the stream.
136
137
		:rtype: bool
138
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
139
		"""
140 1
		d = self.queue.get(True, timeout=None)
141 1
		self.queue.task_done()
142 1
		if self.cha in str(d):
143 1
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
144 1
			return True
145 1
		elif 'TERM' in str(d):
146 1
			self.alive = False
147 1
			printM('Exiting.', self.sender)
148 1
			sys.exit()
149
		else:
150 1
			return False
151
152
153 1
	def _deconvolve(self):
154
		"""
155
		Deconvolves the stream associated with this class.
156
		"""
157 1
		if self.deconv:
158
			helpers.deconvolve(self)
159
160
161 1
	def _subloop(self):
162
		"""
163
		Gets the queue and figures out whether or not the specified channel is in the packet.
164
		"""
165 1
		while True:
166 1
			if self.queue.qsize() > 0:
167 1
				self._getq()			# get recent packets
168
			else:
169 1
				if self._getq():		# is this the specified channel? if so break
170 1
					break
171
172
173 1
	def _rsam(self):
174
		"""
175
		Run the RSAM analysis
176
		"""
177 1
		arr = [abs(el) for el in self.stream[0].data]
178 1
		meanv = statistics.mean(arr)
179 1
		medianv = statistics.median(arr)
180 1
		minv = min(arr)
181 1
		maxv = max(arr)
182 1
		self.rsam = [meanv, medianv, minv, maxv]
183
184
185 1
	def _print_rsam(self):
186
		"""
187
		Print the current RSAM analysis
188
		"""
189 1
		if not self.quiet:
190 1
			msg = '%s Current RSAM: mean %s median %s min %s max %s' % (
191
				(self.stream[0].stats.starttime + timedelta(seconds=
192
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
193
				self.rsam[0],
194
				self.rsam[1],
195
				self.rsam[2],
196
				self.rsam[3]
197
			)
198 1
			printM(msg, self.sender)
199
200 1
	def _forward_rsam(self):
201
		"""
202
		Send the RSAM analysis via UDP to another destination in a lightweight format
203
		"""
204 1
		if self.sock:
205 1
			msg = 'stn:%s|ch:%s|mean:%s|med:%s|min:%s|max:%s' % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
206 1
			if self.fwformat is 'JSON':
207
				msg = '{"station":"%s","channel":"%s","mean":%s,"median":%s,"min":%s,"max":%s}' \
208
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
209 1
			elif self.fwformat is 'CSV':
210
				msg = '%s,%s,%s,%s,%s,%s' \
211
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
212 1
			packet = bytes(msg, 'utf-8')
213 1
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
214
215
216 1
	def run(self):
217
		"""
218
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
219
		Quits if it sees a ``TERM`` message.
220
		"""
221 1
		if self.fwaddr and self.fwport:
222 1
			printM('Opening socket...', sender=self.sender)
223
			
224
			# Set the socket type correctly for macOS compatibility
225 1
			socket_type = s.SOCK_DGRAM
226 1
			self.sock = s.socket(s.AF_INET, socket_type)
227
			
228
			# Set SO_REUSEADDR option separately if not on Windows
229 1
			if os.name != 'nt':
230 1
				self.sock.setsockopt(s.SOL_SOCKET, s.SO_REUSEADDR, 1)
231
232 1
		n = 0
233 1
		next_int = time.time() + self.interval
234
235 1
		wait_pkts = self.interval / (rs.tf / 1000)
236
237 1
		while n > 3:
238
			self.getq()
239
			n += 1
240
241 1
		n = 0
242 1
		while True:
243 1
			self._subloop()
244
245 1
			self.raw = rs.copy(self.raw)  # necessary to avoid memory leak
246 1
			self.stream = self.raw.copy()
247 1
			self._deconvolve()
248
249 1
			if n > wait_pkts:
250
				# if the trigger is activated
251 1
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)  # obspy time
252 1
				self.raw = self.raw.slice(starttime=obstart)  # slice the stream to the specified length (seconds variable)
253 1
				self.stream = self.stream.slice(starttime=obstart)  # slice the stream to the specified length (seconds variable)
254
255
				# run rsam analysis
256 1
				if time.time() > next_int:
257 1
					self._rsam()
258 1
					self.stream = rs.copy(self.stream)  # prevent mem leak
259 1
					self._forward_rsam()
260 1
					self._print_rsam()
261 1
					next_int = time.time() + self.interval
262
263 1
			elif n == 0:
264 1
				printM('Starting RSAM analysis with interval=%s on station=%s channel=%s forward=%s' %
265
					(self.interval, self.stn, self.cha, self.fwaddr),
266
					self.sender)
267 1
			elif n == wait_pkts:
268 1
				printM('RSAM analysis up and running normally.', self.sender)
269 1
				if self.testing:
270 1
					TEST['c_rsam'][1] = True
271
			else:
272
				pass
273
274
			n += 1
275
			sys.stdout.flush()