build.rsudp.c_rsam   A
last analyzed

Complexity

Total Complexity 42

Size/Duplication

Total Lines 270
Duplicated Lines 22.59 %

Test Coverage

Coverage 90.3%

Importance

Changes 0
Metric Value
wmc 42
eloc 150
dl 61
loc 270
ccs 121
cts 134
cp 0.903
rs 9.0399
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
C RSAM.run() 0 54 11
A RSAM._forward_rsam() 0 14 4
A RSAM._find_chn() 0 7 3
A RSAM._rsam() 0 10 1
A RSAM._deconvolve() 0 6 2
A RSAM._set_channel() 20 20 4
A RSAM.__init__() 0 41 2
A RSAM._print_rsam() 0 14 2
A RSAM._subloop() 0 10 4
B RSAM._set_deconv() 23 23 6
A RSAM._getq() 18 18 3

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.rsudp.c_rsam often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import sys, os, time
2 1
import socket as s
3 1
from datetime import timedelta
4 1
import statistics
5 1
from rsudp import printM, printW, printE
6 1
from rsudp import helpers
7 1
import rsudp.raspberryshake as rs
8 1
from rsudp import COLOR
9 1
from rsudp.test import TEST
10
11
# set the terminal text color to green
12 1
COLOR['current'] = COLOR['green']
13
14 1
class RSAM(rs.ConsumerThread):
15
	"""
16
	.. versionadded:: 1.0.1
17
18
	A consumer class that runs an Real-time Seismic Amplitude Measurement (RSAM).
19
	If debugging is enabled and ``"quiet"`` is set to ``true``,
20
	RSAM is printed to the console every ``"interval"`` seconds,
21
	and optionally forwarded to an IP address and port specified by ``"fwaddr"`` and
22
	``"fwport"`` with packets formatted as either JSON, "lite", or CSV.
23
24
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
25
	:param bool quiet: ``True`` to suppress printing of RSAM analysis live to the console, ``False`` otherwise.
26
	:param float interval: window of time in seconds to apply RSAM analysis.
27
	:param str cha: listening channel (defaults to [S,E]HZ)
28
	:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
29
	:param str fwaddr: Specify a forwarding address to send RSAM in a UDP packet
30
	:param str fwport: Specify a forwarding port to send RSAM in a UDP packet
31
	:param str fwformat: Specify a format for the forwarded packet: ``'LITE'``, ``'JSON'``, or ``'CSV'``
32
	"""
33
34 1
	def __init__(self, q=False, interval=5, cha='HZ', deconv=False,
35
				 fwaddr=False, fwport=False, fwformat='LITE', quiet=False,
36
				 testing=False,
37
				 *args, **kwargs):
38
		"""
39
		Initializes the RSAM analysis thread.
40
		"""
41 1
		super().__init__()
42 1
		self.sender = 'RSAM'
43 1
		self.alive = True
44 1
		self.testing = testing
45 1
		self.quiet = quiet	# suppresses printing of transmission stats
46 1
		self.stn = rs.stn
47 1
		self.fwaddr = fwaddr
48 1
		self.fwport = fwport
49 1
		self.fwformat = fwformat.upper()
50 1
		self.sock = False
51 1
		self.interval = interval
52 1
		self.default_ch = 'HZ'
53 1
		self.args = args
54 1
		self.kwargs = kwargs
55 1
		self.raw = rs.Stream()
56 1
		self.stream = rs.Stream()
57 1
		self.units = 'counts'
58
59 1
		self._set_deconv(deconv)
60
61 1
		self._set_channel(cha)
62
63 1
		self.rsam = [1, 1, 1]
64
65 1
		if q:
66 1
			self.queue = q
67
		else:
68
			printE('no queue passed to the consumer thread! We will exit now!',
69
				   self.sender)
70
			sys.stdout.flush()
71
			self.alive = False
72
			sys.exit()
73
74 1
		printM('Starting.', self.sender)
75
76
77 1 View Code Duplication
	def _set_deconv(self, deconv):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
78
		"""
79
		This function sets the deconvolution units. Allowed values are as follows:
80
81
		.. |ms2| replace:: m/s\ :sup:`2`\
82
83
		- ``'VEL'`` - velocity (m/s)
84
		- ``'ACC'`` - acceleration (|ms2|)
85
		- ``'GRAV'`` - fraction of acceleration due to gravity (g, or 9.81 |ms2|)
86
		- ``'DISP'`` - displacement (m)
87
		- ``'CHAN'`` - channel-specific unit calculation, i.e. ``'VEL'`` for geophone channels and ``'ACC'`` for accelerometer channels
88
89
		:param str deconv: ``'VEL'``, ``'ACC'``, ``'GRAV'``, ``'DISP'``, or ``'CHAN'``
90
		"""
91 1
		deconv = deconv.upper() if deconv else False
92 1
		self.deconv = deconv if (deconv in rs.UNITS) else False
93 1
		if self.deconv and rs.inv:
94
			self.units = '%s (%s)' % (rs.UNITS[self.deconv][0], rs.UNITS[self.deconv][1]) if (self.deconv in rs.UNITS) else self.units
95
			printM('Signal deconvolution set to %s' % (self.deconv), self.sender)
96
		else:
97 1
			self.units = rs.UNITS['CHAN'][1]
98 1
			self.deconv = False
99 1
		printM('RSAM stream units are %s' % (self.units.strip(' ').lower()), self.sender)
100
101
102 1
	def _find_chn(self):
103
		"""
104
		Finds channel match in list of channels.
105
		"""
106 1
		for chn in rs.chns:
107 1
			if self.cha in chn:
108 1
				self.cha = chn
109
110
111 1 View Code Duplication
	def _set_channel(self, cha):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
112
		"""
113
		This function sets the channel to listen to. Allowed values are as follows:
114
115
		- "SHZ"``, ``"EHZ"``, ``"EHN"`` or ``"EHE"`` - velocity channels
116
		- ``"ENZ"``, ``"ENN"``, ``"ENE"`` - acceleration channels
117
		- ``"HDF"`` - pressure transducer channel
118
		- ``"all"`` - resolves to either ``"EHZ"`` or ``"SHZ"`` if available
119
120
		:param cha: the channel to listen to
121
		:type cha: str
122
		"""
123 1
		cha = self.default_ch if (cha == 'all') else cha
124 1
		self.cha = cha if isinstance(cha, str) else cha[0]
125
126 1
		if self.cha in str(rs.chns):
127 1
			self._find_chn()
128
		else:
129
			printE('Could not find channel %s in list of channels! Please correct and restart.' % self.cha, self.sender)
130
			sys.exit(2)
131
132
133 1 View Code Duplication
	def _getq(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
134
		"""
135
		Reads data from the queue and updates the stream.
136
137
		:rtype: bool
138
		:return: Returns ``True`` if stream is updated, otherwise ``False``.
139
		"""
140 1
		d = self.queue.get(True, timeout=None)
141 1
		self.queue.task_done()
142 1
		if self.cha in str(d):
143 1
			self.raw = rs.update_stream(stream=self.raw, d=d, fill_value='latest')
144 1
			return True
145 1
		elif 'TERM' in str(d):
146 1
			self.alive = False
147 1
			printM('Exiting.', self.sender)
148 1
			sys.exit()
149
		else:
150 1
			return False
151
152
153 1
	def _deconvolve(self):
154
		"""
155
		Deconvolves the stream associated with this class.
156
		"""
157 1
		if self.deconv:
158
			helpers.deconvolve(self)
159
160
161 1
	def _subloop(self):
162
		"""
163
		Gets the queue and figures out whether or not the specified channel is in the packet.
164
		"""
165 1
		while True:
166 1
			if self.queue.qsize() > 0:
167 1
				self._getq()			# get recent packets
168
			else:
169 1
				if self._getq():		# is this the specified channel? if so break
170 1
					break
171
172
173 1
	def _rsam(self):
174
		"""
175
		Run the RSAM analysis
176
		"""
177 1
		arr = [abs(el) for el in self.stream[0].data]
178 1
		meanv = statistics.mean(arr)
179 1
		medianv = statistics.median(arr)
180 1
		minv = min(arr)
181 1
		maxv = max(arr)
182 1
		self.rsam = [meanv, medianv, minv, maxv]
183
184
185 1
	def _print_rsam(self):
186
		"""
187
		Print the current RSAM analysis
188
		"""
189 1
		if not self.quiet:
190 1
			msg = '%s Current RSAM: mean %s median %s min %s max %s' % (
191
				(self.stream[0].stats.starttime + timedelta(seconds=
192
															len(self.stream[0].data) * self.stream[0].stats.delta)).strftime('%Y-%m-%d %H:%M:%S'),
193
				self.rsam[0],
194
				self.rsam[1],
195
				self.rsam[2],
196
				self.rsam[3]
197
			)
198 1
			printM(msg, self.sender)
199
200 1
	def _forward_rsam(self):
201
		"""
202
		Send the RSAM analysis via UDP to another destination in a lightweight format
203
		"""
204 1
		if self.sock:
205 1
			msg = 'stn:%s|ch:%s|mean:%s|med:%s|min:%s|max:%s' % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
206 1
			if self.fwformat is 'JSON':
207
				msg = '{"station":"%s","channel":"%s","mean":%s,"median":%s,"min":%s,"max":%s}' \
208
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
209 1
			elif self.fwformat is 'CSV':
210
				msg = '%s,%s,%s,%s,%s,%s' \
211
					  % (self.stn, self.cha, self.rsam[0], self.rsam[1], self.rsam[2], self.rsam[3])
212 1
			packet = bytes(msg, 'utf-8')
213 1
			self.sock.sendto(packet, (self.fwaddr, self.fwport))
214
215
216 1
	def run(self):
217
		"""
218
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
219
		Quits if it sees a ``TERM`` message.
220
		"""
221 1
		if self.fwaddr and self.fwport:
222 1
			printM('Opening socket...', sender=self.sender)
223 1
			socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
224 1
			self.sock = s.socket(s.AF_INET, socket_type)
225
226 1
		n = 0
227 1
		next_int = time.time() + self.interval
228
229 1
		wait_pkts = self.interval / (rs.tf / 1000)
230
231 1
		while n > 3:
232
			self.getq()
233
			n += 1
234
235 1
		n = 0
236 1
		while True:
237 1
			self._subloop()
238
239 1
			self.raw = rs.copy(self.raw)	# necessary to avoid memory leak
240 1
			self.stream = self.raw.copy()
241 1
			self._deconvolve()
242
243 1
			if n > wait_pkts:
244
				# if the trigger is activated
245 1
				obstart = self.stream[0].stats.endtime - timedelta(seconds=self.interval)	# obspy time
246 1
				self.raw = self.raw.slice(starttime=obstart)		# slice the stream to the specified length (seconds variable)
247 1
				self.stream = self.stream.slice(starttime=obstart)	# slice the stream to the specified length (seconds variable)
248
249
				# run rsam analysis
250 1
				if time.time() > next_int:
251 1
					self._rsam()
252 1
					self.stream = rs.copy(self.stream)  # prevent mem leak
253 1
					self._forward_rsam()
254 1
					self._print_rsam()
255 1
					next_int = time.time() + self.interval
256
257 1
			elif n == 0:
258 1
				printM('Starting RSAM analysis with interval=%s on station=%s channel=%s forward=%s' %
259
					   (self.interval, self.stn, self.cha, self.fwaddr),
260
					   self.sender)
261 1
			elif n == wait_pkts:
262 1
				printM('RSAM analysis up and running normally.', self.sender)
263 1
				if self.testing:
264 1
					TEST['c_rsam'][1] = True
265
			else:
266
				pass
267
268 1
			n += 1
269
			sys.stdout.flush()
270