Passed
Push — master ( 66e57b...72413d )
by Ian
04:36 queued 12s
created

build.rsudp.helpers.resolve_extra_text()   A

Complexity

Conditions 5

Size

Total Lines 32
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 10.3999

Importance

Changes 0
Metric Value
eloc 10
dl 0
loc 32
ccs 4
cts 10
cp 0.4
rs 9.3333
c 0
b 0
f 0
cc 5
nop 3
crap 10.3999
1 1
import rsudp.raspberryshake as rs
2 1
from rsudp import COLOR, printM, printW
3 1
import rsudp
4 1
import os
5 1
import json
6
7
8 1
def dump_default(settings_loc, default_settings):
9
	'''
10
	Dumps a default settings file to a specified location.
11
12
	:param str settings_loc: The location to create the new settings JSON.
13
	:param str default_settings: The default settings to dump to file.
14
	'''
15
	print('Creating a default settings file at %s' % settings_loc)
16
	with open(settings_loc, 'w+') as f:
17
		f.write(default_settings)
18
		f.write('\n')
19
20
21 1
def default_settings(output_dir='%s/rsudp' % os.path.expanduser('~').replace('\\', '/'), verbose=True):
22
	'''
23
	Returns a formatted json string of default settings.
24
25
	:param str output_dir: the user's specified output location. defaults to ``~/rsudp``.
26
	:param bool verbose: if ``True``, displays some information as the string is created.
27
	:return: default settings string in formatted json
28
	:rtype: str
29
	'''
30 1
	def_settings = r"""{
31
"settings": {
32
    "port": 8888,
33
    "station": "Z0000",
34
    "output_dir": "%s",
35
    "debug": true},
36
"printdata": {
37
    "enabled": false},
38
"write": {
39
    "enabled": false,
40
    "channels": ["all"]},
41
"plot": {
42
    "enabled": true,
43
    "duration": 30,
44
    "spectrogram": true,
45
    "fullscreen": false,
46
    "kiosk": false,
47
    "eq_screenshots": false,
48
    "channels": ["HZ", "HDF"],
49
    "deconvolve": true,
50
    "units": "CHAN"},
51
"forward": {
52
    "enabled": false,
53
    "address": ["192.168.1.254"],
54
    "port": [8888],
55
    "channels": ["all"],
56
    "fwd_data": true,
57
    "fwd_alarms": false},
58
"alert": {
59
    "enabled": true,
60
    "channel": "HZ",
61
    "sta": 6,
62
    "lta": 30,
63
    "threshold": 1.7,
64
    "reset": 1.6,
65
    "highpass": 0,
66
    "lowpass": 50,
67
    "deconvolve": false,
68
    "units": "VEL"},
69
"alertsound": {
70
    "enabled": false,
71
    "mp3file": "doorbell"},
72
"custom": {
73
    "enabled": false,
74
    "codefile": "n/a",
75
    "win_override": false},
76
"tweets": {
77
    "enabled": false,
78
    "tweet_images": true,
79
    "api_key": "n/a",
80
    "api_secret": "n/a",
81
    "access_token": "n/a",
82
    "access_secret": "n/a",
83
    "extra_text": ""},
84
"telegram": {
85
    "enabled": false,
86
    "send_images": true,
87
    "token": "n/a",
88
    "chat_id": "n/a",
89
    "extra_text": ""},
90
"rsam": {
91
    "enabled": false,
92
    "quiet": true,
93
    "fwaddr": "192.168.1.254",
94
    "fwport": 8887,
95
    "fwformat": "LITE",
96
    "channel": "HZ",
97
    "interval": 10,
98
    "deconvolve": false,
99
    "units": "VEL"}
100
}
101
102
""" % (output_dir)
103 1
	if verbose:
104
		print('By default output_dir is set to %s' % output_dir)
105 1
	return def_settings
106
107
108 1
def read_settings(loc):
109
	'''
110
	Reads settings from a specific location.
111
112
	:param str loc: location on disk to read json settings file from
113
	:return: settings dictionary read from JSON, or ``None``
114
	:rtype: dict or NoneType
115
	'''
116
	settings_loc = os.path.abspath(os.path.expanduser(loc)).replace('\\', '/')
117
	settings = None
118
	with open(settings_loc, 'r') as f:
119
		try:
120
			data = f.read().replace('\\', '/')
121
			settings = json.loads(data)
122
		except Exception as e:
123
			print(COLOR['red'] + 'ERROR: Could not load settings file. Perhaps the JSON is malformed?' + COLOR['white'])
124
			print(COLOR['red'] + '       detail: %s' % e + COLOR['white'])
125
			print(COLOR['red'] + '       If you would like to overwrite and rebuild the file, you can enter the command below:' + COLOR['white'])
126
			print(COLOR['bold'] + '       shake_client -d %s' % loc + COLOR['white'])
127
			exit(2)
128
	return settings
129
130
131 1
def set_channels(self, cha):
132
	'''
133
	This function sets the channels available for plotting. Allowed units are as follows:
134
135
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
136
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
137
	- ``["HDF"]`` - pressure transducer channel
138
	- ``["all"]`` - all available channels
139
140
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
141
	(geophone and vertical accelerometer) you could specify:
142
143
	``["EHZ", "ENZ"]``
144
145
	You can also specify partial channel names.
146
	So for example, the following will display at least one channel from any
147
	Raspberry Shake instrument:
148
149
	``["HZ", "HDF"]``
150
151
	Or if you wanted to display only vertical channels from a RS4D,
152
	you could specify
153
154
	``["Z"]``
155
156
	which would match both ``"EHZ"`` and ``"ENZ"``.
157
158
	:param self self: self object of the class calling this function
159
	:param cha: the channel or list of channels to plot
160
	:type cha: list or str
161
	'''
162 1
	cha = rs.chns if ('all' in cha) else cha
163 1
	cha = list(cha) if isinstance(cha, str) else cha
164 1
	for c in rs.chns:
165 1
		n = 0
166 1
		for uch in cha:
167 1
			if (uch.upper() in c) and (c not in str(self.chans)):
168 1
				self.chans.append(c)
169 1
			n += 1
170 1
	if len(self.chans) < 1:
171
			self.chans = rs.chns
172
173
174 1
def fsec(ti):
175
	'''
176
	.. versionadded:: 0.4.3
177
178
	The Raspberry Shake records at hundredths-of-a-second precision.
179
	In order to report time at this precision, we need to do some time-fu.
180
181
	This function rounds the microsecond fraction of a
182
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
183
	depending on its precision, so that it accurately reflects the Raspberry Shake's
184
	event measurement precision.
185
186
	This is necessary because datetime objects in Python are strange and confusing, and
187
	strftime doesn't support fractional returns, only the full integer microsecond field
188
	which is an integer right-padded with zeroes. This function uses the ``precision``
189
	of a datetime object.
190
191
	For example:
192
193
	.. code-block:: python
194
195
		>>> from obspy import UTCDateTime
196
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
197
		>>> fsec(ti)
198
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
199
200
	:param ti: time object to convert microseconds for
201
	:type ti: obspy.core.utcdatetime.UTCDateTime
202
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
203
	:rtype: obspy.core.utcdatetime.UTCDateTime
204
	'''
205
	# time in python is weird and confusing, but luckily obspy is better than Python
206
	# at dealing with datetimes. all we need to do is tell it what precision we want
207
	# and it handles the rounding for us.
208 1
	return rs.UTCDateTime(ti, precision=2)
209
210
211 1
def conn_stats(TESTING=False):
212
	'''
213
	Print some stats about the connection.
214
215
	Example:
216
217
	.. code-block:: python
218
219
		>>> conn_stats()
220
		2020-03-25 01:35:04 [conn_stats] Initialization stats:
221
		2020-03-25 01:35:04 [conn_stats]                 Port: 18069
222
		2020-03-25 01:35:04 [conn_stats]   Sending IP address: 192.168.0.4
223
		2020-03-25 01:35:04 [conn_stats]     Set station name: R24FA
224
		2020-03-25 01:35:04 [conn_stats]   Number of channels: 4
225
		2020-03-25 01:35:04 [conn_stats]   Transmission freq.: 250 ms/packet
226
		2020-03-25 01:35:04 [conn_stats]    Transmission rate: 4 packets/sec
227
		2020-03-25 01:35:04 [conn_stats]   Samples per second: 100 sps
228
		2020-03-25 01:35:04 [conn_stats]            Inventory: AM.R24FA (Raspberry Shake Citizen Science Station)
229
230
	:param bool TESTING: if ``True``, text is printed to the console in yellow. if not, in white.
231
	'''
232 1
	s = 'conn_stats'
233 1
	pf = printW if TESTING else printM
234 1
	pf('Initialization stats:', sender=s, announce=False)
235 1
	pf('                Port: %s' % rs.port, sender=s, announce=False)
236 1
	pf('  Sending IP address: %s' % rs.firstaddr, sender=s, announce=False)
237 1
	pf('    Set station name: %s' % rs.stn, sender=s, announce=False)
238 1
	pf('  Number of channels: %s' % rs.numchns, sender=s, announce=False)
239 1
	pf('  Transmission freq.: %s ms/packet' % rs.tf, sender=s, announce=False)
240 1
	pf('   Transmission rate: %s packets/sec' % rs.tr, sender=s, announce=False)
241 1
	pf('  Samples per second: %s sps' % rs.sps, sender=s, announce=False)
242 1
	if rs.inv:
243 1
		pf('           Inventory: %s' % rs.inv.get_contents()['stations'][0],
244
			   sender=s, announce=False)
245
246
247 1
def msg_alarm(event_time):
248
	'''
249
	This function constructs the ``ALARM`` message as a bytes object.
250
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
251
	to construct alarm queue messages.
252
253
	For example:
254
255
	.. code-block:: python
256
257
		>>> from obspy import UTCDateTime
258
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
259
		>>> msg_alarm(ti)
260
		b'ALARM 2020-01-01T00:00:00.599Z'
261
262
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
263
	:rtype: bytes
264
	:return: the ``ALARM`` message, ready to be put on the queue
265
	'''
266 1
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
267
268
269 1
def msg_reset(reset_time):
270
	'''
271
	This function constructs the ``RESET`` message as a bytes object.
272
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
273
	to construct reset queue messages.
274
275
	For example:
276
277
	.. code-block:: python
278
279
		>>> from obspy import UTCDateTime
280
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
281
		>>> msg_reset(ti)
282
		b'RESET 2020-01-01T00:00:00.599Z'
283
284
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
285
	:rtype: bytes
286
	:return: the ``RESET`` message, ready to be put on the queue
287
	'''
288 1
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
289
290
291 1
def msg_imgpath(event_time, figname):
292
	'''
293
	This function constructs the ``IMGPATH`` message as a bytes object.
294
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
295
	to construct queue messages containing timestamp and saved image path.
296
297
	For example:
298
299
	.. code-block:: python
300
301
		>>> from obspy import UTCDateTime
302
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
303
		>>> path = '/home/pi/rsudp/screenshots/test.png'
304
		>>> msg_imgpath(ti, path)
305
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
306
307
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
308
	:param str figname: the figure path as a string
309
	:rtype: bytes
310
	:return: the ``IMGPATH`` message, ready to be put on the queue
311
	'''
312 1
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
313
314
315 1
def msg_term():
316
	'''
317
	This function constructs the simple ``TERM`` message as a bytes object.
318
319
	.. code-block:: python
320
321
		>>> msg_term()
322
		b'TERM'
323
324
325
	:rtype: bytes
326
	:return: the ``TERM`` message
327
	'''
328 1
	return b'TERM'
329
330
331 1
def get_msg_time(msg):
332
	'''
333
	This function gets the time from ``ALARM``, ``RESET``,
334
	and ``IMGPATH`` messages as a UTCDateTime object.
335
336
	For example:
337
338
	.. code-block:: python
339
340
		>>> from obspy import UTCDateTime
341
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
342
		>>> path = '/home/pi/rsudp/screenshots/test.png'
343
		>>> msg = msg_imgpath(ti, path)
344
		>>> msg
345
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
346
		>>> get_msg_time(msg)
347
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
348
349
	:param bytes msg: the bytes-formatted queue message to decode
350
	:rtype: obspy.core.utcdatetime.UTCDateTime
351
	:return: the time embedded in the message
352
	'''
353 1
	return rs.UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
354
355
356 1
def get_msg_path(msg):
357
	'''
358
	This function gets the path from ``IMGPATH`` messages as a string.
359
360
	For example:
361
362
	.. code-block:: python
363
364
		>>> from obspy import UTCDateTime
365
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
366
		>>> path = '/home/pi/rsudp/screenshots/test.png'
367
		>>> msg = msg_imgpath(ti, path)
368
		>>> msg
369
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
370
		>>> get_msg_path(msg)
371
		'/home/pi/rsudp/screenshots/test.png'
372
373
	:param bytes msg: the bytes-formatted queue message to decode
374
	:rtype: str
375
	:return: the path embedded in the message
376
	'''
377 1
	return msg.decode('utf-8').split(' ')[2]
378
379
380 1
def get_scap_dir():
381
	'''
382
	This function returns the screen capture directory from the init function.
383
	This allows the variable to be more threadsafe.
384
385
	.. code-block:: python
386
387
		>>> get_scap_dir()
388
		'/home/pi/rsudp/screenshots/'
389
390
	:return: the path of the screenshot directory
391
	'''
392 1
	return rsudp.scap_dir
393
394
395 1
def deconv_vel_inst(self, trace, output):
396
	'''
397
	.. role:: pycode(code)
398
		:language: python
399
	
400
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
401
	for velocity channels.
402
403
	:param self self: The self object of the sub-consumer class calling this function.
404
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
405
	'''
406 1
	if self.deconv not in 'CHAN':
407
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
408
								output=output, water_level=4.5, taper=False)
409
	else:
410 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
411
								output='VEL', water_level=4.5, taper=False)
412 1
	if 'ACC' in self.deconv:
413
		trace.data = rs.np.gradient(trace.data, 1)
414 1
	elif 'GRAV' in self.deconv:
415
		trace.data = rs.np.gradient(trace.data, 1) / rs.g
416
		trace.stats.units = 'Earth gravity'
417 1
	elif 'DISP' in self.deconv:
418
		trace.data = rs.np.cumsum(trace.data)
419
		trace.taper(max_percentage=0.1, side='left', max_length=1)
420
		trace.detrend(type='demean')
421
	else:
422 1
		trace.stats.units = 'Velocity'
423
424
425 1
def deconv_acc_inst(self, trace, output):
426
	'''
427
	.. role:: pycode(code)
428
		:language: python
429
	
430
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
431
	for acceleration channels.
432
433
	:param self self: The self object of the sub-consumer class calling this function.
434
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
435
	'''
436 1
	if self.deconv not in 'CHAN':
437
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
438
								output=output, water_level=4.5, taper=False)
439
	else:
440 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
441
								output='ACC', water_level=4.5, taper=False)
442 1
	if 'VEL' in self.deconv:
443
		trace.data = rs.np.cumsum(trace.data)
444
		trace.detrend(type='demean')
445 1
	elif 'DISP' in self.deconv:
446
		trace.data = rs.np.cumsum(rs.np.cumsum(trace.data))
447
		trace.detrend(type='linear')
448 1
	elif 'GRAV' in self.deconv:
449
		trace.data = trace.data / rs.g
450
		trace.stats.units = 'Earth gravity'
451
	else:
452 1
		trace.stats.units = 'Acceleration'
453 1
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
454
		trace.taper(max_percentage=0.1, side='left', max_length=1)
455
456
457 1
def deconv_rbm_inst(self, trace, output):
458
	'''
459
	.. role:: pycode(code)
460
		:language: python
461
	
462
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
463
	for Raspberry Boom pressure transducer channels.
464
465
	.. note::
466
467
		The Raspberry Boom pressure transducer does not currently have a
468
		deconvolution function. The Raspberry Shake team is working on a
469
		calibration for the Boom, but until then Boom units are given in
470
		counts.
471
472
	:param self self: The self object of the sub-consumer class calling this function.
473
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
474
	'''
475
	trace.stats.units = ' counts'
476
477
478 1
def deconvolve(self):
479
	'''
480
	.. role:: pycode(code)
481
		:language: python
482
	
483
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
484
	that need to deconvolve their raw data to metric units.
485
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
486
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
487
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
488
489
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
490
	'''
491 1
	acc_channels = ['ENE', 'ENN', 'ENZ']
492 1
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
493 1
	rbm_channels = ['HDF']
494
495 1
	self.stream = self.raw.copy()
496 1
	for trace in self.stream:
497 1
		trace.stats.units = self.units
498 1
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
499 1
		if self.deconv:
500 1
			if trace.stats.channel in vel_channels:
501 1
				deconv_vel_inst(self, trace, output)	# geophone channels
502
503 1
			elif trace.stats.channel in acc_channels:
504 1
				deconv_acc_inst(self, trace, output)	# accelerometer channels
505
506
			elif trace.stats.channel in rbm_channels:
507
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
508
509
			else:
510
				trace.stats.units = ' counts'	# this is a new one
511
512
		else:
513
			trace.stats.units = ' counts'		# this is not being deconvolved
514
515
516 1
def resolve_extra_text(extra_text, max_len, sender='helpers'):
517
	'''
518
	.. role:: pycode(code)
519
		:language: python
520
521
	.. versionadded:: 1.0.3
522
523
	A central helper function for the :class:`rsudp.c_telegram.Tweeter`
524
	and :class:`rsudp.c_telegram.Telegrammer` classes that checks whether
525
	the :pycode:`"extra_text"` parameter (in the settings file) is of appropriate
526
	length. This is done to avoid errors when posting alerts.
527
	The function will truncate longer messages.
528
529
	:param str extra_text: String of additional characters to post as part of the alert message (longer messages will be truncated).
530
	:param str max_len: Upper limit of characters accepted in message (280 for Twitter, 4096 for Telegram).
531
	:param str sender: String identifying the origin of the use of this function (:pycode:`self.sender` in the source function).
532
	:rtype: str
533
	:return: the message string to be incorporated
534
535
	'''
536 1
	allowable_len = max_len - 177	# length of string allowable given maximum message text & region
537 1
	if ((extra_text == '') or (extra_text == None) or (extra_text == False)):
538 1
		return ''
539
	else:
540
		extra_text = str(extra_text)
541
		len_ex_txt = len(extra_text)
542
543
		if len_ex_txt > allowable_len:
544
			printW('extra_text parameter is longer than allowable (%s chars) and will be truncated. Please keep extra_text at or below %s characters.' % (len_ex_txt, allowable_len), sender=sender)
545
			extra_text = extra_text[:allowable_len]
546
547
		return ' %s' % (extra_text)
548
549