Passed
Push — master ( e96c3d...44474a )
by Ian
06:20
created

build.rsudp.client.run()   F

Complexity

Conditions 35

Size

Total Lines 240
Code Lines 172

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 172
dl 0
loc 240
rs 0
c 0
b 0
f 0
cc 35
nop 2

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.rsudp.client.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import sys, os
2
import signal
3
import getopt
4
import time
5
import json
6
import re
7
import logging
8
import traceback
9
from queue import Queue
10
from rsudp import printM, printW, printE, default_loc, init_dirs, output_dir, add_debug_handler, start_logging
11
from rsudp import COLOR
12
import rsudp.helpers as H
13
import rsudp.test as T
14
import rsudp.raspberryshake as rs
15
from rsudp.packetize import packetize
16
from rsudp.c_consumer import Consumer
17
from rsudp.p_producer import Producer
18
from rsudp.c_printraw import PrintRaw
19
from rsudp.c_write import Write
20
from rsudp.c_plot import Plot, MPL
21
from rsudp.c_forward import Forward
22
from rsudp.c_alert import Alert
23
from rsudp.c_alertsound import AlertSound
24
from rsudp.c_custom import Custom
25
from rsudp.c_tweet import Tweeter
26
from rsudp.c_telegram import Telegrammer
27
from rsudp.c_rsam import RSAM
28
from rsudp.c_testing import Testing
29
from rsudp.t_testdata import TestData
30
import pkg_resources as pr
31
import fnmatch
32
try:
33
	from pydub import AudioSegment
34
	PYDUB_EXISTS = True
35
except ImportError:
36
	PYDUB_EXISTS = False
37
38
39
DESTINATIONS, THREADS = [], []
40
PROD = False
41
PLOTTER = False
42
SOUND = False
43
TESTING = False
44
TESTQUEUE = False
45
TESTFILE = pr.resource_filename('rsudp', os.path.join('test', 'testdata'))
46
SENDER = 'Main'
47
48
def handler(sig, frame):
49
	'''
50
	Function passed to :py:func:`signal.signal` to handle close events
51
	'''
52
	rs.producer = False
53
54
def _xit(code=0):
55
	'''
56
	End the program. Called after all running threads have stopped.
57
58
	:param int code: The process code to exit with. 0=OK, 1=ERROR.
59
	'''
60
	if TESTING:
61
		TESTQUEUE.put(b'ENDTEST')
62
	for thread in THREADS:
63
		del thread
64
	
65
	printM('Shutdown successful.', sender=SENDER)
66
	print()
67
	sys.exit(code)
68
69
def test_mode(mode=None):
70
	'''
71
	Sets the TESTING global variable to ``True`` to indicate that
72
	testing-specific actions should be taken in routines.
73
74
	:param bool mode: if ``True`` or ``False``, sets testing mode state. if anything else, returns state only.
75
	:return: testing mode state
76
	:rtype: bool
77
	'''
78
	global TESTING
79
	if (mode == True) or (mode == False):
80
		TESTING = mode
81
	return TESTING
82
83
84
def mk_q():
85
	'''
86
	Makes a queue and appends it to the :py:data:`destinations`
87
	variable to be passed to the master consumer thread
88
	:py:class:`rsudp.c_consumer.Consumer`.
89
90
	:rtype: queue.Queue
91
	:return: Returns the queue to pass to the sub-consumer.
92
	'''
93
	q = Queue(rs.qsize)
94
	DESTINATIONS.append(q)
95
	return q
96
97
def mk_p(proc):
98
	'''
99
	Appends a process to the list of threads to start and stop.
100
101
	:param threading.Thread proc: The process thread to append to the list of threads.
102
	'''
103
	THREADS.append(proc)
104
105
106
def start():
107
	'''
108
	Start Consumer, Threads, and Producer.
109
	'''
110
	global PROD, PLOTTER, THREADS, DESTINATIONS
111
	# master queue and consumer
112
	queue = Queue(rs.qsize)
113
	cons = Consumer(queue, DESTINATIONS)
114
	cons.start()
115
116
	for thread in THREADS:
117
		thread.start()
118
119
	PROD = Producer(queue, THREADS)
120
	PROD.start()
121
122
	if PLOTTER and MPL:
123
		# give the plotter the master queue
124
		# so that it can issue a TERM signal if closed
125
		PLOTTER.master_queue = queue
126
		# start plotting (in this thread, not a separate one)
127
		PLOTTER.run()
128
	else:
129
		while not PROD.stop:
130
			time.sleep(0.1) # wait until processes end
131
132
133
	time.sleep(0.5) # give threads time to exit
134
	PROD.stop = True
135
136
137
def run(settings, debug):
138
	'''
139
	Main setup function. Takes configuration values and passes them to
140
	the appropriate threads and functions.
141
142
	:param dict settings: settings dictionary (see :ref:`defaults` for guidance)
143
	:param bool debug: whether or not to show debug output (should be turned off if starting as daemon)
144
	'''
145
	global PLOTTER, SOUND
146
	# handler for the exit signal
147
	signal.signal(signal.SIGINT, handler)
148
149
	if TESTING:
150
		global TESTQUEUE
151
		# initialize the test data to read information from file and put it on the port
152
		TESTQUEUE = Queue()		# separate from client library because this is not downstream of the producer
153
		tdata = TestData(q=TESTQUEUE, data_file=TESTFILE, port=settings['settings']['port'])
154
		tdata.start()
155
156
	# initialize the central library
157
	rs.initRSlib(dport=settings['settings']['port'],
158
				 rsstn=settings['settings']['station'])
159
160
	H.conn_stats(TESTING)
161
	if TESTING:
162
		T.TEST['n_port'][1] = True	# port has been opened
163
		if rs.sps == 0:
164
			printE('There is already a Raspberry Shake sending data to this port.', sender=SENDER)
165
			printE('For testing, please change the port in your settings file to an unused one.',
166
					sender=SENDER, spaces=True)
167
			_xit(1)
168
169
170
	output_dir = settings['settings']['output_dir']
171
172
173
	if settings['printdata']['enabled']:
174
		# set up queue and process
175
		q = mk_q()
176
		prnt = PrintRaw(q)
177
		mk_p(prnt)
178
179
	if settings['write']['enabled']:
180
		# set up queue and process
181
		cha = settings['write']['channels']
182
		q = mk_q()
183
		writer = Write(q=q, cha=cha)
184
		mk_p(writer)
185
186
	if settings['plot']['enabled'] and MPL:
187
		while True:
188
			if rs.numchns == 0:
189
				time.sleep(0.01)
190
				continue
191
			else:
192
				break
193
		cha = settings['plot']['channels']
194
		sec = settings['plot']['duration']
195
		spec = settings['plot']['spectrogram']
196
		full = settings['plot']['fullscreen']
197
		kiosk = settings['plot']['kiosk']
198
		screencap = settings['plot']['eq_screenshots']
199
		alert = settings['alert']['enabled']
200
		if settings['plot']['deconvolve']:
201
			if settings['plot']['units'].upper() in rs.UNITS:
202
				deconv = settings['plot']['units'].upper()
203
			else:
204
				deconv = 'CHAN'
205
		else:
206
			deconv = False
207
		pq = mk_q()
208
		PLOTTER = Plot(cha=cha, seconds=sec, spectrogram=spec,
209
						fullscreen=full, kiosk=kiosk, deconv=deconv, q=pq,
210
						screencap=screencap, alert=alert)
211
		# no mk_p() here because the plotter must be controlled by the main thread (this one)
212
213
	if settings['forward']['enabled']:
214
		# put settings in namespace
215
		addr = settings['forward']['address']
216
		port = settings['forward']['port']
217
		cha = settings['forward']['channels']
218
		# set up queue and process
219
		q = mk_q()
220
		forward = Forward(addr=addr, port=port, cha=cha, q=q)
221
		mk_p(forward)
222
223
	if settings['alert']['enabled']:
224
		# put settings in namespace
225
		sta = settings['alert']['sta']
226
		lta = settings['alert']['lta']
227
		thresh = settings['alert']['threshold']
228
		reset = settings['alert']['reset']
229
		bp = [settings['alert']['highpass'], settings['alert']['lowpass']]
230
		cha = settings['alert']['channel']
231
		if settings['alert']['deconvolve']:
232
			if settings['alert']['units'].upper() in rs.UNITS:
233
				deconv = settings['alert']['units'].upper()
234
			else:
235
				deconv = 'CHAN'
236
		else:
237
			deconv = False
238
239
		# set up queue and process
240
		q = mk_q()
241
		alrt = Alert(sta=sta, lta=lta, thresh=thresh, reset=reset, bp=bp,
242
					 cha=cha, debug=debug, q=q,
243
					 deconv=deconv)
244
		mk_p(alrt)
245
246
	if settings['alertsound']['enabled']:
247
		sender = 'AlertSound'
248
		SOUND = False
249
		soundloc = False
250
		if PYDUB_EXISTS:
251
			soundloc = os.path.expanduser(os.path.expanduser(settings['alertsound']['mp3file']))
252
			if soundloc in ['doorbell', 'alarm', 'beeps', 'sonar']:
253
				soundloc = pr.resource_filename('rsudp', os.path.join('rs_sounds', '%s.mp3' % soundloc))
254
			if os.path.exists(soundloc):
255
				try:
256
					SOUND = AudioSegment.from_file(soundloc, format="mp3")
257
					printM('Loaded %.2f sec alert sound from %s' % (len(SOUND)/1000., soundloc), sender='AlertSound')
258
				except FileNotFoundError as e:
259
					printW("You have chosen to play a sound, but don't have ffmpeg or libav installed.", sender='AlertSound')
260
					printW('Sound playback requires one of these dependencies.', sender='AlertSound', spaces=True)
261
					printW("To install either dependency, follow the instructions at:", sender='AlertSound', spaces=True)
262
					printW('https://github.com/jiaaro/pydub#playback', sender='AlertSound', spaces=True)
263
					printW('The program will now continue without sound playback.', sender='AlertSound', spaces=True)
264
					SOUND = False
265
			else:
266
				printW("The file %s could not be found." % (soundloc), sender='AlertSound')
267
				printW('The program will now continue without sound playback.', sender='AlertSound', spaces=True)
268
		else:
269
			printW("You don't have pydub installed, so no sound will play.", sender='AlertSound')
270
			printW('To install pydub, follow the instructions at:', sender='AlertSound', spaces=True)
271
			printW('https://github.com/jiaaro/pydub#installation', sender='AlertSound', spaces=True)
272
			printW('Sound playback also requires you to install either ffmpeg or libav.', sender='AlertSound', spaces=True)
273
274
		q = mk_q()
275
		alsnd = AlertSound(q=q, sound=SOUND, soundloc=soundloc)
276
		mk_p(alsnd)
277
278
	runcustom = False
279
	try:
280
		f = False
281
		win_ovr = False
282
		if settings['custom']['enabled']:
283
			# put settings in namespace
284
			f = settings['custom']['codefile']
285
			win_ovr = settings['custom']['win_override']
286
			if f == 'n/a':
287
				f = False
288
			runcustom = True
289
	except KeyError as e:
290
		if settings['alert']['exec'] != 'eqAlert':
291
			printW('the custom code function has moved to its own module (rsudp.c_custom)', sender='Custom')
292
			f = settings['alert']['exec']
293
			win_ovr = settings['alert']['win_override']
294
			runcustom = True
295
		else:
296
			raise KeyError(e)
297
	if runcustom:
298
		# set up queue and process
299
		q = mk_q()
300
		cstm = Custom(q=q, codefile=f, win_ovr=win_ovr)
301
		mk_p(cstm)
302
303
304
	if settings['tweets']['enabled']:
305
		consumer_key = settings['tweets']['api_key']
306
		consumer_secret = settings['tweets']['api_secret']
307
		access_token = settings['tweets']['access_token']
308
		access_token_secret = settings['tweets']['access_secret']
309
		tweet_images = settings['tweets']['tweet_images']
310
311
		q = mk_q()
312
		tweet = Tweeter(q=q, consumer_key=consumer_key, consumer_secret=consumer_secret,
313
						access_token=access_token, access_token_secret=access_token_secret,
314
						tweet_images=tweet_images)
315
		mk_p(tweet)
316
317
	if settings['telegram']['enabled']:
318
		token = settings['telegram']['token']
319
		chat_ids = settings['telegram']['chat_id'].strip(' ').split(',')
320
		send_images = settings['telegram']['send_images']
321
		
322
		for chat_id in chat_ids:
323
			sender = "Telegram id %s" % (chat_id)
324
			q = mk_q()
325
			telegram = Telegrammer(q=q, token=token, chat_id=chat_id,
326
								   send_images=send_images,
327
								   sender=sender)
328
			mk_p(telegram)
329
330
	if settings['rsam']['enabled']:
331
		# put settings in namespace
332
		fwaddr = settings['rsam']['fwaddr']
333
		fwport = settings['rsam']['fwport']
334
		fwformat = settings['rsam']['fwformat']
335
		interval = settings['rsam']['interval']
336
		cha = settings['rsam']['channel']
337
		quiet = settings['rsam']['quiet']
338
		if settings['rsam']['deconvolve']:
339
			if settings['rsam']['units'].upper() in rs.UNITS:
340
				deconv = settings['rsam']['units'].upper()
341
			else:
342
				deconv = 'CHAN'
343
		else:
344
			deconv = False
345
346
		# set up queue and process
347
		q = mk_q()
348
		rsam = RSAM(q=q, debug=debug, interval=interval, cha=cha, deconv=deconv,
349
					fwaddr=fwaddr, fwport=fwport, fwformat=fwformat, quiet=quiet)
350
351
		mk_p(rsam)
352
353
354
	# start additional modules here!
355
	################################
356
357
358
	################################
359
360
	if TESTING:
361
		# initialize test consumer
362
		q = mk_q()
363
		test = Testing(q=q)
364
		mk_p(test)
365
366
367
	# start the producer, consumer, and activated modules
368
	start()
369
370
	PLOTTER = False
371
	if not TESTING:
372
		_xit()
373
	else:
374
		printW('Client has exited, ending tests...', sender=SENDER, announce=False)
375
		if SOUND:
376
			T.TEST['d_pydub'][1] = True
377
378
379
def main():
380
	'''
381
	Loads settings to start the main client.
382
	Supply -h from the command line to see help text.
383
	'''
384
	settings_loc = os.path.join(default_loc, 'rsudp_settings.json').replace('\\', '/')
385
386
	hlp_txt='''
387
###########################################
388
##     R A S P B E R R Y  S H A K E      ##
389
##              UDP Client               ##
390
##            by Ian Nesbitt             ##
391
##            GNU GPLv3 2020             ##
392
##                                       ##
393
## Do various tasks with Shake data      ##
394
## like plot, trigger alerts, and write  ##
395
## to miniSEED.                          ##
396
##                                       ##
397
##  Requires:                            ##
398
##  - numpy, obspy, matplotlib 3, pydub  ##
399
##                                       ##
400
###########################################
401
402
Usage: rs-client [ OPTIONS ]
403
where OPTIONS := {
404
    -h | --help
405
            display this help message
406
    -d | --dump=default or /path/to/settings/json
407
            dump the default settings to a JSON-formatted file
408
    -s | --settings=/path/to/settings/json
409
            specify the path to a JSON-formatted settings file
410
    }
411
412
rs-client with no arguments will start the program with
413
settings in %s
414
''' % settings_loc
415
416
417
	settings = json.loads(H.default_settings(verbose=False))
418
419
	# get arguments
420
	try:
421
		opts = getopt.getopt(sys.argv[1:], 'hid:s:',
422
			['help', 'install', 'dump=', 'settings=']
423
			)[0]
424
	except Exception as e:
425
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
426
		print(hlp_txt)
427
428
	if len(opts) == 0:
429
		if not os.path.exists(settings_loc):
430
			print(COLOR['yellow'] + 'Could not find rsudp settings file, creating one at %s' % settings_loc + COLOR['white'])
431
			H.dump_default(settings_loc, H.default_settings())
432
		else:
433
			settings = H.read_settings(settings_loc)
434
435
	for o, a in opts:
436
		if o in ('-h', '--help'):
437
			print(hlp_txt)
438
			exit(0)
439
		if o in ('-i', '--install'):
440
			'''
441
			This is only meant to be used by the install script.
442
			'''
443
			os.makedirs(default_loc, exist_ok=True)
444
			H.dump_default(settings_loc, H.default_settings(output_dir='@@DIR@@', verbose=False))
445
			exit(0)
446
		if o in ('-d', '--dump='):
447
			'''
448
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
449
			'''
450
			if str(a) in 'default':
451
				os.makedirs(default_loc, exist_ok=True)
452
				H.dump_default(settings_loc, H.default_settings())
453
			else:
454
				H.dump_default(os.path.abspath(os.path.expanduser(a)), H.default_settings())
455
			exit(0)
456
		if o in ('-s', 'settings='):
457
			'''
458
			Start the program with a specific settings file, for example: `-s settings.json`.
459
			'''
460
			settings = H.read_settings(a)
461
462
	start_logging()
463
	debug = settings['settings']['debug']
464
	if debug:
465
		add_debug_handler()
466
		printM('Logging initialized successfully.', sender=SENDER)
467
468
	printM('Using settings file: %s' % settings_loc)
469
470
	odir = os.path.abspath(os.path.expanduser(settings['settings']['output_dir']))
471
	init_dirs(odir)
472
	if debug:
473
		printM('Output directory is: %s' % odir)
474
475
	run(settings, debug=debug)
476
477
478
def test():
479
	'''
480
	.. versionadded:: 0.4.3
481
482
	Set up tests, run modules, report test results.
483
	For a list of tests run, see :py:mod:`rsudp.test`.
484
	'''
485
	global TESTFILE
486
	hlp_txt='''
487
###########################################
488
##     R A S P B E R R Y  S H A K E      ##
489
##            Testing Module             ##
490
##            by Ian Nesbitt             ##
491
##            GNU GPLv3 2020             ##
492
##                                       ##
493
## Test settings with archived Shake     ##
494
## data to determine optimal             ##
495
## configuration.                        ##
496
##                                       ##
497
##  Requires:                            ##
498
##  - numpy, obspy, matplotlib 3         ##
499
##                                       ##
500
###########################################
501
502
Usage: rs-test [ OPTIONS ]
503
where OPTIONS := {
504
    -h | --help
505
            display this help message
506
    -f | --file=default or /path/to/data/file
507
            specify the path to a seismic data file
508
    -s | --settings=/path/to/settings/json
509
            specify the path to a JSON-formatted settings file
510
    -b | --no-plot
511
            "blind mode", used when there is no display
512
    -q | --no-sound
513
            "quiet mode", used when there is no audio device/ffmpeg
514
    }
515
516
rs-test with no arguments will start the test with
517
default settings and the data file at
518
%s
519
''' % (TESTFILE)
520
521
	test_mode(True)
522
	settings = H.default_settings(verbose=False)
523
	settings_are_default = True
524
	plot = True
525
	quiet = False
526
527
	try:
528
		opts = getopt.getopt(sys.argv[1:], 'hf:s:bq',
529
			['help', 'file=', 'settings=', 'no-plot', 'no-sound']
530
			)[0]
531
	except Exception as e:
532
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
533
		print(hlp_txt)
534
		exit(1)
535
536
	for o, a in opts:
537
		# parse options and arguments
538
		if o in ('-h', '--help'):
539
			print(hlp_txt)
540
			exit(0)
541
		if o in ('-f', '--file='):
542
			'''
543
			The data file.
544
			'''
545
			a = os.path.expanduser(a)
546
			if os.path.exists(a):
547
				try:
548
					out = '%s.txt' % (a)
549
					packetize(inf=a, outf=out)
550
					TESTFILE = out
551
				except Exception as e:
552
					print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
553
					print(hlp_txt)
554
					exit(1)
555
		if o in ('-s', '--settings='):
556
			'''
557
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
558
			'''
559
			settings_loc = os.path.abspath(os.path.expanduser(a)).replace('\\', '/')
560
			if os.path.exists(settings_loc):
561
				settings = H.read_settings(settings_loc)
562
				settings_are_default = False
563
			else:
564
				print(COLOR['red'] + 'ERROR: could not find settings file at %s' % (a) + COLOR['white'])
565
				exit(1)
566
		if o in ('-b', '--no-plot'):
567
			plot = False
568
		if o in ('-q', '--no-sound'):
569
			quiet = True
570
571
572
	T.TEST['n_internet'][1] = T.is_connected('www.google.com')
573
574
	if settings_are_default:
575
		settings = T.make_test_settings(settings=settings, inet=T.TEST['n_internet'][1])
576
577
	T.TEST['p_log_dir'][1] = T.logdir_permissions()
578
	T.TEST['p_log_file'][1] = start_logging(testing=True)
579
	T.TEST['p_log_std'][1] = add_debug_handler(testing=True)
580
581
	T.TEST['p_output_dirs'][1] = init_dirs(os.path.expanduser(settings['settings']['output_dir']))
582
	T.TEST['p_data_dir'][1] = T.datadir_permissions(os.path.expanduser(settings['settings']['output_dir']))
583
	T.TEST['p_screenshot_dir'][1] = T.ss_permissions(os.path.expanduser(settings['settings']['output_dir']))
584
585
	settings = T.cancel_tests(settings, MPL, plot, quiet)
586
587
	try:
588
		run(settings, debug=True)
589
	except Exception as e:
590
		printE(traceback.format_exc(), announce=False)
591
		printE('Ending tests.', sender='test client', announce=False)
592
		time.sleep(0.5)
593
594
	TESTQUEUE.put(b'ENDTEST')
595
	printW('Test finished.', sender=SENDER, announce=False)
596
597
	print()
598
599
	code = 0
600
	printM('Test results:')
601
	for i in T.TEST:
602
		printM('%s: %s' % (T.TEST[i][0], T.TRANS[T.TEST[i][1]]))
603
		if not T.TEST[i][1]:
604
			# if a test fails, change the system exit code to indicate an error occurred
605
			code = 1
606
	_xit(code)
607
608
609
if __name__ == '__main__':
610
	main()
611