build.rsudp.client.main()   D
last analyzed

Complexity

Conditions 12

Size

Total Lines 95
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 145.2214

Importance

Changes 0
Metric Value
eloc 42
dl 0
loc 95
ccs 1
cts 39
cp 0.0256
rs 4.8
c 0
b 0
f 0
cc 12
nop 0
crap 145.2214

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.rsudp.client.main() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import sys, os
2 1
import signal
3 1
import getopt
4 1
import time
5 1
import json
6 1
import traceback
7 1
from queue import Queue
8 1
from rsudp import printM, printW, printE, default_loc, init_dirs, settings_loc, add_debug_handler, start_logging
9 1
from rsudp import COLOR
10 1
import rsudp.helpers as H
11 1
import rsudp.test as T
12 1
import rsudp.raspberryshake as rs
13 1
from rsudp.packetize import packetize
14 1
from rsudp.c_consumer import Consumer
15 1
from rsudp.p_producer import Producer
16 1
from rsudp.c_printraw import PrintRaw
17 1
from rsudp.c_write import Write
18 1
from rsudp.c_plot import Plot, MPL
19 1
from rsudp.c_forward import Forward
20 1
from rsudp.c_alert import Alert
21 1
from rsudp.c_alertsound import AlertSound
22 1
from rsudp.c_custom import Custom
23 1
from rsudp.c_tweet import Tweeter
24 1
from rsudp.c_telegram import Telegrammer
25 1
from rsudp.c_rsam import RSAM
26 1
from rsudp.c_testing import Testing
27 1
from rsudp.t_testdata import TestData
28 1
import pkg_resources as pr
29
30
31 1
DESTINATIONS, THREADS = [], []
32 1
PROD = False
33 1
PLOTTER = False
34 1
TELEGRAM = False
35 1
TWITTER = False
36 1
WRITER = False
37 1
SOUND = False
38 1
TESTING = False
39 1
TESTQUEUE = False
40 1
TESTFILE = pr.resource_filename('rsudp', os.path.join('test', 'testdata'))
41 1
SENDER = 'Main'
42
43 1
def handler(sig, frame):
44
	'''
45
	Function passed to :py:func:`signal.signal` to handle close events
46
	'''
47
	rs.producer = False
48
49 1
def _xit(code=0):
50
	'''
51
	End the program. Called after all running threads have stopped.
52
53
	:param int code: The process code to exit with. 0=OK, 1=ERROR.
54
	'''
55 1
	if TESTING:
56 1
		TESTQUEUE.put(b'ENDTEST')
57 1
	for thread in THREADS:
58 1
		del thread
59
	
60 1
	printM('Shutdown successful.', sender=SENDER)
61 1
	print()
62 1
	sys.exit(code)
63
64 1
def test_mode(mode=None):
65
	'''
66
	Sets the TESTING global variable to ``True`` to indicate that
67
	testing-specific actions should be taken in routines.
68
69
	:param bool mode: if ``True`` or ``False``, sets testing mode state. if anything else, returns state only.
70
	:return: testing mode state
71
	:rtype: bool
72
	'''
73
	global TESTING
74 1
	if (mode == True) or (mode == False):
75 1
		TESTING = mode
76 1
	return TESTING
77
78
79 1
def mk_q():
80
	'''
81
	Makes a queue and appends it to the :py:data:`destinations`
82
	variable to be passed to the master consumer thread
83
	:py:class:`rsudp.c_consumer.Consumer`.
84
85
	:rtype: queue.Queue
86
	:return: Returns the queue to pass to the sub-consumer.
87
	'''
88 1
	q = Queue(rs.qsize)
89 1
	DESTINATIONS.append(q)
90 1
	return q
91
92 1
def mk_p(proc):
93
	'''
94
	Appends a process to the list of threads to start and stop.
95
96
	:param threading.Thread proc: The process thread to append to the list of threads.
97
	'''
98 1
	THREADS.append(proc)
99
100
101 1
def start():
102
	'''
103
	Start Consumer, Threads, and Producer.
104
	'''
105
	global PROD, PLOTTER, THREADS, DESTINATIONS
106
	# master queue and consumer
107 1
	queue = Queue(rs.qsize)
108 1
	cons = Consumer(queue, DESTINATIONS, testing=TESTING)
109 1
	cons.start()
110
111 1
	for thread in THREADS:
112 1
		thread.start()
113
114 1
	PROD = Producer(queue, THREADS, testing=TESTING)
115 1
	PROD.start()
116
117 1
	if PLOTTER and MPL:
118
		# give the plotter the master queue
119
		# so that it can issue a TERM signal if closed
120 1
		PLOTTER.master_queue = queue
121
		# start plotting (in this thread, not a separate one)
122 1
		PLOTTER.run()
123
	else:
124
		while not PROD.stop:
125
			time.sleep(0.1) # wait until processes end
126
127
128 1
	time.sleep(0.5) # give threads time to exit
129 1
	PROD.stop = True
130
131
132 1
def run(settings, debug):
133
	'''
134
	Main setup function. Takes configuration values and passes them to
135
	the appropriate threads and functions.
136
137
	:param dict settings: settings dictionary (see :ref:`defaults` for guidance)
138
	:param bool debug: whether or not to show debug output (should be turned off if starting as daemon)
139
	'''
140
	global PLOTTER, SOUND
141
	# handler for the exit signal
142 1
	signal.signal(signal.SIGINT, handler)
143
144 1
	if TESTING:
145
		global TESTQUEUE
146
		# initialize the test data to read information from file and put it on the port
147 1
		TESTQUEUE = Queue()		# separate from client library because this is not downstream of the producer
148 1
		tdata = TestData(q=TESTQUEUE, data_file=TESTFILE, port=settings['settings']['port'])
149 1
		tdata.start()
150
151
	# initialize the central library
152 1
	rs.initRSlib(dport=settings['settings']['port'],
153
				 rsstn=settings['settings']['station'])
154
155 1
	H.conn_stats(TESTING)
156 1
	if TESTING:
157 1
		T.TEST['n_port'][1] = True	# port has been opened
158 1
		if rs.sps == 0:
159
			printE('There is already a Raspberry Shake sending data to this port.', sender=SENDER)
160
			printE('For testing, please change the port in your settings file to an unused one.',
161
					sender=SENDER, spaces=True)
162
			_xit(1)
163
164
165 1
	output_dir = settings['settings']['output_dir']
166
167
168 1
	if settings['printdata']['enabled']:
169
		# set up queue and process
170 1
		q = mk_q()
171 1
		prnt = PrintRaw(q, testing=TESTING)
172 1
		mk_p(prnt)
173
174 1
	if settings['write']['enabled']:
175
		global WRITER
176
		# set up queue and process
177 1
		cha = settings['write']['channels']
178 1
		q = mk_q()
179 1
		WRITER = Write(q=q, data_dir=output_dir,
180
					   cha=cha, testing=TESTING)
181 1
		mk_p(WRITER)
182
183 1
	if settings['plot']['enabled'] and MPL:
184 1
		while True:
185 1
			if rs.numchns == 0:
186
				time.sleep(0.01)
187
				continue
188
			else:
189 1
				break
190 1
		cha = settings['plot']['channels']
191 1
		sec = settings['plot']['duration']
192 1
		spec = settings['plot']['spectrogram']
193 1
		full = settings['plot']['fullscreen']
194 1
		kiosk = settings['plot']['kiosk']
195 1
		screencap = settings['plot']['eq_screenshots']
196 1
		alert = settings['alert']['enabled']
197 1
		if settings['plot']['deconvolve']:
198 1
			if settings['plot']['units'].upper() in rs.UNITS:
199 1
				deconv = settings['plot']['units'].upper()
200
			else:
201
				deconv = 'CHAN'
202
		else:
203
			deconv = False
204 1
		pq = mk_q()
205 1
		PLOTTER = Plot(cha=cha, seconds=sec, spectrogram=spec,
206
						fullscreen=full, kiosk=kiosk, deconv=deconv, q=pq,
207
						screencap=screencap, alert=alert, testing=TESTING)
208
		# no mk_p() here because the plotter must be controlled by the main thread (this one)
209
210 1
	if settings['forward']['enabled']:
211
		# put settings in namespace
212 1
		addr = settings['forward']['address']
213 1
		port = settings['forward']['port']
214 1
		cha = settings['forward']['channels']
215 1
		fwd_data = settings['forward']['fwd_data']
216 1
		fwd_alarms = settings['forward']['fwd_alarms']
217
		# set up queue and process
218 1
		if len(addr) == len(port):
219 1
			printM('Initializing %s Forward threads' % (len(addr)), sender=SENDER)
220 1
			for i in range(len(addr)):
221 1
				q = mk_q()
222 1
				forward = Forward(num=i, addr=addr[i], port=int(port[i]), cha=cha,
223
								  fwd_data=fwd_data, fwd_alarms=fwd_alarms,
224
								  q=q, testing=TESTING)
225 1
				mk_p(forward)
226
		else:
227
			printE('List length mismatch: %s addresses and %s ports in forward section of settings file' % (
228
										len(addr), len(port)), sender=SENDER)
229
			_xit(1)
230
231 1
	if settings['alert']['enabled']:
232
		# put settings in namespace
233 1
		sta = settings['alert']['sta']
234 1
		lta = settings['alert']['lta']
235 1
		thresh = settings['alert']['threshold']
236 1
		reset = settings['alert']['reset']
237 1
		bp = [settings['alert']['highpass'], settings['alert']['lowpass']]
238 1
		cha = settings['alert']['channel']
239 1
		if settings['alert']['deconvolve']:
240
			if settings['alert']['units'].upper() in rs.UNITS:
241
				deconv = settings['alert']['units'].upper()
242
			else:
243
				deconv = 'CHAN'
244
		else:
245 1
			deconv = False
246
247
		# set up queue and process
248 1
		q = mk_q()
249 1
		alrt = Alert(sta=sta, lta=lta, thresh=thresh, reset=reset, bp=bp,
250
					 cha=cha, debug=debug, q=q, testing=TESTING,
251
					 deconv=deconv)
252 1
		mk_p(alrt)
253
254 1
	if settings['alertsound']['enabled']:
255 1
		soundloc = os.path.expanduser(os.path.expanduser(settings['alertsound']['mp3file']))
256 1
		if soundloc in ['doorbell', 'alarm', 'beeps', 'sonar']:
257 1
			soundloc = pr.resource_filename('rsudp', os.path.join('rs_sounds', '%s.mp3' % soundloc))
258
259 1
		q = mk_q()
260 1
		alsnd = AlertSound(q=q, testing=TESTING, soundloc=soundloc)
261 1
		mk_p(alsnd)
262
263 1
	runcustom = False
264 1
	try:
265 1
		f = False
266 1
		win_ovr = False
267 1
		if settings['custom']['enabled']:
268
			# put settings in namespace
269
			f = settings['custom']['codefile']
270
			win_ovr = settings['custom']['win_override']
271
			if f == 'n/a':
272
				f = False
273
			runcustom = True
274
	except KeyError as e:
275
		if settings['alert']['exec'] != 'eqAlert':
276
			printW('the custom code function has moved to its own module (rsudp.c_custom)', sender='Custom')
277
			f = settings['alert']['exec']
278
			win_ovr = settings['alert']['win_override']
279
			runcustom = True
280
		else:
281
			raise KeyError(e)
282 1
	if runcustom:
283
		# set up queue and process
284
		q = mk_q()
285
		cstm = Custom(q=q, codefile=f, win_ovr=win_ovr, testing=TESTING)
286
		mk_p(cstm)
287
288
289 1
	if settings['tweets']['enabled']:
290
		global TWITTER
291 1
		consumer_key = settings['tweets']['api_key']
292 1
		consumer_secret = settings['tweets']['api_secret']
293 1
		access_token = settings['tweets']['access_token']
294 1
		access_token_secret = settings['tweets']['access_secret']
295 1
		tweet_images = settings['tweets']['tweet_images']
296 1
		extra_text = settings['tweets']['extra_text']
297
298 1
		q = mk_q()
299 1
		TWITTER = Tweeter(q=q, consumer_key=consumer_key, consumer_secret=consumer_secret,
300
						access_token=access_token, access_token_secret=access_token_secret,
301
						tweet_images=tweet_images, extra_text=extra_text, testing=TESTING)
302 1
		mk_p(TWITTER)
303
304 1
	if settings['telegram']['enabled']:
305
		global TELEGRAM
306 1
		token = settings['telegram']['token']
307 1
		chat_ids = settings['telegram']['chat_id'].strip(' ').split(',')
308 1
		send_images = settings['telegram']['send_images']
309 1
		extra_text = settings['telegram']['extra_text']
310
311 1
		for chat_id in chat_ids:
312 1
			sender = "Telegram id %s" % (chat_id)
313 1
			q = mk_q()
314 1
			TELEGRAM = Telegrammer(q=q, token=token, chat_id=chat_id,
315
								   send_images=send_images, extra_text=extra_text,
316
								   sender=sender, testing=TESTING)
317 1
			mk_p(TELEGRAM)
318
319 1
	if settings['rsam']['enabled']:
320
		# put settings in namespace
321 1
		fwaddr = settings['rsam']['fwaddr']
322 1
		fwport = settings['rsam']['fwport']
323 1
		fwformat = settings['rsam']['fwformat']
324 1
		interval = settings['rsam']['interval']
325 1
		cha = settings['rsam']['channel']
326 1
		quiet = settings['rsam']['quiet']
327 1
		if settings['rsam']['deconvolve']:
328
			if settings['rsam']['units'].upper() in rs.UNITS:
329
				deconv = settings['rsam']['units'].upper()
330
			else:
331
				deconv = 'CHAN'
332
		else:
333 1
			deconv = False
334
335
		# set up queue and process
336 1
		q = mk_q()
337 1
		rsam = RSAM(q=q, interval=interval, cha=cha, deconv=deconv,
338
					fwaddr=fwaddr, fwport=fwport, fwformat=fwformat,
339
					quiet=quiet, testing=TESTING)
340
341 1
		mk_p(rsam)
342
343
344
	# start additional modules here!
345
	################################
346
347
348
	################################
349
350 1
	if TESTING:
351
		# initialize test consumer
352 1
		q = mk_q()
353 1
		test = Testing(q=q)
354 1
		mk_p(test)
355
356
357
	# start the producer, consumer, and activated modules
358 1
	start()
359
360 1
	PLOTTER = False
361 1
	if not TESTING:
362
		_xit()
363
	else:
364 1
		printW('Client has exited, ending tests...', sender=SENDER, announce=False)
365
366
367 1
def main():
368
	'''
369
	Loads settings to start the main client.
370
	Supply -h from the command line to see help text.
371
	'''
372
373
	hlp_txt='''
374
###########################################
375
##     R A S P B E R R Y  S H A K E      ##
376
##              UDP Client               ##
377
##            by Ian Nesbitt             ##
378
##            GNU GPLv3 2020             ##
379
##                                       ##
380
## Do various tasks with Shake data      ##
381
## like plot, trigger alerts, and write  ##
382
## to miniSEED.                          ##
383
##                                       ##
384
##  Requires:                            ##
385
##  - numpy, obspy, matplotlib 3, pydub  ##
386
##                                       ##
387
###########################################
388
389
Usage: rs-client [ OPTIONS ]
390
where OPTIONS := {
391
    -h | --help
392
            display this help message
393
    -d | --dump=default or /path/to/settings/json
394
            dump the default settings to a JSON-formatted file
395
    -s | --settings=/path/to/settings/json
396
            specify the path to a JSON-formatted settings file
397
    }
398
399
rs-client with no arguments will start the program with
400
settings in %s
401
''' % settings_loc
402
403
404
	settings = json.loads(H.default_settings(verbose=False))
405
406
	# get arguments
407
	try:
408
		opts = getopt.getopt(sys.argv[1:], 'hid:s:',
409
			['help', 'install', 'dump=', 'settings=']
410
			)[0]
411
	except Exception as e:
412
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
413
		print(hlp_txt)
414
415
	if len(opts) == 0:
416
		if not os.path.exists(settings_loc):
417
			print(COLOR['yellow'] + 'Could not find rsudp settings file, creating one at %s' % settings_loc + COLOR['white'])
418
			H.dump_default(settings_loc, H.default_settings())
419
		else:
420
			settings = H.read_settings(settings_loc)
421
422
	for o, a in opts:
423
		if o in ('-h', '--help'):
424
			print(hlp_txt)
425
			exit(0)
426
		if o in ('-i', '--install'):
427
			'''
428
			This is only meant to be used by the install script.
429
			'''
430
			os.makedirs(default_loc, exist_ok=True)
431
			H.dump_default(settings_loc, H.default_settings(output_dir='@@DIR@@', verbose=False))
432
			exit(0)
433
		if o in ('-d', '--dump='):
434
			'''
435
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
436
			'''
437
			if str(a) in 'default':
438
				os.makedirs(default_loc, exist_ok=True)
439
				H.dump_default(settings_loc, H.default_settings())
440
			else:
441
				H.dump_default(os.path.abspath(os.path.expanduser(a)), H.default_settings())
442
			exit(0)
443
		if o in ('-s', 'settings='):
444
			'''
445
			Start the program with a specific settings file, for example: `-s settings.json`.
446
			'''
447
			settings = H.read_settings(a)
448
449
	debug = settings['settings']['debug']
450
	if debug:
451
		add_debug_handler()
452
	start_logging()
453
454
	printM('Using settings file: %s' % settings_loc)
455
456
	odir = os.path.abspath(os.path.expanduser(settings['settings']['output_dir']))
457
	init_dirs(odir)
458
	if debug:
459
		printM('Output directory is: %s' % odir)
460
461
	run(settings, debug=debug)
462
463
464 1
def test():
465
	'''
466
	.. versionadded:: 0.4.3
467
468
	Set up tests, run modules, report test results.
469
	For a list of tests run, see :py:mod:`rsudp.test`.
470
	'''
471
	global TESTFILE
472 1
	hlp_txt='''
473
###########################################
474
##     R A S P B E R R Y  S H A K E      ##
475
##            Testing Module             ##
476
##            by Ian Nesbitt             ##
477
##            GNU GPLv3 2020             ##
478
##                                       ##
479
## Test settings with archived Shake     ##
480
## data to determine optimal             ##
481
## configuration.                        ##
482
##                                       ##
483
##  Requires:                            ##
484
##  - numpy, obspy, matplotlib 3         ##
485
##                                       ##
486
###########################################
487
488
Usage: rs-test [ OPTIONS ]
489
where OPTIONS := {
490
    -h | --help
491
            display this help message
492
    -f | --file=default or /path/to/data/file
493
            specify the path to a seismic data file
494
    -s | --settings=/path/to/settings/json
495
            specify the path to a JSON-formatted settings file
496
    -b | --no-plot
497
            "blind mode", used when there is no display
498
    -q | --no-sound
499
            "quiet mode", used when there is no audio device/ffmpeg
500
    }
501
502
rs-test with no arguments will start the test with
503
default settings and the data file at
504
%s
505
''' % (TESTFILE)
506
507 1
	test_mode(True)
508 1
	settings = H.default_settings(verbose=False)
509 1
	settings_are_default = True
510 1
	plot = True
511 1
	quiet = False
512 1
	customfile = False
513
514 1
	try:
515 1
		opts = getopt.getopt(sys.argv[1:], 'hf:s:bq',
516
			['help', 'file=', 'settings=', 'no-plot', 'no-sound']
517
			)[0]
518
	except Exception as e:
519
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
520
		print(hlp_txt)
521
		exit(1)
522
523 1
	for o, a in opts:
524
		# parse options and arguments
525
		if o in ('-h', '--help'):
526
			print(hlp_txt)
527
			exit(0)
528
		if o in ('-f', '--file='):
529
			'''
530
			The data file.
531
			'''
532
			a = os.path.expanduser(a)
533
			if os.path.exists(a):
534
				try:
535
					out = '%s.txt' % (a)
536
					packetize(inf=a, outf=out, testing=True)
537
					TESTFILE = out
538
					customfile = True # using a custom miniseed file for testing
539
				except Exception as e:
540
					print(hlp_txt)
541
					print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
542
					exit(1)
543
		if o in ('-s', '--settings='):
544
			'''
545
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
546
			'''
547
			settings_loc = os.path.abspath(os.path.expanduser(a)).replace('\\', '/')
548
			if os.path.exists(settings_loc):
549
				settings = H.read_settings(settings_loc)
550
				settings_are_default = False
551
			else:
552
				print(COLOR['red'] + 'ERROR: could not find settings file at %s' % (a) + COLOR['white'])
553
				exit(1)
554
		if o in ('-b', '--no-plot'):
555
			plot = False
556
		if o in ('-q', '--no-sound'):
557
			quiet = True
558
559 1
	if not customfile:
560
		# we are just using the default miniseed file
561 1
		packetize(inf=TESTFILE+'.ms', outf=TESTFILE, testing=True)
562
563 1
	T.TEST['n_internet'][1] = T.is_connected('www.google.com')
564
565 1
	if settings_are_default:
566 1
		settings = T.make_test_settings(settings=settings, inet=T.TEST['n_internet'][1])
567
568 1
	T.TEST['p_log_dir'][1] = T.logdir_permissions()
569 1
	T.TEST['p_log_file'][1] = start_logging(testing=True)
570 1
	T.TEST['p_log_std'][1] = add_debug_handler(testing=True)
571
572 1
	T.TEST['p_output_dirs'][1] = init_dirs(os.path.expanduser(settings['settings']['output_dir']))
573 1
	T.TEST['p_data_dir'][1] = T.datadir_permissions(os.path.expanduser(settings['settings']['output_dir']))
574 1
	T.TEST['p_screenshot_dir'][1] = T.ss_permissions(os.path.expanduser(settings['settings']['output_dir']))
575
576 1
	settings = T.cancel_tests(settings, MPL, plot, quiet)
577
578 1
	try:
579 1
		run(settings, debug=True)
580
581
		# client test
582 1
		ctest = 'client test'
583 1
		if (T.TEST['c_miniseed'] and WRITER):
584 1
			printM('Merging and testing MiniSEED file(s)...', sender=ctest)
585 1
			try:
586 1
				ms = rs.Stream()
587 1
				for outfile in WRITER.outfiles:
588 1
					if os.path.exists(outfile):
589 1
						T.TEST['c_miniseed'][1] = True
590 1
						ms = ms + rs.read(outfile)
591 1
						dn, fn = os.path.dirname(outfile), os.path.basename(outfile)
592 1
						os.replace(outfile, os.path.join(dn, 'test.' + fn))
593
					else:
594
						raise FileNotFoundError('MiniSEED file not found: %s' % outfile)
595 1
				printM('Renamed test file(s).', sender=ctest)
596 1
				printM(ms.merge().__str__())
597
			except Exception as e:
598
				printE(e)
599
				T.TEST['c_miniseed'][1] = False
600
601
	except Exception as e:
602
		printE(traceback.format_exc(), announce=False)
603
		printE('Ending tests.', sender=SENDER, announce=False)
604
		time.sleep(0.5)
605
606
607 1
	TESTQUEUE.put(b'ENDTEST')
608 1
	printW('Test finished.', sender=SENDER, announce=False)
609
610 1
	print()
611
612 1
	code = 0
613 1
	printM('Test results:')
614 1
	for i in T.TEST:
615 1
		printM('%s: %s' % (T.TEST[i][0], T.TRANS[T.TEST[i][1]]))
616 1
		if not T.TEST[i][1]:
617
			# if a test fails, change the system exit code to indicate an error occurred
618
			code = 1
619 1
	_xit(code)
620
621
622
if __name__ == '__main__':
623
	main()
624