Passed
Push — master ( 5ff0cb...527ff7 )
by Ian
04:40
created

build.rsudp.raspberryshake   F

Complexity

Total Complexity 79

Size/Duplication

Total Lines 982
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 334
dl 0
loc 982
rs 2.08
c 0
b 0
f 0
wmc 79

28 Functions

Rating   Name   Duplication   Size   Complexity  
A deconv_vel_inst() 0 28 5
A get_ip() 0 31 2
B make_trace() 0 45 5
A openSOCK() 0 28 3
A msg_imgpath() 0 22 1
A set_params() 0 26 3
A get_msg_time() 0 23 1
A getTTLCHN() 0 20 1
A fsec() 0 35 1
A getDATA() 0 32 3
B deconv_acc_inst() 0 30 7
A handler() 0 15 1
A update_stream() 0 32 3
A getTIME() 0 27 1
A msg_reset() 0 20 1
B getCHNS() 0 44 8
A copy() 0 44 2
A getSTREAM() 0 24 1
B initRSlib() 0 69 8
A getTR() 0 38 4
A msg_term() 0 14 1
A getCHN() 0 21 1
A msg_alarm() 0 20 1
A get_msg_path() 0 22 1
A getSR() 0 30 1
A get_inventory() 0 60 4
A deconv_rbm_inst() 0 12 1
B deconvolve() 0 36 7

1 Method

Rating   Name   Duplication   Size   Complexity  
A ConsumerThread.__init__() 0 6 1

How to fix   Complexity   

Complexity

Complex classes like build.rsudp.raspberryshake often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from datetime import datetime, timedelta
2
import time
3
import math
4
import numpy as np
5
import sys, os, platform
6
import socket as s
7
import signal
8
from obspy import UTCDateTime
9
from obspy.core.stream import Stream
10
from obspy import read_inventory
11
from obspy.geodetics.flinnengdahl import FlinnEngdahl
12
from obspy.core.trace import Trace
13
from rsudp import printM, printW, printE
14
from requests.exceptions import HTTPError
15
from threading import Thread
16
17
initd, sockopen = False, False
18
qsize = 2048 			# max queue size
19
port = 8888				# default listening port
20
to = 10					# socket test timeout
21
firstaddr = ''			# the first address data is received from
22
inv = False				# station inventory
23
region = False
24
producer = False 		# flag for producer status
25
stn = 'Z0000'			# station name
26
net = 'AM'				# network (this will always be AM)
27
chns = []				# list of channels
28
numchns = 0
29
30
tf = None				# transmission frequency in ms
31
tr = None				# transmission rate in packets per second
32
sps = None				# samples per second
33
34
# conversion units
35
# 		'name',	: ['pretty name', 'unit display']
36
UNITS = {'ACC'	: ['Acceleration', 'm/s$^2$'],
37
		 'GRAV'	: ['Earth gravity', ' g'],
38
		 'VEL'	: ['Velocity', 'm/s'],
39
		 'DISP'	: ['Displacement', 'm'],
40
		 'CHAN'	: ['channel-specific', ' Counts']}
41
42
g = 9.81	# earth gravity in m/s2
43
44
45
# get an IP to report to the user
46
# from https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
47
def get_ip():
48
	'''
49
	.. |so_ip| raw:: html
50
51
		<a href="https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib" target="_blank">this stackoverflow answer</a>
52
53
54
	Return a reliable network IP to report to the user when there is no data received.
55
	This helps the user set their Raspberry Shake's datacast streams to point to the correct location
56
	if the library raises a "no data received" error.
57
	Solution adapted from |so_ip|.
58
59
	.. code-block:: python
60
61
		>>> get_ip()
62
		'192.168.1.23'
63
64
	:rtype: str
65
	:return: The network IP of the machine that this program is running on
66
	'''
67
68
	testsock = s.socket(s.AF_INET, s.SOCK_DGRAM)
69
	try:
70
		# doesn't even have to be reachable
71
		testsock.connect(('10.255.255.255', 1))
72
		IP = testsock.getsockname()[0]
73
	except:
74
		IP = '127.0.0.1'
75
	finally:
76
		testsock.close()
77
	return IP
78
79
ip = get_ip()
80
81
# construct a socket
82
socket_type =  s.SOCK_DGRAM
83
sock = s.socket(s.AF_INET, socket_type)
84
if platform.system() not in 'Windows':
85
    sock.setsockopt(s.SOL_SOCKET, s.SO_REUSEADDR, 1)
86
87
def handler(signum, frame, ip=ip):
88
	'''
89
	The signal handler for the nodata alarm.
90
91
	:param int signum: signal number
92
	:param int frame: frame number
93
	:param str ip: the IP of the box this program is running on (i.e. the device the Raspberry Shake should send data to)
94
	:raise IOError: on UNIX systems if no data is received
95
	'''
96
	global port
97
	printE('No data received in %s seconds; aborting.' % (to), sender='Init')
98
	printE('Check that the Shake is forwarding data to:', sender='Init', announce=False, spaces=True)
99
	printE('IP address: %s    Port: %s' % (ip, port), sender='Init', announce=False, spaces=True)
100
	printE('and that no firewall exists between the Shake and this computer.', sender='Init', announce=False, spaces=True)
101
	raise IOError('No data received')
102
103
104
def initRSlib(dport=port, rsstn='Z0000', timeout=10):
105
	'''
106
	.. role:: pycode(code)
107
		:language: python
108
109
	Initializes this library (:py:func:`rsudp.raspberryshake`).
110
	Set values for data port, station, network, and port timeout prior to opening the socket.
111
	Calls both :py:func:`rsudp.raspberryshake.openSOCK` and :py:func:`rsudp.raspberryshake.set_params`.
112
113
	.. code-block:: python
114
115
		>>> import rsudp.raspberryshake as rs
116
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
117
118
	The library is now initialized:
119
120
	.. code-block:: python
121
122
		>>> rs.initd
123
		True
124
125
	:param int dport: The local port the Raspberry Shake is sending UDP data packets to. Defaults to :pycode:`8888`.
126
	:param str rsstn: The name of the station (something like :pycode:`'RCB43'` or :pycode:`'S0CDE'`)
127
	:param int timeout: The number of seconds for :py:func:`rsudp.raspberryshake.set_params` to wait for data before an error is raised (zero for unlimited wait)
128
129
	:rtype: str
130
	:return: The instrument channel as a string
131
132
	'''
133
	global port, stn, to, initd, port
134
	global producer
135
	sender = 'Init'
136
	printM('Initializing.', sender)
137
	try:						# set port value first
138
		if dport == int(dport):
139
			port = int(dport)
140
		else:
141
			port = int(dport)
142
			printW('Supplied port value was converted to integer. Non-integer port numbers are invalid.')
143
	except Exception as e:
144
		printE('Details - %s' % e)
145
146
	try:						# set station name
147
		if len(rsstn) == 5:
148
			stn = str(rsstn).upper()
149
		else:
150
			stn = str(rsstn).upper()
151
			printW('Station name does not follow Raspberry Shake naming convention.')
152
	except ValueError as e:
153
		printE('Invalid station name supplied. Details: %s' % e)
154
		printE('reverting to station name Z0000', announce=False, spaces=True)
155
	except Exception as e:
156
		printE('Details - %s' % e)
157
	
158
	try:						# set timeout value 
159
		to = int(timeout)
160
	except ValueError as e:
161
		printW('You likely supplied a non-integer as the timeout value. Your value was: %s'
162
				% timeout)
163
		printW('Continuing with default timeout of %s sec'
164
				% (to), announce=False, spaces=True)
165
		printW('details: %s' % e, announce=False, spaces=True)
166
	except Exception as e:
167
		printE('Details - %s' % e)
168
169
	initd = True				# if initialization goes correctly, set initd to true
170
	openSOCK()					# open a socket
171
	printM('Waiting for UDP data on port %s...' % (port), sender)
172
	set_params()				# get data and set parameters
173
174
def openSOCK(host=''):
175
	'''
176
	.. role:: pycode(code)
177
		:language: python
178
179
	Initialize a socket at the port specified by :pycode:`rsudp.raspberryshake.port`.
180
	Called by :py:func:`rsudp.raspberryshake.initRSlib`, must be done before :py:func:`rsudp.raspberryshake.set_params`.
181
182
	:param str host: self-referential location at which to open a listening port (defaults to :pycode:`''` which resolves to :pycode:`'localhost'`)
183
	:raise IOError: if the library is not initialized (:py:func:`rsudp.raspberryshake.initRSlib`) prior to running this function
184
	:raise OSError: if the program cannot bind to the specified port number
185
186
	'''
187
	global sockopen
188
	sockopen = False
189
	if initd:
190
		HP = '%s:%s' % ('localhost',port)
191
		printM("Opening socket on %s (HOST:PORT)"
192
				% HP, 'openSOCK')
193
		try:
194
			sock.bind((host, port))
195
			sockopen = True
196
		except Exception as e:
197
			printE('Could not bind to port %s. Is another program using it?' % port)
198
			printE('Detail: %s' % e, announce=False)
199
			raise OSError(e)
200
	else:
201
		raise IOError("Before opening a socket, you must initialize this raspberryshake library by calling initRSlib(dport=XXXXX, rssta='R0E05') first.")
202
203
def set_params():
204
	'''
205
	.. role:: pycode(code)
206
		:language: python
207
208
	Read a data packet off the port.
209
	Called by :py:func:`rsudp.raspberryshake.initRSlib`,
210
	must be done after :py:func:`rsudp.raspberryshake.openSOCK`
211
	but before :py:func:`rsudp.raspberryshake.getDATA`.
212
	Will wait :pycode:`rsudp.raspberryshake.to` seconds for data before raising a no data exception
213
	(only available with UNIX socket types).
214
215
	'''
216
	global to, firstaddr
217
	if os.name not in 'nt': 	# signal alarm not available on windows
218
		signal.signal(signal.SIGALRM, handler)
219
		signal.alarm(to)		# alarm time set with timeout value
220
	data, (firstaddr, connport) = sock.recvfrom(2048)
221
	if os.name not in 'nt':
222
		signal.alarm(0)			# once data has been received, turn alarm completely off
223
	to = 0						# otherwise it erroneously triggers after keyboardinterrupt
224
	getTR(getCHNS()[0])
225
	getSR(tf, data)
226
	getTTLCHN()
227
	printM('Available channels: %s' % chns, 'Init')
228
	get_inventory()
229
230
def getDATA():
231
	'''
232
	Read a data packet off the port.
233
234
	In this example, we get a Shake 1Dv7 data packet:
235
236
	.. code-block:: python
237
238
		>>> import rsudp.raspberryshake as rs
239
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
240
		>>> d = rs.getDATA()
241
		>>> d
242
		b"{'EHZ', 1582315130.292, 14168, 14927, 16112, 17537, 18052, 17477,
243
		15418, 13716, 15604, 17825, 19637, 20985, 17325, 10439, 11510, 17678,
244
		20027, 20207, 18481, 15916, 13836, 13073, 14462, 17628, 19388}"
245
246
247
	:rtype: bytes
248
	:return: Returns a data packet as an encoded bytes object.
249
250
	:raise IOError: if no socket is open (:py:func:`rsudp.raspberryshake.openSOCK`) prior to running this function
251
	:raise IOError: if the library is not initialized (:py:func:`rsudp.raspberryshake.initRSlib`) prior to running this function
252
253
	'''
254
	global to, firstaddr
255
	if sockopen:
256
		return sock.recv(4096)
257
	else:
258
		if initd:
259
			raise IOError("No socket is open. Please open a socket using this library's openSOCK() function.")
260
		else:
261
			raise IOError('No socket is open. Please initialize the library using initRSlib() then open a socket using openSOCK().')
262
	
263
def getCHN(DP):
264
	'''
265
	Extract the channel information from the data packet.
266
	Requires :py:func:`rsudp.raspberryshake.getDATA` packet as argument.
267
268
	In this example, we get the channel code from a Shake 1Dv7 data packet:
269
270
	.. code-block:: python
271
272
		>>> import rsudp.raspberryshake as rs
273
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
274
		>>> d = rs.getDATA()
275
		>>> rs.getCHN(d)
276
		'EHZ'
277
278
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse channel information from
279
	:type DP: bytes
280
	:rtype: str
281
	:return: Returns the instrument channel as a string.
282
	'''
283
	return str(DP.decode('utf-8').split(",")[0][1:]).strip("\'")
284
	
285
def getTIME(DP):
286
	'''
287
	Extract the timestamp from the data packet.
288
	Timestamp is seconds since 1970-01-01 00:00:00Z,
289
	which can be passed directly to an :py:class:`obspy.core.utcdatetime.UTCDateTime` object:
290
291
	In this example, we get the timestamp of a Shake 1Dv7 data packet and convert it to a UTCDateTime:
292
293
	.. code-block:: python
294
295
		>>> import rsudp.raspberryshake as rs
296
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
297
		>>> from obspy import UTCDateTime
298
		>>> d = rs.getDATA()
299
		>>> t = rs.getTIME(d)
300
		>>> t
301
		1582315130.292
302
		>>> dt = obspy.UTCDateTime(t, precision=3)
303
		>>> dt
304
		UTCDateTime(2020, 2, 21, 19, 58, 50, 292000)
305
306
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse time information from
307
	:type DP: bytes
308
	:rtype: float
309
	:return: Timestamp in decimal seconds since 1970-01-01 00:00:00Z
310
	'''
311
	return float(DP.split(b",")[1])
312
313
def getSTREAM(DP):
314
	'''
315
	Get the samples in a data packet as a list object.
316
	Requires :py:func:`rsudp.raspberryshake.getDATA` packet as argument.
317
318
	In this example, we get a list of samples from a Shake 1Dv7 data packet:
319
320
	.. code-block:: python
321
322
		>>> import rsudp.raspberryshake as rs
323
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
324
		>>> d = rs.getDATA()
325
		>>> s = rs.getSTREAM(d)
326
		>>> s
327
		[14168, 14927, 16112, 17537, 18052, 17477, 15418, 13716, 15604,
328
		 17825, 19637, 20985, 17325, 10439, 11510, 17678, 20027, 20207,
329
		 18481, 15916, 13836, 13073, 14462, 17628, 19388]
330
331
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse stream information from
332
	:type DP: bytes
333
	:rtype: list
334
	:return: List of data samples in the packet
335
	'''
336
	return list(map(int, DP.decode('utf-8').replace('}','').split(',')[2:]))
337
338
def getTR(chn):				# DP transmission rate in msecs
339
	'''
340
	Get the transmission rate in milliseconds between consecutive packets from the same channel.
341
	Must wait to receive a second packet from the same channel.
342
	Requires a :py:func:`rsudp.raspberryshake.getCHN` or a channel name string as argument.
343
344
	In this example, we calculate the transmission frequency of a Shake 1Dv7:
345
346
	.. code-block:: python
347
348
		>>> import rsudp.raspberryshake as rs
349
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
350
		>>> d = rs.getDATA()
351
		>>> tr = rs.getTR(rs.getCHN(d))
352
		>>> tr
353
		250
354
355
	:param chn: The seismic instrument channel (:py:func:`rsudp.raspberryshake.getCHN`) to calculate transmission rate information from
356
	:type chn: str
357
	:rtype: int
358
	:return: Transmission rate in milliseconds between consecutive packets from a specific channel
359
	'''
360
	global tf, tr
361
	timeP1, timeP2 = 0.0, 0.0
362
	done = False
363
	while not done:
364
		DP = getDATA()
365
		CHAN = getCHN(DP)
366
		if CHAN == chn:
367
			if timeP1 == 0.0:
368
				timeP1 = getTIME(DP)
369
			else:
370
				timeP2 = getTIME(DP)
371
				done = True
372
	TR = timeP2*1000 - timeP1*1000
373
	tf = int(TR)
374
	tr = int(1000 / TR)
375
	return tf
376
377
def getSR(TR, DP):
378
	'''
379
	Get the sample rate in samples per second.
380
	Requires an integer transmission frequency and a data packet as arguments.
381
382
	In this example, we calculate the number of samples per second from a Shake 1Dv7:
383
384
	.. code-block:: python
385
386
		>>> import rsudp.raspberryshake as rs
387
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
388
		>>> d = rs.getDATA()
389
		>>> tr = rs.getTR(rs.getCHN(d))
390
		>>> tr
391
		250
392
		>>> sps = rs.getSR(tr, d)
393
		>>> sps
394
		100
395
396
397
	:param TR: The transmission frequency (:py:func:`rsudp.raspberryshake.getTR`) in milliseconds between packets
398
	:type TR: int
399
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) calculate sample rate information from
400
	:type DP: bytes
401
	:rtype: int
402
	:return: The sample rate in samples per second from a specific channel
403
	'''
404
	global sps
405
	sps = int((DP.count(b",") - 1) * 1000 / TR)
406
	return sps
407
	
408
def getCHNS():
409
	'''
410
	Get a list of channels sent to the port.
411
412
	In this example, we list channels from a Boom:
413
414
	.. code-block:: python
415
416
		>>> import rsudp.raspberryshake as rs
417
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
418
		>>> rs.getCHNS()
419
		['EHZ', 'HDF']
420
421
422
	:rtype: list
423
	:return: The list of channels being sent to the port (from the single IP address sending data)
424
	'''
425
	global chns
426
	chdict = {'EHZ': False, 'EHN': False, 'EHE': False,
427
			  'ENZ': False, 'ENN': False, 'ENE': False, 'HDF': False}
428
	firstCHN = ''
429
	done = False
430
	sim = 0
431
	while not done:
432
		DP = getDATA()
433
		if firstCHN == '':
434
			firstCHN = getCHN(DP)
435
			chns.append(firstCHN)
436
			continue
437
		nextCHN = getCHN(DP)
438
		if firstCHN == nextCHN:
439
			if sim > 1:
440
				done = True
441
				continue
442
			sim += 1
443
		else:
444
			chns.append(nextCHN)
445
	for ch in chns:
446
		chdict[ch] = True
447
	chns = []
448
	for ch in chdict:
449
		if chdict[ch] == True:
450
			chns.append(ch)
451
	return chns
452
453
def getTTLCHN():
454
	'''
455
	Calculate total number of channels received,
456
	by counting the number of channels returned by :py:func:`rsudp.raspberryshake.getCHNS`.
457
458
	In this example, we get the number of channels from a Shake & Boom:
459
460
	.. code-block:: python
461
462
		>>> import rsudp.raspberryshake as rs
463
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
464
		>>> rs.getTTLCHN()
465
		2
466
467
	:rtype: int
468
	:return: The number of channels being sent to the port (from the single IP address sending data)
469
	'''
470
	global numchns
471
	numchns = len(getCHNS())
472
	return numchns
473
474
475
def get_inventory(sender='get_inventory'):
476
	'''
477
	.. role:: pycode(code)
478
		:language: python
479
480
	Downloads the station inventory from the Raspberry Shake FDSN and stores
481
	it as an :py:class:`obspy.core.inventory.inventory.Inventory` object which is available globally.
482
483
	In this example, we get the R940D station inventory from the Raspberry Shake FDSN:
484
485
	.. code-block:: python
486
487
		>>> import rsudp.raspberryshake as rs
488
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
489
		>>> inv = rs.get_inventory()
490
		>>> print(inv)
491
		Inventory created at 2020-02-21T20:37:34.246777Z
492
			Sending institution: SeisComP3 (gempa testbed)
493
			Contains:
494
				Networks (1):
495
					AM
496
				Stations (1):
497
					AM.R940D (Raspberry Shake Citizen Science Station)
498
				Channels (2):
499
					AM.R940D.00.EHZ, AM.R940D.00.HDF
500
501
502
	:param sender: `(optional)` The name of the function calling the :py:func:`rsudp.printM` logging function
503
	:type str: str or None
504
	:rtype: obspy.core.inventory.inventory.Inventory or bool
505
	:return: The inventory of the Raspberry Shake station in the :pycode:`rsudp.raspberryshake.stn` variable.
506
	'''
507
	global inv, stn, region
508
	sender = 'get_inventory'
509
	if 'Z0000' in stn:
510
		printW('No station name given, continuing without inventory.',
511
				sender)
512
		inv = False
513
	else:
514
		try:
515
			printM('Fetching inventory for station %s.%s from Raspberry Shake FDSN.'
516
					% (net, stn), sender)
517
			
518
			inv = read_inventory('https://fdsnws.raspberryshakedata.com/fdsnws/station/1/query?network=%s&station=%s&starttime=%s&level=resp&nodata=404&format=xml'
519
								 % (net, stn, str(UTCDateTime.now()-timedelta(seconds=14400))))
520
			region = FlinnEngdahl().get_region(inv[0][0].longitude, inv[0][0].latitude)
521
			printM('Inventory fetch successful. Station region is %s' % (region), sender)
522
		except (IndexError, HTTPError):
523
			printW('No inventory found for %s. Are you forwarding your Shake data?' % stn, sender)
524
			printW('Deconvolution will only be available if data forwarding is on.', sender, spaces=True)
525
			printW('Access the config page of the web front end for details.', sender, spaces=True)
526
			printW('More info at https://manual.raspberryshake.org/quickstart.html', sender, spaces=True)
527
			inv = False
528
			region = False
529
		except Exception as e:
530
			printE('Inventory fetch failed!', sender)
531
			printE('Error detail: %s' % e, sender, spaces=True)
532
			inv = False
533
			region = False
534
	return inv
535
536
537
def make_trace(d):
538
	'''
539
	Makes a trace and assigns it some values using a data packet.
540
541
	In this example, we make a trace object with some RS 1Dv7 data:
542
543
	.. code-block:: python
544
545
		>>> import rsudp.raspberryshake as rs
546
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
547
		>>> d = rs.getDATA()
548
		>>> t = rs.make_trace(d)
549
		>>> print(t)
550
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
551
552
	:param d: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse Trace information from
553
	:type d: bytes
554
	:rtype: obspy.core.trace.Trace
555
	:return: A fully formed Trace object to build a Stream with
556
	'''
557
	global producer
558
	ch = getCHN(d)						# channel
559
	if ch:
560
		t = getTIME(d)				# unix epoch time since 1970-01-01 00:00:00Z; "timestamp" in obspy
561
		st = getSTREAM(d)				# samples in data packet in list [] format
562
		tr = Trace(data=np.ma.MaskedArray(st, dtype=np.int32))	# create empty trace
563
		tr.stats.network = net			# assign values
564
		tr.stats.location = '00'
565
		tr.stats.station = stn
566
		tr.stats.channel = ch
567
		tr.stats.sampling_rate = sps
568
		tr.stats.starttime = UTCDateTime(t, precision=3)
569
		if inv:
570
			try:
571
				tr.attach_response(inv)
572
			except:
573
				if producer:
574
					printE('Could not attach inventory response.')
575
					printE('Are you sure you set the station name correctly?', spaces=True)
576
					printE('This could indicate a mismatch in the number of data channels', spaces=True)
577
					printE('between the inventory and the stream. For example,', spaces=True)
578
					printE('if you are receiving RS4D data, please make sure', spaces=True)
579
					printE('the inventory you download has 4 channels.', spaces=True)
580
				producer = False
581
		return tr
582
583
584
# Then make repeated calls to this, to continue adding trace data to the stream
585
def update_stream(stream, d, **kwargs):
586
	'''
587
	Returns an updated Stream object with new data, merged down to one trace per available channel.
588
	Most sub-consumers call this each time they receive data packets in order to keep their obspy stream current.
589
590
	In this example, we make a stream object with some RS 1Dv7 data:
591
592
	.. code-block:: python
593
594
		>>> import rsudp.raspberryshake as rs
595
		>>> from obspy.core.stream import Stream
596
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
597
		>>> s = Stream()
598
		>>> d = rs.getDATA()
599
		>>> t = rs.make_trace(d)
600
		>>> s = rs.update_stream(s, d)
601
		>>> print(s)
602
		1 Trace(s) in Stream:
603
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
604
605
606
	:param obspy.core.stream.Stream stream: The stream to update
607
	:param d: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse Stream information from
608
	:type d: bytes
609
	:rtype: obspy.core.stream.Stream
610
	:return: A seismic data stream
611
	'''
612
	while True:
613
		try:
614
			return stream.append(make_trace(d)).merge(**kwargs)
615
		except TypeError:
616
			pass
617
618
619
def copy(orig):
620
	"""
621
	True-copy a stream by creating a new stream and copying old attributes to it.
622
	This is necessary because the old stream accumulates *something* that causes
623
	CPU usage to increase over time as more data is added. This is a bug in obspy
624
	that I intend to find--or at the very least report--but until then this hack
625
	works fine and is plenty fast enough.
626
627
	In this example, we make a stream object with some RS 1Dv7 data and then copy it to a new stream:
628
629
	.. code-block:: python
630
631
		>>> import rsudp.raspberryshake as rs
632
		>>> from obspy.core.stream import Stream
633
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
634
		>>> s = Stream()
635
		>>> d = rs.getDATA()
636
		>>> t = rs.make_trace(d)
637
		>>> s = rs.update_stream(s, d)
638
		>>> s
639
		1 Trace(s) in Stream:
640
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
641
		>>> s = rs.copy(s)
642
		>>> s
643
		1 Trace(s) in Stream:
644
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
645
646
647
	:param obspy.core.stream.Stream orig: The data stream to copy information from
648
	:rtype: obspy.core.stream.Stream
649
	:return: A low-memory copy of the passed data stream
650
651
	"""
652
	stream = Stream()
653
	for t in range(len(orig)):
654
		trace = Trace(data=orig[t].data)
655
		trace.stats.network = orig[t].stats.network
656
		trace.stats.location = orig[t].stats.location
657
		trace.stats.station = orig[t].stats.station
658
		trace.stats.channel = orig[t].stats.channel
659
		trace.stats.sampling_rate = orig[t].stats.sampling_rate
660
		trace.stats.starttime = orig[t].stats.starttime
661
		stream.append(trace).merge(fill_value=None)
662
	return stream.copy()
663
664
665
def fsec(ti):
666
	'''
667
	.. versionadded:: 0.4.3
668
669
	The Raspberry Shake records at hundredths-of-a-second precision.
670
	In order to report time at this precision, we need to do some time-fu.
671
672
	This function rounds the microsecond fraction of a
673
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
674
	depending on its precision, so that it accurately reflects the Raspberry Shake's
675
	event measurement precision.
676
677
	This is necessary because datetime objects in Python are strange and confusing, and
678
	strftime doesn't support fractional returns, only the full integer microsecond field
679
	which is an integer right-padded with zeroes. This function uses the ``precision``
680
	of a datetime object.
681
682
	For example:
683
684
	.. code-block:: python
685
686
		>>> from obspy import UTCDateTime
687
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
688
		>>> fsec(ti)
689
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
690
691
	:param ti: time object to convert microseconds for
692
	:type ti: obspy.core.utcdatetime.UTCDateTime
693
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
694
	:rtype: obspy.core.utcdatetime.UTCDateTime
695
	'''
696
	# time in python is weird and confusing, but luckily obspy is better than Python
697
	# at dealing with datetimes. all we need to do is tell it what precision we want
698
	# and it handles the rounding for us.
699
	return UTCDateTime(ti, precision=2)
700
701
702
def msg_alarm(event_time):
703
	'''
704
	This function constructs the ``ALARM`` message as a bytes object.
705
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
706
	to construct alarm queue messages.
707
708
	For example:
709
710
	.. code-block:: python
711
712
		>>> from obspy import UTCDateTime
713
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
714
		>>> msg_alarm(ti)
715
		b'ALARM 2020-01-01T00:00:00.599Z'
716
717
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
718
	:rtype: bytes
719
	:return: the ``ALARM`` message, ready to be put on the queue
720
	'''
721
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
722
723
724
def msg_reset(reset_time):
725
	'''
726
	This function constructs the ``RESET`` message as a bytes object.
727
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
728
	to construct reset queue messages.
729
730
	For example:
731
732
	.. code-block:: python
733
734
		>>> from obspy import UTCDateTime
735
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
736
		>>> msg_reset(ti)
737
		b'RESET 2020-01-01T00:00:00.599Z'
738
739
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
740
	:rtype: bytes
741
	:return: the ``RESET`` message, ready to be put on the queue
742
	'''
743
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
744
745
746
def msg_imgpath(event_time, figname):
747
	'''
748
	This function constructs the ``IMGPATH`` message as a bytes object.
749
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
750
	to construct queue messages containing timestamp and saved image path.
751
752
	For example:
753
754
	.. code-block:: python
755
756
		>>> from obspy import UTCDateTime
757
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
758
		>>> path = '/home/pi/rsudp/screenshots/test.png'
759
		>>> msg_imgpath(ti, path)
760
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
761
762
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
763
	:param str figname: the figure path as a string
764
	:rtype: bytes
765
	:return: the ``IMGPATH`` message, ready to be put on the queue
766
	'''
767
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
768
769
770
def msg_term():
771
	'''
772
	This function constructs the simple ``TERM`` message as a bytes object.
773
774
	.. code-block:: python
775
776
		>>> msg_term()
777
		b'TERM'
778
779
780
	:rtype: bytes
781
	:return: the ``TERM`` message
782
	'''
783
	return b'TERM'
784
785
786
def get_msg_time(msg):
787
	'''
788
	This function gets the time from ``ALARM``, ``RESET``,
789
	and ``IMGPATH`` messages as a UTCDateTime object.
790
791
	For example:
792
793
	.. code-block:: python
794
795
		>>> from obspy import UTCDateTime
796
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
797
		>>> path = '/home/pi/rsudp/screenshots/test.png'
798
		>>> msg = msg_imgpath(ti, path)
799
		>>> msg
800
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
801
		>>> get_msg_time(msg)
802
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
803
804
	:param bytes msg: the bytes-formatted queue message to decode
805
	:rtype: obspy.core.utcdatetime.UTCDateTime
806
	:return: the time embedded in the message
807
	'''
808
	return UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
809
810
811
def get_msg_path(msg):
812
	'''
813
	This function gets the path from ``IMGPATH`` messages as a string.
814
815
	For example:
816
817
	.. code-block:: python
818
819
		>>> from obspy import UTCDateTime
820
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
821
		>>> path = '/home/pi/rsudp/screenshots/test.png'
822
		>>> msg = msg_imgpath(ti, path)
823
		>>> msg
824
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
825
		>>> get_msg_path(msg)
826
		'/home/pi/rsudp/screenshots/test.png'
827
828
	:param bytes msg: the bytes-formatted queue message to decode
829
	:rtype: str
830
	:return: the path embedded in the message
831
	'''
832
	return msg.decode('utf-8').split(' ')[2]
833
834
835
def deconv_vel_inst(self, trace):
836
	'''
837
	.. role:: pycode(code)
838
		:language: python
839
	
840
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
841
	for velocity channels.
842
843
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
844
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
845
	'''
846
	if self.deconv not in 'CHAN':
847
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
848
								output=output, water_level=4.5, taper=False)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable output does not seem to be defined.
Loading history...
849
	else:
850
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
851
								output='VEL', water_level=4.5, taper=False)
852
	if 'ACC' in self.deconv:
853
		trace.data = np.gradient(trace.data, 1)
854
	elif 'GRAV' in self.deconv:
855
		trace.data = np.gradient(trace.data, 1) / g
856
		trace.stats.units = 'Earth gravity'
857
	elif 'DISP' in self.deconv:
858
		trace.data = np.cumsum(trace.data)
859
		trace.taper(max_percentage=0.1, side='left', max_length=1)
860
		trace.detrend(type='demean')
861
	else:
862
		trace.stats.units = 'Velocity'
863
864
865
def deconv_acc_inst(self, trace):
866
	'''
867
	.. role:: pycode(code)
868
		:language: python
869
	
870
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
871
	for acceleration channels.
872
873
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
874
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
875
	'''
876
	if self.deconv not in 'CHAN':
877
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
878
								output=output, water_level=4.5, taper=False)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable output does not seem to be defined.
Loading history...
879
	else:
880
		trace.remove_response(inventory=inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
881
								output='ACC', water_level=4.5, taper=False)
882
	if 'VEL' in self.deconv:
883
		trace.data = np.cumsum(trace.data)
884
		trace.detrend(type='demean')
885
	elif 'DISP' in self.deconv:
886
		trace.data = np.cumsum(np.cumsum(trace.data))
887
		trace.detrend(type='linear')
888
	elif 'GRAV' in self.deconv:
889
		trace.data = trace.data / g
890
		trace.stats.units = 'Earth gravity'
891
	else:
892
		trace.stats.units = 'Acceleration'
893
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
894
		trace.taper(max_percentage=0.1, side='left', max_length=1)
895
896
897
def deconv_rbm_inst(self, trace):
898
	'''
899
	.. role:: pycode(code)
900
		:language: python
901
	
902
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
903
	for acceleration channels.
904
905
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
906
	:param osbpy.core.trace.Trace trace: the trace object instance to deconvolve
907
	'''
908
	trace.stats.units = ' counts'
909
910
911
def deconvolve(self):
912
	'''
913
	.. role:: pycode(code)
914
		:language: python
915
	
916
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
917
	that need to deconvolve their raw data to metric units.
918
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
919
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
920
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
921
922
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
923
	'''
924
	acc_channels = ['ENE', 'ENN', 'ENZ']
925
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
926
	rbm_channels = ['HDF']
927
928
	self.stream = self.raw.copy()
929
	for trace in self.stream:
930
		trace.stats.units = self.units
931
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
932
		if self.deconv:
933
			if trace.stats.channel in vel_channels:
934
				deconv_vel_inst(self, trace)	# geophone channels
935
936
			elif trace.stats.channel in acc_channels:
937
				deconv_acc_inst(self, trace)	# accelerometer channels
938
939
			elif trace.stats.channel in rbm_channels:
940
				deconv_rbm_inst(self, trace)	# this is the Boom channel
941
942
			else:
943
				trace.stats.units = ' counts'	# this is a new one
944
945
		else:
946
			trace.stats.units = ' counts'		# this is not being deconvolved
947
948
949
class ConsumerThread(Thread):
950
	'''
951
	The default consumer thread setup.
952
	Import this consumer and easily create your own consumer modules!
953
	This class modifies the :py:class:`threading.Thread` object to
954
	include some settings that all rsudp consumers need,
955
	some of which the :py:class:`rsudp.p_producer.Producer`
956
	needs in order to function.
957
958
	Currently, the modifications that this module makes to
959
	:py:class:`threading.Thread` objects are:
960
961
	.. code-block:: python
962
963
		self.sender = 'ConsumerThread'  # module name used in logging
964
		self.alarm = False              # the Producer reads this to set the ``ALARM`` state
965
		self.alarm_reset = False        # the Producer reads this to set the ``RESET`` state
966
		self.alive = True               # this is used to keep the main ``for`` loop running
967
968
	For more information on creating your own consumer threads,
969
	see :ref:`add_your_own`.
970
971
	'''
972
	def __init__(self):
973
		super().__init__()
974
		self.sender = 'ConsumerThread'	# used in logging
975
		self.alarm = False				# the producer reads this
976
		self.alarm_reset = False		# the producer reads this
977
		self.alive = True				# this is used to keep the main for loop running
978
979
980
if __name__ == '__main__':
981
	pass
982