Passed
Push — master ( dc58f3...a85733 )
by Ian
04:44 queued 11s
created

build.rsudp.helpers   B

Complexity

Total Complexity 52

Size/Duplication

Total Lines 561
Duplicated Lines 0 %

Test Coverage

Coverage 65.38%

Importance

Changes 0
Metric Value
wmc 52
eloc 137
dl 0
loc 561
ccs 85
cts 130
cp 0.6538
rs 7.44
c 0
b 0
f 0

19 Functions

Rating   Name   Duplication   Size   Complexity  
A dump_default() 0 11 2
A default_settings() 0 85 2
B set_channels() 0 41 8
A read_settings() 0 21 3
A fsec() 0 35 1
A conn_stats() 0 34 3
A msg_imgpath() 0 22 1
A deconv_vel_inst() 0 28 5
A get_scap_dir() 0 13 1
A msg_term() 0 14 1
A msg_reset() 0 20 1
A resolve_extra_text() 0 32 5
B deconvolve() 0 36 7
A msg_alarm() 0 20 1
A get_msg_path() 0 22 1
A get_msg_time() 0 23 1
A deconv_rbm_inst() 0 19 1
B deconv_acc_inst() 0 30 7
A lesser_multiple() 0 11 1

How to fix   Complexity   

Complexity

Complex classes like build.rsudp.helpers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import rsudp.raspberryshake as rs
2 1
from rsudp import COLOR, printM, printW
3 1
import rsudp
4 1
import os
5 1
import json
6
7
8 1
def dump_default(settings_loc, default_settings):
9
	'''
10
	Dumps a default settings file to a specified location.
11
12
	:param str settings_loc: The location to create the new settings JSON.
13
	:param str default_settings: The default settings to dump to file.
14
	'''
15
	print('Creating a default settings file at %s' % settings_loc)
16
	with open(settings_loc, 'w+') as f:
17
		f.write(default_settings)
18
		f.write('\n')
19
20
21 1
def default_settings(output_dir='%s/rsudp' % os.path.expanduser('~').replace('\\', '/'), verbose=True):
22
	'''
23
	Returns a formatted json string of default settings.
24
25
	:param str output_dir: the user's specified output location. defaults to ``~/rsudp``.
26
	:param bool verbose: if ``True``, displays some information as the string is created.
27
	:return: default settings string in formatted json
28
	:rtype: str
29
	'''
30 1
	def_settings = r"""{
31
"settings": {
32
    "port": 8888,
33
    "station": "Z0000",
34
    "output_dir": "%s",
35
    "debug": true},
36
"printdata": {
37
    "enabled": false},
38
"write": {
39
    "enabled": false,
40
    "channels": ["all"]},
41
"plot": {
42
    "enabled": true,
43
    "duration": 30,
44
    "spectrogram": true,
45
    "fullscreen": false,
46
    "kiosk": false,
47
    "eq_screenshots": false,
48
    "channels": ["HZ", "HDF"],
49
    "deconvolve": true,
50
    "units": "CHAN"},
51
"forward": {
52
    "enabled": false,
53
    "address": ["192.168.1.254"],
54
    "port": [8888],
55
    "channels": ["all"],
56
    "fwd_data": true,
57
    "fwd_alarms": false},
58
"alert": {
59
    "enabled": true,
60
    "channel": "HZ",
61
    "sta": 6,
62
    "lta": 30,
63
    "threshold": 1.7,
64
    "reset": 1.6,
65
    "highpass": 0,
66
    "lowpass": 50,
67
    "deconvolve": false,
68
    "units": "VEL"},
69
"alertsound": {
70
    "enabled": false,
71
    "mp3file": "doorbell"},
72
"custom": {
73
    "enabled": false,
74
    "codefile": "n/a",
75
    "win_override": false},
76
"tweets": {
77
    "enabled": false,
78
    "tweet_images": true,
79
    "api_key": "n/a",
80
    "api_secret": "n/a",
81
    "access_token": "n/a",
82
    "access_secret": "n/a",
83
    "extra_text": ""},
84
"telegram": {
85
    "enabled": false,
86
    "send_images": true,
87
    "token": "n/a",
88
    "chat_id": "n/a",
89
    "extra_text": ""},
90
"rsam": {
91
    "enabled": false,
92
    "quiet": true,
93
    "fwaddr": "192.168.1.254",
94
    "fwport": 8887,
95
    "fwformat": "LITE",
96
    "channel": "HZ",
97
    "interval": 10,
98
    "deconvolve": false,
99
    "units": "VEL"}
100
}
101
102
""" % (output_dir)
103 1
	if verbose:
104
		print('By default output_dir is set to %s' % output_dir)
105 1
	return def_settings
106
107
108 1
def read_settings(loc):
109
	'''
110
	Reads settings from a specific location.
111
112
	:param str loc: location on disk to read json settings file from
113
	:return: settings dictionary read from JSON, or ``None``
114
	:rtype: dict or NoneType
115
	'''
116
	settings_loc = os.path.abspath(os.path.expanduser(loc)).replace('\\', '/')
117
	settings = None
118
	with open(settings_loc, 'r') as f:
119
		try:
120
			data = f.read().replace('\\', '/')
121
			settings = json.loads(data)
122
		except Exception as e:
123
			print(COLOR['red'] + 'ERROR: Could not load settings file. Perhaps the JSON is malformed?' + COLOR['white'])
124
			print(COLOR['red'] + '       detail: %s' % e + COLOR['white'])
125
			print(COLOR['red'] + '       If you would like to overwrite and rebuild the file, you can enter the command below:' + COLOR['white'])
126
			print(COLOR['bold'] + '       shake_client -d %s' % loc + COLOR['white'])
127
			exit(2)
128
	return settings
129
130
131 1
def set_channels(self, cha):
132
	'''
133
	This function sets the channels available for plotting. Allowed units are as follows:
134
135
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
136
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
137
	- ``["HDF"]`` - pressure transducer channel
138
	- ``["all"]`` - all available channels
139
140
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
141
	(geophone and vertical accelerometer) you could specify:
142
143
	``["EHZ", "ENZ"]``
144
145
	You can also specify partial channel names.
146
	So for example, the following will display at least one channel from any
147
	Raspberry Shake instrument:
148
149
	``["HZ", "HDF"]``
150
151
	Or if you wanted to display only vertical channels from a RS4D,
152
	you could specify
153
154
	``["Z"]``
155
156
	which would match both ``"EHZ"`` and ``"ENZ"``.
157
158
	:param self self: self object of the class calling this function
159
	:param cha: the channel or list of channels to plot
160
	:type cha: list or str
161
	'''
162 1
	cha = rs.chns if ('all' in cha) else cha
163 1
	cha = list(cha) if isinstance(cha, str) else cha
164 1
	for c in rs.chns:
165 1
		n = 0
166 1
		for uch in cha:
167 1
			if (uch.upper() in c) and (c not in str(self.chans)):
168 1
				self.chans.append(c)
169 1
			n += 1
170 1
	if len(self.chans) < 1:
171
			self.chans = rs.chns
172
173
174 1
def fsec(ti):
175
	'''
176
	.. versionadded:: 0.4.3
177
178
	The Raspberry Shake records at hundredths-of-a-second precision.
179
	In order to report time at this precision, we need to do some time-fu.
180
181
	This function rounds the microsecond fraction of a
182
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
183
	depending on its precision, so that it accurately reflects the Raspberry Shake's
184
	event measurement precision.
185
186
	This is necessary because datetime objects in Python are strange and confusing, and
187
	strftime doesn't support fractional returns, only the full integer microsecond field
188
	which is an integer right-padded with zeroes. This function uses the ``precision``
189
	of a datetime object.
190
191
	For example:
192
193
	.. code-block:: python
194
195
		>>> from obspy import UTCDateTime
196
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
197
		>>> fsec(ti)
198
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
199
200
	:param ti: time object to convert microseconds for
201
	:type ti: obspy.core.utcdatetime.UTCDateTime
202
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
203
	:rtype: obspy.core.utcdatetime.UTCDateTime
204
	'''
205
	# time in python is weird and confusing, but luckily obspy is better than Python
206
	# at dealing with datetimes. all we need to do is tell it what precision we want
207
	# and it handles the rounding for us.
208 1
	return rs.UTCDateTime(ti, precision=2)
209
210
211 1
def lesser_multiple(x, base=10):
212
	'''
213
	.. versionadded:: 1.0.3
214
215
	This function calculates the nearest multiple of the base number ``base``
216
	for the number ``x`` passed to it, as long as the result is less than ``x``.
217
218
	This is useful for :func:`rsudp.packetize` when figuring out where to cut
219
	off samples when trying to fit them into packets.
220
	'''
221 1
	return int(base * int(float(x)/base))
222
223
224 1
def conn_stats(TESTING=False):
225
	'''
226
	Print some stats about the connection.
227
228
	Example:
229
230
	.. code-block:: python
231
232
		>>> conn_stats()
233
		2020-03-25 01:35:04 [conn_stats] Initialization stats:
234
		2020-03-25 01:35:04 [conn_stats]                 Port: 18069
235
		2020-03-25 01:35:04 [conn_stats]   Sending IP address: 192.168.0.4
236
		2020-03-25 01:35:04 [conn_stats]     Set station name: R24FA
237
		2020-03-25 01:35:04 [conn_stats]   Number of channels: 4
238
		2020-03-25 01:35:04 [conn_stats]   Transmission freq.: 250 ms/packet
239
		2020-03-25 01:35:04 [conn_stats]    Transmission rate: 4 packets/sec
240
		2020-03-25 01:35:04 [conn_stats]   Samples per second: 100 sps
241
		2020-03-25 01:35:04 [conn_stats]            Inventory: AM.R24FA (Raspberry Shake Citizen Science Station)
242
243
	:param bool TESTING: if ``True``, text is printed to the console in yellow. if not, in white.
244
	'''
245 1
	s = 'conn_stats'
246 1
	pf = printW if TESTING else printM
247 1
	pf('Initialization stats:', sender=s, announce=False)
248 1
	pf('                Port: %s' % rs.port, sender=s, announce=False)
249 1
	pf('  Sending IP address: %s' % rs.firstaddr, sender=s, announce=False)
250 1
	pf('    Set station name: %s' % rs.stn, sender=s, announce=False)
251 1
	pf('  Number of channels: %s' % rs.numchns, sender=s, announce=False)
252 1
	pf('  Transmission freq.: %s ms/packet' % rs.tf, sender=s, announce=False)
253 1
	pf('   Transmission rate: %s packets/sec' % rs.tr, sender=s, announce=False)
254 1
	pf('  Samples per second: %s sps' % rs.sps, sender=s, announce=False)
255 1
	if rs.inv:
256 1
		pf('           Inventory: %s' % rs.inv.get_contents()['stations'][0],
257
			   sender=s, announce=False)
258
259
260 1
def msg_alarm(event_time):
261
	'''
262
	This function constructs the ``ALARM`` message as a bytes object.
263
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
264
	to construct alarm queue messages.
265
266
	For example:
267
268
	.. code-block:: python
269
270
		>>> from obspy import UTCDateTime
271
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
272
		>>> msg_alarm(ti)
273
		b'ALARM 2020-01-01T00:00:00.599Z'
274
275
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
276
	:rtype: bytes
277
	:return: the ``ALARM`` message, ready to be put on the queue
278
	'''
279 1
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
280
281
282 1
def msg_reset(reset_time):
283
	'''
284
	This function constructs the ``RESET`` message as a bytes object.
285
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
286
	to construct reset queue messages.
287
288
	For example:
289
290
	.. code-block:: python
291
292
		>>> from obspy import UTCDateTime
293
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
294
		>>> msg_reset(ti)
295
		b'RESET 2020-01-01T00:00:00.599Z'
296
297
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
298
	:rtype: bytes
299
	:return: the ``RESET`` message, ready to be put on the queue
300
	'''
301 1
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
302
303
304 1
def msg_imgpath(event_time, figname):
305
	'''
306
	This function constructs the ``IMGPATH`` message as a bytes object.
307
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
308
	to construct queue messages containing timestamp and saved image path.
309
310
	For example:
311
312
	.. code-block:: python
313
314
		>>> from obspy import UTCDateTime
315
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
316
		>>> path = '/home/pi/rsudp/screenshots/test.png'
317
		>>> msg_imgpath(ti, path)
318
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
319
320
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
321
	:param str figname: the figure path as a string
322
	:rtype: bytes
323
	:return: the ``IMGPATH`` message, ready to be put on the queue
324
	'''
325 1
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
326
327
328 1
def msg_term():
329
	'''
330
	This function constructs the simple ``TERM`` message as a bytes object.
331
332
	.. code-block:: python
333
334
		>>> msg_term()
335
		b'TERM'
336
337
338
	:rtype: bytes
339
	:return: the ``TERM`` message
340
	'''
341 1
	return b'TERM'
342
343
344 1
def get_msg_time(msg):
345
	'''
346
	This function gets the time from ``ALARM``, ``RESET``,
347
	and ``IMGPATH`` messages as a UTCDateTime object.
348
349
	For example:
350
351
	.. code-block:: python
352
353
		>>> from obspy import UTCDateTime
354
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
355
		>>> path = '/home/pi/rsudp/screenshots/test.png'
356
		>>> msg = msg_imgpath(ti, path)
357
		>>> msg
358
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
359
		>>> get_msg_time(msg)
360
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
361
362
	:param bytes msg: the bytes-formatted queue message to decode
363
	:rtype: obspy.core.utcdatetime.UTCDateTime
364
	:return: the time embedded in the message
365
	'''
366 1
	return rs.UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
367
368
369 1
def get_msg_path(msg):
370
	'''
371
	This function gets the path from ``IMGPATH`` messages as a string.
372
373
	For example:
374
375
	.. code-block:: python
376
377
		>>> from obspy import UTCDateTime
378
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
379
		>>> path = '/home/pi/rsudp/screenshots/test.png'
380
		>>> msg = msg_imgpath(ti, path)
381
		>>> msg
382
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
383
		>>> get_msg_path(msg)
384
		'/home/pi/rsudp/screenshots/test.png'
385
386
	:param bytes msg: the bytes-formatted queue message to decode
387
	:rtype: str
388
	:return: the path embedded in the message
389
	'''
390 1
	return msg.decode('utf-8').split(' ')[2]
391
392
393 1
def get_scap_dir():
394
	'''
395
	This function returns the screen capture directory from the init function.
396
	This allows the variable to be more threadsafe.
397
398
	.. code-block:: python
399
400
		>>> get_scap_dir()
401
		'/home/pi/rsudp/screenshots/'
402
403
	:return: the path of the screenshot directory
404
	'''
405 1
	return rsudp.scap_dir
406
407
408 1
def deconv_vel_inst(self, trace, output):
409
	'''
410
	.. role:: pycode(code)
411
		:language: python
412
	
413
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
414
	for velocity channels.
415
416
	:param self self: The self object of the sub-consumer class calling this function.
417
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
418
	'''
419 1
	if self.deconv not in 'CHAN':
420
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
421
								output=output, water_level=4.5, taper=False)
422
	else:
423 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
424
								output='VEL', water_level=4.5, taper=False)
425 1
	if 'ACC' in self.deconv:
426
		trace.data = rs.np.gradient(trace.data, 1)
427 1
	elif 'GRAV' in self.deconv:
428
		trace.data = rs.np.gradient(trace.data, 1) / rs.g
429
		trace.stats.units = 'Earth gravity'
430 1
	elif 'DISP' in self.deconv:
431
		trace.data = rs.np.cumsum(trace.data)
432
		trace.taper(max_percentage=0.1, side='left', max_length=1)
433
		trace.detrend(type='demean')
434
	else:
435 1
		trace.stats.units = 'Velocity'
436
437
438 1
def deconv_acc_inst(self, trace, output):
439
	'''
440
	.. role:: pycode(code)
441
		:language: python
442
	
443
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
444
	for acceleration channels.
445
446
	:param self self: The self object of the sub-consumer class calling this function.
447
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
448
	'''
449 1
	if self.deconv not in 'CHAN':
450
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
451
								output=output, water_level=4.5, taper=False)
452
	else:
453 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
454
								output='ACC', water_level=4.5, taper=False)
455 1
	if 'VEL' in self.deconv:
456
		trace.data = rs.np.cumsum(trace.data)
457
		trace.detrend(type='demean')
458 1
	elif 'DISP' in self.deconv:
459
		trace.data = rs.np.cumsum(rs.np.cumsum(trace.data))
460
		trace.detrend(type='linear')
461 1
	elif 'GRAV' in self.deconv:
462
		trace.data = trace.data / rs.g
463
		trace.stats.units = 'Earth gravity'
464
	else:
465 1
		trace.stats.units = 'Acceleration'
466 1
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
467
		trace.taper(max_percentage=0.1, side='left', max_length=1)
468
469
470 1
def deconv_rbm_inst(self, trace, output):
471
	'''
472
	.. role:: pycode(code)
473
		:language: python
474
	
475
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
476
	for Raspberry Boom pressure transducer channels.
477
478
	.. note::
479
480
		The Raspberry Boom pressure transducer does not currently have a
481
		deconvolution function. The Raspberry Shake team is working on a
482
		calibration for the Boom, but until then Boom units are given in
483
		counts.
484
485
	:param self self: The self object of the sub-consumer class calling this function.
486
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
487
	'''
488
	trace.stats.units = ' counts'
489
490
491 1
def deconvolve(self):
492
	'''
493
	.. role:: pycode(code)
494
		:language: python
495
	
496
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
497
	that need to deconvolve their raw data to metric units.
498
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
499
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
500
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
501
502
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
503
	'''
504 1
	acc_channels = ['ENE', 'ENN', 'ENZ']
505 1
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
506 1
	rbm_channels = ['HDF']
507
508 1
	self.stream = self.raw.copy()
509 1
	for trace in self.stream:
510 1
		trace.stats.units = self.units
511 1
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
512 1
		if self.deconv:
513 1
			if trace.stats.channel in vel_channels:
514 1
				deconv_vel_inst(self, trace, output)	# geophone channels
515
516 1
			elif trace.stats.channel in acc_channels:
517 1
				deconv_acc_inst(self, trace, output)	# accelerometer channels
518
519
			elif trace.stats.channel in rbm_channels:
520
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
521
522
			else:
523
				trace.stats.units = ' counts'	# this is a new one
524
525
		else:
526
			trace.stats.units = ' counts'		# this is not being deconvolved
527
528
529 1
def resolve_extra_text(extra_text, max_len, sender='helpers'):
530
	'''
531
	.. role:: pycode(code)
532
		:language: python
533
534
	.. versionadded:: 1.0.3
535
536
	A central helper function for the :class:`rsudp.c_telegram.Tweeter`
537
	and :class:`rsudp.c_telegram.Telegrammer` classes that checks whether
538
	the :pycode:`"extra_text"` parameter (in the settings file) is of appropriate
539
	length. This is done to avoid errors when posting alerts.
540
	The function will truncate longer messages.
541
542
	:param str extra_text: String of additional characters to post as part of the alert message (longer messages will be truncated).
543
	:param str max_len: Upper limit of characters accepted in message (280 for Twitter, 4096 for Telegram).
544
	:param str sender: String identifying the origin of the use of this function (:pycode:`self.sender` in the source function).
545
	:rtype: str
546
	:return: the message string to be incorporated
547
548
	'''
549 1
	allowable_len = max_len - 177	# length of string allowable given maximum message text & region
550 1
	if ((extra_text == '') or (extra_text == None) or (extra_text == False)):
551 1
		return ''
552
	else:
553
		extra_text = str(extra_text)
554
		len_ex_txt = len(extra_text)
555
556
		if len_ex_txt > allowable_len:
557
			printW('extra_text parameter is longer than allowable (%s chars) and will be truncated. Please keep extra_text at or below %s characters.' % (len_ex_txt, allowable_len), sender=sender)
558
			extra_text = extra_text[:allowable_len]
559
560
		return ' %s' % (extra_text)
561
562