Passed
Push — master ( fe5590...f40c33 )
by Ian
06:56
created

build.rsudp.test.cancel_tests()   A

Complexity

Conditions 4

Size

Total Lines 26
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 26
rs 9.7
c 0
b 0
f 0
cc 4
nop 4
1
import os, sys
2
from rsudp import COLOR, printM, printW, printE
3
from queue import Queue
4
import socket
5
import json
6
import time
7
import pkg_resources as pr
8
9
SENDER = 'test.py'
10
TEST = {
11
	# permissions
12
	'p_log_dir':			['log directory               ', False],
13
	'p_log_std':			['stdout logging              ', False],
14
	'p_log_file':			['logging to file             ', False],
15
	'p_output_dirs':		['output directory structure  ', False],
16
	'p_screenshot_dir':		['screenshot directory        ', False],
17
	'p_data_dir':			['data directory              ', False],
18
19
	# network
20
	'n_port':				['port                        ', False],
21
	'n_internet':			['internet                    ', False],
22
	'n_inventory':			['inventory fetch             ', False],
23
24
	# dependencies
25
	'd_pydub':				['pydub dependencies          ', False],
26
	'd_matplotlib':			['matplotlib backend          ', False],
27
28
	# core
29
	'c_data':				['receiving data              ', False],
30
	'c_processing':			['processing data             ', False],
31
	'c_ALARM':				['ALARM message               ', False],
32
	'c_RESET':				['RESET message               ', False],
33
	'c_IMGPATH':			['IMGPATH message             ', False],
34
	'c_TERM':				['TERM message                ', False],
35
}
36
37
TRANS = {
38
	True: COLOR['green'] + 'PASS' + COLOR['white'],
39
	False: COLOR['red'] + 'FAIL' + COLOR['white']
40
}
41
42
PORT = 18888
43
44
def make_test_settings(settings, inet=False):
45
	'''
46
	Get the default settings and return settings for testing.
47
48
	The default settings are modified in the following way:
49
50
	======================================== ===================
51
	Setting                                  Value
52
	======================================== ===================
53
	 ``settings['settings']['station']``      ``'R24FA'``
54
	 ``settings['alert']['threshold']``       ``2``
55
	 ``settings['alert']['reset']``           ``0.5``
56
	 ``settings['alert']['lowpass']``         ``9``
57
	 ``settings['alert']['highpass']``        ``0.8``
58
	 ``settings['plot']['channels']``         ``['all']``
59
	 ``settings['plot']['duration']``         ``60``
60
	 ``settings['plot']['deconvolve']``       ``True``
61
	 ``settings['plot']['units']``            ``'CHAN'``
62
	 ``settings['plot']['eq_screenshots']``   ``True``
63
	 ``settings['alertsound']['enabled']``    ``True``
64
	======================================== ===================
65
66
	.. note::
67
68
		If there is no internet connection detected, the station
69
		name will default to ``'Z0000'`` so that no time is wasted
70
		trying to download an inventory from the Raspberry Shake
71
		FDSN service.
72
73
	:param dict settings: settings dictionary (will be modified from :ref:`defaults`)
74
	:param bool inet: whether or not the internet test passed
75
	:rtype: dict
76
	:return: settings dictionary to test with
77
	'''
78
	settings = json.loads(settings)
79
80
	settings['settings']['port'] = PORT
81
	if inet:
82
		settings['settings']['station'] = 'R24FA'
83
	else:
84
		settings['settings']['station'] = 'Z0000'
85
86
87
	settings['alert']['threshold'] = 2
88
	settings['alert']['reset'] = 0.5
89
	settings['alert']['lowpass'] = 9
90
	settings['alert']['highpass'] = 0.8
91
92
	settings['plot']['channels'] = ['all']
93
	settings['plot']['duration'] = 60
94
	settings['plot']['deconvolve'] = True
95
	settings['plot']['units'] = 'CHAN'
96
	settings['plot']['eq_screenshots'] = True
97
98
	settings['alertsound']['enabled'] = True
99
100
	return settings
101
102
103
def cancel_tests(settings, MPL, plot, quiet):
104
	'''
105
	Cancel some tests if they don't need to be run.
106
107
	:param dict settings: the dictionary of settings for program execution
108
	:param bool plot: whether or not to plot (``False`` == no plot)
109
	:param bool quiet: whether or not to play sounds (``True`` == no sound)
110
	:rtype: dict
111
	:return: settings dictionary to test with
112
	'''
113
	if plot:
114
		if MPL:
115
			TEST['d_matplotlib'][1] = True
116
		else:
117
			printW('matplotlib backend failed to load')
118
	else:
119
		settings['plot']['enabled'] = False
120
		del TEST['d_matplotlib']
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable TEST does not seem to be defined.
Loading history...
121
		del TEST['c_IMGPATH']
122
		printM('Plot is disabled')
123
124
	if quiet:
125
		settings['alertsound']['enabled'] = False
126
		del TEST['d_pydub']
127
		printM('Alert sound is disabled')
128
	return settings
129
130
131
def permissions(dp):
132
	'''
133
	Test write permissions for the specified directory.
134
135
	:param str dp: the directory path to test permissions for
136
	:rtype: bool
137
	:return: if ``True``, the test was successful, ``False`` otherwise
138
	'''
139
	dp = os.path.join(dp, 'test')
140
	try:
141
		with open(dp, 'w') as f:
142
			f.write('testing\n')
143
		os.remove(dp)
144
		return True
145
	except Exception as e:
146
		printE(e)
147
		return False
148
149
def datadir_permissions(testdir):
150
	'''
151
	Test write permissions in the data directory (``./data`` by default)
152
153
	:param str testdir: The directory to test permissions for
154
	:rtype: bool
155
	:return: the output of :py:func:`rsudp.test.permissions`
156
157
	'''
158
	return permissions('%s/data/' % testdir)
159
160
def ss_permissions(testdir):
161
	'''
162
	Test write permissions in the screenshots directory (``./screenshots`` by default)
163
164
	:param str testdir: The directory to test permissions for
165
	:rtype: bool
166
	:return: the output of :py:func:`rsudp.test.permissions`
167
168
	'''
169
	return permissions('%s/screenshots/' % testdir)
170
171
def logdir_permissions(logdir='/tmp/rsudp'):
172
	'''
173
	Test write permissions in the log directory (``/tmp/rsudp`` by default)
174
175
	:param str logdir: The log directory to test permissions for
176
	:rtype: bool
177
	:return: the output of :py:func:`rsudp.test.permissions`
178
	'''
179
	return permissions(logdir)
180
181
def is_connected(hostname):
182
	'''
183
	Test for an internet connection. 
184
185
	:param str hostname: The hostname to test with
186
	:rtype: bool
187
	:return: ``True`` if connection is successful, ``False`` otherwise
188
	'''
189
	try:
190
		# see if we can resolve the host name -- tells us if there is
191
		# a DNS listening
192
		host = socket.gethostbyname(hostname)
193
		# connect to the host -- tells us if the host is actually
194
		# reachable
195
		s = socket.create_connection((host, 80), 2)
196
		s.close()
197
		return True
198
	except:
199
		pass
200
	return False
201
202