Passed
Pull Request — master (#64)
by
unknown
06:49
created

build.rsudp.raspberryshake.update_stream()   A

Complexity

Conditions 3

Size

Total Lines 32
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3.3332

Importance

Changes 0
Metric Value
eloc 6
dl 0
loc 32
ccs 4
cts 6
cp 0.6667
rs 10
c 0
b 0
f 0
cc 3
nop 3
crap 3.3332
1 1
import numpy as np
2 1
import os, platform
3 1
import socket as s
4 1
import signal
5 1
from obspy import UTCDateTime
6 1
from obspy.core.stream import Stream
7 1
from obspy import read_inventory, read
8 1
from obspy.geodetics.flinnengdahl import FlinnEngdahl
9 1
from obspy.core.trace import Trace
10 1
from rsudp import printM, printW, printE
11 1
from requests.exceptions import HTTPError
12 1
from threading import Thread
13 1
from . import __version__
14
15 1
initd, sockopen = False, False
16 1
qsize = 2048 			# max queue size
17 1
port = 8888				# default listening port
18 1
to = 10					# socket test timeout
19 1
firstaddr = ''			# the first address data is received from
20 1
inv = False				# station inventory
21 1
INVWARN = False			# warning when inventory attachment fails
22 1
region = False
23 1
producer = False 		# flag for producer status
24 1
stn = 'Z0000'			# station name
25 1
net = 'AM'				# network (this will always be AM)
26 1
chns = []				# list of channels
27 1
numchns = 0
28
29 1
tf = None				# transmission frequency in ms
30 1
tr = None				# transmission rate in packets per second
31 1
sps = None				# samples per second
32
33
# conversion units
34
# 		'name',	: ['pretty name', 'unit display']
35 1
UNITS = {'ACC'	: ['Acceleration', 'm/s$^2$'],
36
		 'GRAV'	: ['Earth gravity', ' g'],
37
		 'VEL'	: ['Velocity', 'm/s'],
38
		 'DISP'	: ['Displacement', 'm'],
39
		 'CHAN'	: ['channel-specific', ' Counts']}
40
41 1
g = 9.81	# earth gravity in m/s2
42
43
44
# get an IP to report to the user
45
# from https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib
46 1
def get_ip():
47
	'''
48
	.. |so_ip| raw:: html
49
50
		<a href="https://stackoverflow.com/questions/166506/finding-local-ip-addresses-using-pythons-stdlib" target="_blank">this stackoverflow answer</a>
51
52
53
	Return a reliable network IP to report to the user when there is no data received.
54
	This helps the user set their Raspberry Shake's datacast streams to point to the correct location
55
	if the library raises a "no data received" error.
56
	Solution adapted from |so_ip|.
57
58
	.. code-block:: python
59
60
		>>> get_ip()
61
		'192.168.1.23'
62
63
	:rtype: str
64
	:return: The network IP of the machine that this program is running on
65
	'''
66
67 1
	testsock = s.socket(s.AF_INET, s.SOCK_DGRAM)
68 1
	try:
69
		# doesn't even have to be reachable
70 1
		testsock.connect(('10.255.255.255', 1))
71 1
		IP = testsock.getsockname()[0]
72
	except:
73
		IP = '127.0.0.1'
74
	finally:
75 1
		testsock.close()
76 1
	return IP
77
78 1
ip = get_ip()
79
80
# construct a socket
81 1
socket_type =  s.SOCK_DGRAM
82 1
sock = s.socket(s.AF_INET, socket_type)
83 1
if platform.system() not in 'Windows':
84 1
    sock.setsockopt(s.SOL_SOCKET, s.SO_REUSEADDR, 1)
85
86 1
def handler(signum, frame, ip=ip):
87
	'''
88
	The signal handler for the nodata alarm.
89
90
	:param int signum: signal number
91
	:param int frame: frame number
92
	:param str ip: the IP of the box this program is running on (i.e. the device the Raspberry Shake should send data to)
93
	:raise IOError: on UNIX systems if no data is received
94
	'''
95
	global port
96
	printE('No data received in %s seconds; aborting.' % (to), sender='Init')
97
	printE('Check that the Shake is forwarding data to:', sender='Init', announce=False, spaces=True)
98
	printE('IP address: %s    Port: %s' % (ip, port), sender='Init', announce=False, spaces=True)
99
	printE('and that no firewall exists between the Shake and this computer.', sender='Init', announce=False, spaces=True)
100
	raise IOError('No data received')
101
102
103 1
def initRSlib(dport=port, rsstn='Z0000', timeout=10):
104
	'''
105
	.. role:: pycode(code)
106
		:language: python
107
108
	Initializes this library (:py:func:`rsudp.raspberryshake`).
109
	Set values for data port, station, network, and port timeout prior to opening the socket.
110
	Calls both :py:func:`rsudp.raspberryshake.openSOCK` and :py:func:`rsudp.raspberryshake.set_params`.
111
112
	.. code-block:: python
113
114
		>>> import rsudp.raspberryshake as rs
115
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
116
117
	The library is now initialized:
118
119
	.. code-block:: python
120
121
		>>> rs.initd
122
		True
123
124
	:param int dport: The local port the Raspberry Shake is sending UDP data packets to. Defaults to :pycode:`8888`.
125
	:param str rsstn: The name of the station (something like :pycode:`'RCB43'` or :pycode:`'S0CDE'`)
126
	:param int timeout: The number of seconds for :py:func:`rsudp.raspberryshake.set_params` to wait for data before an error is raised (zero for unlimited wait)
127
128
	:rtype: str
129
	:return: The instrument channel as a string
130
131
	'''
132
	global port, stn, to, initd, port
133
	global producer
134 1
	sender = 'RS lib'
135 1
	printM('Initializing rsudp v %s.' % (__version__), sender)
136 1
	try:						# set port value first
137 1
		if dport == int(dport):
138 1
			port = int(dport)
139
		else:
140
			port = int(dport)
141
			printW('Supplied port value was converted to integer. Non-integer port numbers are invalid.')
142
	except Exception as e:
143
		printE('Details - %s' % e)
144
145 1
	try:						# set station name
146 1
		if len(rsstn) == 5:
147 1
			stn = str(rsstn).upper()
148
		else:
149
			stn = str(rsstn).upper()
150
			printW('Station name does not follow Raspberry Shake naming convention.')
151
	except ValueError as e:
152
		printE('Invalid station name supplied. Details: %s' % e)
153
		printE('reverting to station name Z0000', announce=False, spaces=True)
154
	except Exception as e:
155
		printE('Details - %s' % e)
156
	
157 1
	try:						# set timeout value 
158 1
		to = int(timeout)
159
	except ValueError as e:
160
		printW('You likely supplied a non-integer as the timeout value. Your value was: %s'
161
				% timeout)
162
		printW('Continuing with default timeout of %s sec'
163
				% (to), announce=False, spaces=True)
164
		printW('details: %s' % e, announce=False, spaces=True)
165
	except Exception as e:
166
		printE('Details - %s' % e)
167
168 1
	initd = True				# if initialization goes correctly, set initd to true
169 1
	openSOCK()					# open a socket
170 1
	printM('Waiting for UDP data on port %s...' % (port), sender)
171 1
	set_params()				# get data and set parameters
172
173 1
def openSOCK(host=''):
174
	'''
175
	.. role:: pycode(code)
176
		:language: python
177
178
	Initialize a socket at the port specified by :pycode:`rsudp.raspberryshake.port`.
179
	Called by :py:func:`rsudp.raspberryshake.initRSlib`, must be done before :py:func:`rsudp.raspberryshake.set_params`.
180
181
	:param str host: self-referential location at which to open a listening port (defaults to :pycode:`''` which resolves to :pycode:`'localhost'`)
182
	:raise IOError: if the library is not initialized (:py:func:`rsudp.raspberryshake.initRSlib`) prior to running this function
183
	:raise OSError: if the program cannot bind to the specified port number
184
185
	'''
186
	global sockopen
187 1
	sockopen = False
188
	
189 1
	if initd:
190 1
		HP = '%s:%s' % ('localhost',port)
191 1
		printM("Opening socket on %s (HOST:PORT)"
192
				% HP, 'openSOCK')
193 1
		try:
194 1
			sock.bind((host, port))
195 1
			sockopen = True
196 1
			print('ABTEST - Socket was succesfully opened. ', sock)
197
		except Exception as e:
198
			printE('Could not bind to port %s. Is another program using it?' % port)
199
			printE('Detail: %s' % e, announce=False)
200
			raise OSError(e)
201
	else:
202
		raise IOError("Before opening a socket, you must initialize this raspberryshake library by calling initRSlib(dport=XXXXX, rssta='R0E05') first.")
203
204 1
def set_params():
205
	'''
206
	.. role:: pycode(code)
207
		:language: python
208
209
	Read a data packet off the port.
210
	Called by :py:func:`rsudp.raspberryshake.initRSlib`,
211
	must be done after :py:func:`rsudp.raspberryshake.openSOCK`
212
	but before :py:func:`rsudp.raspberryshake.getDATA`.
213
	Will wait :pycode:`rsudp.raspberryshake.to` seconds for data before raising a no data exception
214
	(only available with UNIX socket types).
215
216
	'''
217
	global to, firstaddr
218 1
	if os.name not in 'nt': 	# signal alarm not available on windows
219 1
		signal.signal(signal.SIGALRM, handler)
220 1
		signal.alarm(to)		# alarm time set with timeout value
221 1
	data, (firstaddr, connport) = sock.recvfrom(2048)
222 1
	if os.name not in 'nt':
223 1
		signal.alarm(0)			# once data has been received, turn alarm completely off
224 1
	to = 0						# otherwise it erroneously triggers after keyboardinterrupt
225 1
	getTR(getCHNS()[0])
226 1
	getSR(tf, data)
227 1
	getTTLCHN()
228 1
	printM('Available channels: %s' % chns, 'Init')
229 1
	get_inventory()
230
231 1
def getDATA():
232
	'''
233
	Read a data packet off the port.
234
235
	In this example, we get a Shake 1Dv7 data packet:
236
237
	.. code-block:: python
238
239
		>>> import rsudp.raspberryshake as rs
240
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
241
		>>> d = rs.getDATA()
242
		>>> d
243
		b"{'EHZ', 1582315130.292, 14168, 14927, 16112, 17537, 18052, 17477,
244
		15418, 13716, 15604, 17825, 19637, 20985, 17325, 10439, 11510, 17678,
245
		20027, 20207, 18481, 15916, 13836, 13073, 14462, 17628, 19388}"
246
247
248
	:rtype: bytes
249
	:return: Returns a data packet as an encoded bytes object.
250
251
	:raise IOError: if no socket is open (:py:func:`rsudp.raspberryshake.openSOCK`) prior to running this function
252
	:raise IOError: if the library is not initialized (:py:func:`rsudp.raspberryshake.initRSlib`) prior to running this function
253
254
	'''
255
	global to, firstaddr
256 1
	if sockopen:
257 1
		return sock.recv(4096)
258
	else:
259
		if initd:
260
			raise IOError("No socket is open. Please open a socket using this library's openSOCK() function.")
261
		else:
262
			raise IOError('No socket is open. Please initialize the library using initRSlib() then open a socket using openSOCK().')
263
	
264 1
def getCHN(DP):
265
	'''
266
	Extract the channel information from the data packet.
267
	Requires :py:func:`rsudp.raspberryshake.getDATA` packet as argument.
268
269
	In this example, we get the channel code from a Shake 1Dv7 data packet:
270
271
	.. code-block:: python
272
273
		>>> import rsudp.raspberryshake as rs
274
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
275
		>>> d = rs.getDATA()
276
		>>> rs.getCHN(d)
277
		'EHZ'
278
279
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse channel information from
280
	:type DP: bytes
281
	:rtype: str
282
	:return: Returns the instrument channel as a string.
283
	'''
284 1
	return str(DP.decode('utf-8').split(",")[0][1:]).strip("\'")
285
	
286 1
def getTIME(DP):
287
	'''
288
	Extract the timestamp from the data packet.
289
	Timestamp is seconds since 1970-01-01 00:00:00Z,
290
	which can be passed directly to an :py:class:`obspy.core.utcdatetime.UTCDateTime` object:
291
292
	In this example, we get the timestamp of a Shake 1Dv7 data packet and convert it to a UTCDateTime:
293
294
	.. code-block:: python
295
296
		>>> import rsudp.raspberryshake as rs
297
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
298
		>>> from obspy import UTCDateTime
299
		>>> d = rs.getDATA()
300
		>>> t = rs.getTIME(d)
301
		>>> t
302
		1582315130.292
303
		>>> dt = obspy.UTCDateTime(t, precision=3)
304
		>>> dt
305
		UTCDateTime(2020, 2, 21, 19, 58, 50, 292000)
306
307
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse time information from
308
	:type DP: bytes
309
	:rtype: float
310
	:return: Timestamp in decimal seconds since 1970-01-01 00:00:00Z
311
	'''
312 1
	return float(DP.split(b",")[1])
313
314 1
def getSTREAM(DP):
315
	'''
316
	Get the samples in a data packet as a list object.
317
	Requires :py:func:`rsudp.raspberryshake.getDATA` packet as argument.
318
319
	In this example, we get a list of samples from a Shake 1Dv7 data packet:
320
321
	.. code-block:: python
322
323
		>>> import rsudp.raspberryshake as rs
324
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
325
		>>> d = rs.getDATA()
326
		>>> s = rs.getSTREAM(d)
327
		>>> s
328
		[14168, 14927, 16112, 17537, 18052, 17477, 15418, 13716, 15604,
329
		 17825, 19637, 20985, 17325, 10439, 11510, 17678, 20027, 20207,
330
		 18481, 15916, 13836, 13073, 14462, 17628, 19388]
331
332
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse stream information from
333
	:type DP: bytes
334
	:rtype: list
335
	:return: List of data samples in the packet
336
	'''
337 1
	return list(map(int, DP.decode('utf-8').replace('}','').split(',')[2:]))
338
339 1
def getTR(chn):				# DP transmission rate in msecs
340
	'''
341
	Get the transmission rate in milliseconds between consecutive packets from the same channel.
342
	Must wait to receive a second packet from the same channel.
343
	Requires a :py:func:`rsudp.raspberryshake.getCHN` or a channel name string as argument.
344
345
	In this example, we calculate the transmission frequency of a Shake 1Dv7:
346
347
	.. code-block:: python
348
349
		>>> import rsudp.raspberryshake as rs
350
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
351
		>>> d = rs.getDATA()
352
		>>> tr = rs.getTR(rs.getCHN(d))
353
		>>> tr
354
		250
355
356
	:param chn: The seismic instrument channel (:py:func:`rsudp.raspberryshake.getCHN`) to calculate transmission rate information from
357
	:type chn: str
358
	:rtype: int
359
	:return: Transmission rate in milliseconds between consecutive packets from a specific channel
360
	'''
361
	global tf, tr
362 1
	timeP1, timeP2 = 0.0, 0.0
363 1
	done = False
364 1
	while not done:
365 1
		DP = getDATA()
366 1
		CHAN = getCHN(DP)
367 1
		if CHAN == chn:
368 1
			if timeP1 == 0.0:
369 1
				timeP1 = getTIME(DP)
370
			else:
371 1
				timeP2 = getTIME(DP)
372 1
				done = True
373 1
	TR = timeP2*1000 - timeP1*1000
374 1
	tf = int(TR)
375 1
	tr = int(1000 / TR)
376 1
	return tf
377
378 1
def getSR(TR, DP):
379
	'''
380
	Get the sample rate in samples per second.
381
	Requires an integer transmission frequency and a data packet as arguments.
382
383
	In this example, we calculate the number of samples per second from a Shake 1Dv7:
384
385
	.. code-block:: python
386
387
		>>> import rsudp.raspberryshake as rs
388
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
389
		>>> d = rs.getDATA()
390
		>>> tr = rs.getTR(rs.getCHN(d))
391
		>>> tr
392
		250
393
		>>> sps = rs.getSR(tr, d)
394
		>>> sps
395
		100
396
397
398
	:param TR: The transmission frequency (:py:func:`rsudp.raspberryshake.getTR`) in milliseconds between packets
399
	:type TR: int
400
	:param DP: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) calculate sample rate information from
401
	:type DP: bytes
402
	:rtype: int
403
	:return: The sample rate in samples per second from a specific channel
404
	'''
405
	global sps
406 1
	sps = int((DP.count(b",") - 1) * 1000 / TR)
407 1
	return sps
408
	
409 1
def getCHNS():
410
	'''
411
	Get a list of channels sent to the port.
412
413
	In this example, we list channels from a Boom:
414
415
	.. code-block:: python
416
417
		>>> import rsudp.raspberryshake as rs
418
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
419
		>>> rs.getCHNS()
420
		['EHZ', 'HDF']
421
422
423
	:rtype: list
424
	:return: The list of channels being sent to the port (from the single IP address sending data)
425
	'''
426
	global chns
427 1
	chdict = {'EHZ': False, 'EHN': False, 'EHE': False,
428
			  'ENZ': False, 'ENN': False, 'ENE': False, 'HDF': False}
429 1
	firstCHN = ''
430 1
	done = False
431 1
	sim = 0
432 1
	while not done:
433 1
		DP = getDATA()
434 1
		if firstCHN == '':
435 1
			firstCHN = getCHN(DP)
436 1
			chns.append(firstCHN)
437 1
			continue
438 1
		nextCHN = getCHN(DP)
439 1
		if firstCHN == nextCHN:
440 1
			if sim > 1:
441 1
				done = True
442 1
				continue
443 1
			sim += 1
444
		else:
445 1
			chns.append(nextCHN)
446 1
	for ch in chns:
447 1
		chdict[ch] = True
448 1
	chns = []
449 1
	for ch in chdict:
450 1
		if chdict[ch] == True:
451 1
			chns.append(ch)
452 1
	return chns
453
454 1
def getTTLCHN():
455
	'''
456
	Calculate total number of channels received,
457
	by counting the number of channels returned by :py:func:`rsudp.raspberryshake.getCHNS`.
458
459
	In this example, we get the number of channels from a Shake & Boom:
460
461
	.. code-block:: python
462
463
		>>> import rsudp.raspberryshake as rs
464
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
465
		>>> rs.getTTLCHN()
466
		2
467
468
	:rtype: int
469
	:return: The number of channels being sent to the port (from the single IP address sending data)
470
	'''
471
	global numchns
472 1
	numchns = len(getCHNS())
473 1
	return numchns
474
475
476 1
def get_inventory(sender='get_inventory'):
477
	'''
478
	.. role:: pycode(code)
479
		:language: python
480
481
	Downloads the station inventory from the Raspberry Shake FDSN and stores
482
	it as an :py:class:`obspy.core.inventory.inventory.Inventory` object which is available globally.
483
484
	In this example, we get the R940D station inventory from the Raspberry Shake FDSN:
485
486
	.. code-block:: python
487
488
		>>> import rsudp.raspberryshake as rs
489
		>>> rs.initRSlib(dport=8888, rsstn='R940D')
490
		>>> inv = rs.get_inventory()
491
		>>> print(inv)
492
		Inventory created at 2020-02-21T20:37:34.246777Z
493
			Sending institution: SeisComP3 (gempa testbed)
494
			Contains:
495
				Networks (1):
496
					AM
497
				Stations (1):
498
					AM.R940D (Raspberry Shake Citizen Science Station)
499
				Channels (2):
500
					AM.R940D.00.EHZ, AM.R940D.00.HDF
501
502
503
	:param sender: `(optional)` The name of the function calling the :py:func:`rsudp.printM` logging function
504
	:type str: str or None
505
	:rtype: obspy.core.inventory.inventory.Inventory or bool
506
	:return: The inventory of the Raspberry Shake station in the :pycode:`rsudp.raspberryshake.stn` variable.
507
	'''
508
	global inv, stn, region
509 1
	sender = 'get_inventory'
510 1
	if 'Z0000' in stn:
511
		printW('No station name given, continuing without inventory.',
512
				sender)
513
		inv = False
514
	else:
515 1
		try:
516 1
			printM('Fetching inventory for station %s.%s from Raspberry Shake FDSN.'
517
					% (net, stn), sender)
518 1
			url = 'https://fdsnws.raspberryshakedata.com/fdsnws/station/1/query?network=%s&station=%s&level=resp&nodata=404&format=xml' % (
519
				   net, stn)#, str(UTCDateTime.now()-timedelta(seconds=14400)))
520 1
			inv = read_inventory(url)
521
			region = FlinnEngdahl().get_region(inv[0][-1].longitude, inv[0][-1].latitude)
522
			printM('Inventory fetch successful. Station region is %s' % (region), sender)
523 1
		except (IndexError, HTTPError):
524 1
			printW('No inventory found for %s. Are you forwarding your Shake data?' % stn, sender)
525 1
			printW('Deconvolution will only be available if data forwarding is on.', sender, spaces=True)
526 1
			printW('Access the config page of the web front end for details.', sender, spaces=True)
527 1
			printW('More info at https://manual.raspberryshake.org/quickstart.html', sender, spaces=True)
528 1
			inv = False
529 1
			region = False
530
		except Exception as e:
531
			printE('Inventory fetch failed!', sender)
532
			printE('Error detail: %s' % e, sender, spaces=True)
533
			inv = False
534
			region = False
535 1
	return inv
536
537
538 1
def make_trace(d):
539
	'''
540
	Makes a trace and assigns it some values using a data packet.
541
542
	In this example, we make a trace object with some RS 1Dv7 data:
543
544
	.. code-block:: python
545
546
		>>> import rsudp.raspberryshake as rs
547
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
548
		>>> d = rs.getDATA()
549
		>>> t = rs.make_trace(d)
550
		>>> print(t)
551
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
552
553
	:param d: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse Trace information from
554
	:type d: bytes
555
	:rtype: obspy.core.trace.Trace
556
	:return: A fully formed Trace object to build a Stream with
557
	'''
558
	global INVWARN
559 1
	ch = getCHN(d)						# channel
560 1
	if ch:
561 1
		t = getTIME(d)				# unix epoch time since 1970-01-01 00:00:00Z; "timestamp" in obspy
562 1
		st = getSTREAM(d)				# samples in data packet in list [] format
563 1
		tr = Trace(data=np.ma.MaskedArray(st, dtype=np.int32))	# create empty trace
564 1
		tr.stats.network = net			# assign values
565 1
		tr.stats.location = '00'
566 1
		tr.stats.station = stn
567 1
		tr.stats.channel = ch
568 1
		tr.stats.sampling_rate = sps
569 1
		tr.stats.starttime = UTCDateTime(t, precision=3)
570 1
		if inv:
571
			try:
572
				tr.stats.response = inv.get_response(tr.id, tr.stats.starttime)
573
			except Exception as e:
574
				if not INVWARN:
575
					INVWARN = True
576
					printE(e, sender='make_trace')
577
					printE('Could not attach inventory response.', sender='make_trace')
578
					printE('Are you sure you set the station name correctly?', spaces=True, sender='make_trace')
579
					printE('This could indicate a mismatch in the number of data channels', spaces=True, sender='make_trace')
580
					printE('between the inventory and the stream. For example,', spaces=True, sender='make_trace')
581
					printE('if you are receiving RS4D data, please make sure', spaces=True, sender='make_trace')
582
					printE('the inventory you download has 4 channels.', spaces=True, sender='make_trace')
583
				else:
584
					pass
585 1
		return tr
586
587
588
# Then make repeated calls to this, to continue adding trace data to the stream
589 1
def update_stream(stream, d, **kwargs):
590
	'''
591
	Returns an updated Stream object with new data, merged down to one trace per available channel.
592
	Most sub-consumers call this each time they receive data packets in order to keep their obspy stream current.
593
594
	In this example, we make a stream object with some RS 1Dv7 data:
595
596
	.. code-block:: python
597
598
		>>> import rsudp.raspberryshake as rs
599
		>>> from obspy.core.stream import Stream
600
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
601
		>>> s = Stream()
602
		>>> d = rs.getDATA()
603
		>>> t = rs.make_trace(d)
604
		>>> s = rs.update_stream(s, d)
605
		>>> print(s)
606
		1 Trace(s) in Stream:
607
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
608
609
610
	:param obspy.core.stream.Stream stream: The stream to update
611
	:param d: The Raspberry Shake UDP data packet (:py:func:`rsudp.raspberryshake.getDATA`) to parse Stream information from
612
	:type d: bytes
613
	:rtype: obspy.core.stream.Stream
614
	:return: A seismic data stream
615
	'''
616 1
	while True:
617 1
		try:
618 1
			return stream.append(make_trace(d)).merge(**kwargs)
619
		except TypeError:
620
			pass
621
622
623 1
def copy(orig):
624
	"""
625
	True-copy a stream by creating a new stream and copying old attributes to it.
626
	This is necessary because the old stream accumulates *something* that causes
627
	CPU usage to increase over time as more data is added. This is a bug in obspy
628
	that I intend to find--or at the very least report--but until then this hack
629
	works fine and is plenty fast enough.
630
631
	In this example, we make a stream object with some RS 1Dv7 data and then copy it to a new stream:
632
633
	.. code-block:: python
634
635
		>>> import rsudp.raspberryshake as rs
636
		>>> from obspy.core.stream import Stream
637
		>>> rs.initRSlib(dport=8888, rsstn='R3BCF')
638
		>>> s = Stream()
639
		>>> d = rs.getDATA()
640
		>>> t = rs.make_trace(d)
641
		>>> s = rs.update_stream(s, d)
642
		>>> s
643
		1 Trace(s) in Stream:
644
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
645
		>>> s = rs.copy(s)
646
		>>> s
647
		1 Trace(s) in Stream:
648
		AM.R3BCF.00.EHZ | 2020-02-21T19:58:50.292000Z - 2020-02-21T19:58:50.532000Z | 100.0 Hz, 25 samples
649
650
651
	:param obspy.core.stream.Stream orig: The data stream to copy information from
652
	:rtype: obspy.core.stream.Stream
653
	:return: A low-memory copy of the passed data stream
654
655
	"""
656 1
	stream = Stream()
657 1
	for t in range(len(orig)):
658 1
		trace = Trace(data=orig[t].data)
659 1
		trace.stats.network = orig[t].stats.network
660 1
		trace.stats.location = orig[t].stats.location
661 1
		trace.stats.station = orig[t].stats.station
662 1
		trace.stats.channel = orig[t].stats.channel
663 1
		trace.stats.sampling_rate = orig[t].stats.sampling_rate
664 1
		trace.stats.starttime = orig[t].stats.starttime
665 1
		stream.append(trace).merge(fill_value=None)
666 1
	return stream.copy()
667
668
669 1
class ConsumerThread(Thread):
670
	'''
671
	The default consumer thread setup.
672
	Import this consumer and easily create your own consumer modules!
673
	This class modifies the :py:class:`threading.Thread` object to
674
	include some settings that all rsudp consumers need,
675
	some of which the :py:class:`rsudp.p_producer.Producer`
676
	needs in order to function.
677
678
	Currently, the modifications that this module makes to
679
	:py:class:`threading.Thread` objects are:
680
681
	.. code-block:: python
682
683
		self.sender = 'ConsumerThread'  # module name used in logging
684
		self.alarm = False              # the Producer reads this to set the ``ALARM`` state
685
		self.alarm_reset = False        # the Producer reads this to set the ``RESET`` state
686
		self.alive = True               # this is used to keep the main ``for`` loop running
687
688
	For more information on creating your own consumer threads,
689
	see :ref:`add_your_own`.
690
691
	'''
692 1
	def __init__(self):
693 1
		super().__init__()
694 1
		self.sender = 'ConsumerThread'	# used in logging
695 1
		self.alarm = False				# the producer reads this
696 1
		self.alarm_reset = False		# the producer reads this
697 1
		self.alive = True				# this is used to keep the main for loop running
698
699
700
if __name__ == '__main__':
701
	pass
702