build.rsudp.c_custom.Custom.run()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 16.3253

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 17
ccs 1
cts 12
cp 0.0833
rs 9.8
c 0
b 0
f 0
cc 4
nop 1
crap 16.3253
1 1
import sys, os
2 1
from rsudp import printM, printW, printE
3 1
from rsudp.raspberryshake import ConsumerThread
4 1
from rsudp.test import TEST
5
6
7 1
class Custom(ConsumerThread):
8
	"""
9
	.. versionadded:: 0.4.3
10
11
	.. |lineendings_howto| raw:: html
12
13
		<a href="https://stackoverflow.com/questions/17579553/windows-command-to-convert-unix-line-endings" target="_blank">this stackoverflow question</a>
14
15
	.. |lineendings_wiki| raw:: html
16
17
		<a href="https://en.wikipedia.org/wiki/Newline" target="_blank">here</a>
18
19
	.. role:: json(code)
20
		:language: json
21
22
23
	A consumer class that runs custom code from a python file passed to it.
24
	Please read the disclaimers and warnings at :ref:`customcode` prior to using this module.
25
26
	.. warning::
27
28
		If you are running Windows and have code you want to pass to the :py:func:`exec` function,
29
		Python requires that your newline characters are in the UNIX style (:code:`\\n`),
30
		not the standard Windows style (:code:`\\r\\n`).
31
		To convert, follow the instructions in one of the answers to |lineendings_howto|.
32
		If you're not sure what this means, please read about newline/line ending characters |lineendings_wiki|.
33
		If you are certain that your code file has no Windows newlines, you can set :json:`"win_override"` to true.
34
35
		Read more warnings about this module at :ref:`customcode`.
36
37
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
38
	:param codefile: string of the python (.py) file to run, or False if none.
39
	:type codefile: str or bool
40
	:param bool win_ovr: user check to make sure that line ending format is correct (see warning above)
41
42
	"""
43
44 1
	def __init__(self, q=False, codefile=False, win_ovr=False, testing=False):
45
		"""
46
		Initializes the custom code execution thread.
47
		"""
48
		super().__init__()
49
		self.sender = 'Custom'
50
		self.alive = True
51
		self.testing = testing
52
		self.codefile = False
53
		self.win_ovr = win_ovr
54
		if codefile:
55
			if (os.path.exists(os.path.expanduser(codefile))) and (os.path.splitext(codefile)[1]):
56
				self.codefile = os.path.expanduser(codefile)
57
				printM('Custom code file to run: %s' % self.codefile, sender=self.sender)
58
			else:
59
				printW('No python file exists at %s. No custom code will be run during alarms.' % codefile, sender=self.sender)
60
		else:
61
			printW('No custom code file set. No custom code will be run during alarms.', sender=self.sender)
62
63
		if (os.name in 'nt') and (not self.win_ovr):
64
			printE('Using Windows with custom alert code! Your code MUST have UNIX/Mac newline characters!')
65
			printE('Please use a conversion tool like dos2unix to convert line endings', spaces=True)
66
			printE('(https://en.wikipedia.org/wiki/Unix2dos) to make your code file', spaces=True)
67
			printE('readable to the Python interpreter.', spaces=True)
68
			printE('Once you have done that, please set "win_override" to true', spaces=True)
69
			printE('in the settings file.', spaces=True)
70
			printE('(see also footnote [1] on this page: https://docs.python.org/3/library/functions.html#id2)', spaces=True)
71
			printE('THREAD EXITING, please correct and restart!', self.sender, spaces=True)
72
			sys.exit(2)
73
		else:
74
			pass
75
76
77
		if q:
78
			self.queue = q
79
		else:
80
			printE('no queue passed to the consumer thread! We will exit now!',
81
				   self.sender)
82
			sys.stdout.flush()
83
			self.alive = False
84
			sys.exit()
85
86
		printM('Starting.', self.sender)
87
88 1
	def exec_code(self):
89
		if self.codefile:
90
			# if the user has set a code file
91
			printM('Executing code from file: %s' % self.codefile, sender=self.sender)
92
			try:
93
				# try to execute some code
94
				exec(self.codefile)
95
				if self.testing:
96
					TEST['c_custom'][1] = True
97
			except Exception as e:
98
				# do something if it fails
99
				printE('Code execution failed. Error: %s' % e, sender=self.sender, announce=False)
100
		else:
101
			printW('No code to run, codefile variable not set correctly.', sender=self.sender)
102
103
104 1
	def run(self):
105
		"""
106
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
107
		Quits if it sees a ``TERM`` message.
108
		"""
109
		while True:
110
			d = self.queue.get()
111
			self.queue.task_done()
112
			if 'TERM' in str(d):
113
				self.alive = False
114
				printM('Exiting.', self.sender)
115
				sys.exit()
116
			elif 'ALARM' in str(d):
117
				printM('Got ALARM message...', sender=self.sender)
118
				self.exec_code()
119
120
		self.alive = False
121