build.rsudp.c_printraw   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 58
Duplicated Lines 0 %

Test Coverage

Coverage 81.25%

Importance

Changes 0
Metric Value
wmc 7
eloc 33
dl 0
loc 58
ccs 26
cts 32
cp 0.8125
rs 10
c 0
b 0
f 0

2 Methods

Rating   Name   Duplication   Size   Complexity  
A PrintRaw.run() 0 21 5
A PrintRaw.__init__() 0 19 2
1 1
import sys
2 1
from rsudp.raspberryshake import ConsumerThread
3 1
from rsudp import printM, printW, printE
4 1
from rsudp.test import TEST
5
6
7 1
class PrintRaw(ConsumerThread):
8
	"""
9
	A sub-consumer class that simply prints incoming data to the terminal.
10
	This is enabled by setting the "printdata" > "enabled" parameter to `true`
11
	in the settings file. This is more of a debug feature than anything else,
12
	meant to be a way to check that data is flowing into the port as expected.
13
14
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
15
	"""
16
17 1
	def __init__(self, q=False, testing=False):
18
		"""
19
		Initializing the data printing process.
20
21
		"""
22 1
		super().__init__()
23 1
		self.sender = 'Print'
24 1
		self.alive = True
25 1
		self.testing = testing
26
27 1
		if q:
28 1
			self.queue = q
29
		else:
30
			printE('no queue passed to consumer! Thread will exit now!', self.sender)
31
			sys.stdout.flush()
32
			self.alive = False
33
			sys.exit()
34
35 1
		printM('Starting.', self.sender)
36
37 1
	def run(self):
38
		"""
39
		Reads data from the queue and print to stdout.
40
		"""
41 1
		while True:
42 1
			d = self.queue.get()
43 1
			self.queue.task_done()
44 1
			if 'TERM' in str(d):
45 1
				self.alive = False
46 1
				printM('Exiting.', self.sender)
47 1
				sys.exit()
48 1
			elif 'ALARM' in str(d):
49 1
				pass
50
			else:
51 1
				if not self.testing:
52
					print(str(d))
53
				else:
54 1
					TEST['c_print'][1] = True
55 1
			sys.stdout.flush()
56
57
		self.alive = False