Passed
Push — convenience-scripts ( cc7f56 )
by Ian
10:17
created

build.rsudp.helpers   F

Complexity

Total Complexity 62

Size/Duplication

Total Lines 630
Duplicated Lines 0 %

Test Coverage

Coverage 55.41%

Importance

Changes 0
Metric Value
wmc 62
eloc 164
dl 0
loc 630
ccs 87
cts 157
cp 0.5541
rs 3.44
c 0
b 0
f 0

21 Functions

Rating   Name   Duplication   Size   Complexity  
A deconv_vel_inst() 0 28 5
B deconvolve() 0 36 7
A deconv_rbm_inst() 0 19 1
B deconv_acc_inst() 0 30 7
A dump_default() 0 11 2
A ep_tailf_log() 0 28 5
A ep_edit_settings() 0 37 4
A conn_stats() 0 34 3
A msg_imgpath() 0 22 1
A ep_cat_log() 0 14 2
A msg_term() 0 14 1
A default_settings() 0 85 2
A msg_reset() 0 20 1
A resolve_extra_text() 0 32 5
B set_channels() 0 41 8
A lesser_multiple() 0 11 1
A read_settings() 0 21 3
A msg_alarm() 0 20 1
A fsec() 0 35 1
A get_msg_path() 0 22 1
A get_msg_time() 0 23 1

How to fix   Complexity   

Complexity

Complex classes like build.rsudp.helpers often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1 1
import rsudp.raspberryshake as rs
2 1
from rsudp import COLOR, printM, printW, scap_dir, log_loc, settings_loc
3 1
import os
4 1
import subprocess
5 1
import platform
6 1
import json
7
8
9 1
def dump_default(settings_loc, default_settings):
10
	'''
11
	Dumps a default settings file to a specified location.
12
13
	:param str settings_loc: The location to create the new settings JSON.
14
	:param str default_settings: The default settings to dump to file.
15
	'''
16
	print('Creating a default settings file at %s' % settings_loc)
17
	with open(settings_loc, 'w+') as f:
18
		f.write(default_settings)
19
		f.write('\n')
20
21
22 1
def default_settings(output_dir='%s/rsudp' % os.path.expanduser('~').replace('\\', '/'), verbose=True):
23
	'''
24
	Returns a formatted json string of default settings.
25
26
	:param str output_dir: the user's specified output location. defaults to ``~/rsudp``.
27
	:param bool verbose: if ``True``, displays some information as the string is created.
28
	:return: default settings string in formatted json
29
	:rtype: str
30
	'''
31 1
	def_settings = r"""{
32
"settings": {
33
    "port": 8888,
34
    "station": "Z0000",
35
    "output_dir": "%s",
36
    "debug": true},
37
"printdata": {
38
    "enabled": false},
39
"write": {
40
    "enabled": false,
41
    "channels": ["all"]},
42
"plot": {
43
    "enabled": true,
44
    "duration": 90,
45
    "spectrogram": true,
46
    "fullscreen": false,
47
    "kiosk": false,
48
    "eq_screenshots": false,
49
    "channels": ["all"],
50
    "deconvolve": true,
51
    "units": "CHAN"},
52
"forward": {
53
    "enabled": false,
54
    "address": ["192.168.1.254"],
55
    "port": [8888],
56
    "channels": ["all"],
57
    "fwd_data": true,
58
    "fwd_alarms": false},
59
"alert": {
60
    "enabled": true,
61
    "channel": "HZ",
62
    "sta": 6,
63
    "lta": 30,
64
    "threshold": 3.95,
65
    "reset": 0.9,
66
    "highpass": 0.8,
67
    "lowpass": 9,
68
    "deconvolve": false,
69
    "units": "VEL"},
70
"alertsound": {
71
    "enabled": false,
72
    "mp3file": "doorbell"},
73
"custom": {
74
    "enabled": false,
75
    "codefile": "n/a",
76
    "win_override": false},
77
"tweets": {
78
    "enabled": false,
79
    "tweet_images": true,
80
    "api_key": "n/a",
81
    "api_secret": "n/a",
82
    "access_token": "n/a",
83
    "access_secret": "n/a",
84
    "extra_text": ""},
85
"telegram": {
86
    "enabled": false,
87
    "send_images": true,
88
    "token": "n/a",
89
    "chat_id": "n/a",
90
    "extra_text": ""},
91
"rsam": {
92
    "enabled": false,
93
    "quiet": true,
94
    "fwaddr": "192.168.1.254",
95
    "fwport": 8887,
96
    "fwformat": "LITE",
97
    "channel": "HZ",
98
    "interval": 10,
99
    "deconvolve": false,
100
    "units": "VEL"}
101
}
102
103
""" % (output_dir)
104 1
	if verbose:
105
		print('By default output_dir is set to %s' % output_dir)
106 1
	return def_settings
107
108
109 1
def read_settings(loc):
110
	'''
111
	Reads settings from a specific location.
112
113
	:param str loc: location on disk to read json settings file from
114
	:return: settings dictionary read from JSON, or ``None``
115
	:rtype: dict or NoneType
116
	'''
117
	settings_loc = os.path.abspath(os.path.expanduser(loc)).replace('\\', '/')
118
	settings = None
119
	with open(settings_loc, 'r') as f:
120
		try:
121
			data = f.read().replace('\\', '/')
122
			settings = json.loads(data)
123
		except Exception as e:
124
			print(COLOR['red'] + 'ERROR: Could not load settings file. Perhaps the JSON is malformed?' + COLOR['white'])
125
			print(COLOR['red'] + '       detail: %s' % e + COLOR['white'])
126
			print(COLOR['red'] + '       If you would like to overwrite and rebuild the file, you can enter the command below:' + COLOR['white'])
127
			print(COLOR['bold'] + '       shake_client -d %s' % loc + COLOR['white'])
128
			exit(2)
129
	return settings
130
131
132 1
def set_channels(self, cha):
133
	'''
134
	This function sets the channels available for plotting. Allowed units are as follows:
135
136
	- ``["SHZ", "EHZ", "EHN", "EHE"]`` - velocity channels
137
	- ``["ENZ", "ENN", "ENE"]`` - acceleration channels
138
	- ``["HDF"]`` - pressure transducer channel
139
	- ``["all"]`` - all available channels
140
141
	So for example, if you wanted to display the two vertical channels of a Shake 4D,
142
	(geophone and vertical accelerometer) you could specify:
143
144
	``["EHZ", "ENZ"]``
145
146
	You can also specify partial channel names.
147
	So for example, the following will display at least one channel from any
148
	Raspberry Shake instrument:
149
150
	``["HZ", "HDF"]``
151
152
	Or if you wanted to display only vertical channels from a RS4D,
153
	you could specify
154
155
	``["Z"]``
156
157
	which would match both ``"EHZ"`` and ``"ENZ"``.
158
159
	:param self self: self object of the class calling this function
160
	:param cha: the channel or list of channels to plot
161
	:type cha: list or str
162
	'''
163 1
	cha = rs.chns if ('all' in cha) else cha
164 1
	cha = list(cha) if isinstance(cha, str) else cha
165 1
	for c in rs.chns:
166 1
		n = 0
167 1
		for uch in cha:
168 1
			if (uch.upper() in c) and (c not in str(self.chans)):
169 1
				self.chans.append(c)
170 1
			n += 1
171 1
	if len(self.chans) < 1:
172
			self.chans = rs.chns
173
174
175 1
def fsec(ti):
176
	'''
177
	.. versionadded:: 0.4.3
178
179
	The Raspberry Shake records at hundredths-of-a-second precision.
180
	In order to report time at this precision, we need to do some time-fu.
181
182
	This function rounds the microsecond fraction of a
183
	:py:class:`obspy.core.utcdatetime.UTCDateTime`
184
	depending on its precision, so that it accurately reflects the Raspberry Shake's
185
	event measurement precision.
186
187
	This is necessary because datetime objects in Python are strange and confusing, and
188
	strftime doesn't support fractional returns, only the full integer microsecond field
189
	which is an integer right-padded with zeroes. This function uses the ``precision``
190
	of a datetime object.
191
192
	For example:
193
194
	.. code-block:: python
195
196
		>>> from obspy import UTCDateTime
197
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
198
		>>> fsec(ti)
199
		UTCDateTime(2020, 1, 1, 0, 0, 0, 600000)
200
201
	:param ti: time object to convert microseconds for
202
	:type ti: obspy.core.utcdatetime.UTCDateTime
203
	:return: the hundredth-of-a-second rounded version of the time object passed (precision is 0.01 second)
204
	:rtype: obspy.core.utcdatetime.UTCDateTime
205
	'''
206
	# time in python is weird and confusing, but luckily obspy is better than Python
207
	# at dealing with datetimes. all we need to do is tell it what precision we want
208
	# and it handles the rounding for us.
209 1
	return rs.UTCDateTime(ti, precision=2)
210
211
212 1
def lesser_multiple(x, base=10):
213
	'''
214
	.. versionadded:: 1.0.3
215
216
	This function calculates the nearest multiple of the base number ``base``
217
	for the number ``x`` passed to it, as long as the result is less than ``x``.
218
219
	This is useful for :func:`rsudp.packetize` when figuring out where to cut
220
	off samples when trying to fit them into packets.
221
	'''
222 1
	return int(base * int(float(x)/base))
223
224
225 1
def conn_stats(TESTING=False):
226
	'''
227
	Print some stats about the connection.
228
229
	Example:
230
231
	.. code-block:: python
232
233
		>>> conn_stats()
234
		2020-03-25 01:35:04 [conn_stats] Initialization stats:
235
		2020-03-25 01:35:04 [conn_stats]                 Port: 18069
236
		2020-03-25 01:35:04 [conn_stats]   Sending IP address: 192.168.0.4
237
		2020-03-25 01:35:04 [conn_stats]     Set station name: R24FA
238
		2020-03-25 01:35:04 [conn_stats]   Number of channels: 4
239
		2020-03-25 01:35:04 [conn_stats]   Transmission freq.: 250 ms/packet
240
		2020-03-25 01:35:04 [conn_stats]    Transmission rate: 4 packets/sec
241
		2020-03-25 01:35:04 [conn_stats]   Samples per second: 100 sps
242
		2020-03-25 01:35:04 [conn_stats]            Inventory: AM.R24FA (Raspberry Shake Citizen Science Station)
243
244
	:param bool TESTING: if ``True``, text is printed to the console in yellow. if not, in white.
245
	'''
246 1
	s = 'conn_stats'
247 1
	pf = printW if TESTING else printM
248 1
	pf('Initialization stats:', sender=s, announce=False)
249 1
	pf('                Port: %s' % rs.port, sender=s, announce=False)
250 1
	pf('  Sending IP address: %s' % rs.firstaddr, sender=s, announce=False)
251 1
	pf('    Set station name: %s' % rs.stn, sender=s, announce=False)
252 1
	pf('  Number of channels: %s' % rs.numchns, sender=s, announce=False)
253 1
	pf('  Transmission freq.: %s ms/packet' % rs.tf, sender=s, announce=False)
254 1
	pf('   Transmission rate: %s packets/sec' % rs.tr, sender=s, announce=False)
255 1
	pf('  Samples per second: %s sps' % rs.sps, sender=s, announce=False)
256 1
	if rs.inv:
257 1
		pf('           Inventory: %s' % rs.inv.get_contents()['stations'][0],
258
			   sender=s, announce=False)
259
260
261 1
def msg_alarm(event_time):
262
	'''
263
	This function constructs the ``ALARM`` message as a bytes object.
264
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
265
	to construct alarm queue messages.
266
267
	For example:
268
269
	.. code-block:: python
270
271
		>>> from obspy import UTCDateTime
272
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
273
		>>> msg_alarm(ti)
274
		b'ALARM 2020-01-01T00:00:00.599Z'
275
276
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
277
	:rtype: bytes
278
	:return: the ``ALARM`` message, ready to be put on the queue
279
	'''
280 1
	return b'ALARM %s' % bytes(str(event_time), 'utf-8')
281
282
283 1
def msg_reset(reset_time):
284
	'''
285
	This function constructs the ``RESET`` message as a bytes object.
286
	Currently this is only used by :py:class:`rsudp.p_producer.Producer`
287
	to construct reset queue messages.
288
289
	For example:
290
291
	.. code-block:: python
292
293
		>>> from obspy import UTCDateTime
294
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
295
		>>> msg_reset(ti)
296
		b'RESET 2020-01-01T00:00:00.599Z'
297
298
	:param obspy.core.utcdatetime.UTCDateTime reset_time: the datetime object to serialize and convert to bytes
299
	:rtype: bytes
300
	:return: the ``RESET`` message, ready to be put on the queue
301
	'''
302 1
	return b'RESET %s' % bytes(str(reset_time), 'utf-8')
303
304
305 1
def msg_imgpath(event_time, figname):
306
	'''
307
	This function constructs the ``IMGPATH`` message as a bytes object.
308
	Currently this is only used by :py:class:`rsudp.c_plot.Plot`
309
	to construct queue messages containing timestamp and saved image path.
310
311
	For example:
312
313
	.. code-block:: python
314
315
		>>> from obspy import UTCDateTime
316
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
317
		>>> path = '/home/pi/rsudp/screenshots/test.png'
318
		>>> msg_imgpath(ti, path)
319
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
320
321
	:param obspy.core.utcdatetime.UTCDateTime event_time: the datetime object to serialize and convert to bytes
322
	:param str figname: the figure path as a string
323
	:rtype: bytes
324
	:return: the ``IMGPATH`` message, ready to be put on the queue
325
	'''
326 1
	return b'IMGPATH %s %s' % (bytes(str(event_time), 'utf-8'), bytes(str(figname), 'utf-8'))
327
328
329 1
def msg_term():
330
	'''
331
	This function constructs the simple ``TERM`` message as a bytes object.
332
333
	.. code-block:: python
334
335
		>>> msg_term()
336
		b'TERM'
337
338
339
	:rtype: bytes
340
	:return: the ``TERM`` message
341
	'''
342 1
	return b'TERM'
343
344
345 1
def get_msg_time(msg):
346
	'''
347
	This function gets the time from ``ALARM``, ``RESET``,
348
	and ``IMGPATH`` messages as a UTCDateTime object.
349
350
	For example:
351
352
	.. code-block:: python
353
354
		>>> from obspy import UTCDateTime
355
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
356
		>>> path = '/home/pi/rsudp/screenshots/test.png'
357
		>>> msg = msg_imgpath(ti, path)
358
		>>> msg
359
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
360
		>>> get_msg_time(msg)
361
		UTCDateTime(2020, 1, 1, 0, 0, 0, 599000)
362
363
	:param bytes msg: the bytes-formatted queue message to decode
364
	:rtype: obspy.core.utcdatetime.UTCDateTime
365
	:return: the time embedded in the message
366
	'''
367 1
	return rs.UTCDateTime.strptime(msg.decode('utf-8').split(' ')[1], '%Y-%m-%dT%H:%M:%S.%fZ')
368
369
370 1
def get_msg_path(msg):
371
	'''
372
	This function gets the path from ``IMGPATH`` messages as a string.
373
374
	For example:
375
376
	.. code-block:: python
377
378
		>>> from obspy import UTCDateTime
379
		>>> ti = UTCDateTime(2020, 1, 1, 0, 0, 0, 599000, precision=3)
380
		>>> path = '/home/pi/rsudp/screenshots/test.png'
381
		>>> msg = msg_imgpath(ti, path)
382
		>>> msg
383
		b'IMGPATH 2020-01-01T00:00:00.599Z /home/pi/rsudp/screenshots/test.png'
384
		>>> get_msg_path(msg)
385
		'/home/pi/rsudp/screenshots/test.png'
386
387
	:param bytes msg: the bytes-formatted queue message to decode
388
	:rtype: str
389
	:return: the path embedded in the message
390
	'''
391 1
	return msg.decode('utf-8').split(' ')[2]
392
393
394 1
def deconv_vel_inst(self, trace, output):
395
	'''
396
	.. role:: pycode(code)
397
		:language: python
398
	
399
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
400
	for velocity channels.
401
402
	:param self self: The self object of the sub-consumer class calling this function.
403
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
404
	'''
405 1
	if self.deconv not in 'CHAN':
406
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
407
								output=output, water_level=4.5, taper=False)
408
	else:
409 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
410
								output='VEL', water_level=4.5, taper=False)
411 1
	if 'ACC' in self.deconv:
412
		trace.data = rs.np.gradient(trace.data, 1)
413 1
	elif 'GRAV' in self.deconv:
414
		trace.data = rs.np.gradient(trace.data, 1) / rs.g
415
		trace.stats.units = 'Earth gravity'
416 1
	elif 'DISP' in self.deconv:
417
		trace.data = rs.np.cumsum(trace.data)
418
		trace.taper(max_percentage=0.1, side='left', max_length=1)
419
		trace.detrend(type='demean')
420
	else:
421 1
		trace.stats.units = 'Velocity'
422
423
424 1
def deconv_acc_inst(self, trace, output):
425
	'''
426
	.. role:: pycode(code)
427
		:language: python
428
	
429
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
430
	for acceleration channels.
431
432
	:param self self: The self object of the sub-consumer class calling this function.
433
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
434
	'''
435 1
	if self.deconv not in 'CHAN':
436
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
437
								output=output, water_level=4.5, taper=False)
438
	else:
439 1
		trace.remove_response(inventory=rs.inv, pre_filt=[0.1, 0.6, 0.95*self.sps, self.sps],
440
								output='ACC', water_level=4.5, taper=False)
441 1
	if 'VEL' in self.deconv:
442
		trace.data = rs.np.cumsum(trace.data)
443
		trace.detrend(type='demean')
444 1
	elif 'DISP' in self.deconv:
445
		trace.data = rs.np.cumsum(rs.np.cumsum(trace.data))
446
		trace.detrend(type='linear')
447 1
	elif 'GRAV' in self.deconv:
448
		trace.data = trace.data / rs.g
449
		trace.stats.units = 'Earth gravity'
450
	else:
451 1
		trace.stats.units = 'Acceleration'
452 1
	if ('ACC' not in self.deconv) and ('CHAN' not in self.deconv):
453
		trace.taper(max_percentage=0.1, side='left', max_length=1)
454
455
456 1
def deconv_rbm_inst(self, trace, output):
457
	'''
458
	.. role:: pycode(code)
459
		:language: python
460
	
461
	A helper function for :py:func:`rsudp.raspberryshake.deconvolve`
462
	for Raspberry Boom pressure transducer channels.
463
464
	.. note::
465
466
		The Raspberry Boom pressure transducer does not currently have a
467
		deconvolution function. The Raspberry Shake team is working on a
468
		calibration for the Boom, but until then Boom units are given in
469
		counts.
470
471
	:param self self: The self object of the sub-consumer class calling this function.
472
	:param obspy.core.trace.Trace trace: the trace object instance to deconvolve
473
	'''
474
	trace.stats.units = ' counts'
475
476
477 1
def deconvolve(self):
478
	'''
479
	.. role:: pycode(code)
480
		:language: python
481
	
482
	A central helper function for sub-consumers (i.e. :py:class:`rsudp.c_plot.Plot` or :py:class:`rsudp.c_alert.Alert`)
483
	that need to deconvolve their raw data to metric units.
484
	Consumers with :py:class:`obspy.core.stream.Stream` objects in :pycode:`self.stream` can use this to deconvolve data
485
	if this library's :pycode:`rsudp.raspberryshake.inv` variable
486
	contains a valid :py:class:`obspy.core.inventory.inventory.Inventory` object.
487
488
	:param self self: The self object of the sub-consumer class calling this function. Must contain :pycode:`self.stream` as a :py:class:`obspy.core.stream.Stream` object.
489
	'''
490 1
	acc_channels = ['ENE', 'ENN', 'ENZ']
491 1
	vel_channels = ['EHE', 'EHN', 'EHZ', 'SHZ']
492 1
	rbm_channels = ['HDF']
493
494 1
	self.stream = self.raw.copy()
495 1
	for trace in self.stream:
496 1
		trace.stats.units = self.units
497 1
		output = 'ACC' if self.deconv == 'GRAV' else self.deconv	# if conversion is to gravity
498 1
		if self.deconv:
499 1
			if trace.stats.channel in vel_channels:
500 1
				deconv_vel_inst(self, trace, output)	# geophone channels
501
502 1
			elif trace.stats.channel in acc_channels:
503 1
				deconv_acc_inst(self, trace, output)	# accelerometer channels
504
505
			elif trace.stats.channel in rbm_channels:
506
				deconv_rbm_inst(self, trace, output)	# this is the Boom channel
507
508
			else:
509
				trace.stats.units = ' counts'	# this is a new one
510
511
		else:
512
			trace.stats.units = ' counts'		# this is not being deconvolved
513
514
515 1
def resolve_extra_text(extra_text, max_len, sender='helpers'):
516
	'''
517
	.. role:: pycode(code)
518
		:language: python
519
520
	.. versionadded:: 1.0.3
521
522
	A central helper function for the :class:`rsudp.c_telegram.Tweeter`
523
	and :class:`rsudp.c_telegram.Telegrammer` classes that checks whether
524
	the :pycode:`"extra_text"` parameter (in the settings file) is of appropriate
525
	length. This is done to avoid errors when posting alerts.
526
	The function will truncate longer messages.
527
528
	:param str extra_text: String of additional characters to post as part of the alert message (longer messages will be truncated).
529
	:param str max_len: Upper limit of characters accepted in message (280 for Twitter, 4096 for Telegram).
530
	:param str sender: String identifying the origin of the use of this function (:pycode:`self.sender` in the source function).
531
	:rtype: str
532
	:return: the message string to be incorporated
533
534
	'''
535 1
	allowable_len = max_len - 177	# length of string allowable given maximum message text & region
536 1
	if ((extra_text == '') or (extra_text == None) or (extra_text == False)):
537 1
		return ''
538
	else:
539
		extra_text = str(extra_text)
540
		len_ex_txt = len(extra_text)
541
542
		if len_ex_txt > allowable_len:
543
			printW('extra_text parameter is longer than allowable (%s chars) and will be truncated. Please keep extra_text at or below %s characters.' % (len_ex_txt, allowable_len), sender=sender)
544
			extra_text = extra_text[:allowable_len]
545
546
		return ' %s' % (extra_text)
547
548
549 1
def ep_edit_settings():
550
	'''
551
	.. versionadded:: 1.0.3
552
553
	This function calls the system's default text editor to open the settings
554
	file for editing. It is provided for convenience only.
555
	Advanced users may prefer to add an alias in place of this function.
556
	The alias **should** override the entrypoint command set in rsudp's
557
	``setup.py``.
558
559
	On Linux and MacOS, adding an alias may look like this:
560
561
	.. code-block:: bash
562
563
		# add the alias definition to the aliases file
564
		echo "alias rsudp-settings='nano ~/.config/rsudp/rsudp_settings.json'" >> .bash_aliases
565
		# then reload the console
566
		bash
567
568
	To add an alias on Windows via the command prompt is much more difficult.
569
570
	.. note::
571
572
		This function has been tested on multiple operating systems, but
573
		because each system's functionality and defaults may be different,
574
		proper operation cannot be not guaranteed.
575
576
	'''
577
	if not os.path.exists(settings_loc):
578
		raise(FileNotFoundError('Settings file not found at %s' % settings_loc))
579
580
	if platform.system() == 'Darwin':		# MacOS
581
		subprocess.call(('open', settings_loc))
582
	elif platform.system() == 'Windows':	# Windows
583
		os.startfile(settings_loc)
584
	else:									# linux variants
585
		subprocess.call(('xdg-open', settings_loc))
586
587 1
def ep_cat_log():
588
	'''
589
	.. versionadded:: 1.0.3
590
591
	This function uses a posix system's ``cat`` command to print messages in
592
	the rsudp log file. It is provided for convenience only.
593
594
	It is accessible via the console command ``rs-log`` on posix (Linux, MacOS)
595
	style operating systems.
596
	'''
597
	if os.name == 'posix':
598
		subprocess.call(('cat', log_loc))
599
	else:
600
		printE('This command is only available on posix (Linux, MacOS) machines.')
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable printE does not seem to be defined.
Loading history...
601
602 1
def ep_tailf_log():
603
	'''
604
	.. versionadded:: 1.0.3
605
606
	This function uses a the system's follow command to follow new
607
	messages added to the log file. It is provided for convenience only.
608
	is the equivalent of ``tail -f /tmp/rsudp/rsudp.log`` on Linux/MacOS
609
	and ``Get-Content -Path "C:/tmp/rsudp/rsudp.log" -Wait`` on Windows.
610
611
	It is accessible via the console command ``rs-tailf``.
612
613
	The function will run until it receives a keyboard interrupt (CTRL+C).
614
	'''
615
	if os.name == 'posix':
616
		try:
617
			print(COLOR['blue'] + 'Entering log follow (tail -f) mode.')
618
			print('New log messages will be printed until the console receives an interrupt (CTRL+C to end).' + COLOR['white'])
619
			subprocess.call(('tail','-f', log_loc))
620
		except KeyboardInterrupt:
621
			print()
622
			print(COLOR['blue'] + 'Quitting tail -f mode.' + COLOR['white'])
623
	if os.name == 'nt':
624
		try:
625
			print('Entering log follow mode.')
626
			print('New log messages will be printed until the console receives an interrupt (CTRL+C to end).')
627
			subprocess.call(('Get-Content', '-Path', '"C:/tmp/rsudp/rsudp.log"', '-Wait'))
628
		except Exception as e:
629
			print('This function is not available on Windows. Error: %s' % (e))
630