build.rsudp.helpers.deconv_vel_inst()   A
last analyzed

Complexity

Conditions 5

Size

Total Lines 28
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 8.125

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 28
ccs 7
cts 14
cp 0.5
rs 9.1333
c 0
b 0
f 0
cc 5
nop 3
crap 8.125
1 1
import rsudp.raspberryshake as rs
2 1
from rsudp import COLOR, printM, printW
3 1
import os
4 1
import json
5
6
7 1
def dump_default(settings_loc, default_settings):
8
	'''
9
	Dumps a default settings file to a specified location.
10
11
	:param str settings_loc: The location to create the new settings JSON.
12
	:param str default_settings: The default settings to dump to file.
13
	'''
14
	print('Creating a default settings file at %s' % settings_loc)
15
	with open(settings_loc, 'w+') as f:
16
		f.write(default_settings)
17
		f.write('\n')
18
19
20 1
def default_settings(output_dir='%s/rsudp' % os.path.expanduser('~').replace('\\', '/'), verbose=True):
21
	'''
22
	Returns a formatted json string of default settings.
23
24
	:param str output_dir: the user's specified output location. defaults to ``~/rsudp``.
25
	:param bool verbose: if ``True``, displays some information as the string is created.
26
	:return: default settings string in formatted json
27
	:rtype: str
28
	'''
29 1
	def_settings = r"""{
30
"settings": {
31
    "port": 8888,
32
    "station": "Z0000",
33
    "output_dir": "%s",
34
    "debug": true},
35
"printdata": {
36
    "enabled": false},
37
"write": {
38
    "enabled": false,
39
    "channels": ["all"]},
40
"plot": {
41
    "enabled": true,
42
    "duration": 90,
43
    "spectrogram": true,
44
    "fullscreen": false,
45
    "kiosk": false,
46
    "eq_screenshots": false,
47
    "channels": ["all"],
48
    "deconvolve": true,
49
    "units": "CHAN"},
50
"forward": {
51
    "enabled": false,
52
    "address": ["192.168.1.254"],
53
    "port": [8888],
54
    "channels": ["all"],
55
    "fwd_data": true,
56
    "fwd_alarms": false},
57
"alert": {
58
    "enabled": true,
59
    "channel": "HZ",
60
    "sta": 6,
61
    "lta": 30,
62
    "threshold": 3.95,
63
    "reset": 0.9,
64
    "highpass": 0.8,
65
    "lowpass": 9,
66
    "deconvolve": false,
67
    "units": "VEL"},
68
"alertsound": {
69
    "enabled": false,
70
    "mp3file": "doorbell"},
71
"custom": {
72
    "enabled": false,
73
    "codefile": "n/a",
74
    "win_override": false},
75
"tweets": {
76
    "enabled": false,
77
    "tweet_images": true,
78
    "api_key": "n/a",
79
    "api_secret": "n/a",
80
    "access_token": "n/a",
81
    "access_secret": "n/a",
82
    "extra_text": ""},
83
"telegram": {
84
    "enabled": false,
85
    "send_images": true,
86
    "token": "n/a",
87
    "chat_id": "n/a",
88
    "extra_text": ""},
89
"rsam": {
90
    "enabled": false,
91
    "quiet": true,
92
    "fwaddr": "192.168.1.254",
93
    "fwport": 8887,
94
    "fwformat": "LITE",
95
    "channel": "HZ",
96
    "interval": 10,
97
    "deconvolve": false,
98
    "units": "VEL"}
99
}
100
101
""" % (output_dir)
102 1
	if verbose:
103
		print('By default output_dir is set to %s' % output_dir)
104 1
	return def_settings
105
106
107 1
def read_settings(loc):
108
	'''
109
	Reads settings from a specific location.
110
111
	:param str loc: location on disk to read json settings file from
112
	:return: settings dictionary read from JSON, or ``None``
113
	:rtype: dict or NoneType
114
	'''
115
	settings_loc = os.path.abspath(os.path.expanduser(loc)).replace('\\', '/')
116
	settings = None
117
	with open(settings_loc, 'r') as f:
118
		try:
119
			data = f.read().replace('\\', '/')
120
			settings = json.loads(data)
121
		except Exception as e:
122
			print(COLOR['red'] + 'ERROR: Could not load settings file. Perhaps the JSON is malformed?' + COLOR['white'])
123
			print(COLOR['red'] + '       detail: %s' % e + COLOR['white'])
124
			print(COLOR['red'] + '       If you would like to overwrite and rebuild the file, you can enter the command below:' + COLOR['white'])
125
			print(COLOR['bold'] + '       shake_client -d %s' % loc + COLOR['white'])
126
			exit(2)
127
	return settings
128
129
130 1
def set_channels(self, cha):
131
	'''
132
	This function sets the channels available for plotting. Allowed units are as follows:
133
134
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
135
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
136
	- ``["HDF"]`` - pressure transducer channel
137
	- ``["all"]`` - all available channels
138
139
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
140
	(geophone and vertical accelerometer) you could specify:
141
142
	``["EHZ", "ENZ"]``
143
144
	You can also specify partial channel names.
145
	So for example, the following will display at least one channel from any
146
	Raspberry Shake instrument:
147
148
	``["HZ", "HDF"]``
149
150
	Or if you wanted to display only vertical channels from a RS4D,
151
	you could specify
152
153
	``["Z"]``
154
155
	which would match both ``"EHZ"`` and ``"ENZ"``.
156
157
	:param self self: self object of the class calling this function
158
	:param cha: the channel or list of channels to plot
159
	:type cha: list or str
160
	'''
161 1
	cha = rs.chns if ('all' in cha) else cha
162 1
	cha = list(cha) if isinstance(cha, str) else cha
163 1
	for c in rs.chns:
164 1
		n = 0
165 1
		for uch in cha:
166 1
			if (uch.upper() in c) and (c not in str(self.chans)):
167 1
				self.chans.append(c)
168 1
			n += 1
169 1
	if len(self.chans) < 1:
170
			self.chans = rs.chns
171
172
173 1
def fsec(ti):
174
	'''
175
	.. versionadded:: 0.4.3
176
177
	The Raspberry Shake records at hundredths-of-a-second precision.
178
	In order to report time at this precision, we need to do some time-fu.
179
180
	This function rounds the microsecond fraction of a
181
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
182
	depending on its precision, so that it accurately reflects the Raspberry Shake's
183
	event measurement precision.
184
185
	This is necessary because datetime objects in Python are strange and confusing, and
186
	strftime doesn't support fractional returns, only the full integer microsecond field
187
	which is an integer right-padded with zeroes. This function uses the ``precision``
188
	of a datetime object.
189
190
	For example:
191
192
	.. code-block:: python
193
194
		>>> from obspy import UTCDateTime
195
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
196
		>>> fsec(ti)
197
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
198
199
	:param ti: time object to convert microseconds for
200
	:type ti: obspy.core.utcdatetime.UTCDateTime
201
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
202
	:rtype: obspy.core.utcdatetime.UTCDateTime
203
	'''
204
	# time in python is weird and confusing, but luckily obspy is better than Python
205
	# at dealing with datetimes. all we need to do is tell it what precision we want
206
	# and it handles the rounding for us.
207 1
	return rs.UTCDateTime(ti, precision=2)
208
209
210 1
def lesser_multiple(x, base=10):
211
	'''
212
	.. versionadded:: 1.0.3
213
214
	This function calculates the nearest multiple of the base number ``base``
215
	for the number ``x`` passed to it, as long as the result is less than ``x``.
216
217
	This is useful for :func:`rsudp.packetize` when figuring out where to cut
218
	off samples when trying to fit them into packets.
219
	'''
220 1
	return int(base * int(float(x)/base))
221
222
223 1
def conn_stats(TESTING=False):
224
	'''
225
	Print some stats about the connection.
226
227
	Example:
228
229
	.. code-block:: python
230
231
		>>> conn_stats()
232
		2020-03-25 01:35:04 [conn_stats] Initialization stats:
233
		2020-03-25 01:35:04 [conn_stats]                 Port: 18069
234
		2020-03-25 01:35:04 [conn_stats]   Sending IP address: 192.168.0.4
235
		2020-03-25 01:35:04 [conn_stats]     Set station name: R24FA
236
		2020-03-25 01:35:04 [conn_stats]   Number of channels: 4
237
		2020-03-25 01:35:04 [conn_stats]   Transmission freq.: 250 ms/packet
238
		2020-03-25 01:35:04 [conn_stats]    Transmission rate: 4 packets/sec
239
		2020-03-25 01:35:04 [conn_stats]   Samples per second: 100 sps
240
		2020-03-25 01:35:04 [conn_stats]            Inventory: AM.R24FA (Raspberry Shake Citizen Science Station)
241
242
	:param bool TESTING: if ``True``, text is printed to the console in yellow. if not, in white.
243
	'''
244 1
	s = 'conn_stats'
245 1
	pf = printW if TESTING else printM
246 1
	pf('Initialization stats:', sender=s, announce=False)
247 1
	pf('                Port: %s' % rs.port, sender=s, announce=False)
248 1
	pf('  Sending IP address: %s' % rs.firstaddr, sender=s, announce=False)
249 1
	pf('    Set station name: %s' % rs.stn, sender=s, announce=False)
250 1
	pf('  Number of channels: %s' % rs.numchns, sender=s, announce=False)
251 1
	pf('  Transmission freq.: %s ms/packet' % rs.tf, sender=s, announce=False)
252 1
	pf('   Transmission rate: %s packets/sec' % rs.tr, sender=s, announce=False)
253 1
	pf('  Samples per second: %s sps' % rs.sps, sender=s, announce=False)
254 1
	if rs.inv:
255 1
		pf('           Inventory: %s' % rs.inv.get_contents()['stations'][0],
256
			   sender=s, announce=False)
257
258
259 1
def msg_alarm(event_time):
260
	'''
261
	This function constructs the ``ALARM`` message as a bytes object.
262
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
263
	to construct alarm queue messages.
264
265
	For example:
266
267
	.. code-block:: python
268
269
		>>> from obspy import UTCDateTime
270
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
271
		>>> msg_alarm(ti)
272
		b'ALARM 2020-01-01T00:00:00.599Z'
273
274
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
275
	:rtype: bytes
276
	:return: the ``ALARM`` message, ready to be put on the queue
277
	'''
278 1
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
279
280
281 1
def msg_reset(reset_time):
282
	'''
283
	This function constructs the ``RESET`` message as a bytes object.
284
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
285
	to construct reset queue messages.
286
287
	For example:
288
289
	.. code-block:: python
290
291
		>>> from obspy import UTCDateTime
292
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
293
		>>> msg_reset(ti)
294
		b'RESET 2020-01-01T00:00:00.599Z'
295
296
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
297
	:rtype: bytes
298
	:return: the ``RESET`` message, ready to be put on the queue
299
	'''
300 1
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
301
302
303 1
def msg_imgpath(event_time, figname):
304
	'''
305
	This function constructs the ``IMGPATH`` message as a bytes object.
306
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
307
	to construct queue messages containing timestamp and saved image path.
308
309
	For example:
310
311
	.. code-block:: python
312
313
		>>> from obspy import UTCDateTime
314
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
315
		>>> path = '/home/pi/rsudp/screenshots/test.png'
316
		>>> msg_imgpath(ti, path)
317
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
318
319
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
320
	:param str figname: the figure path as a string
321
	:rtype: bytes
322
	:return: the ``IMGPATH`` message, ready to be put on the queue
323
	'''
324 1
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
325
326
327 1
def msg_term():
328
	'''
329
	This function constructs the simple ``TERM`` message as a bytes object.
330
331
	.. code-block:: python
332
333
		>>> msg_term()
334
		b'TERM'
335
336
337
	:rtype: bytes
338
	:return: the ``TERM`` message
339
	'''
340 1
	return b'TERM'
341
342
343 1
def get_msg_time(msg):
344
	'''
345
	This function gets the time from ``ALARM``, ``RESET``,
346
	and ``IMGPATH`` messages as a UTCDateTime object.
347
348
	For example:
349
350
	.. code-block:: python
351
352
		>>> from obspy import UTCDateTime
353
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
354
		>>> path = '/home/pi/rsudp/screenshots/test.png'
355
		>>> msg = msg_imgpath(ti, path)
356
		>>> msg
357
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
358
		>>> get_msg_time(msg)
359
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
360
361
	:param bytes msg: the bytes-formatted queue message to decode
362
	:rtype: obspy.core.utcdatetime.UTCDateTime
363
	:return: the time embedded in the message
364
	'''
365 1
	return rs.UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
366
367
368 1
def get_msg_path(msg):
369
	'''
370
	This function gets the path from ``IMGPATH`` messages as a string.
371
372
	For example:
373
374
	.. code-block:: python
375
376
		>>> from obspy import UTCDateTime
377
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
378
		>>> path = '/home/pi/rsudp/screenshots/test.png'
379
		>>> msg = msg_imgpath(ti, path)
380
		>>> msg
381
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
382
		>>> get_msg_path(msg)
383
		'/home/pi/rsudp/screenshots/test.png'
384
385
	:param bytes msg: the bytes-formatted queue message to decode
386
	:rtype: str
387
	:return: the path embedded in the message
388
	'''
389 1
	return msg.decode('utf-8').split(' ')[2]
390
391
392 1
def deconv_vel_inst(self, trace, output):
393
	'''
394
	.. role:: pycode(code)
395
		:language: python
396
	
397
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
398
	for velocity channels.
399
400
	:param self self: The self object of the sub-consumer class calling this function.
401
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
402
	'''
403 1
	if self.deconv not in 'CHAN':
404
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
405
								output=output, water_level=4.5, taper=False)
406
	else:
407 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
408
								output='VEL', water_level=4.5, taper=False)
409 1
	if 'ACC' in self.deconv:
410
		trace.data = rs.np.gradient(trace.data, 1)
411 1
	elif 'GRAV' in self.deconv:
412
		trace.data = rs.np.gradient(trace.data, 1) / rs.g
413
		trace.stats.units = 'Earth gravity'
414 1
	elif 'DISP' in self.deconv:
415
		trace.data = rs.np.cumsum(trace.data)
416
		trace.taper(max_percentage=0.1, side='left', max_length=1)
417
		trace.detrend(type='demean')
418
	else:
419 1
		trace.stats.units = 'Velocity'
420
421
422 1
def deconv_acc_inst(self, trace, output):
423
	'''
424
	.. role:: pycode(code)
425
		:language: python
426
	
427
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
428
	for acceleration channels.
429
430
	:param self self: The self object of the sub-consumer class calling this function.
431
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
432
	'''
433 1
	if self.deconv not in 'CHAN':
434
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
435
								output=output, water_level=4.5, taper=False)
436
	else:
437 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
438
								output='ACC', water_level=4.5, taper=False)
439 1
	if 'VEL' in self.deconv:
440
		trace.data = rs.np.cumsum(trace.data)
441
		trace.detrend(type='demean')
442 1
	elif 'DISP' in self.deconv:
443
		trace.data = rs.np.cumsum(rs.np.cumsum(trace.data))
444
		trace.detrend(type='linear')
445 1
	elif 'GRAV' in self.deconv:
446
		trace.data = trace.data / rs.g
447
		trace.stats.units = 'Earth gravity'
448
	else:
449 1
		trace.stats.units = 'Acceleration'
450 1
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
451
		trace.taper(max_percentage=0.1, side='left', max_length=1)
452
453
454 1
def deconv_rbm_inst(self, trace, output):
455
	'''
456
	.. role:: pycode(code)
457
		:language: python
458
	
459
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
460
	for Raspberry Boom pressure transducer channels.
461
462
	.. note::
463
464
		The Raspberry Boom pressure transducer does not currently have a
465
		deconvolution function. The Raspberry Shake team is working on a
466
		calibration for the Boom, but until then Boom units are given in
467
		counts.
468
469
	:param self self: The self object of the sub-consumer class calling this function.
470
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
471
	'''
472
	trace.stats.units = ' counts'
473
474
475 1
def deconvolve(self):
476
	'''
477
	.. role:: pycode(code)
478
		:language: python
479
	
480
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
481
	that need to deconvolve their raw data to metric units.
482
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
483
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
484
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
485
486
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
487
	'''
488 1
	acc_channels = ['ENE', 'ENN', 'ENZ']
489 1
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
490 1
	rbm_channels = ['HDF']
491
492 1
	self.stream = self.raw.copy()
493 1
	for trace in self.stream:
494 1
		trace.stats.units = self.units
495 1
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
496 1
		if self.deconv:
497 1
			if trace.stats.channel in vel_channels:
498 1
				deconv_vel_inst(self, trace, output)	# geophone channels
499
500 1
			elif trace.stats.channel in acc_channels:
501 1
				deconv_acc_inst(self, trace, output)	# accelerometer channels
502
503
			elif trace.stats.channel in rbm_channels:
504
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
505
506
			else:
507
				trace.stats.units = ' counts'	# this is a new one
508
509
		else:
510
			trace.stats.units = ' counts'		# this is not being deconvolved
511
512
513 1
def resolve_extra_text(extra_text, max_len, sender='helpers'):
514
	'''
515
	.. role:: pycode(code)
516
		:language: python
517
518
	.. versionadded:: 1.0.3
519
520
	A central helper function for the :class:`rsudp.c_telegram.Tweeter`
521
	and :class:`rsudp.c_telegram.Telegrammer` classes that checks whether
522
	the :pycode:`"extra_text"` parameter (in the settings file) is of appropriate
523
	length. This is done to avoid errors when posting alerts.
524
	The function will truncate longer messages.
525
526
	:param str extra_text: String of additional characters to post as part of the alert message (longer messages will be truncated).
527
	:param str max_len: Upper limit of characters accepted in message (280 for Twitter, 4096 for Telegram).
528
	:param str sender: String identifying the origin of the use of this function (:pycode:`self.sender` in the source function).
529
	:rtype: str
530
	:return: the message string to be incorporated
531
532
	'''
533 1
	allowable_len = max_len - 177	# length of string allowable given maximum message text & region
534 1
	if ((extra_text == '') or (extra_text == None) or (extra_text == False)):
535 1
		return ''
536
	else:
537
		extra_text = str(extra_text)
538
		len_ex_txt = len(extra_text)
539
540
		if len_ex_txt > allowable_len:
541
			printW('extra_text parameter is longer than allowable (%s chars) and will be truncated. Please keep extra_text at or below %s characters.' % (len_ex_txt, allowable_len), sender=sender)
542
			extra_text = extra_text[:allowable_len]
543
544
		return ' %s' % (extra_text)
545