build.rsudp.t_testdata   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 133
Duplicated Lines 0 %

Test Coverage

Coverage 92.65%

Importance

Changes 0
Metric Value
wmc 14
eloc 72
dl 0
loc 133
ccs 63
cts 68
cp 0.9265
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
B TestData.run() 0 43 6
B TestData.send() 0 30 6
A TestData.__init__() 0 17 1
A TestData._getq() 0 12 1
1 1
import os, sys
2 1
from threading import Thread
3 1
import socket as s
4 1
from rsudp import printM, printW, helpers
5 1
import rsudp.raspberryshake as rs
6 1
from rsudp.test import TEST
7 1
import time
8 1
from queue import Empty
9
10
11 1
class TestData(Thread):
12
	'''
13
	.. versionadded:: 0.4.3
14
15
	A simple module that reads lines formatted as Raspberry Shake UDP packets
16
	from a file on disk, and sends them to the specified localhost port.
17
	Designed to quit on seeing a ``TERM`` string as the last line of the file
18
	or when an ``ENDTEST`` packet arrives on this thread's queue.
19
20
	For a diagram of ``TestData``'s position in the data hierarchy, see
21
	:ref:`testing_flow`.
22
23
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
24
	:param str data_file: data file to read from disk
25
	:param port: network port to pass UDP data to (at ``localhost`` address)
26
	:type port: str or int
27
	'''
28 1
	def __init__(self, q, data_file, port):
29
		"""
30
		Initializes the data supplier thread.
31
		"""
32 1
		super().__init__()
33 1
		self.sender = 'TestData'
34 1
		self.data_file = data_file
35 1
		self.port = port
36 1
		self.addr = 'localhost'
37 1
		self.speed = 0
38 1
		self.pos = 0
39 1
		self.queue = q
40 1
		self.sock = False
41 1
		self.alive = True
42
43 1
		printW('Sending test data from %s'
44
			   % self.data_file, sender=self.sender, announce=False)
45
46 1
	def send(self):
47
		'''
48
		Send the latest line in the open file to the specified port at localhost.
49
		If the next line's timestamp is the same,
50
		that line will also be sent immediately.
51
		If the next line does not contain the same timestamp,
52
		the program will seek back to the last line read
53
		and then break for a new loop.
54
55
		If the line contains ``TERM``, the program will set ``self.alive = False``
56
		and prepare to exit.
57
		'''
58 1
		l = self.f.readline()
59 1
		if ('TERM' in l.decode('utf-8')) or (l.decode('utf-8') == ''):
60 1
			printM('End of file.', self.sender)
61 1
			self.alive = False
62
		else:
63 1
			ts = rs.getTIME(l)
64 1
			self.sock.sendto(l, (self.addr, self.port))
65
66 1
			while True:
67 1
				self.pos = self.f.tell()
68 1
				l = self.f.readline()
69 1
				if 'TERM' in l.decode('utf-8'):
70 1
					break
71 1
				if rs.getTIME(l) == ts:
72 1
					self.sock.sendto(l, (self.addr, self.port))
73
				else:
74 1
					self.f.seek(self.pos)
75 1
					break
76
77 1
	def _getq(self):
78
		'''
79
		Gets a data packet from the queue and returns it.
80
		If no packet is immediately available, an ``Empty`` exception
81
		will be raised.
82
83
		:return: a bytes-encoded queue message
84
		:rtype: bytes
85
		'''
86 1
		q = self.queue.get_nowait()
87
		self.queue.task_done()
88
		return q
89
90 1
	def run(self):
91
		'''
92
		Start the thread. First, opens a file, determines the speed of data flow,
93
		then opens a socket and begins sending data at that transmission rate.
94
95
		Continues sending data until an ``ENDTEST`` packet arrives on the queue,
96
		or until the reader reaches the end of the file.
97
		Then, sends a ``TERM`` message to the localhost port and exits.
98
		'''
99 1
		self.f = open(self.data_file, 'rb')
100 1
		self.f.seek(0)
101 1
		l = self.f.readline()
102 1
		l2 = self.f.readline()
103 1
		while (rs.getTIME(l2) == rs.getTIME(l)):
104 1
			l2 = self.f.readline()
105
106 1
		self.f.seek(0)
107
108 1
		self.speed = rs.getTIME(l2) - rs.getTIME(l)
109
110 1
		printW('Opening test socket...', sender=self.sender, announce=False)
111 1
		socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
112 1
		self.sock = s.socket(s.AF_INET, socket_type)
113
114 1
		printW('Sending data to %s:%s every %s seconds'
115
			   % (self.addr, self.port, self.speed),
116
			   sender=self.sender, announce=False)
117
118 1
		while self.alive:
119 1
			try:
120 1
				q = self._getq()
121
				if q.decode('utf-8') in 'ENDTEST':
122
					self.alive = False
123
					break
124 1
			except Empty:
125 1
				self.send()
126 1
				time.sleep(self.speed)
127 1
				TEST['x_send'][1] = True
128
129 1
		self.f.close()
130 1
		self.sock.sendto(helpers.msg_term(), (self.addr, self.port))
131 1
		printW('Exiting.', self.sender, announce=False)
132
		sys.exit()
133