Passed
Push — master ( e96c3d...44474a )
by Ian
06:20
created

build.rsudp.t_testdata.TestData.run()   B

Complexity

Conditions 6

Size

Total Lines 42
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 28
nop 1
dl 0
loc 42
rs 8.2746
c 0
b 0
f 0
1
import os, sys
2
from threading import Thread, Timer
3
import socket as s
4
from rsudp import printM, printW, helpers
5
import rsudp.raspberryshake as rs
6
import time
7
from queue import Empty
8
9
10
class TestData(Thread):
11
	'''
12
	.. versionadded:: 0.4.3
13
14
	A simple module that reads lines formatted as Raspberry Shake UDP packets
15
	from a file on disk, and sends them to the specified localhost port.
16
	Designed to quit on seeing a ``TERM`` string as the last line of the file
17
	or when an ``ENDTEST`` packet arrives on this thread's queue.
18
19
	For a diagram of ``TestData``'s position in the data hierarchy, see
20
	:ref:`testing_flow`.
21
22
	:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer`
23
	:param str data_file: data file to read from disk
24
	:param port: network port to pass UDP data to (at ``localhost`` address)
25
	:type port: str or int
26
	'''
27
	def __init__(self, q, data_file, port):
28
		"""
29
		Initializes the data supplier thread.
30
		"""
31
		super().__init__()
32
		self.sender = 'TestData'
33
		self.data_file = data_file
34
		self.port = port
35
		self.addr = 'localhost'
36
		self.speed = 0
37
		self.pos = 0
38
		self.queue = q
39
		self.sock = False
40
		self.alive = True
41
42
		printW('Sending test data from %s'
43
			   % self.data_file, sender=self.sender, announce=False)
44
45
	def send(self):
46
		'''
47
		Send the latest line in the open file to the specified port at localhost.
48
		If the next line's timestamp is the same,
49
		that line will also be sent immediately.
50
		If the next line does not contain the same timestamp,
51
		the program will seek back to the last line read
52
		and then break for a new loop.
53
54
		If the line contains ``TERM``, the program will set ``self.alive = False``
55
		and prepare to exit.
56
		'''
57
		l = self.f.readline()
58
		if ('TERM' in l.decode('utf-8')) or (l.decode('utf-8') == ''):
59
			printM('End of file.', self.sender)
60
			self.alive = False
61
		else:
62
			ts = rs.getTIME(l)
63
			self.sock.sendto(l, (self.addr, self.port))
64
65
			while True:
66
				self.pos = self.f.tell()
67
				l = self.f.readline()
68
				if 'TERM' in l.decode('utf-8'):
69
					break
70
				if rs.getTIME(l) == ts:
71
					self.sock.sendto(l, (self.addr, self.port))
72
				else:
73
					self.f.seek(self.pos)
74
					break
75
76
	def _getq(self):
77
		'''
78
		Gets a data packet from the queue and returns it.
79
		If no packet is immediately available, an ``Empty`` exception
80
		will be raised.
81
82
		:return: a bytes-encoded queue message
83
		:rtype: bytes
84
		'''
85
		q = self.queue.get_nowait()
86
		self.queue.task_done()
87
		return q
88
89
	def run(self):
90
		'''
91
		Start the thread. First, opens a file, determines the speed of data flow,
92
		then opens a socket and begins sending data at that transmission rate.
93
94
		Continues sending data until an ``ENDTEST`` packet arrives on the queue,
95
		or until the reader reaches the end of the file.
96
		Then, sends a ``TERM`` message to the localhost port and exits.
97
		'''
98
		self.f = open(self.data_file, 'rb')
99
		self.f.seek(0)
100
		l = self.f.readline()
101
		l2 = self.f.readline()
102
		while (rs.getTIME(l2) == rs.getTIME(l)):
103
			l2 = self.f.readline()
104
105
		self.f.seek(0)
106
107
		self.speed = rs.getTIME(l2) - rs.getTIME(l)
108
109
		printW('Opening test socket...', sender=self.sender, announce=False)
110
		socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR
111
		self.sock = s.socket(s.AF_INET, socket_type)
112
113
		printW('Sending data to %s:%s every %s seconds'
114
			   % (self.addr, self.port, self.speed),
115
			   sender=self.sender, announce=False)
116
117
		while self.alive:
118
			try:
119
				q = self._getq()
120
				if q.decode('utf-8') in 'ENDTEST':
121
					self.alive = False
122
					break
123
			except Empty:
124
				self.send()
125
				time.sleep(self.speed)
126
127
		self.f.close()
128
		self.sock.sendto(helpers.msg_term(), (self.addr, self.port))
129
		printW('Exiting.', self.sender, announce=False)
130
		sys.exit()
131