Passed
Push — master ( fe5590...f40c33 )
by Ian
06:56
created

build.rsudp.client.handler()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
import sys, os
2
import signal
3
import getopt
4
import time
5
import json
6
import re
7
import logging
8
import traceback
9
from queue import Queue
10
from rsudp import printM, printW, printE, default_loc, init_dirs, output_dir, add_debug_handler, start_logging
11
from rsudp import COLOR
12
import rsudp.helpers as H
13
import rsudp.test as T
14
import rsudp.raspberryshake as rs
15
from rsudp.packetize import packetize
16
from rsudp.c_consumer import Consumer
17
from rsudp.p_producer import Producer
18
from rsudp.c_printraw import PrintRaw
19
from rsudp.c_write import Write
20
from rsudp.c_plot import Plot, MPL
21
from rsudp.c_forward import Forward
22
from rsudp.c_alert import Alert
23
from rsudp.c_alertsound import AlertSound
24
from rsudp.c_custom import Custom
25
from rsudp.c_tweet import Tweeter
26
from rsudp.c_telegram import Telegrammer
27
from rsudp.c_testing import Testing
28
from rsudp.t_testdata import TestData
29
import pkg_resources as pr
30
import fnmatch
31
try:
32
	from pydub import AudioSegment
33
	PYDUB_EXISTS = True
34
except ImportError:
35
	PYDUB_EXISTS = False
36
37
38
DESTINATIONS, THREADS = [], []
39
PROD = False
40
PLOTTER = False
41
SOUND = False
42
TESTING = False
43
TESTQUEUE = False
44
TESTFILE = pr.resource_filename('rsudp', os.path.join('test', 'testdata'))
45
SENDER = 'Main'
46
47
def handler(sig, frame):
48
	'''
49
	Function passed to :py:func:`signal.signal` to handle close events
50
	'''
51
	rs.producer = False
52
53
def _xit(code=0):
54
	'''
55
	End the program. Called after all running threads have stopped.
56
57
	:param int code: The process code to exit with. 0=OK, 1=ERROR.
58
	'''
59
	if TESTING:
60
		TESTQUEUE.put(b'ENDTEST')
61
	for thread in THREADS:
62
		del thread
63
	
64
	printM('Shutdown successful.', sender=SENDER)
65
	print()
66
	sys.exit(code)
67
68
def test_mode(mode=None):
69
	'''
70
	Sets the TESTING global variable to ``True`` to indicate that
71
	testing-specific actions should be taken in routines.
72
73
	:param bool mode: if ``True`` or ``False``, sets testing mode state. if anything else, returns state only.
74
	:return: testing mode state
75
	:rtype: bool
76
	'''
77
	global TESTING
78
	if (mode == True) or (mode == False):
79
		TESTING = mode
80
	return TESTING
81
82
83
def mk_q():
84
	'''
85
	Makes a queue and appends it to the :py:data:`destinations`
86
	variable to be passed to the master consumer thread
87
	:py:class:`rsudp.c_consumer.Consumer`.
88
89
	:rtype: queue.Queue
90
	:return: Returns the queue to pass to the sub-consumer.
91
	'''
92
	q = Queue(rs.qsize)
93
	DESTINATIONS.append(q)
94
	return q
95
96
def mk_p(proc):
97
	'''
98
	Appends a process to the list of threads to start and stop.
99
100
	:param threading.Thread proc: The process thread to append to the list of threads.
101
	'''
102
	THREADS.append(proc)
103
104
105
def start():
106
	'''
107
	Start Consumer, Threads, and Producer.
108
	'''
109
	global PROD, PLOTTER, THREADS, DESTINATIONS
110
	# master queue and consumer
111
	queue = Queue(rs.qsize)
112
	cons = Consumer(queue, DESTINATIONS)
113
	cons.start()
114
115
	for thread in THREADS:
116
		thread.start()
117
118
	PROD = Producer(queue, THREADS)
119
	PROD.start()
120
121
	if PLOTTER and MPL:
122
		# give the plotter the master queue
123
		# so that it can issue a TERM signal if closed
124
		PLOTTER.master_queue = queue
125
		# start plotting (in this thread, not a separate one)
126
		PLOTTER.run()
127
	else:
128
		while not PROD.stop:
129
			time.sleep(0.1) # wait until processes end
130
131
132
	time.sleep(0.5) # give threads time to exit
133
	PROD.stop = True
134
135
136
def run(settings, debug):
137
	'''
138
	Main setup function. Takes configuration values and passes them to
139
	the appropriate threads and functions.
140
141
	:param dict settings: settings dictionary (see :ref:`defaults` for guidance)
142
	:param bool debug: whether or not to show debug output (should be turned off if starting as daemon)
143
	'''
144
	global PLOTTER, SOUND
145
	# handler for the exit signal
146
	signal.signal(signal.SIGINT, handler)
147
148
	if TESTING:
149
		global TESTQUEUE
150
		# initialize the test data to read information from file and put it on the port
151
		TESTQUEUE = Queue()		# separate from client library because this is not downstream of the producer
152
		tdata = TestData(q=TESTQUEUE, data_file=TESTFILE, port=settings['settings']['port'])
153
		tdata.start()
154
155
	# initialize the central library
156
	rs.initRSlib(dport=settings['settings']['port'],
157
				 rsstn=settings['settings']['station'])
158
159
	H.conn_stats(TESTING)
160
	if TESTING:
161
		T.TEST['n_port'][1] = True	# port has been opened
162
		if rs.sps == 0:
163
			printE('There is already a Raspberry Shake sending data to this port.', sender=SENDER)
164
			printE('For testing, please change the port in your settings file to an unused one.',
165
					sender=SENDER, spaces=True)
166
			_xit(1)
167
168
169
	output_dir = settings['settings']['output_dir']
170
171
172
	if settings['printdata']['enabled']:
173
		# set up queue and process
174
		q = mk_q()
175
		prnt = PrintRaw(q)
176
		mk_p(prnt)
177
178
	if settings['write']['enabled']:
179
		# set up queue and process
180
		cha = settings['write']['channels']
181
		q = mk_q()
182
		writer = Write(q=q, cha=cha)
183
		mk_p(writer)
184
185
	if settings['plot']['enabled'] and MPL:
186
		while True:
187
			if rs.numchns == 0:
188
				time.sleep(0.01)
189
				continue
190
			else:
191
				break
192
		cha = settings['plot']['channels']
193
		sec = settings['plot']['duration']
194
		spec = settings['plot']['spectrogram']
195
		full = settings['plot']['fullscreen']
196
		kiosk = settings['plot']['kiosk']
197
		screencap = settings['plot']['eq_screenshots']
198
		alert = settings['alert']['enabled']
199
		if settings['plot']['deconvolve']:
200
			if settings['plot']['units'].upper() in rs.UNITS:
201
				deconv = settings['plot']['units'].upper()
202
			else:
203
				deconv = 'CHAN'
204
		else:
205
			deconv = False
206
		pq = mk_q()
207
		PLOTTER = Plot(cha=cha, seconds=sec, spectrogram=spec,
208
						fullscreen=full, kiosk=kiosk, deconv=deconv, q=pq,
209
						screencap=screencap, alert=alert)
210
		# no mk_p() here because the plotter must be controlled by the main thread (this one)
211
212
	if settings['forward']['enabled']:
213
		# put settings in namespace
214
		addr = settings['forward']['address']
215
		port = settings['forward']['port']
216
		cha = settings['forward']['channels']
217
		# set up queue and process
218
		q = mk_q()
219
		forward = Forward(addr=addr, port=port, cha=cha, q=q)
220
		mk_p(forward)
221
222
	if settings['alert']['enabled']:
223
		# put settings in namespace
224
		sta = settings['alert']['sta']
225
		lta = settings['alert']['lta']
226
		thresh = settings['alert']['threshold']
227
		reset = settings['alert']['reset']
228
		bp = [settings['alert']['highpass'], settings['alert']['lowpass']]
229
		cha = settings['alert']['channel']
230
		if settings['alert']['deconvolve']:
231
			if settings['alert']['units'].upper() in rs.UNITS:
232
				deconv = settings['alert']['units'].upper()
233
			else:
234
				deconv = 'CHAN'
235
		else:
236
			deconv = False
237
238
		# set up queue and process
239
		q = mk_q()
240
		alrt = Alert(sta=sta, lta=lta, thresh=thresh, reset=reset, bp=bp,
241
					 cha=cha, debug=debug, q=q,
242
					 deconv=deconv)
243
		mk_p(alrt)
244
245
	if settings['alertsound']['enabled']:
246
		sender = 'AlertSound'
247
		SOUND = False
248
		soundloc = False
249
		if PYDUB_EXISTS:
250
			soundloc = os.path.expanduser(os.path.expanduser(settings['alertsound']['mp3file']))
251
			if soundloc in ['doorbell', 'alarm', 'beeps', 'sonar']:
252
				soundloc = pr.resource_filename('rsudp', os.path.join('rs_sounds', '%s.mp3' % soundloc))
253
			if os.path.exists(soundloc):
254
				try:
255
					SOUND = AudioSegment.from_file(soundloc, format="mp3")
256
					printM('Loaded %.2f sec alert sound from %s' % (len(SOUND)/1000., soundloc), sender='AlertSound')
257
				except FileNotFoundError as e:
258
					printW("You have chosen to play a sound, but don't have ffmpeg or libav installed.", sender='AlertSound')
259
					printW('Sound playback requires one of these dependencies.', sender='AlertSound', spaces=True)
260
					printW("To install either dependency, follow the instructions at:", sender='AlertSound', spaces=True)
261
					printW('https://github.com/jiaaro/pydub#playback', sender='AlertSound', spaces=True)
262
					printW('The program will now continue without sound playback.', sender='AlertSound', spaces=True)
263
					SOUND = False
264
			else:
265
				printW("The file %s could not be found." % (soundloc), sender='AlertSound')
266
				printW('The program will now continue without sound playback.', sender='AlertSound', spaces=True)
267
		else:
268
			printW("You don't have pydub installed, so no sound will play.", sender='AlertSound')
269
			printW('To install pydub, follow the instructions at:', sender='AlertSound', spaces=True)
270
			printW('https://github.com/jiaaro/pydub#installation', sender='AlertSound', spaces=True)
271
			printW('Sound playback also requires you to install either ffmpeg or libav.', sender='AlertSound', spaces=True)
272
273
		q = mk_q()
274
		alsnd = AlertSound(q=q, sound=SOUND, soundloc=soundloc)
275
		mk_p(alsnd)
276
277
	runcustom = False
278
	try:
279
		f = False
280
		win_ovr = False
281
		if settings['custom']['enabled']:
282
			# put settings in namespace
283
			f = settings['custom']['codefile']
284
			win_ovr = settings['custom']['win_override']
285
			if f == 'n/a':
286
				f = False
287
			runcustom = True
288
	except KeyError as e:
289
		if settings['alert']['exec'] != 'eqAlert':
290
			printW('the custom code function has moved to its own module (rsudp.c_custom)', sender='Custom')
291
			f = settings['alert']['exec']
292
			win_ovr = settings['alert']['win_override']
293
			runcustom = True
294
		else:
295
			raise KeyError(e)
296
	if runcustom:
297
		# set up queue and process
298
		q = mk_q()
299
		cstm = Custom(q=q, codefile=f, win_ovr=win_ovr)
300
		mk_p(cstm)
301
302
303
	if settings['tweets']['enabled']:
304
		consumer_key = settings['tweets']['api_key']
305
		consumer_secret = settings['tweets']['api_secret']
306
		access_token = settings['tweets']['access_token']
307
		access_token_secret = settings['tweets']['access_secret']
308
		tweet_images = settings['tweets']['tweet_images']
309
310
		q = mk_q()
311
		tweet = Tweeter(q=q, consumer_key=consumer_key, consumer_secret=consumer_secret,
312
						access_token=access_token, access_token_secret=access_token_secret,
313
						tweet_images=tweet_images)
314
		mk_p(tweet)
315
316
	if settings['telegram']['enabled']:
317
		token = settings['telegram']['token']
318
		chat_id = settings['telegram']['chat_id']
319
		send_images = settings['telegram']['send_images']
320
321
		q = mk_q()
322
		telegram = Telegrammer(q=q, token=token, chat_id=chat_id,
323
							   send_images=send_images)
324
		mk_p(telegram)
325
326
	# start additional modules here!
327
	################################
328
329
330
	################################
331
332
	if TESTING:
333
		# initialize test consumer
334
		q = mk_q()
335
		test = Testing(q=q)
336
		mk_p(test)
337
338
339
	# start the producer, consumer, and activated modules
340
	start()
341
342
	PLOTTER = False
343
	if not TESTING:
344
		_xit()
345
	else:
346
		printW('Client has exited, ending tests...', sender=SENDER, announce=False)
347
		if SOUND:
348
			T.TEST['d_pydub'][1] = True
349
350
351
def main():
352
	'''
353
	Loads settings to start the main client.
354
	Supply -h from the command line to see help text.
355
	'''
356
	settings_loc = os.path.join(default_loc, 'rsudp_settings.json').replace('\\', '/')
357
358
	hlp_txt='''
359
###########################################
360
##     R A S P B E R R Y  S H A K E      ##
361
##              UDP Client               ##
362
##            by Ian Nesbitt             ##
363
##            GNU GPLv3 2020             ##
364
##                                       ##
365
## Do various tasks with Shake data      ##
366
## like plot, trigger alerts, and write  ##
367
## to miniSEED.                          ##
368
##                                       ##
369
##  Requires:                            ##
370
##  - numpy, obspy, matplotlib 3, pydub  ##
371
##                                       ##
372
###########################################
373
374
Usage: rs-client [ OPTIONS ]
375
where OPTIONS := {
376
    -h | --help
377
            display this help message
378
    -d | --dump=default or /path/to/settings/json
379
            dump the default settings to a JSON-formatted file
380
    -s | --settings=/path/to/settings/json
381
            specify the path to a JSON-formatted settings file
382
    }
383
384
rs-client with no arguments will start the program with
385
settings in %s
386
''' % settings_loc
387
388
389
	settings = json.loads(H.default_settings(verbose=False))
390
391
	# get arguments
392
	try:
393
		opts = getopt.getopt(sys.argv[1:], 'hid:s:',
394
			['help', 'install', 'dump=', 'settings=']
395
			)[0]
396
	except Exception as e:
397
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
398
		print(hlp_txt)
399
400
	if len(opts) == 0:
401
		if not os.path.exists(settings_loc):
402
			print(COLOR['yellow'] + 'Could not find rsudp settings file, creating one at %s' % settings_loc + COLOR['white'])
403
			H.dump_default(settings_loc, H.default_settings())
404
		else:
405
			settings = H.read_settings(settings_loc)
406
407
	for o, a in opts:
408
		if o in ('-h', '--help'):
409
			print(hlp_txt)
410
			exit(0)
411
		if o in ('-i', '--install'):
412
			'''
413
			This is only meant to be used by the install script.
414
			'''
415
			os.makedirs(default_loc, exist_ok=True)
416
			H.dump_default(settings_loc, H.default_settings(output_dir='@@DIR@@', verbose=False))
417
			exit(0)
418
		if o in ('-d', '--dump='):
419
			'''
420
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
421
			'''
422
			if str(a) in 'default':
423
				os.makedirs(default_loc, exist_ok=True)
424
				H.dump_default(settings_loc, H.default_settings())
425
			else:
426
				H.dump_default(os.path.abspath(os.path.expanduser(a)), H.default_settings())
427
			exit(0)
428
		if o in ('-s', 'settings='):
429
			'''
430
			Start the program with a specific settings file, for example: `-s settings.json`.
431
			'''
432
			settings = H.read_settings(a)
433
434
	start_logging()
435
	debug = settings['settings']['debug']
436
	if debug:
437
		add_debug_handler()
438
		printM('Logging initialized successfully.', sender=SENDER)
439
440
	printM('Using settings file: %s' % settings_loc)
441
442
	odir = os.path.abspath(os.path.expanduser(settings['settings']['output_dir']))
443
	init_dirs(odir)
444
	if debug:
445
		printM('Output directory is: %s' % odir)
446
447
	run(settings, debug=debug)
448
449
450
def test():
451
	'''
452
	.. versionadded:: 0.4.3
453
454
	Set up tests, run modules, report test results.
455
	For a list of tests run, see :py:mod:`rsudp.test`.
456
	'''
457
	global TESTFILE
458
	hlp_txt='''
459
###########################################
460
##     R A S P B E R R Y  S H A K E      ##
461
##            Testing Module             ##
462
##            by Ian Nesbitt             ##
463
##            GNU GPLv3 2020             ##
464
##                                       ##
465
## Test settings with archived Shake     ##
466
## data to determine optimal             ##
467
## configuration.                        ##
468
##                                       ##
469
##  Requires:                            ##
470
##  - numpy, obspy, matplotlib 3         ##
471
##                                       ##
472
###########################################
473
474
Usage: rs-test [ OPTIONS ]
475
where OPTIONS := {
476
    -h | --help
477
            display this help message
478
    -f | --file=default or /path/to/data/file
479
            specify the path to a seismic data file
480
    -s | --settings=/path/to/settings/json
481
            specify the path to a JSON-formatted settings file
482
    -b | --no-plot
483
            "blind mode", used when there is no display
484
    -q | --no-sound
485
            "quiet mode", used when there is no audio device/ffmpeg
486
    }
487
488
rs-test with no arguments will start the test with
489
default settings and the data file at
490
%s
491
''' % (TESTFILE)
492
493
	test_mode(True)
494
	settings = H.default_settings(verbose=False)
495
	settings_are_default = True
496
	plot = True
497
	quiet = False
498
499
	try:
500
		opts = getopt.getopt(sys.argv[1:], 'hf:s:bq',
501
			['help', 'file=', 'settings=', 'no-plot', 'no-sound']
502
			)[0]
503
	except Exception as e:
504
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
505
		print(hlp_txt)
506
		exit(1)
507
508
	for o, a in opts:
509
		# parse options and arguments
510
		if o in ('-h', '--help'):
511
			print(hlp_txt)
512
			exit(0)
513
		if o in ('-f', '--file='):
514
			'''
515
			The data file.
516
			'''
517
			a = os.path.expanduser(a)
518
			if os.path.exists(a):
519
				try:
520
					out = '%s.txt' % (a)
521
					packetize(inf=a, outf=out)
522
					TESTFILE = out
523
				except Exception as e:
524
					print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
525
					print(hlp_txt)
526
					exit(1)
527
		if o in ('-s', '--settings='):
528
			'''
529
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
530
			'''
531
			settings_loc = os.path.abspath(os.path.expanduser(a)).replace('\\', '/')
532
			if os.path.exists(settings_loc):
533
				settings = H.read_settings(settings_loc)
534
				settings_are_default = False
535
			else:
536
				print(COLOR['red'] + 'ERROR: could not find settings file at %s' % (a) + COLOR['white'])
537
				exit(1)
538
		if o in ('-b', '--no-plot'):
539
			plot = False
540
		if o in ('-q', '--no-sound'):
541
			quiet = True
542
543
544
	T.TEST['n_internet'][1] = T.is_connected('www.google.com')
545
546
	if settings_are_default:
547
		settings = T.make_test_settings(settings=settings, inet=T.TEST['n_internet'][1])
548
549
	T.TEST['p_log_dir'][1] = T.logdir_permissions()
550
	T.TEST['p_log_file'][1] = start_logging(testing=True)
551
	T.TEST['p_log_std'][1] = add_debug_handler(testing=True)
552
553
	T.TEST['p_output_dirs'][1] = init_dirs(os.path.expanduser(settings['settings']['output_dir']))
554
	T.TEST['p_data_dir'][1] = T.datadir_permissions(os.path.expanduser(settings['settings']['output_dir']))
555
	T.TEST['p_screenshot_dir'][1] = T.ss_permissions(os.path.expanduser(settings['settings']['output_dir']))
556
557
	settings = T.cancel_tests(settings, MPL, plot, quiet)
558
559
	try:
560
		run(settings, debug=True)
561
	except Exception as e:
562
		printE(traceback.format_exc(), announce=False)
563
		printE('Ending tests.', sender='test client', announce=False)
564
		time.sleep(0.5)
565
566
	TESTQUEUE.put(b'ENDTEST')
567
	printW('Test finished.', sender=SENDER, announce=False)
568
569
	print()
570
571
	code = 0
572
	printM('Test results:')
573
	for i in T.TEST:
574
		printM('%s: %s' % (T.TEST[i][0], T.TRANS[T.TEST[i][1]]))
575
		if not T.TEST[i][1]:
576
			# if a test fails, change the system exit code to indicate an error occurred
577
			code = 1
578
	_xit(code)
579
580
581
if __name__ == '__main__':
582
	main()
583