build.rsudp.c_testing.Testing._getd()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 10
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 2
nop 1
crap 2
1 1
import sys, os
2 1
import rsudp.raspberryshake as rs
3 1
from rsudp import printM, printW, helpers
4 1
from rsudp import ms_path
5 1
import rsudp.test as t
6
7 1
IMGPATH = False
8
9 1
class Testing(rs.ConsumerThread):
10
	'''
11
	.. versionadded:: 0.4.3
12
13
	This is the test consumer thread.
14
	It operates just like a normal consumer,
15
	but its only function is to run tests for data processing
16
	and message passing.
17
18
	For a diagram of ``TestData``'s position in the data hierarchy, see
19
	:ref:`testing_flow`.
20
21
	Currently it has the power to run seven tests from
22
	:py:mod:`rsudp.test`:
23
24
	.. code-block:: python
25
26
		TEST['n_inventory']
27
		TEST['x_processing']
28
		TEST['x_TERM']
29
		TEST['x_ALARM']
30
		TEST['x_RESET']
31
		TEST['x_IMGPATH']
32
		TEST['c_img']
33
34
	These tests represent inventory fetch, data packet reception,
35
	stream processing, and the reception of the four current message types:
36
	``TERM``, ``ALARM``, ``RESET``, and ``IMGPATH``.
37
38
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
39
	'''
40 1
	def __init__(self, q):
41
		"""
42
		Initializes the custom code execution thread.
43
		"""
44 1
		super().__init__()
45 1
		self.sender = 'Testing'
46 1
		self.alive = True
47 1
		self.queue = q
48
49 1
		self.stream = rs.Stream()
50 1
		self.cha = rs.chns
51
52 1
		printW('Starting test consumer.', sender=self.sender, announce=False)
53
54 1
	def _getq(self):
55
		'''
56
		Reads data from the queue and returns the queue object.
57
58
		:rtype: bytes
59
		:return: The queue object.
60
		'''
61 1
		d = self.queue.get(True, timeout=None)
62 1
		self.queue.task_done()
63 1
		return d
64
65 1
	def _getd(self):
66
		'''
67
		Reads data from the queue and updates the stream.
68
		'''
69 1
		d = self._getq()
70
71 1
		if rs.getCHN(d) in self.cha:
72 1
			self.stream = rs.update_stream(stream=self.stream, d=d, fill_value='latest')
73
		else:
74 1
			self._messagetests(d)
75
76 1
	def _datatests(self, d):
77
		'''
78
		Run tests on a data packet to see if it can be processed into a stream object.
79
		If so, mark the data and processing tests passed.
80
81
		:param bytes d: a data packet from the queue
82
83
		'''
84
85 1
		if rs.getCHN(d) in self.cha:
86 1
			self.stream = rs.update_stream(stream=self.stream, d=d, fill_value='latest')
87 1
			t.TEST['x_processing'][1] = True
88
89 1
	def _messagetests(self, d):
90
		'''
91
		Run tests on a message to see if a specific one has been passed.
92
		If so, mark the test passed.
93
94
		:param bytes d: a data packet from the queue
95
96
		'''
97
		global IMGPATH
98 1
		if 'TERM' in str(d):
99 1
			printM('Got TERM message...', sender=self.sender)
100 1
			t.TEST['x_TERM'][1] = True
101 1
			self.alive = False
102
	
103 1
		elif 'ALARM' in str(d):
104 1
			printM('Got ALARM message with time %s' % (
105
				   helpers.fsec(helpers.get_msg_time(d))
106
				   ), sender=self.sender)
107 1
			t.TEST['x_ALARM'][1] = True
108
109 1
		elif 'RESET' in str(d):
110 1
			printM('Got RESET message with time %s' % (
111
				   helpers.fsec(helpers.get_msg_time(d))
112
				   ), sender=self.sender)
113 1
			t.TEST['x_RESET'][1] = True
114
115 1
		elif 'IMGPATH' in str(d):
116 1
			printM('Got IMGPATH message with time %s' % (
117
				   helpers.fsec(helpers.get_msg_time(d))
118
				   ), sender=self.sender)
119 1
			IMGPATH = helpers.get_msg_path(d)
120 1
			printM('and path %s' % (IMGPATH), sender=self.sender)
121 1
			t.TEST['x_IMGPATH'][1] = True
122
123 1
	def _img_test(self):
124 1
		if (t.TEST['c_img'] and IMGPATH):
125 1
			t.TEST['c_img'][1] = os.path.exists(IMGPATH)
126 1
			dn, fn = os.path.dirname(IMGPATH), os.path.basename(IMGPATH)
127 1
			os.replace(IMGPATH, os.path.join(dn, 'test.' + fn))
128
129
130 1
	def run(self):
131
		'''
132
		Start the testing thread and run until ``self.alive == False``.
133
134
		'''
135 1
		if rs.inv:
136 1
			t.TEST['n_inventory'][1] = True
137 1
		self._datatests(self._getq())
138
139
140 1
		while self.alive:
141 1
			self._getd()
142
143 1
		self._img_test()
144
145 1
		printW('Exiting.', sender=self.sender, announce=False)
146
		sys.exit()
147