Passed
Pull Request — master (#61)
by
unknown
12:44
created

build.rsudp.c_custom.Custom.exec_code()   A

Complexity

Conditions 5

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 24.2584

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 16
ccs 1
cts 12
cp 0.0833
rs 9.3333
c 0
b 0
f 0
cc 5
nop 1
crap 24.2584
1 1
import sys, os
2 1
from rsudp import printM, printW, printE
3 1
from rsudp.raspberryshake import ConsumerThread
4 1
from rsudp.test import TEST
5
6
7 1
class Custom(ConsumerThread):
8
	"""
9
	.. versionadded:: 0.4.3
10
11
	.. |lineendings_howto| raw:: html
12
13
		<a href="https://stackoverflow.com/questions/17579553/windows-command-to-convert-unix-line-endings" target="_blank">this stackoverflow question</a>
14
15
	.. |lineendings_wiki| raw:: html
16
17
		<a href="https://en.wikipedia.org/wiki/Newline" target="_blank">here</a>
18
19
	.. role:: json(code)
20
		:language: json
21
22
23
	A consumer class that runs custom code from a python file passed to it.
24
	Please read the disclaimers and warnings at :ref:`customcode` prior to using this module.
25
26
	.. warning::
27
28
		If you are running Windows and have code you want to pass to the :py:func:`exec` function,
29
		Python requires that your newline characters are in the UNIX style (:code:`\\n`),
30
		not the standard Windows style (:code:`\\r\\n`).
31
		To convert, follow the instructions in one of the answers to |lineendings_howto|.
32
		If you're not sure what this means, please read about newline/line ending characters |lineendings_wiki|.
33
		If you are certain that your code file has no Windows newlines, you can set :json:`"win_override"` to true.
34
35
		Read more warnings about this module at :ref:`customcode`.
36
37
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`.
38
	:param codefile: string of the python (.py) file to run, or False if none.
39
	:type codefile: str or bool
40
	:param bool win_ovr: user check to make sure that line ending format is correct (see warning above)
41
42
	"""
43
44 1
	def __init__(self, q=False, codefile=False, win_ovr=False, testing=False):
45
		"""
46
		Initializes the custom code execution thread.
47
		"""
48
		super().__init__()
49
		self.sender = 'Custom'
50
		self.alive = True
51
		self.testing = testing
52
		self.codefile = False
53
		self.win_ovr = win_ovr
54
		if codefile:
55
			if (os.path.exists(os.path.expanduser(codefile))) and (os.path.splitext(codefile)[1]):
56
				self.codefile = os.path.expanduser(codefile)
57
				printM('Custom code file to run: %s' % self.codefile, sender=self.sender)
58
			else:
59
				printW('No python file exists at %s. No custom code will be run during alarms.' % codefile, sender=self.sender)
60
		else:
61
			printW('No custom code file set. No custom code will be run during alarms.', sender=self.sender)
62
63
		if (os.name in 'nt') and (not self.win_ovr):
64
			printE('Using Windows with custom alert code! Your code MUST have UNIX/Mac newline characters!')
65
			printE('Please use a conversion tool like dos2unix to convert line endings', spaces=True)
66
			printE('(https://en.wikipedia.org/wiki/Unix2dos) to make your code file', spaces=True)
67
			printE('readable to the Python interpreter.', spaces=True)
68
			printE('Once you have done that, please set "win_override" to true', spaces=True)
69
			printE('in the settings file.', spaces=True)
70
			printE('(see also footnote [1] on this page: https://docs.python.org/3/library/functions.html#id2)', spaces=True)
71
			printE('THREAD EXITING, please correct and restart!', self.sender, spaces=True)
72
			sys.exit(2)
73
		else:
74
			pass
75
76
77
		if q:
78
			self.queue = q
79
		else:
80
			printE('no queue passed to the consumer thread! We will exit now!',
81
				   self.sender)
82
			sys.stdout.flush()
83
			self.alive = False
84
			sys.exit()
85
86
		printM('Starting.', self.sender)
87
88 1
	def exec_code(self):
89
		if self.codefile:
90
			# if the user has set a code file
91
			printM('Executing code from file: %s' % self.codefile, sender=self.sender)
92
			try:
93
				# try to execute some code
94
				with open(self.codefile, 'r') as file:
95
					file_content = file.read()
96
				exec(file_content)
97
				if self.testing:
98
					TEST['c_custom'][1] = True
99
			except Exception as e:
100
				# do something if it fails
101
				printE('Code execution failed. Error: %s' % e, sender=self.sender, announce=False)
102
		else:
103
			printW('No code to run, codefile variable not set correctly.', sender=self.sender)
104
105
106 1
	def run(self):
107
		"""
108
		Reads data from the queue and executes self.codefile if it sees an ``ALARM`` message.
109
		Quits if it sees a ``TERM`` message.
110
		"""
111
		while True:
112
			d = self.queue.get()
113
			self.queue.task_done()
114
			if 'TERM' in str(d):
115
				self.alive = False
116
				printM('Exiting.', self.sender)
117
				sys.exit()
118
			elif 'ALARM' in str(d):
119
				printM('Got ALARM message...', sender=self.sender)
120
				self.exec_code()
121
122
		self.alive = False
123