Passed
Push — master ( 0ed23a...45db06 )
by Ian
04:37 queued 12s
created

build.rsudp.c_testing.Testing._img_test()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 5
rs 10
c 0
b 0
f 0
cc 2
nop 1
1
import sys, os
2
from rsudp.raspberryshake import ConsumerThread
3
import rsudp.raspberryshake as rs
4
from rsudp import printM, printW, helpers
5
from rsudp import ms_path
6
import rsudp.test as t
7
8
IMGPATH = False
9
10
class Testing(rs.ConsumerThread):
11
	'''
12
	.. versionadded:: 0.4.3
13
14
	This is the test consumer thread.
15
	It operates just like a normal consumer,
16
	but its only function is to run tests for data processing
17
	and message passing.
18
19
	For a diagram of ``TestData``'s position in the data hierarchy, see
20
	:ref:`testing_flow`.
21
22
	Currently it has the power to run 7 tests from
23
	:py:mod:`rsudp.test`:
24
25
	.. code-block:: python
26
27
		TEST['n_inventory']
28
		TEST['c_data']
29
		TEST['c_processing']
30
		TEST['c_TERM']
31
		TEST['c_ALARM']
32
		TEST['c_RESET']
33
		TEST['c_IMGPATH']
34
35
	These tests represent inventory fetch, data packet reception,
36
	stream processing, and the reception of the four current message types:
37
	``TERM``, ``ALARM``, ``RESET``, and ``IMGPATH``.
38
39
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
40
	'''
41
	def __init__(self, q):
42
		"""
43
		Initializes the custom code execution thread.
44
		"""
45
		super().__init__()
46
		self.sender = 'Testing'
47
		self.alive = True
48
		self.queue = q
49
50
		self.stream = rs.Stream()
51
		self.cha = rs.chns
52
53
		printW('Starting test consumer.', sender=self.sender, announce=False)
54
55
	def _getq(self):
56
		'''
57
		Reads data from the queue and returns the queue object.
58
59
		:rtype: bytes
60
		:return: The queue object.
61
		'''
62
		d = self.queue.get(True, timeout=None)
63
		self.queue.task_done()
64
		return d
65
66
	def _getd(self):
67
		'''
68
		Reads data from the queue and updates the stream.
69
		'''
70
		d = self._getq()
71
72
		if rs.getCHN(d) in self.cha:
73
			self.stream = rs.update_stream(stream=self.stream, d=d, fill_value='latest')
74
		else:
75
			self._messagetests(d)
76
77
	def _datatests(self, d):
78
		'''
79
		Run tests on a data packet to see if it can be processed into a stream object.
80
		If so, mark the data and processing tests passed.
81
82
		:param bytes d: a data packet from the queue
83
84
		'''
85
86
		if rs.getCHN(d) in self.cha:
87
			t.TEST['c_data'][1] = True
88
			self.stream = rs.update_stream(stream=self.stream, d=d, fill_value='latest')
89
			t.TEST['c_processing'][1] = True
90
91
	def _messagetests(self, d):
92
		'''
93
		Run tests on a message to see if a specific one has been passed.
94
		If so, mark the test passed.
95
96
		:param bytes d: a data packet from the queue
97
98
		'''
99
		global IMGPATH
100
		if 'TERM' in str(d):
101
			printM('Got TERM message...', sender=self.sender)
102
			t.TEST['c_TERM'][1] = True
103
			self.alive = False
104
	
105
		elif 'ALARM' in str(d):
106
			printM('Got ALARM message with time %s' % (
107
				   helpers.fsec(helpers.get_msg_time(d))
108
				   ), sender=self.sender)
109
			t.TEST['c_ALARM'][1] = True
110
111
		elif 'RESET' in str(d):
112
			printM('Got RESET message with time %s' % (
113
				   helpers.fsec(helpers.get_msg_time(d))
114
				   ), sender=self.sender)
115
			t.TEST['c_RESET'][1] = True
116
117
		elif 'IMGPATH' in str(d):
118
			printM('Got IMGPATH message with time %s' % (
119
				   helpers.fsec(helpers.get_msg_time(d))
120
				   ), sender=self.sender)
121
			IMGPATH = helpers.get_msg_path(d)
122
			printM('and path %s' % (IMGPATH), sender=self.sender)
123
			t.TEST['c_IMGPATH'][1] = True
124
125
	def _img_test(self):
126
		if t.TEST['c_img']:
127
			t.TEST['c_img'][1] = os.path.exists(IMGPATH)
128
			dn, fn = os.path.dirname(IMGPATH), os.path.basename(IMGPATH)
129
			os.replace(IMGPATH, os.path.join(dn, 'test.' + fn))
130
131
132
	def run(self):
133
		'''
134
		Start the testing thread and run until ``self.alive == False``.
135
136
		'''
137
		if rs.inv:
138
			t.TEST['n_inventory'][1] = True
139
		self._datatests(self._getq())
140
141
142
		while self.alive:
143
			self._getd()
144
145
		self._img_test()
146
147
		printW('Exiting.', sender=self.sender, announce=False)
148
		sys.exit()
149