Test Failed
Push — coverage ( 390bfe...0e0f02 )
by Ian
06:34
created

build.rsudp.c_rsam   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 272
Duplicated Lines 22.43 %

Importance

Changes 0
Metric Value
wmc 43
eloc 151
dl 61
loc 272
rs 8.96
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
C RSAM.run() 0 54 11
A RSAM._forward_rsam() 0 14 4
A RSAM._find_chn() 0 7 3
A RSAM._rsam() 0 10 1
A RSAM._deconvolve() 0 6 2
A RSAM._set_channel() 20 20 4
A RSAM.__init__() 0 42 2
A RSAM._print_rsam() 0 14 3
A RSAM._subloop() 0 10 4
B RSAM._set_deconv() 23 23 6
A RSAM._getq() 18 18 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.rsudp.c_rsam often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys, os, time
2
import socket as s
3
from datetime import datetime, timedelta
4
import statistics
5
from rsudp import printM, printW, printE
6
from rsudp import helpers
7
import rsudp.raspberryshake as rs
8
from rsudp import COLOR
9
from rsudp.test import TEST
10
11
# set the terminal text color to green
12
COLOR['current'] = COLOR['green']
13
14
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.1
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
	If debugging is enabled and ``"quiet"`` is set to ``true``,
20
	RSAM is printed to the console every ``"interval"`` seconds,
21
	and optionally forwarded to an IP address and port specified by ``"fwaddr"`` and
22
	``"fwport"`` with packets formatted as either JSON, "lite", or CSV.
23
24
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
25
	:param bool debug: whether or not to print RSAM analysis live to the console.
26
	:param bool quiet: whether or not to suppress ``debug`` printing (``True`` suppresses output).
27
	:param float interval: window of time in seconds to apply RSAM analysis.
28
	:param str cha: listening channel (defaults to [S,E]HZ)
29
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
30
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
31
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
32
	:param str fwformat: Specify a format for the forwarded packet: ``'LITE'``, ``'JSON'``, or ``'CSV'``
33
	"""
34
35
	def __init__(self, q=False, debug=False, interval=5, cha='HZ', deconv=False,
36
				 fwaddr=False, fwport=False, fwformat='LITE', quiet=False,
37
				 testing=False,
38
				 *args, **kwargs):
39
		"""
40
		Initializes the RSAM analysis thread.
41
		"""
42
		super().__init__()
43
		self.sender = 'RSAM'
44
		self.alive = True
45
		self.testing = testing
46
		self.debug = debug
47
		self.quiet = quiet	# overrides debug and suppresses printing
48
		self.stn = rs.stn
49
		self.fwaddr = fwaddr
50
		self.fwport = fwport
51
		self.fwformat = fwformat.upper()
52
		self.sock = False
53
		self.interval = interval
54
		self.default_ch = 'HZ'
55
		self.args = args
56
		self.kwargs = kwargs
57
		self.raw = rs.Stream()
58
		self.stream = rs.Stream()
59
		self.units = 'counts'
60
61
		self._set_deconv(deconv)
62
63
		self._set_channel(cha)
64
65
		self.rsam = [1, 1, 1]
66
67
		if q:
68
			self.queue = q
69
		else:
70
			printE('no queue passed to the consumer thread! We will exit now!',
71
				   self.sender)
72
			sys.stdout.flush()
73
			self.alive = False
74
			sys.exit()
75
76
		printM('Starting.', self.sender)
77
78
79 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
80
		"""
81
		This function sets the deconvolution units. Allowed values are as follows:
82
83
		.. |ms2| replace:: m/s\ :sup:`2`\
84
85
		- ``'VEL'`` - velocity (m/s)
86
		- ``'ACC'`` - acceleration (|ms2|)
87
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
88
		- ``'DISP'`` - displacement (m)
89
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
90
91
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
92
		"""
93
		deconv = deconv.upper() if deconv else False
94
		self.deconv = deconv if (deconv in rs.UNITS) else False
95
		if self.deconv and rs.inv:
96
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
97
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
98
		else:
99
			self.units = rs.UNITS['CHAN'][1]
100
			self.deconv = False
101
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
102
103
104
	def _find_chn(self):
105
		"""
106
		Finds channel match in list of channels.
107
		"""
108
		for chn in rs.chns:
109
			if self.cha in chn:
110
				self.cha = chn
111
112
113 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
114
		"""
115
		This function sets the channel to listen to. Allowed values are as follows:
116
117
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
118
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
119
		- ``"HDF"`` - pressure transducer channel
120
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
121
122
		:param cha: the channel to listen to
123
		:type cha: str
124
		"""
125
		cha = self.default_ch if (cha == 'all') else cha
126
		self.cha = cha if isinstance(cha, str) else cha[0]
127
128
		if self.cha in str(rs.chns):
129
			self._find_chn()
130
		else:
131
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
132
			sys.exit(2)
133
134
135 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
136
		"""
137
		Reads data from the queue and updates the stream.
138
139
		:rtype: bool
140
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
141
		"""
142
		d = self.queue.get(True, timeout=None)
143
		self.queue.task_done()
144
		if self.cha in str(d):
145
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
146
			return True
147
		elif 'TERM' in str(d):
148
			self.alive = False
149
			printM('Exiting.', self.sender)
150
			sys.exit()
151
		else:
152
			return False
153
154
155
	def _deconvolve(self):
156
		"""
157
		Deconvolves the stream associated with this class.
158
		"""
159
		if self.deconv:
160
			helpers.deconvolve(self)
161
162
163
	def _subloop(self):
164
		"""
165
		Gets the queue and figures out whether or not the specified channel is in the packet.
166
		"""
167
		while True:
168
			if self.queue.qsize() > 0:
169
				self._getq()			# get recent packets
170
			else:
171
				if self._getq():		# is this the specified channel? if so break
172
					break
173
174
175
	def _rsam(self):
176
		"""
177
		Run the RSAM analysis
178
		"""
179
		arr = [abs(el) for el in self.stream[0].data]
180
		meanv = statistics.mean(arr)
181
		medianv = statistics.median(arr)
182
		minv = min(arr)
183
		maxv = max(arr)
184
		self.rsam = [meanv, medianv, minv, maxv]
185
186
187
	def _print_rsam(self):
188
		"""
189
		Print the current RSAM analysis
190
		"""
191
		if (self.debug) and (not self.quiet):
192
			msg = '%s Current RSAM: mean %s median %s min %s max %s' % (
193
				(self.stream[0].stats.starttime + timedelta(seconds=
194
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
195
				self.rsam[0],
196
				self.rsam[1],
197
				self.rsam[2],
198
				self.rsam[3]
199
			)
200
			printM(msg, self.sender)
201
202
	def _forward_rsam(self):
203
		"""
204
		Send the RSAM analysis via UDP to another destination in a lightweight format
205
		"""
206
		if self.sock:
207
			msg = 'stn:%s|ch:%s|mean:%s|med:%s|min:%s|max:%s' % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
208
			if self.fwformat is 'JSON':
209
				msg = '{"station":"%s","channel":"%s","mean":%s,"median":%s,"min":%s,"max":%s}' \
210
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
211
			elif self.fwformat is 'CSV':
212
				msg = '%s,%s,%s,%s,%s,%s' \
213
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
214
			packet = bytes(msg, 'utf-8')
215
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
216
217
218
	def run(self):
219
		"""
220
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
221
		Quits if it sees a ``TERM`` message.
222
		"""
223
		if self.fwaddr and self.fwport:
224
			printM('Opening socket...', sender=self.sender)
225
			socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
226
			self.sock = s.socket(s.AF_INET, socket_type)
227
228
		n = 0
229
		next_int = time.time() + self.interval
230
231
		wait_pkts = self.interval / (rs.tf / 1000)
232
233
		while n > 3:
234
			self.getq()
235
			n += 1
236
237
		n = 0
238
		while True:
239
			self._subloop()
240
241
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
242
			self.stream = self.raw.copy()
243
			self._deconvolve()
244
245
			if n > wait_pkts:
246
				# if the trigger is activated
247
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)	# obspy time
248
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
249
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
250
251
				# run rsam analysis
252
				if time.time() > next_int:
253
					self._rsam()
254
					self.stream = rs.copy(self.stream)  # prevent mem leak
255
					self._forward_rsam()
256
					self._print_rsam()
257
					next_int = time.time() + self.interval
258
259
			elif n == 0:
260
				printM('Starting RSAM analysis with interval=%s on station=%s channel=%s forward=%s' %
261
					   (self.interval, self.stn, self.cha, self.fwaddr),
262
					   self.sender)
263
			elif n == wait_pkts:
264
				printM('RSAM analysis up and running normally.', self.sender)
265
				if self.testing:
266
					TEST['c_rsam'][1] = True
267
			else:
268
				pass
269
270
			n += 1
271
			sys.stdout.flush()
272