Test Failed
Push — coverage ( 0196fb...b39e1a )
by Ian
05:17
created

build.rsudp.c_rsam   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 270
Duplicated Lines 22.59 %

Importance

Changes 0
Metric Value
wmc 42
eloc 150
dl 61
loc 270
rs 9.0399
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A RSAM._forward_rsam() 0 14 4
A RSAM._find_chn() 0 7 3
A RSAM._rsam() 0 10 1
A RSAM._deconvolve() 0 6 2
A RSAM._set_channel() 20 20 4
A RSAM._subloop() 0 10 4
B RSAM._set_deconv() 23 23 6
A RSAM._getq() 18 18 3
C RSAM.run() 0 54 11
A RSAM.__init__() 0 41 2
A RSAM._print_rsam() 0 14 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.rsudp.c_rsam often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys, os, time
2
import socket as s
3
from datetime import datetime, timedelta
4
import statistics
5
from rsudp import printM, printW, printE
6
from rsudp import helpers
7
import rsudp.raspberryshake as rs
8
from rsudp import COLOR
9
from rsudp.test import TEST
10
11
# set the terminal text color to green
12
COLOR['current'] = COLOR['green']
13
14
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.1
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
	If debugging is enabled and ``"quiet"`` is set to ``true``,
20
	RSAM is printed to the console every ``"interval"`` seconds,
21
	and optionally forwarded to an IP address and port specified by ``"fwaddr"`` and
22
	``"fwport"`` with packets formatted as either JSON, "lite", or CSV.
23
24
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
25
	:param bool quiet: ``True`` to suppress printing of RSAM analysis live to the console, ``False`` otherwise.
26
	:param float interval: window of time in seconds to apply RSAM analysis.
27
	:param str cha: listening channel (defaults to [S,E]HZ)
28
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
29
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
30
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
31
	:param str fwformat: Specify a format for the forwarded packet: ``'LITE'``, ``'JSON'``, or ``'CSV'``
32
	"""
33
34
	def __init__(self, q=False, interval=5, cha='HZ', deconv=False,
35
				 fwaddr=False, fwport=False, fwformat='LITE', quiet=False,
36
				 testing=False,
37
				 *args, **kwargs):
38
		"""
39
		Initializes the RSAM analysis thread.
40
		"""
41
		super().__init__()
42
		self.sender = 'RSAM'
43
		self.alive = True
44
		self.testing = testing
45
		self.quiet = quiet	# suppresses printing of transmission stats
46
		self.stn = rs.stn
47
		self.fwaddr = fwaddr
48
		self.fwport = fwport
49
		self.fwformat = fwformat.upper()
50
		self.sock = False
51
		self.interval = interval
52
		self.default_ch = 'HZ'
53
		self.args = args
54
		self.kwargs = kwargs
55
		self.raw = rs.Stream()
56
		self.stream = rs.Stream()
57
		self.units = 'counts'
58
59
		self._set_deconv(deconv)
60
61
		self._set_channel(cha)
62
63
		self.rsam = [1, 1, 1]
64
65
		if q:
66
			self.queue = q
67
		else:
68
			printE('no queue passed to the consumer thread! We will exit now!',
69
				   self.sender)
70
			sys.stdout.flush()
71
			self.alive = False
72
			sys.exit()
73
74
		printM('Starting.', self.sender)
75
76
77 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
78
		"""
79
		This function sets the deconvolution units. Allowed values are as follows:
80
81
		.. |ms2| replace:: m/s\ :sup:`2`\
82
83
		- ``'VEL'`` - velocity (m/s)
84
		- ``'ACC'`` - acceleration (|ms2|)
85
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
86
		- ``'DISP'`` - displacement (m)
87
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
88
89
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
90
		"""
91
		deconv = deconv.upper() if deconv else False
92
		self.deconv = deconv if (deconv in rs.UNITS) else False
93
		if self.deconv and rs.inv:
94
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
95
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
96
		else:
97
			self.units = rs.UNITS['CHAN'][1]
98
			self.deconv = False
99
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
100
101
102
	def _find_chn(self):
103
		"""
104
		Finds channel match in list of channels.
105
		"""
106
		for chn in rs.chns:
107
			if self.cha in chn:
108
				self.cha = chn
109
110
111 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
112
		"""
113
		This function sets the channel to listen to. Allowed values are as follows:
114
115
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
116
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
117
		- ``"HDF"`` - pressure transducer channel
118
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
119
120
		:param cha: the channel to listen to
121
		:type cha: str
122
		"""
123
		cha = self.default_ch if (cha == 'all') else cha
124
		self.cha = cha if isinstance(cha, str) else cha[0]
125
126
		if self.cha in str(rs.chns):
127
			self._find_chn()
128
		else:
129
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
130
			sys.exit(2)
131
132
133 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
134
		"""
135
		Reads data from the queue and updates the stream.
136
137
		:rtype: bool
138
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
139
		"""
140
		d = self.queue.get(True, timeout=None)
141
		self.queue.task_done()
142
		if self.cha in str(d):
143
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
144
			return True
145
		elif 'TERM' in str(d):
146
			self.alive = False
147
			printM('Exiting.', self.sender)
148
			sys.exit()
149
		else:
150
			return False
151
152
153
	def _deconvolve(self):
154
		"""
155
		Deconvolves the stream associated with this class.
156
		"""
157
		if self.deconv:
158
			helpers.deconvolve(self)
159
160
161
	def _subloop(self):
162
		"""
163
		Gets the queue and figures out whether or not the specified channel is in the packet.
164
		"""
165
		while True:
166
			if self.queue.qsize() > 0:
167
				self._getq()			# get recent packets
168
			else:
169
				if self._getq():		# is this the specified channel? if so break
170
					break
171
172
173
	def _rsam(self):
174
		"""
175
		Run the RSAM analysis
176
		"""
177
		arr = [abs(el) for el in self.stream[0].data]
178
		meanv = statistics.mean(arr)
179
		medianv = statistics.median(arr)
180
		minv = min(arr)
181
		maxv = max(arr)
182
		self.rsam = [meanv, medianv, minv, maxv]
183
184
185
	def _print_rsam(self):
186
		"""
187
		Print the current RSAM analysis
188
		"""
189
		if not self.quiet:
190
			msg = '%s Current RSAM: mean %s median %s min %s max %s' % (
191
				(self.stream[0].stats.starttime + timedelta(seconds=
192
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
193
				self.rsam[0],
194
				self.rsam[1],
195
				self.rsam[2],
196
				self.rsam[3]
197
			)
198
			printM(msg, self.sender)
199
200
	def _forward_rsam(self):
201
		"""
202
		Send the RSAM analysis via UDP to another destination in a lightweight format
203
		"""
204
		if self.sock:
205
			msg = 'stn:%s|ch:%s|mean:%s|med:%s|min:%s|max:%s' % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
206
			if self.fwformat is 'JSON':
207
				msg = '{"station":"%s","channel":"%s","mean":%s,"median":%s,"min":%s,"max":%s}' \
208
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
209
			elif self.fwformat is 'CSV':
210
				msg = '%s,%s,%s,%s,%s,%s' \
211
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
212
			packet = bytes(msg, 'utf-8')
213
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
214
215
216
	def run(self):
217
		"""
218
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
219
		Quits if it sees a ``TERM`` message.
220
		"""
221
		if self.fwaddr and self.fwport:
222
			printM('Opening socket...', sender=self.sender)
223
			socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
224
			self.sock = s.socket(s.AF_INET, socket_type)
225
226
		n = 0
227
		next_int = time.time() + self.interval
228
229
		wait_pkts = self.interval / (rs.tf / 1000)
230
231
		while n > 3:
232
			self.getq()
233
			n += 1
234
235
		n = 0
236
		while True:
237
			self._subloop()
238
239
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
240
			self.stream = self.raw.copy()
241
			self._deconvolve()
242
243
			if n > wait_pkts:
244
				# if the trigger is activated
245
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)	# obspy time
246
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
247
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
248
249
				# run rsam analysis
250
				if time.time() > next_int:
251
					self._rsam()
252
					self.stream = rs.copy(self.stream)  # prevent mem leak
253
					self._forward_rsam()
254
					self._print_rsam()
255
					next_int = time.time() + self.interval
256
257
			elif n == 0:
258
				printM('Starting RSAM analysis with interval=%s on station=%s channel=%s forward=%s' %
259
					   (self.interval, self.stn, self.cha, self.fwaddr),
260
					   self.sender)
261
			elif n == wait_pkts:
262
				printM('RSAM analysis up and running normally.', self.sender)
263
				if self.testing:
264
					TEST['c_rsam'][1] = True
265
			else:
266
				pass
267
268
			n += 1
269
			sys.stdout.flush()
270