Passed
Pull Request — master (#55)
by
unknown
06:09
created

build.rsudp.helpers.dump_default()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.048

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
ccs 1
cts 5
cp 0.2
rs 10
c 0
b 0
f 0
cc 2
nop 2
crap 4.048
1 1
import rsudp.raspberryshake as rs
2 1
from rsudp import COLOR, printM, printW
3 1
import os
4 1
import json
5
6
7 1
def dump_default(settings_loc, default_settings):
8
	'''
9
	Dumps a default settings file to a specified location.
10
11
	:param str settings_loc: The location to create the new settings JSON.
12
	:param str default_settings: The default settings to dump to file.
13
	'''
14
	print('Creating a default settings file at %s' % settings_loc)
15
	with open(settings_loc, 'w+') as f:
16
		f.write(default_settings)
17
		f.write('\n')
18
19
20 1
def default_settings(output_dir='%s/rsudp' % os.path.expanduser('~').replace('\\', '/'), verbose=True):
21
	'''
22
	Returns a formatted json string of default settings.
23
24
	:param str output_dir: the user's specified output location. defaults to ``~/rsudp``.
25
	:param bool verbose: if ``True``, displays some information as the string is created.
26
	:return: default settings string in formatted json
27
	:rtype: str
28
	'''
29 1
	def_settings = r"""{
30
"settings": {
31
    "port": 8888,
32
    "station": "Z0000",
33
    "output_dir": "%s",
34
    "debug": true},
35
"printdata": {
36
    "enabled": false},
37
"write": {
38
    "enabled": false,
39
    "channels": ["all"]},
40
"plot": {
41
    "enabled": true,
42
    "duration": 90,
43
    "spectrogram": true,
44
    "fullscreen": false,
45
    "kiosk": false,
46
    "eq_screenshots": false,
47
    "channels": ["all"],
48
    "deconvolve": true,
49
    "units": "CHAN"},
50
"forward": {
51
    "enabled": false,
52
    "address": ["192.168.1.254"],
53
    "port": [8888],
54
    "channels": ["all"],
55
    "fwd_data": true,
56
    "fwd_alarms": false},
57
"alert": {
58
    "enabled": true,
59
    "channel": "HZ",
60
    "sta": 6,
61
    "lta": 30,
62
    "threshold": 3.95,
63
    "reset": 0.9,
64
    "highpass": 0.8,
65
    "lowpass": 9,
66
    "deconvolve": false,
67
    "units": "VEL"},
68
"alertsound": {
69
    "enabled": false,
70
    "mp3file": "doorbell"},
71
"custom": {
72
    "enabled": false,
73
    "codefile": "n/a",
74
    "win_override": false},
75
"tweets": {
76
    "enabled": false,
77
    "tweet_images": true,
78
    "api_key": "n/a",
79
    "api_secret": "n/a",
80
    "access_token": "n/a",
81
    "access_secret": "n/a",
82
    "extra_text": ""},
83
"telegram": {
84
    "enabled": false,
85
    "send_images": true,
86
    "token": "n/a",
87
    "chat_id": "n/a",
88
    "extra_text": ""},
89
"rsam": {
90
    "enabled": false,
91
    "quiet": true,
92
    "fwaddr": "192.168.1.254",
93
    "fwport": 8887,
94
    "fwformat": "LITE",
95
    "channel": "HZ",
96
    "interval": 10,
97
    "deconvolve": false,
98
    "units": "VEL"}
99
}
100
101
""" % (output_dir)
102 1
	if verbose:
103
		print('By default output_dir is set to %s' % output_dir)
104 1
	return def_settings
105
106
107 1
def read_settings(loc):
108
	'''
109
	Reads settings from a specific location.
110
111
	:param str loc: location on disk to read json settings file from
112
	:return: settings dictionary read from JSON, or ``None``
113
	:rtype: dict or NoneType
114
	'''
115
	settings_loc = os.path.abspath(os.path.expanduser(loc)).replace('\\', '/')
116
	print('settings_loc', settings_loc)
117
	settings = None
118
	with open(settings_loc, 'r') as f:
119
		try:
120
			data = f.read().replace('\\', '/')
121
			settings = json.loads(data)
122
		except Exception as e:
123
			print(COLOR['red'] + 'ERROR: Could not load settings file. Perhaps the JSON is malformed?' + COLOR['white'])
124
			print(COLOR['red'] + '       detail: %s' % e + COLOR['white'])
125
			print(COLOR['red'] + '       If you would like to overwrite and rebuild the file, you can enter the command below:' + COLOR['white'])
126
			print(COLOR['bold'] + '       shake_client -d %s' % loc + COLOR['white'])
127
			exit(2)
128
	return settings
129
130
131 1
def set_channels(self, cha):
132
	'''
133
	This function sets the channels available for plotting. Allowed units are as follows:
134
135
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
136
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
137
	- ``["HDF"]`` - pressure transducer channel
138
	- ``["all"]`` - all available channels
139
140
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
141
	(geophone and vertical accelerometer) you could specify:
142
143
	``["EHZ", "ENZ"]``
144
145
	You can also specify partial channel names.
146
	So for example, the following will display at least one channel from any
147
	Raspberry Shake instrument:
148
149
	``["HZ", "HDF"]``
150
151
	Or if you wanted to display only vertical channels from a RS4D,
152
	you could specify
153
154
	``["Z"]``
155
156
	which would match both ``"EHZ"`` and ``"ENZ"``.
157
158
	:param self self: self object of the class calling this function
159
	:param cha: the channel or list of channels to plot
160
	:type cha: list or str
161
	'''
162 1
	cha = rs.chns if ('all' in cha) else cha
163 1
	cha = list(cha) if isinstance(cha, str) else cha
164 1
	for c in rs.chns:
165 1
		n = 0
166 1
		for uch in cha:
167 1
			if (uch.upper() in c) and (c not in str(self.chans)):
168 1
				self.chans.append(c)
169 1
			n += 1
170 1
	if len(self.chans) < 1:
171
			self.chans = rs.chns
172
173
174 1
def fsec(ti):
175
	'''
176
	.. versionadded:: 0.4.3
177
178
	The Raspberry Shake records at hundredths-of-a-second precision.
179
	In order to report time at this precision, we need to do some time-fu.
180
181
	This function rounds the microsecond fraction of a
182
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
183
	depending on its precision, so that it accurately reflects the Raspberry Shake's
184
	event measurement precision.
185
186
	This is necessary because datetime objects in Python are strange and confusing, and
187
	strftime doesn't support fractional returns, only the full integer microsecond field
188
	which is an integer right-padded with zeroes. This function uses the ``precision``
189
	of a datetime object.
190
191
	For example:
192
193
	.. code-block:: python
194
195
		>>> from obspy import UTCDateTime
196
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
197
		>>> fsec(ti)
198
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
199
200
	:param ti: time object to convert microseconds for
201
	:type ti: obspy.core.utcdatetime.UTCDateTime
202
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
203
	:rtype: obspy.core.utcdatetime.UTCDateTime
204
	'''
205
	# time in python is weird and confusing, but luckily obspy is better than Python
206
	# at dealing with datetimes. all we need to do is tell it what precision we want
207
	# and it handles the rounding for us.
208 1
	return rs.UTCDateTime(ti, precision=2)
209
210
211 1
def lesser_multiple(x, base=10):
212
	'''
213
	.. versionadded:: 1.0.3
214
215
	This function calculates the nearest multiple of the base number ``base``
216
	for the number ``x`` passed to it, as long as the result is less than ``x``.
217
218
	This is useful for :func:`rsudp.packetize` when figuring out where to cut
219
	off samples when trying to fit them into packets.
220
	'''
221 1
	return int(base * int(float(x)/base))
222
223
224 1
def conn_stats(TESTING=False):
225
	'''
226
	Print some stats about the connection.
227
228
	Example:
229
230
	.. code-block:: python
231
232
		>>> conn_stats()
233
		2020-03-25 01:35:04 [conn_stats] Initialization stats:
234
		2020-03-25 01:35:04 [conn_stats]                 Port: 18069
235
		2020-03-25 01:35:04 [conn_stats]   Sending IP address: 192.168.0.4
236
		2020-03-25 01:35:04 [conn_stats]     Set station name: R24FA
237
		2020-03-25 01:35:04 [conn_stats]   Number of channels: 4
238
		2020-03-25 01:35:04 [conn_stats]   Transmission freq.: 250 ms/packet
239
		2020-03-25 01:35:04 [conn_stats]    Transmission rate: 4 packets/sec
240
		2020-03-25 01:35:04 [conn_stats]   Samples per second: 100 sps
241
		2020-03-25 01:35:04 [conn_stats]            Inventory: AM.R24FA (Raspberry Shake Citizen Science Station)
242
243
	:param bool TESTING: if ``True``, text is printed to the console in yellow. if not, in white.
244
	'''
245 1
	s = 'conn_stats'
246 1
	pf = printW if TESTING else printM
247 1
	pf('Initialization stats:', sender=s, announce=False)
248 1
	pf('                Port: %s' % rs.port, sender=s, announce=False)
249 1
	pf('  Sending IP address: %s' % rs.firstaddr, sender=s, announce=False)
250 1
	pf('    Set station name: %s' % rs.stn, sender=s, announce=False)
251 1
	pf('  Number of channels: %s' % rs.numchns, sender=s, announce=False)
252 1
	pf('  Transmission freq.: %s ms/packet' % rs.tf, sender=s, announce=False)
253 1
	pf('   Transmission rate: %s packets/sec' % rs.tr, sender=s, announce=False)
254 1
	pf('  Samples per second: %s sps' % rs.sps, sender=s, announce=False)
255 1
	if rs.inv:
256 1
		pf('           Inventory: %s' % rs.inv.get_contents()['stations'][0],
257
			   sender=s, announce=False)
258
259
260 1
def msg_alarm(event_time):
261
	'''
262
	This function constructs the ``ALARM`` message as a bytes object.
263
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
264
	to construct alarm queue messages.
265
266
	For example:
267
268
	.. code-block:: python
269
270
		>>> from obspy import UTCDateTime
271
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
272
		>>> msg_alarm(ti)
273
		b'ALARM 2020-01-01T00:00:00.599Z'
274
275
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
276
	:rtype: bytes
277
	:return: the ``ALARM`` message, ready to be put on the queue
278
	'''
279 1
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
280
281
282 1
def msg_reset(reset_time):
283
	'''
284
	This function constructs the ``RESET`` message as a bytes object.
285
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
286
	to construct reset queue messages.
287
288
	For example:
289
290
	.. code-block:: python
291
292
		>>> from obspy import UTCDateTime
293
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
294
		>>> msg_reset(ti)
295
		b'RESET 2020-01-01T00:00:00.599Z'
296
297
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
298
	:rtype: bytes
299
	:return: the ``RESET`` message, ready to be put on the queue
300
	'''
301 1
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
302
303
304 1
def msg_imgpath(event_time, figname):
305
	'''
306
	This function constructs the ``IMGPATH`` message as a bytes object.
307
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
308
	to construct queue messages containing timestamp and saved image path.
309
310
	For example:
311
312
	.. code-block:: python
313
314
		>>> from obspy import UTCDateTime
315
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
316
		>>> path = '/home/pi/rsudp/screenshots/test.png'
317
		>>> msg_imgpath(ti, path)
318
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
319
320
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
321
	:param str figname: the figure path as a string
322
	:rtype: bytes
323
	:return: the ``IMGPATH`` message, ready to be put on the queue
324
	'''
325 1
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
326
327
328 1
def msg_term():
329
	'''
330
	This function constructs the simple ``TERM`` message as a bytes object.
331
332
	.. code-block:: python
333
334
		>>> msg_term()
335
		b'TERM'
336
337
338
	:rtype: bytes
339
	:return: the ``TERM`` message
340
	'''
341 1
	return b'TERM'
342
343
344 1
def get_msg_time(msg):
345
	'''
346
	This function gets the time from ``ALARM``, ``RESET``,
347
	and ``IMGPATH`` messages as a UTCDateTime object.
348
349
	For example:
350
351
	.. code-block:: python
352
353
		>>> from obspy import UTCDateTime
354
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
355
		>>> path = '/home/pi/rsudp/screenshots/test.png'
356
		>>> msg = msg_imgpath(ti, path)
357
		>>> msg
358
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
359
		>>> get_msg_time(msg)
360
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
361
362
	:param bytes msg: the bytes-formatted queue message to decode
363
	:rtype: obspy.core.utcdatetime.UTCDateTime
364
	:return: the time embedded in the message
365
	'''
366 1
	return rs.UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
367
368
369 1
def get_msg_path(msg):
370
	'''
371
	This function gets the path from ``IMGPATH`` messages as a string.
372
373
	For example:
374
375
	.. code-block:: python
376
377
		>>> from obspy import UTCDateTime
378
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
379
		>>> path = '/home/pi/rsudp/screenshots/test.png'
380
		>>> msg = msg_imgpath(ti, path)
381
		>>> msg
382
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
383
		>>> get_msg_path(msg)
384
		'/home/pi/rsudp/screenshots/test.png'
385
386
	:param bytes msg: the bytes-formatted queue message to decode
387
	:rtype: str
388
	:return: the path embedded in the message
389
	'''
390 1
	return msg.decode('utf-8').split(' ')[2]
391
392
393 1
def deconv_vel_inst(self, trace, output):
394
	'''
395
	.. role:: pycode(code)
396
		:language: python
397
398
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
399
	for velocity channels.
400
401
	:param self self: The self object of the sub-consumer class calling this function.
402
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
403
	'''
404 1
	print('rs.inv>>>>>', rs.inv)
405 1
	if self.deconv not in 'CHAN':
406
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
407
								output=output, water_level=4.5, taper=False)
408
	else:
409 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
410
								output='VEL', water_level=4.5, taper=False)
411 1
	if 'ACC' in self.deconv:
412
		trace.data = rs.np.gradient(trace.data, 1)
413 1
	elif 'GRAV' in self.deconv:
414
		trace.data = rs.np.gradient(trace.data, 1) / rs.g
415
		trace.stats.units = 'Earth gravity'
416 1
	elif 'DISP' in self.deconv:
417
		trace.data = rs.np.cumsum(trace.data)
418
		trace.taper(max_percentage=0.1, side='left', max_length=1)
419
		trace.detrend(type='demean')
420
	else:
421 1
		trace.stats.units = 'Velocity'
422
423
424 1
def deconv_acc_inst(self, trace, output):
425
	'''
426
	.. role:: pycode(code)
427
		:language: python
428
429
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
430
	for acceleration channels.
431
432
	:param self self: The self object of the sub-consumer class calling this function.
433
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
434
	'''
435 1
	if self.deconv not in 'CHAN':
436
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
437
								output=output, water_level=4.5, taper=False)
438
	else:
439 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
440
								output='ACC', water_level=4.5, taper=False)
441 1
	if 'VEL' in self.deconv:
442
		trace.data = rs.np.cumsum(trace.data)
443
		trace.detrend(type='demean')
444 1
	elif 'DISP' in self.deconv:
445
		trace.data = rs.np.cumsum(rs.np.cumsum(trace.data))
446
		trace.detrend(type='linear')
447 1
	elif 'GRAV' in self.deconv:
448
		trace.data = trace.data / rs.g
449
		trace.stats.units = 'Earth gravity'
450
	else:
451 1
		trace.stats.units = 'Acceleration'
452 1
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
453
		trace.taper(max_percentage=0.1, side='left', max_length=1)
454
455
456 1
def deconv_rbm_inst(self, trace, output):
457
	'''
458
	.. role:: pycode(code)
459
		:language: python
460
461
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
462
	for Raspberry Boom pressure transducer channels.
463
464
	.. note::
465
466
		The Raspberry Boom pressure transducer does not currently have a
467
		deconvolution function. The Raspberry Shake team is working on a
468
		calibration for the Boom, but until then Boom units are given in
469
		counts.
470
471
	:param self self: The self object of the sub-consumer class calling this function.
472
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
473
	'''
474
	trace.stats.units = ' counts'
475
476
477 1
def deconvolve(self):
478
	'''
479
	.. role:: pycode(code)
480
		:language: python
481
482
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
483
	that need to deconvolve their raw data to metric units.
484
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
485
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
486
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
487
488
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
489
	'''
490 1
	acc_channels = ['ENE', 'ENN', 'ENZ']
491 1
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
492 1
	rbm_channels = ['HDF']
493
494 1
	self.stream = self.raw.copy()
495 1
	for trace in self.stream:
496 1
		trace.stats.units = self.units
497 1
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
498 1
		print('hello>>>', trace, output)
499 1
		if self.deconv:
500 1
			if trace.stats.channel in vel_channels:
501 1
				deconv_vel_inst(self, trace, output)	# geophone channels
502
503 1
			elif trace.stats.channel in acc_channels:
504 1
				deconv_acc_inst(self, trace, output)	# accelerometer channels
505
506
			elif trace.stats.channel in rbm_channels:
507
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
508
509
			else:
510
				trace.stats.units = ' counts'	# this is a new one
511
512
		else:
513
			trace.stats.units = ' counts'		# this is not being deconvolved
514
515
516 1
def resolve_extra_text(extra_text, max_len, sender='helpers'):
517
	'''
518
	.. role:: pycode(code)
519
		:language: python
520
521
	.. versionadded:: 1.0.3
522
523
	A central helper function for the :class:`rsudp.c_telegram.Tweeter`
524
	and :class:`rsudp.c_telegram.Telegrammer` classes that checks whether
525
	the :pycode:`"extra_text"` parameter (in the settings file) is of appropriate
526
	length. This is done to avoid errors when posting alerts.
527
	The function will truncate longer messages.
528
529
	:param str extra_text: String of additional characters to post as part of the alert message (longer messages will be truncated).
530
	:param str max_len: Upper limit of characters accepted in message (280 for Twitter, 4096 for Telegram).
531
	:param str sender: String identifying the origin of the use of this function (:pycode:`self.sender` in the source function).
532
	:rtype: str
533
	:return: the message string to be incorporated
534
535
	'''
536 1
	allowable_len = max_len - 177	# length of string allowable given maximum message text & region
537 1
	if ((extra_text == '') or (extra_text == None) or (extra_text == False)):
538 1
		return ''
539
	else:
540
		extra_text = str(extra_text)
541
		len_ex_txt = len(extra_text)
542
543
		if len_ex_txt > allowable_len:
544
			printW('extra_text parameter is longer than allowable (%s chars) and will be truncated. Please keep extra_text at or below %s characters.' % (len_ex_txt, allowable_len), sender=sender)
545
			extra_text = extra_text[:allowable_len]
546
547
		return ' %s' % (extra_text)
548