Passed
Push — master ( b90ced...a5e1e1 )
by Ian
06:05
created

build.rsudp.client.test()   F

Complexity

Conditions 19

Size

Total Lines 152
Code Lines 87

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 45
CRAP Score 53.6365

Importance

Changes 0
Metric Value
eloc 87
dl 0
loc 152
ccs 45
cts 83
cp 0.5422
rs 0.5999
c 0
b 0
f 0
cc 19
nop 0
crap 53.6365

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.rsudp.client.test() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import sys, os
2 1
import signal
3 1
import getopt
4 1
import time
5 1
import json
6 1
import traceback
7 1
from queue import Queue
8 1
from rsudp import printM, printW, printE, default_loc, init_dirs, output_dir, add_debug_handler, start_logging
9 1
from rsudp import COLOR
10 1
import rsudp.helpers as H
11 1
import rsudp.test as T
12 1
import rsudp.raspberryshake as rs
13 1
from rsudp.packetize import packetize
14 1
from rsudp.c_consumer import Consumer
15 1
from rsudp.p_producer import Producer
16 1
from rsudp.c_printraw import PrintRaw
17 1
from rsudp.c_write import Write
18 1
from rsudp.c_plot import Plot, MPL
19 1
from rsudp.c_forward import Forward
20 1
from rsudp.c_alert import Alert
21 1
from rsudp.c_alertsound import AlertSound
22 1
from rsudp.c_custom import Custom
23 1
from rsudp.c_tweet import Tweeter
24 1
from rsudp.c_telegram import Telegrammer
25 1
from rsudp.c_rsam import RSAM
26 1
from rsudp.c_testing import Testing
27 1
from rsudp.t_testdata import TestData
28 1
import pkg_resources as pr
29
30
31 1
DESTINATIONS, THREADS = [], []
32 1
PROD = False
33 1
PLOTTER = False
34 1
TELEGRAM = False
35 1
TWITTER = False
36 1
WRITER = False
37 1
SOUND = False
38 1
TESTING = False
39 1
TESTQUEUE = False
40 1
TESTFILE = pr.resource_filename('rsudp', os.path.join('test', 'testdata'))
41 1
SENDER = 'Main'
42
43 1
def handler(sig, frame):
44
	'''
45
	Function passed to :py:func:`signal.signal` to handle close events
46
	'''
47
	rs.producer = False
48
49 1
def _xit(code=0):
50
	'''
51
	End the program. Called after all running threads have stopped.
52
53
	:param int code: The process code to exit with. 0=OK, 1=ERROR.
54
	'''
55 1
	if TESTING:
56 1
		TESTQUEUE.put(b'ENDTEST')
57 1
	for thread in THREADS:
58 1
		del thread
59
	
60 1
	printM('Shutdown successful.', sender=SENDER)
61 1
	print()
62 1
	sys.exit(code)
63
64 1
def test_mode(mode=None):
65
	'''
66
	Sets the TESTING global variable to ``True`` to indicate that
67
	testing-specific actions should be taken in routines.
68
69
	:param bool mode: if ``True`` or ``False``, sets testing mode state. if anything else, returns state only.
70
	:return: testing mode state
71
	:rtype: bool
72
	'''
73
	global TESTING
74 1
	if (mode == True) or (mode == False):
75 1
		TESTING = mode
76 1
	return TESTING
77
78
79 1
def mk_q():
80
	'''
81
	Makes a queue and appends it to the :py:data:`destinations`
82
	variable to be passed to the master consumer thread
83
	:py:class:`rsudp.c_consumer.Consumer`.
84
85
	:rtype: queue.Queue
86
	:return: Returns the queue to pass to the sub-consumer.
87
	'''
88 1
	q = Queue(rs.qsize)
89 1
	DESTINATIONS.append(q)
90 1
	return q
91
92 1
def mk_p(proc):
93
	'''
94
	Appends a process to the list of threads to start and stop.
95
96
	:param threading.Thread proc: The process thread to append to the list of threads.
97
	'''
98 1
	THREADS.append(proc)
99
100
101 1
def start():
102
	'''
103
	Start Consumer, Threads, and Producer.
104
	'''
105
	global PROD, PLOTTER, THREADS, DESTINATIONS
106
	# master queue and consumer
107 1
	queue = Queue(rs.qsize)
108 1
	cons = Consumer(queue, DESTINATIONS, testing=TESTING)
109 1
	cons.start()
110
111 1
	for thread in THREADS:
112 1
		thread.start()
113
114 1
	PROD = Producer(queue, THREADS, testing=TESTING)
115 1
	PROD.start()
116
117 1
	if PLOTTER and MPL:
118
		# give the plotter the master queue
119
		# so that it can issue a TERM signal if closed
120 1
		PLOTTER.master_queue = queue
121
		# start plotting (in this thread, not a separate one)
122 1
		PLOTTER.run()
123
	else:
124
		while not PROD.stop:
125
			time.sleep(0.1) # wait until processes end
126
127
128 1
	time.sleep(0.5) # give threads time to exit
129 1
	PROD.stop = True
130
131
132 1
def run(settings, debug):
133
	'''
134
	Main setup function. Takes configuration values and passes them to
135
	the appropriate threads and functions.
136
137
	:param dict settings: settings dictionary (see :ref:`defaults` for guidance)
138
	:param bool debug: whether or not to show debug output (should be turned off if starting as daemon)
139
	'''
140
	global PLOTTER, SOUND
141
	# handler for the exit signal
142 1
	signal.signal(signal.SIGINT, handler)
143
144 1
	if TESTING:
145
		global TESTQUEUE
146
		# initialize the test data to read information from file and put it on the port
147 1
		TESTQUEUE = Queue()		# separate from client library because this is not downstream of the producer
148 1
		tdata = TestData(q=TESTQUEUE, data_file=TESTFILE, port=settings['settings']['port'])
149 1
		tdata.start()
150
151
	# initialize the central library
152 1
	rs.initRSlib(dport=settings['settings']['port'],
153
				 rsstn=settings['settings']['station'])
154
155 1
	H.conn_stats(TESTING)
156 1
	if TESTING:
157 1
		T.TEST['n_port'][1] = True	# port has been opened
158 1
		if rs.sps == 0:
159
			printE('There is already a Raspberry Shake sending data to this port.', sender=SENDER)
160
			printE('For testing, please change the port in your settings file to an unused one.',
161
					sender=SENDER, spaces=True)
162
			_xit(1)
163
164
165 1
	output_dir = settings['settings']['output_dir']
166
167
168 1
	if settings['printdata']['enabled']:
169
		# set up queue and process
170 1
		q = mk_q()
171 1
		prnt = PrintRaw(q, testing=TESTING)
172 1
		mk_p(prnt)
173
174 1
	if settings['write']['enabled']:
175
		global WRITER
176
		# set up queue and process
177 1
		cha = settings['write']['channels']
178 1
		q = mk_q()
179 1
		WRITER = Write(q=q, data_dir=output_dir,
180
					   cha=cha, testing=TESTING)
181 1
		mk_p(WRITER)
182
183 1
	if settings['plot']['enabled'] and MPL:
184 1
		while True:
185 1
			if rs.numchns == 0:
186
				time.sleep(0.01)
187
				continue
188
			else:
189 1
				break
190 1
		cha = settings['plot']['channels']
191 1
		sec = settings['plot']['duration']
192 1
		spec = settings['plot']['spectrogram']
193 1
		full = settings['plot']['fullscreen']
194 1
		kiosk = settings['plot']['kiosk']
195 1
		screencap = settings['plot']['eq_screenshots']
196 1
		alert = settings['alert']['enabled']
197 1
		if settings['plot']['deconvolve']:
198 1
			if settings['plot']['units'].upper() in rs.UNITS:
199 1
				deconv = settings['plot']['units'].upper()
200
			else:
201
				deconv = 'CHAN'
202
		else:
203
			deconv = False
204 1
		pq = mk_q()
205 1
		PLOTTER = Plot(cha=cha, seconds=sec, spectrogram=spec,
206
						fullscreen=full, kiosk=kiosk, deconv=deconv, q=pq,
207
						screencap=screencap, alert=alert, testing=TESTING)
208
		# no mk_p() here because the plotter must be controlled by the main thread (this one)
209
210 1
	if settings['forward']['enabled']:
211
		# put settings in namespace
212 1
		addr = settings['forward']['address']
213 1
		port = settings['forward']['port']
214 1
		cha = settings['forward']['channels']
215 1
		fwd_data = settings['forward']['fwd_data']
216 1
		fwd_alarms = settings['forward']['fwd_alarms']
217
		# set up queue and process
218 1
		if len(addr) == len(port):
219 1
			printM('Initializing %s Forward threads' % (len(addr)), sender=SENDER)
220 1
			for i in range(len(addr)):
221 1
				q = mk_q()
222 1
				forward = Forward(num=i, addr=addr[i], port=int(port[i]), cha=cha,
223
								  fwd_data=fwd_data, fwd_alarms=fwd_alarms,
224
								  q=q, testing=TESTING)
225 1
				mk_p(forward)
226
		else:
227
			printE('List length mismatch: %s addresses and %s ports in forward section of settings file' % (
228
										len(addr), len(port)), sender=SENDER)
229
			_xit(1)
230
231 1
	if settings['alert']['enabled']:
232
		# put settings in namespace
233 1
		sta = settings['alert']['sta']
234 1
		lta = settings['alert']['lta']
235 1
		thresh = settings['alert']['threshold']
236 1
		reset = settings['alert']['reset']
237 1
		bp = [settings['alert']['highpass'], settings['alert']['lowpass']]
238 1
		cha = settings['alert']['channel']
239 1
		if settings['alert']['deconvolve']:
240
			if settings['alert']['units'].upper() in rs.UNITS:
241
				deconv = settings['alert']['units'].upper()
242
			else:
243
				deconv = 'CHAN'
244
		else:
245 1
			deconv = False
246
247
		# set up queue and process
248 1
		q = mk_q()
249 1
		alrt = Alert(sta=sta, lta=lta, thresh=thresh, reset=reset, bp=bp,
250
					 cha=cha, debug=debug, q=q, testing=TESTING,
251
					 deconv=deconv)
252 1
		mk_p(alrt)
253
254 1
	if settings['alertsound']['enabled']:
255 1
		soundloc = os.path.expanduser(os.path.expanduser(settings['alertsound']['mp3file']))
256 1
		if soundloc in ['doorbell', 'alarm', 'beeps', 'sonar']:
257 1
			soundloc = pr.resource_filename('rsudp', os.path.join('rs_sounds', '%s.mp3' % soundloc))
258
259 1
		q = mk_q()
260 1
		alsnd = AlertSound(q=q, testing=TESTING, soundloc=soundloc)
261 1
		mk_p(alsnd)
262
263 1
	runcustom = False
264 1
	try:
265 1
		f = False
266 1
		win_ovr = False
267 1
		if settings['custom']['enabled']:
268
			# put settings in namespace
269
			f = settings['custom']['codefile']
270
			win_ovr = settings['custom']['win_override']
271
			if f == 'n/a':
272
				f = False
273
			runcustom = True
274
	except KeyError as e:
275
		if settings['alert']['exec'] != 'eqAlert':
276
			printW('the custom code function has moved to its own module (rsudp.c_custom)', sender='Custom')
277
			f = settings['alert']['exec']
278
			win_ovr = settings['alert']['win_override']
279
			runcustom = True
280
		else:
281
			raise KeyError(e)
282 1
	if runcustom:
283
		# set up queue and process
284
		q = mk_q()
285
		cstm = Custom(q=q, codefile=f, win_ovr=win_ovr, testing=TESTING)
286
		mk_p(cstm)
287
288
289 1
	if settings['tweets']['enabled']:
290
		global TWITTER
291 1
		consumer_key = settings['tweets']['api_key']
292 1
		consumer_secret = settings['tweets']['api_secret']
293 1
		access_token = settings['tweets']['access_token']
294 1
		access_token_secret = settings['tweets']['access_secret']
295 1
		tweet_images = settings['tweets']['tweet_images']
296 1
		extra_text = settings['tweets']['extra_text']
297
298 1
		q = mk_q()
299 1
		TWITTER = Tweeter(q=q, consumer_key=consumer_key, consumer_secret=consumer_secret,
300
						access_token=access_token, access_token_secret=access_token_secret,
301
						tweet_images=tweet_images, extra_text=extra_text, testing=TESTING)
302 1
		mk_p(TWITTER)
303
304 1
	if settings['telegram']['enabled']:
305
		global TELEGRAM
306 1
		token = settings['telegram']['token']
307 1
		chat_ids = settings['telegram']['chat_id'].strip(' ').split(',')
308 1
		send_images = settings['telegram']['send_images']
309 1
		extra_text = settings['telegram']['extra_text']
310
311 1
		for chat_id in chat_ids:
312 1
			sender = "Telegram id %s" % (chat_id)
313 1
			q = mk_q()
314 1
			TELEGRAM = Telegrammer(q=q, token=token, chat_id=chat_id,
315
								   send_images=send_images, extra_text=extra_text,
316
								   sender=sender, testing=TESTING)
317 1
			mk_p(TELEGRAM)
318
319 1
	if settings['rsam']['enabled']:
320
		# put settings in namespace
321 1
		fwaddr = settings['rsam']['fwaddr']
322 1
		fwport = settings['rsam']['fwport']
323 1
		fwformat = settings['rsam']['fwformat']
324 1
		interval = settings['rsam']['interval']
325 1
		cha = settings['rsam']['channel']
326 1
		quiet = settings['rsam']['quiet']
327 1
		if settings['rsam']['deconvolve']:
328
			if settings['rsam']['units'].upper() in rs.UNITS:
329
				deconv = settings['rsam']['units'].upper()
330
			else:
331
				deconv = 'CHAN'
332
		else:
333 1
			deconv = False
334
335
		# set up queue and process
336 1
		q = mk_q()
337 1
		rsam = RSAM(q=q, interval=interval, cha=cha, deconv=deconv,
338
					fwaddr=fwaddr, fwport=fwport, fwformat=fwformat,
339
					quiet=quiet, testing=TESTING)
340
341 1
		mk_p(rsam)
342
343
344
	# start additional modules here!
345
	################################
346
347
348
	################################
349
350 1
	if TESTING:
351
		# initialize test consumer
352 1
		q = mk_q()
353 1
		test = Testing(q=q)
354 1
		mk_p(test)
355
356
357
	# start the producer, consumer, and activated modules
358 1
	start()
359
360 1
	PLOTTER = False
361 1
	if not TESTING:
362
		_xit()
363
	else:
364 1
		printW('Client has exited, ending tests...', sender=SENDER, announce=False)
365
366
367 1
def main():
368
	'''
369
	Loads settings to start the main client.
370
	Supply -h from the command line to see help text.
371
	'''
372
	settings_loc = os.path.join(default_loc, 'rsudp_settings.json').replace('\\', '/')
373
374
	hlp_txt='''
375
###########################################
376
##     R A S P B E R R Y  S H A K E      ##
377
##              UDP Client               ##
378
##            by Ian Nesbitt             ##
379
##            GNU GPLv3 2020             ##
380
##                                       ##
381
## Do various tasks with Shake data      ##
382
## like plot, trigger alerts, and write  ##
383
## to miniSEED.                          ##
384
##                                       ##
385
##  Requires:                            ##
386
##  - numpy, obspy, matplotlib 3, pydub  ##
387
##                                       ##
388
###########################################
389
390
Usage: rs-client [ OPTIONS ]
391
where OPTIONS := {
392
    -h | --help
393
            display this help message
394
    -d | --dump=default or /path/to/settings/json
395
            dump the default settings to a JSON-formatted file
396
    -s | --settings=/path/to/settings/json
397
            specify the path to a JSON-formatted settings file
398
    }
399
400
rs-client with no arguments will start the program with
401
settings in %s
402
''' % settings_loc
403
404
405
	settings = json.loads(H.default_settings(verbose=False))
406
407
	# get arguments
408
	try:
409
		opts = getopt.getopt(sys.argv[1:], 'hid:s:',
410
			['help', 'install', 'dump=', 'settings=']
411
			)[0]
412
	except Exception as e:
413
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
414
		print(hlp_txt)
415
416
	if len(opts) == 0:
417
		if not os.path.exists(settings_loc):
418
			print(COLOR['yellow'] + 'Could not find rsudp settings file, creating one at %s' % settings_loc + COLOR['white'])
419
			H.dump_default(settings_loc, H.default_settings())
420
		else:
421
			settings = H.read_settings(settings_loc)
422
423
	for o, a in opts:
424
		if o in ('-h', '--help'):
425
			print(hlp_txt)
426
			exit(0)
427
		if o in ('-i', '--install'):
428
			'''
429
			This is only meant to be used by the install script.
430
			'''
431
			os.makedirs(default_loc, exist_ok=True)
432
			H.dump_default(settings_loc, H.default_settings(output_dir='@@DIR@@', verbose=False))
433
			exit(0)
434
		if o in ('-d', '--dump='):
435
			'''
436
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
437
			'''
438
			if str(a) in 'default':
439
				os.makedirs(default_loc, exist_ok=True)
440
				H.dump_default(settings_loc, H.default_settings())
441
			else:
442
				H.dump_default(os.path.abspath(os.path.expanduser(a)), H.default_settings())
443
			exit(0)
444
		if o in ('-s', 'settings='):
445
			'''
446
			Start the program with a specific settings file, for example: `-s settings.json`.
447
			'''
448
			settings = H.read_settings(a)
449
450
	debug = settings['settings']['debug']
451
	if debug:
452
		add_debug_handler()
453
	start_logging()
454
455
	printM('Using settings file: %s' % settings_loc)
456
457
	odir = os.path.abspath(os.path.expanduser(settings['settings']['output_dir']))
458
	init_dirs(odir)
459
	if debug:
460
		printM('Output directory is: %s' % odir)
461
462
	run(settings, debug=debug)
463
464
465 1
def test():
466
	'''
467
	.. versionadded:: 0.4.3
468
469
	Set up tests, run modules, report test results.
470
	For a list of tests run, see :py:mod:`rsudp.test`.
471
	'''
472
	global TESTFILE
473 1
	hlp_txt='''
474
###########################################
475
##     R A S P B E R R Y  S H A K E      ##
476
##            Testing Module             ##
477
##            by Ian Nesbitt             ##
478
##            GNU GPLv3 2020             ##
479
##                                       ##
480
## Test settings with archived Shake     ##
481
## data to determine optimal             ##
482
## configuration.                        ##
483
##                                       ##
484
##  Requires:                            ##
485
##  - numpy, obspy, matplotlib 3         ##
486
##                                       ##
487
###########################################
488
489
Usage: rs-test [ OPTIONS ]
490
where OPTIONS := {
491
    -h | --help
492
            display this help message
493
    -f | --file=default or /path/to/data/file
494
            specify the path to a seismic data file
495
    -s | --settings=/path/to/settings/json
496
            specify the path to a JSON-formatted settings file
497
    -b | --no-plot
498
            "blind mode", used when there is no display
499
    -q | --no-sound
500
            "quiet mode", used when there is no audio device/ffmpeg
501
    }
502
503
rs-test with no arguments will start the test with
504
default settings and the data file at
505
%s
506
''' % (TESTFILE)
507
508 1
	test_mode(True)
509 1
	settings = H.default_settings(verbose=False)
510 1
	settings_are_default = True
511 1
	plot = True
512 1
	quiet = False
513 1
	packetize(inf=TESTFILE+'.ms', outf=TESTFILE, testing=True)
514
515 1
	try:
516 1
		opts = getopt.getopt(sys.argv[1:], 'hf:s:bq',
517
			['help', 'file=', 'settings=', 'no-plot', 'no-sound']
518
			)[0]
519
	except Exception as e:
520
		print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
521
		print(hlp_txt)
522
		exit(1)
523
524 1
	for o, a in opts:
525
		# parse options and arguments
526
		if o in ('-h', '--help'):
527
			print(hlp_txt)
528
			exit(0)
529
		if o in ('-f', '--file='):
530
			'''
531
			The data file.
532
			'''
533
			a = os.path.expanduser(a)
534
			if os.path.exists(a):
535
				try:
536
					out = '%s.txt' % (a)
537
					packetize(inf=a, outf=out)
538
					TESTFILE = out
539
				except Exception as e:
540
					print(hlp_txt)
541
					print(COLOR['red'] + 'ERROR: %s' % e + COLOR['white'])
542
					exit(1)
543
		if o in ('-s', '--settings='):
544
			'''
545
			Dump the settings to a file, specified after the `-d` flag, or `-d default` to let the software decide where to put it.
546
			'''
547
			settings_loc = os.path.abspath(os.path.expanduser(a)).replace('\\', '/')
548
			if os.path.exists(settings_loc):
549
				settings = H.read_settings(settings_loc)
550
				settings_are_default = False
551
			else:
552
				print(COLOR['red'] + 'ERROR: could not find settings file at %s' % (a) + COLOR['white'])
553
				exit(1)
554
		if o in ('-b', '--no-plot'):
555
			plot = False
556
		if o in ('-q', '--no-sound'):
557
			quiet = True
558
559
560 1
	T.TEST['n_internet'][1] = T.is_connected('www.google.com')
561
562 1
	if settings_are_default:
563 1
		settings = T.make_test_settings(settings=settings, inet=T.TEST['n_internet'][1])
564
565 1
	T.TEST['p_log_dir'][1] = T.logdir_permissions()
566 1
	T.TEST['p_log_file'][1] = start_logging(testing=True)
567 1
	T.TEST['p_log_std'][1] = add_debug_handler(testing=True)
568
569 1
	T.TEST['p_output_dirs'][1] = init_dirs(os.path.expanduser(settings['settings']['output_dir']))
570 1
	T.TEST['p_data_dir'][1] = T.datadir_permissions(os.path.expanduser(settings['settings']['output_dir']))
571 1
	T.TEST['p_screenshot_dir'][1] = T.ss_permissions(os.path.expanduser(settings['settings']['output_dir']))
572
573 1
	settings = T.cancel_tests(settings, MPL, plot, quiet)
574
575 1
	try:
576 1
		run(settings, debug=True)
577
578
		# client test
579 1
		ctest = 'client test'
580 1
		if T.TEST['c_miniseed']:
581 1
			printM('Merging and testing MiniSEED file(s)...', sender=ctest)
582 1
			try:
583 1
				ms = rs.Stream()
584 1
				for outfile in WRITER.outfiles:
585 1
					if os.path.exists(outfile):
586 1
						T.TEST['c_miniseed'][1] = True
587 1
						ms = ms + rs.read(outfile)
588 1
						dn, fn = os.path.dirname(outfile), os.path.basename(outfile)
589 1
						os.replace(outfile, os.path.join(dn, 'test.' + fn))
590
					else:
591
						raise FileNotFoundError('MiniSEED file not found: %s' % outfile)
592 1
				printM('Renamed test file(s).', sender=ctest)
593 1
				printM(ms.merge().__str__())
594
			except Exception as e:
595
				printE(e)
596
				T.TEST['c_miniseed'][1] = False
597
598
	except Exception as e:
599
		printE(traceback.format_exc(), announce=False)
600
		printE('Ending tests.', sender=SENDER, announce=False)
601
		time.sleep(0.5)
602
603
604 1
	TESTQUEUE.put(b'ENDTEST')
605 1
	printW('Test finished.', sender=SENDER, announce=False)
606
607 1
	print()
608
609 1
	code = 0
610 1
	printM('Test results:')
611 1
	for i in T.TEST:
612 1
		printM('%s: %s' % (T.TEST[i][0], T.TRANS[T.TEST[i][1]]))
613 1
		if not T.TEST[i][1]:
614
			# if a test fails, change the system exit code to indicate an error occurred
615
			code = 1
616 1
	_xit(code)
617
618
619
if __name__ == '__main__':
620
	main()
621