Passed
Branch master (7ffd9c)
by Ian
04:15
created

build.rsudp.raspberryshake.msg_imgpath()   A

Complexity

Conditions 1

Size

Total Lines 22
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 22
rs 10
c 0
b 0
f 0
1
from datetime import datetime, timedelta
2
import time
3
import math
4
import numpy as np
5
import sys, os, platform
6
import socket as s
7
import signal
8
from obspy import UTCDateTime
9
from obspy.core.stream import Stream
10
from obspy import read_inventory
11
from obspy.geodetics.flinnengdahl import FlinnEngdahl
12
from obspy.core.trace import Trace
13
from rsudp import printM, printW, printE
14
from requests.exceptions import HTTPError
15
from threading import Thread
16
17
initd, sockopen = False, False
18
qsize = 2048 			# max queue size
19
port = 8888				# default listening port
20
to = 10					# socket test timeout
21
firstaddr = ''			# the first address data is received from
22
inv = False				# station inventory
23
INVWARN = False			# warning when inventory attachment fails
24
region = False
25
producer = False 		# flag for producer status
26
stn = 'Z0000'			# station name
27
net = 'AM'				# network (this will always be AM)
28
chns = []				# list of channels
29
numchns = 0
30
31
tf = None				# transmission frequency in ms
32
tr = None				# transmission rate in packets per second
33
sps = None				# samples per second
34
35
# conversion units
36
# 		'name',	: ['pretty name', 'unit display']
37
UNITS = {'ACC'	: ['Acceleration', 'm/s$^2$'],
38
		 'GRAV'	: ['Earth gravity', ' g'],
39
		 'VEL'	: ['Velocity', 'm/s'],
40
		 'DISP'	: ['Displacement', 'm'],
41
		 'CHAN'	: ['channel-specific', ' Counts']}
42
43
g = 9.81	# earth gravity in m/s2
44
45
46
# get an IP to report to the user
47
# from https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
48
def get_ip():
49
	'''
50
	.. |so_ip| raw:: html
51
52
		<a href="https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib" target="_blank">this stackoverflow answer</a>
53
54
55
	Return a reliable network IP to report to the user when there is no data received.
56
	This helps the user set their Raspberry Shake's datacast streams to point to the correct location
57
	if the library raises a "no data received" error.
58
	Solution adapted from |so_ip|.
59
60
	.. code-block:: python
61
62
		>>> get_ip()
63
		'192.168.1.23'
64
65
	:rtype: str
66
	:return: The network IP of the machine that this program is running on
67
	'''
68
69
	testsock = s.socket(s.AF_INET, s.SOCK_DGRAM)
70
	try:
71
		# doesn't even have to be reachable
72
		testsock.connect(('10.255.255.255', 1))
73
		IP = testsock.getsockname()[0]
74
	except:
75
		IP = '127.0.0.1'
76
	finally:
77
		testsock.close()
78
	return IP
79
80
ip = get_ip()
81
82
# construct a socket
83
socket_type =  s.SOCK_DGRAM
84
sock = s.socket(s.AF_INET, socket_type)
85
if platform.system() not in 'Windows':
86
    sock.setsockopt(s.SOL_SOCKET, s.SO_REUSEADDR, 1)
87
88
def handler(signum, frame, ip=ip):
89
	'''
90
	The signal handler for the nodata alarm.
91
92
	:param int signum: signal number
93
	:param int frame: frame number
94
	:param str ip: the IP of the box this program is running on (i.e. the device the Raspberry Shake should send data to)
95
	:raise IOError: on UNIX systems if no data is received
96
	'''
97
	global port
98
	printE('No data received in %s seconds; aborting.' % (to), sender='Init')
99
	printE('Check that the Shake is forwarding data to:', sender='Init', announce=False, spaces=True)
100
	printE('IP address: %s    Port: %s' % (ip, port), sender='Init', announce=False, spaces=True)
101
	printE('and that no firewall exists between the Shake and this computer.', sender='Init', announce=False, spaces=True)
102
	raise IOError('No data received')
103
104
105
def initRSlib(dport=port, rsstn='Z0000', timeout=10):
106
	'''
107
	.. role:: pycode(code)
108
		:language: python
109
110
	Initializes this library (:py:func:`rsudp.raspberryshake`).
111
	Set values for data port, station, network, and port timeout prior to opening the socket.
112
	Calls both :py:func:`rsudp.raspberryshake.openSOCK` and :py:func:`rsudp.raspberryshake.set_params`.
113
114
	.. code-block:: python
115
116
		>>> import rsudp.raspberryshake as rs
117
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
118
119
	The library is now initialized:
120
121
	.. code-block:: python
122
123
		>>> rs.initd
124
		True
125
126
	:param int dport: The local port the Raspberry Shake is sending UDP data packets to. Defaults to :pycode:`8888`.
127
	:param str rsstn: The name of the station (something like :pycode:`'RCB43'` or :pycode:`'S0CDE'`)
128
	:param int timeout: The number of seconds for :py:func:`rsudp.raspberryshake.set_params` to wait for data before an error is raised (zero for unlimited wait)
129
130
	:rtype: str
131
	:return: The instrument channel as a string
132
133
	'''
134
	global port, stn, to, initd, port
135
	global producer
136
	sender = 'Init'
137
	printM('Initializing.', sender)
138
	try:						# set port value first
139
		if dport == int(dport):
140
			port = int(dport)
141
		else:
142
			port = int(dport)
143
			printW('Supplied port value was converted to integer. Non-integer port numbers are invalid.')
144
	except Exception as e:
145
		printE('Details - %s' % e)
146
147
	try:						# set station name
148
		if len(rsstn) == 5:
149
			stn = str(rsstn).upper()
150
		else:
151
			stn = str(rsstn).upper()
152
			printW('Station name does not follow Raspberry Shake naming convention.')
153
	except ValueError as e:
154
		printE('Invalid station name supplied. Details: %s' % e)
155
		printE('reverting to station name Z0000', announce=False, spaces=True)
156
	except Exception as e:
157
		printE('Details - %s' % e)
158
	
159
	try:						# set timeout value 
160
		to = int(timeout)
161
	except ValueError as e:
162
		printW('You likely supplied a non-integer as the timeout value. Your value was: %s'
163
				% timeout)
164
		printW('Continuing with default timeout of %s sec'
165
				% (to), announce=False, spaces=True)
166
		printW('details: %s' % e, announce=False, spaces=True)
167
	except Exception as e:
168
		printE('Details - %s' % e)
169
170
	initd = True				# if initialization goes correctly, set initd to true
171
	openSOCK()					# open a socket
172
	printM('Waiting for UDP data on port %s...' % (port), sender)
173
	set_params()				# get data and set parameters
174
175
def openSOCK(host=''):
176
	'''
177
	.. role:: pycode(code)
178
		:language: python
179
180
	Initialize a socket at the port specified by :pycode:`rsudp.raspberryshake.port`.
181
	Called by :py:func:`rsudp.raspberryshake.initRSlib`, must be done before :py:func:`rsudp.raspberryshake.set_params`.
182
183
	:param str host: self-referential location at which to open a listening port (defaults to :pycode:`''` which resolves to :pycode:`'localhost'`)
184
	:raise IOError: if the library is not initialized (:py:func:`rsudp.raspberryshake.initRSlib`) prior to running this function
185
	:raise OSError: if the program cannot bind to the specified port number
186
187
	'''
188
	global sockopen
189
	sockopen = False
190
	if initd:
191
		HP = '%s:%s' % ('localhost',port)
192
		printM("Opening socket on %s (HOST:PORT)"
193
				% HP, 'openSOCK')
194
		try:
195
			sock.bind((host, port))
196
			sockopen = True
197
		except Exception as e:
198
			printE('Could not bind to port %s. Is another program using it?' % port)
199
			printE('Detail: %s' % e, announce=False)
200
			raise OSError(e)
201
	else:
202
		raise IOError("Before opening a socket, you must initialize this raspberryshake library by calling initRSlib(dport=XXXXX, rssta='R0E05') first.")
203
204
def set_params():
205
	'''
206
	.. role:: pycode(code)
207
		:language: python
208
209
	Read a data packet off the port.
210
	Called by :py:func:`rsudp.raspberryshake.initRSlib`,
211
	must be done after :py:func:`rsudp.raspberryshake.openSOCK`
212
	but before :py:func:`rsudp.raspberryshake.getDATA`.
213
	Will wait :pycode:`rsudp.raspberryshake.to` seconds for data before raising a no data exception
214
	(only available with UNIX socket types).
215
216
	'''
217
	global to, firstaddr
218
	if os.name not in 'nt': 	# signal alarm not available on windows
219
		signal.signal(signal.SIGALRM, handler)
220
		signal.alarm(to)		# alarm time set with timeout value
221
	data, (firstaddr, connport) = sock.recvfrom(2048)
222
	if os.name not in 'nt':
223
		signal.alarm(0)			# once data has been received, turn alarm completely off
224
	to = 0						# otherwise it erroneously triggers after keyboardinterrupt
225
	getTR(getCHNS()[0])
226
	getSR(tf, data)
227
	getTTLCHN()
228
	printM('Available channels: %s' % chns, 'Init')
229
	get_inventory()
230
231
def getDATA():
232
	'''
233
	Read a data packet off the port.
234
235
	In this example, we get a Shake 1Dv7 data packet:
236
237
	.. code-block:: python
238
239
		>>> import rsudp.raspberryshake as rs
240
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
241
		>>> d = rs.getDATA()
242
		>>> d
243
		b"{'EHZ', 1582315130.292, 14168, 14927, 16112, 17537, 18052, 17477,
244
		15418, 13716, 15604, 17825, 19637, 20985, 17325, 10439, 11510, 17678,
245
		20027, 20207, 18481, 15916, 13836, 13073, 14462, 17628, 19388}"
246
247
248
	:rtype: bytes
249
	:return: Returns a data packet as an encoded bytes object.
250
251
	:raise IOError: if no socket is open (:py:func:`rsudp.raspberryshake.openSOCK`) prior to running this function
252
	:raise IOError: if the library is not initialized (:py:func:`rsudp.raspberryshake.initRSlib`) prior to running this function
253
254
	'''
255
	global to, firstaddr
256
	if sockopen:
257
		return sock.recv(4096)
258
	else:
259
		if initd:
260
			raise IOError("No socket is open. Please open a socket using this library's openSOCK() function.")
261
		else:
262
			raise IOError('No socket is open. Please initialize the library using initRSlib() then open a socket using openSOCK().')
263
	
264
def getCHN(DP):
265
	'''
266
	Extract the channel information from the data packet.
267
	Requires :py:func:`rsudp.raspberryshake.getDATA` packet as argument.
268
269
	In this example, we get the channel code from a Shake 1Dv7 data packet:
270
271
	.. code-block:: python
272
273
		>>> import rsudp.raspberryshake as rs
274
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
275
		>>> d = rs.getDATA()
276
		>>> rs.getCHN(d)
277
		'EHZ'
278
279
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse channel information from
280
	:type DP: bytes
281
	:rtype: str
282
	:return: Returns the instrument channel as a string.
283
	'''
284
	return str(DP.decode('utf-8').split(",")[0][1:]).strip("\'")
285
	
286
def getTIME(DP):
287
	'''
288
	Extract the timestamp from the data packet.
289
	Timestamp is seconds since 1970-01-01 00:00:00Z,
290
	which can be passed directly to an :py:class:`obspy.core.utcdatetime.UTCDateTime` object:
291
292
	In this example, we get the timestamp of a Shake 1Dv7 data packet and convert it to a UTCDateTime:
293
294
	.. code-block:: python
295
296
		>>> import rsudp.raspberryshake as rs
297
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
298
		>>> from obspy import UTCDateTime
299
		>>> d = rs.getDATA()
300
		>>> t = rs.getTIME(d)
301
		>>> t
302
		1582315130.292
303
		>>> dt = obspy.UTCDateTime(t, precision=3)
304
		>>> dt
305
		UTCDateTime(2020, 2, 21, 19, 58, 50, 292000)
306
307
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse time information from
308
	:type DP: bytes
309
	:rtype: float
310
	:return: Timestamp in decimal seconds since 1970-01-01 00:00:00Z
311
	'''
312
	return float(DP.split(b",")[1])
313
314
def getSTREAM(DP):
315
	'''
316
	Get the samples in a data packet as a list object.
317
	Requires :py:func:`rsudp.raspberryshake.getDATA` packet as argument.
318
319
	In this example, we get a list of samples from a Shake 1Dv7 data packet:
320
321
	.. code-block:: python
322
323
		>>> import rsudp.raspberryshake as rs
324
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
325
		>>> d = rs.getDATA()
326
		>>> s = rs.getSTREAM(d)
327
		>>> s
328
		[14168, 14927, 16112, 17537, 18052, 17477, 15418, 13716, 15604,
329
		 17825, 19637, 20985, 17325, 10439, 11510, 17678, 20027, 20207,
330
		 18481, 15916, 13836, 13073, 14462, 17628, 19388]
331
332
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse stream information from
333
	:type DP: bytes
334
	:rtype: list
335
	:return: List of data samples in the packet
336
	'''
337
	return list(map(int, DP.decode('utf-8').replace('}','').split(',')[2:]))
338
339
def getTR(chn):				# DP transmission rate in msecs
340
	'''
341
	Get the transmission rate in milliseconds between consecutive packets from the same channel.
342
	Must wait to receive a second packet from the same channel.
343
	Requires a :py:func:`rsudp.raspberryshake.getCHN` or a channel name string as argument.
344
345
	In this example, we calculate the transmission frequency of a Shake 1Dv7:
346
347
	.. code-block:: python
348
349
		>>> import rsudp.raspberryshake as rs
350
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
351
		>>> d = rs.getDATA()
352
		>>> tr = rs.getTR(rs.getCHN(d))
353
		>>> tr
354
		250
355
356
	:param chn: The seismic instrument channel (:py:func:`rsudp.raspberryshake.getCHN`) to calculate transmission rate information from
357
	:type chn: str
358
	:rtype: int
359
	:return: Transmission rate in milliseconds between consecutive packets from a specific channel
360
	'''
361
	global tf, tr
362
	timeP1, timeP2 = 0.0, 0.0
363
	done = False
364
	while not done:
365
		DP = getDATA()
366
		CHAN = getCHN(DP)
367
		if CHAN == chn:
368
			if timeP1 == 0.0:
369
				timeP1 = getTIME(DP)
370
			else:
371
				timeP2 = getTIME(DP)
372
				done = True
373
	TR = timeP2*1000 - timeP1*1000
374
	tf = int(TR)
375
	tr = int(1000 / TR)
376
	return tf
377
378
def getSR(TR, DP):
379
	'''
380
	Get the sample rate in samples per second.
381
	Requires an integer transmission frequency and a data packet as arguments.
382
383
	In this example, we calculate the number of samples per second from a Shake 1Dv7:
384
385
	.. code-block:: python
386
387
		>>> import rsudp.raspberryshake as rs
388
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
389
		>>> d = rs.getDATA()
390
		>>> tr = rs.getTR(rs.getCHN(d))
391
		>>> tr
392
		250
393
		>>> sps = rs.getSR(tr, d)
394
		>>> sps
395
		100
396
397
398
	:param TR: The transmission frequency (:py:func:`rsudp.raspberryshake.getTR`) in milliseconds between packets
399
	:type TR: int
400
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) calculate sample rate information from
401
	:type DP: bytes
402
	:rtype: int
403
	:return: The sample rate in samples per second from a specific channel
404
	'''
405
	global sps
406
	sps = int((DP.count(b",") - 1) * 1000 / TR)
407
	return sps
408
	
409
def getCHNS():
410
	'''
411
	Get a list of channels sent to the port.
412
413
	In this example, we list channels from a Boom:
414
415
	.. code-block:: python
416
417
		>>> import rsudp.raspberryshake as rs
418
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
419
		>>> rs.getCHNS()
420
		['EHZ', 'HDF']
421
422
423
	:rtype: list
424
	:return: The list of channels being sent to the port (from the single IP address sending data)
425
	'''
426
	global chns
427
	chdict = {'EHZ': False, 'EHN': False, 'EHE': False,
428
			  'ENZ': False, 'ENN': False, 'ENE': False, 'HDF': False}
429
	firstCHN = ''
430
	done = False
431
	sim = 0
432
	while not done:
433
		DP = getDATA()
434
		if firstCHN == '':
435
			firstCHN = getCHN(DP)
436
			chns.append(firstCHN)
437
			continue
438
		nextCHN = getCHN(DP)
439
		if firstCHN == nextCHN:
440
			if sim > 1:
441
				done = True
442
				continue
443
			sim += 1
444
		else:
445
			chns.append(nextCHN)
446
	for ch in chns:
447
		chdict[ch] = True
448
	chns = []
449
	for ch in chdict:
450
		if chdict[ch] == True:
451
			chns.append(ch)
452
	return chns
453
454
def getTTLCHN():
455
	'''
456
	Calculate total number of channels received,
457
	by counting the number of channels returned by :py:func:`rsudp.raspberryshake.getCHNS`.
458
459
	In this example, we get the number of channels from a Shake & Boom:
460
461
	.. code-block:: python
462
463
		>>> import rsudp.raspberryshake as rs
464
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
465
		>>> rs.getTTLCHN()
466
		2
467
468
	:rtype: int
469
	:return: The number of channels being sent to the port (from the single IP address sending data)
470
	'''
471
	global numchns
472
	numchns = len(getCHNS())
473
	return numchns
474
475
476
def get_inventory(sender='get_inventory'):
477
	'''
478
	.. role:: pycode(code)
479
		:language: python
480
481
	Downloads the station inventory from the Raspberry Shake FDSN and stores
482
	it as an :py:class:`obspy.core.inventory.inventory.Inventory` object which is available globally.
483
484
	In this example, we get the R940D station inventory from the Raspberry Shake FDSN:
485
486
	.. code-block:: python
487
488
		>>> import rsudp.raspberryshake as rs
489
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
490
		>>> inv = rs.get_inventory()
491
		>>> print(inv)
492
		Inventory created at 2020-02-21T20:37:34.246777Z
493
			Sending institution: SeisComP3 (gempa testbed)
494
			Contains:
495
				Networks (1):
496
					AM
497
				Stations (1):
498
					AM.R940D (Raspberry Shake Citizen Science Station)
499
				Channels (2):
500
					AM.R940D.00.EHZ, AM.R940D.00.HDF
501
502
503
	:param sender: `(optional)` The name of the function calling the :py:func:`rsudp.printM` logging function
504
	:type str: str or None
505
	:rtype: obspy.core.inventory.inventory.Inventory or bool
506
	:return: The inventory of the Raspberry Shake station in the :pycode:`rsudp.raspberryshake.stn` variable.
507
	'''
508
	global inv, stn, region
509
	sender = 'get_inventory'
510
	if 'Z0000' in stn:
511
		printW('No station name given, continuing without inventory.',
512
				sender)
513
		inv = False
514
	else:
515
		try:
516
			printM('Fetching inventory for station %s.%s from Raspberry Shake FDSN.'
517
					% (net, stn), sender)
518
			url = 'https://fdsnws.raspberryshakedata.com/fdsnws/station/1/query?network=%s&station=%s&level=resp&nodata=404&format=xml' % (
519
				   net, stn)#, str(UTCDateTime.now()-timedelta(seconds=14400)))
520
			inv = read_inventory(url)
521
			region = FlinnEngdahl().get_region(inv[0][0].longitude, inv[0][0].latitude)
522
			printM('Inventory fetch successful. Station region is %s' % (region), sender)
523
		except (IndexError, HTTPError):
524
			printW('No inventory found for %s. Are you forwarding your Shake data?' % stn, sender)
525
			printW('Deconvolution will only be available if data forwarding is on.', sender, spaces=True)
526
			printW('Access the config page of the web front end for details.', sender, spaces=True)
527
			printW('More info at https://manual.raspberryshake.org/quickstart.html', sender, spaces=True)
528
			inv = False
529
			region = False
530
		except Exception as e:
531
			printE('Inventory fetch failed!', sender)
532
			printE('Error detail: %s' % e, sender, spaces=True)
533
			inv = False
534
			region = False
535
	return inv
536
537
538
def make_trace(d):
539
	'''
540
	Makes a trace and assigns it some values using a data packet.
541
542
	In this example, we make a trace object with some RS 1Dv7 data:
543
544
	.. code-block:: python
545
546
		>>> import rsudp.raspberryshake as rs
547
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
548
		>>> d = rs.getDATA()
549
		>>> t = rs.make_trace(d)
550
		>>> print(t)
551
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
552
553
	:param d: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse Trace information from
554
	:type d: bytes
555
	:rtype: obspy.core.trace.Trace
556
	:return: A fully formed Trace object to build a Stream with
557
	'''
558
	global INVWARN
559
	ch = getCHN(d)						# channel
560
	if ch:
561
		t = getTIME(d)				# unix epoch time since 1970-01-01 00:00:00Z; "timestamp" in obspy
562
		st = getSTREAM(d)				# samples in data packet in list [] format
563
		tr = Trace(data=np.ma.MaskedArray(st, dtype=np.int32))	# create empty trace
564
		tr.stats.network = net			# assign values
565
		tr.stats.location = '00'
566
		tr.stats.station = stn
567
		tr.stats.channel = ch
568
		tr.stats.sampling_rate = sps
569
		tr.stats.starttime = UTCDateTime(t, precision=3)
570
		if inv:
571
			try:
572
				tr.stats.response = inv.get_response(tr.id, tr.stats.starttime)
573
			except Exception as e:
574
				if not INVWARN:
575
					INVWARN = True
576
					printE(e, sender='make_trace')
577
					printE('Could not attach inventory response.', sender='make_trace')
578
					printE('Are you sure you set the station name correctly?', spaces=True, sender='make_trace')
579
					printE('This could indicate a mismatch in the number of data channels', spaces=True, sender='make_trace')
580
					printE('between the inventory and the stream. For example,', spaces=True, sender='make_trace')
581
					printE('if you are receiving RS4D data, please make sure', spaces=True, sender='make_trace')
582
					printE('the inventory you download has 4 channels.', spaces=True, sender='make_trace')
583
				else:
584
					pass
585
		return tr
586
587
588
# Then make repeated calls to this, to continue adding trace data to the stream
589
def update_stream(stream, d, **kwargs):
590
	'''
591
	Returns an updated Stream object with new data, merged down to one trace per available channel.
592
	Most sub-consumers call this each time they receive data packets in order to keep their obspy stream current.
593
594
	In this example, we make a stream object with some RS 1Dv7 data:
595
596
	.. code-block:: python
597
598
		>>> import rsudp.raspberryshake as rs
599
		>>> from obspy.core.stream import Stream
600
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
601
		>>> s = Stream()
602
		>>> d = rs.getDATA()
603
		>>> t = rs.make_trace(d)
604
		>>> s = rs.update_stream(s, d)
605
		>>> print(s)
606
		1 Trace(s) in Stream:
607
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
608
609
610
	:param obspy.core.stream.Stream stream: The stream to update
611
	:param d: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse Stream information from
612
	:type d: bytes
613
	:rtype: obspy.core.stream.Stream
614
	:return: A seismic data stream
615
	'''
616
	while True:
617
		try:
618
			return stream.append(make_trace(d)).merge(**kwargs)
619
		except TypeError:
620
			pass
621
622
623
def copy(orig):
624
	"""
625
	True-copy a stream by creating a new stream and copying old attributes to it.
626
	This is necessary because the old stream accumulates *something* that causes
627
	CPU usage to increase over time as more data is added. This is a bug in obspy
628
	that I intend to find--or at the very least report--but until then this hack
629
	works fine and is plenty fast enough.
630
631
	In this example, we make a stream object with some RS 1Dv7 data and then copy it to a new stream:
632
633
	.. code-block:: python
634
635
		>>> import rsudp.raspberryshake as rs
636
		>>> from obspy.core.stream import Stream
637
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
638
		>>> s = Stream()
639
		>>> d = rs.getDATA()
640
		>>> t = rs.make_trace(d)
641
		>>> s = rs.update_stream(s, d)
642
		>>> s
643
		1 Trace(s) in Stream:
644
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
645
		>>> s = rs.copy(s)
646
		>>> s
647
		1 Trace(s) in Stream:
648
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
649
650
651
	:param obspy.core.stream.Stream orig: The data stream to copy information from
652
	:rtype: obspy.core.stream.Stream
653
	:return: A low-memory copy of the passed data stream
654
655
	"""
656
	stream = Stream()
657
	for t in range(len(orig)):
658
		trace = Trace(data=orig[t].data)
659
		trace.stats.network = orig[t].stats.network
660
		trace.stats.location = orig[t].stats.location
661
		trace.stats.station = orig[t].stats.station
662
		trace.stats.channel = orig[t].stats.channel
663
		trace.stats.sampling_rate = orig[t].stats.sampling_rate
664
		trace.stats.starttime = orig[t].stats.starttime
665
		stream.append(trace).merge(fill_value=None)
666
	return stream.copy()
667
668
669
def fsec(ti):
670
	'''
671
	.. versionadded:: 0.4.3
672
673
	The Raspberry Shake records at hundredths-of-a-second precision.
674
	In order to report time at this precision, we need to do some time-fu.
675
676
	This function rounds the microsecond fraction of a
677
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
678
	depending on its precision, so that it accurately reflects the Raspberry Shake's
679
	event measurement precision.
680
681
	This is necessary because datetime objects in Python are strange and confusing, and
682
	strftime doesn't support fractional returns, only the full integer microsecond field
683
	which is an integer right-padded with zeroes. This function uses the ``precision``
684
	of a datetime object.
685
686
	For example:
687
688
	.. code-block:: python
689
690
		>>> from obspy import UTCDateTime
691
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
692
		>>> fsec(ti)
693
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
694
695
	:param ti: time object to convert microseconds for
696
	:type ti: obspy.core.utcdatetime.UTCDateTime
697
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
698
	:rtype: obspy.core.utcdatetime.UTCDateTime
699
	'''
700
	# time in python is weird and confusing, but luckily obspy is better than Python
701
	# at dealing with datetimes. all we need to do is tell it what precision we want
702
	# and it handles the rounding for us.
703
	return UTCDateTime(ti, precision=2)
704
705
706
def msg_alarm(event_time):
707
	'''
708
	This function constructs the ``ALARM`` message as a bytes object.
709
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
710
	to construct alarm queue messages.
711
712
	For example:
713
714
	.. code-block:: python
715
716
		>>> from obspy import UTCDateTime
717
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
718
		>>> msg_alarm(ti)
719
		b'ALARM 2020-01-01T00:00:00.599Z'
720
721
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
722
	:rtype: bytes
723
	:return: the ``ALARM`` message, ready to be put on the queue
724
	'''
725
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
726
727
728
def msg_reset(reset_time):
729
	'''
730
	This function constructs the ``RESET`` message as a bytes object.
731
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
732
	to construct reset queue messages.
733
734
	For example:
735
736
	.. code-block:: python
737
738
		>>> from obspy import UTCDateTime
739
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
740
		>>> msg_reset(ti)
741
		b'RESET 2020-01-01T00:00:00.599Z'
742
743
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
744
	:rtype: bytes
745
	:return: the ``RESET`` message, ready to be put on the queue
746
	'''
747
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
748
749
750
def msg_imgpath(event_time, figname):
751
	'''
752
	This function constructs the ``IMGPATH`` message as a bytes object.
753
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
754
	to construct queue messages containing timestamp and saved image path.
755
756
	For example:
757
758
	.. code-block:: python
759
760
		>>> from obspy import UTCDateTime
761
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
762
		>>> path = '/home/pi/rsudp/screenshots/test.png'
763
		>>> msg_imgpath(ti, path)
764
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
765
766
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
767
	:param str figname: the figure path as a string
768
	:rtype: bytes
769
	:return: the ``IMGPATH`` message, ready to be put on the queue
770
	'''
771
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
772
773
774
def set_channels(self, cha):
775
	'''
776
	This function sets the channels available for plotting. Allowed units are as follows:
777
778
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
779
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
780
	- ``["HDF"]`` - pressure transducer channel
781
	- ``["all"]`` - all available channels
782
783
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
784
	(geophone and vertical accelerometer) you could specify:
785
786
	``["EHZ", "ENZ"]``
787
788
	You can also specify partial channel names.
789
	So for example, the following will display at least one channel from any
790
	Raspberry Shake instrument:
791
792
	``["HZ", "HDF"]``
793
794
	Or if you wanted to display only vertical channels from a RS4D,
795
	you could specify
796
797
	``["Z"]``
798
799
	which would match both ``"EHZ"`` and ``"ENZ"``.
800
801
	:param self self: self object of the class calling this function
802
	:param cha: the channel or list of channels to plot
803
	:type cha: list or str
804
	'''
805
	cha = chns if ('all' in cha) else cha
806
	cha = list(cha) if isinstance(cha, str) else cha
807
	for c in chns:
808
		n = 0
809
		for uch in cha:
810
			if (uch.upper() in c) and (c not in str(self.chans)):
811
				self.chans.append(c)
812
			n += 1
813
	if len(self.chans) < 1:
814
			self.chans = chns
815
816
817
def msg_term():
818
	'''
819
	This function constructs the simple ``TERM`` message as a bytes object.
820
821
	.. code-block:: python
822
823
		>>> msg_term()
824
		b'TERM'
825
826
827
	:rtype: bytes
828
	:return: the ``TERM`` message
829
	'''
830
	return b'TERM'
831
832
833
def get_msg_time(msg):
834
	'''
835
	This function gets the time from ``ALARM``, ``RESET``,
836
	and ``IMGPATH`` messages as a UTCDateTime object.
837
838
	For example:
839
840
	.. code-block:: python
841
842
		>>> from obspy import UTCDateTime
843
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
844
		>>> path = '/home/pi/rsudp/screenshots/test.png'
845
		>>> msg = msg_imgpath(ti, path)
846
		>>> msg
847
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
848
		>>> get_msg_time(msg)
849
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
850
851
	:param bytes msg: the bytes-formatted queue message to decode
852
	:rtype: obspy.core.utcdatetime.UTCDateTime
853
	:return: the time embedded in the message
854
	'''
855
	return UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
856
857
858
def get_msg_path(msg):
859
	'''
860
	This function gets the path from ``IMGPATH`` messages as a string.
861
862
	For example:
863
864
	.. code-block:: python
865
866
		>>> from obspy import UTCDateTime
867
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
868
		>>> path = '/home/pi/rsudp/screenshots/test.png'
869
		>>> msg = msg_imgpath(ti, path)
870
		>>> msg
871
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
872
		>>> get_msg_path(msg)
873
		'/home/pi/rsudp/screenshots/test.png'
874
875
	:param bytes msg: the bytes-formatted queue message to decode
876
	:rtype: str
877
	:return: the path embedded in the message
878
	'''
879
	return msg.decode('utf-8').split(' ')[2]
880
881
882
def deconv_vel_inst(self, trace, output):
883
	'''
884
	.. role:: pycode(code)
885
		:language: python
886
	
887
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
888
	for velocity channels.
889
890
	:param self self: The self object of the sub-consumer class calling this function.
891
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
892
	'''
893
	if self.deconv not in 'CHAN':
894
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
895
								output=output, water_level=4.5, taper=False)
896
	else:
897
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
898
								output='VEL', water_level=4.5, taper=False)
899
	if 'ACC' in self.deconv:
900
		trace.data = np.gradient(trace.data, 1)
901
	elif 'GRAV' in self.deconv:
902
		trace.data = np.gradient(trace.data, 1) / g
903
		trace.stats.units = 'Earth gravity'
904
	elif 'DISP' in self.deconv:
905
		trace.data = np.cumsum(trace.data)
906
		trace.taper(max_percentage=0.1, side='left', max_length=1)
907
		trace.detrend(type='demean')
908
	else:
909
		trace.stats.units = 'Velocity'
910
911
912
def deconv_acc_inst(self, trace, output):
913
	'''
914
	.. role:: pycode(code)
915
		:language: python
916
	
917
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
918
	for acceleration channels.
919
920
	:param self self: The self object of the sub-consumer class calling this function.
921
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
922
	'''
923
	if self.deconv not in 'CHAN':
924
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
925
								output=output, water_level=4.5, taper=False)
926
	else:
927
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
928
								output='ACC', water_level=4.5, taper=False)
929
	if 'VEL' in self.deconv:
930
		trace.data = np.cumsum(trace.data)
931
		trace.detrend(type='demean')
932
	elif 'DISP' in self.deconv:
933
		trace.data = np.cumsum(np.cumsum(trace.data))
934
		trace.detrend(type='linear')
935
	elif 'GRAV' in self.deconv:
936
		trace.data = trace.data / g
937
		trace.stats.units = 'Earth gravity'
938
	else:
939
		trace.stats.units = 'Acceleration'
940
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
941
		trace.taper(max_percentage=0.1, side='left', max_length=1)
942
943
944
def deconv_rbm_inst(self, trace, output):
945
	'''
946
	.. role:: pycode(code)
947
		:language: python
948
	
949
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
950
	for Raspberry Boom pressure transducer channels.
951
952
	.. note::
953
954
		The Raspberry Boom pressure transducer does not currently have a
955
		deconvolution function. The Raspberry Shake team is working on a
956
		calibration for the Boom, but until then Boom units are given in
957
		counts.
958
959
	:param self self: The self object of the sub-consumer class calling this function.
960
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
961
	'''
962
	trace.stats.units = ' counts'
963
964
965
def deconvolve(self):
966
	'''
967
	.. role:: pycode(code)
968
		:language: python
969
	
970
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
971
	that need to deconvolve their raw data to metric units.
972
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
973
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
974
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
975
976
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
977
	'''
978
	acc_channels = ['ENE', 'ENN', 'ENZ']
979
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
980
	rbm_channels = ['HDF']
981
982
	self.stream = self.raw.copy()
983
	for trace in self.stream:
984
		trace.stats.units = self.units
985
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
986
		if self.deconv:
987
			if trace.stats.channel in vel_channels:
988
				deconv_vel_inst(self, trace, output)	# geophone channels
989
990
			elif trace.stats.channel in acc_channels:
991
				deconv_acc_inst(self, trace, output)	# accelerometer channels
992
993
			elif trace.stats.channel in rbm_channels:
994
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
995
996
			else:
997
				trace.stats.units = ' counts'	# this is a new one
998
999
		else:
1000
			trace.stats.units = ' counts'		# this is not being deconvolved
1001
1002
1003
class ConsumerThread(Thread):
1004
	'''
1005
	The default consumer thread setup.
1006
	Import this consumer and easily create your own consumer modules!
1007
	This class modifies the :py:class:`threading.Thread` object to
1008
	include some settings that all rsudp consumers need,
1009
	some of which the :py:class:`rsudp.p_producer.Producer`
1010
	needs in order to function.
1011
1012
	Currently, the modifications that this module makes to
1013
	:py:class:`threading.Thread` objects are:
1014
1015
	.. code-block:: python
1016
1017
		self.sender = 'ConsumerThread'  # module name used in logging
1018
		self.alarm = False              # the Producer reads this to set the ``ALARM`` state
1019
		self.alarm_reset = False        # the Producer reads this to set the ``RESET`` state
1020
		self.alive = True               # this is used to keep the main ``for`` loop running
1021
1022
	For more information on creating your own consumer threads,
1023
	see :ref:`add_your_own`.
1024
1025
	'''
1026
	def __init__(self):
1027
		super().__init__()
1028
		self.sender = 'ConsumerThread'	# used in logging
1029
		self.alarm = False				# the producer reads this
1030
		self.alarm_reset = False		# the producer reads this
1031
		self.alive = True				# this is used to keep the main for loop running
1032
1033
1034
if __name__ == '__main__':
1035
	pass
1036