1
|
|
|
import os, sys |
2
|
|
|
from threading import Thread, Timer |
3
|
|
|
import socket as s |
4
|
|
|
from rsudp import printM, printW, helpers |
5
|
|
|
import rsudp.raspberryshake as rs |
6
|
|
|
import time |
7
|
|
|
from queue import Empty |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
class TestData(Thread): |
11
|
|
|
''' |
12
|
|
|
.. versionadded:: 0.4.3 |
13
|
|
|
|
14
|
|
|
A simple module that reads lines formatted as Raspberry Shake UDP packets |
15
|
|
|
from a file on disk, and sends them to the specified localhost port. |
16
|
|
|
Designed to quit on seeing a ``TERM`` string as the last line of the file |
17
|
|
|
or when an ``ENDTEST`` packet arrives on this thread's queue. |
18
|
|
|
|
19
|
|
|
For a diagram of ``TestData``'s position in the data hierarchy, see |
20
|
|
|
:ref:`testing_flow`. |
21
|
|
|
|
22
|
|
|
:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer` |
23
|
|
|
:param str data_file: data file to read from disk |
24
|
|
|
:param port: network port to pass UDP data to (at ``localhost`` address) |
25
|
|
|
:type port: str or int |
26
|
|
|
''' |
27
|
|
|
def __init__(self, q, data_file, port): |
28
|
|
|
""" |
29
|
|
|
Initializes the data supplier thread. |
30
|
|
|
""" |
31
|
|
|
super().__init__() |
32
|
|
|
self.sender = 'TestData' |
33
|
|
|
self.data_file = data_file |
34
|
|
|
self.port = port |
35
|
|
|
self.addr = 'localhost' |
36
|
|
|
self.speed = 0 |
37
|
|
|
self.pos = 0 |
38
|
|
|
self.queue = q |
39
|
|
|
self.sock = False |
40
|
|
|
self.alive = True |
41
|
|
|
|
42
|
|
|
printW('Sending test data from %s' |
43
|
|
|
% self.data_file, sender=self.sender, announce=False) |
44
|
|
|
|
45
|
|
|
def send(self): |
46
|
|
|
''' |
47
|
|
|
Send the latest line in the open file to the specified port at localhost. |
48
|
|
|
If the next line's timestamp is the same, |
49
|
|
|
that line will also be sent immediately. |
50
|
|
|
If the next line does not contain the same timestamp, |
51
|
|
|
the program will seek back to the last line read |
52
|
|
|
and then break for a new loop. |
53
|
|
|
|
54
|
|
|
If the line contains ``TERM``, the program will set ``self.alive = False`` |
55
|
|
|
and prepare to exit. |
56
|
|
|
''' |
57
|
|
|
l = self.f.readline() |
58
|
|
|
if ('TERM' in l.decode('utf-8')) or (l.decode('utf-8') == ''): |
59
|
|
|
printM('End of file.', self.sender) |
60
|
|
|
self.alive = False |
61
|
|
|
else: |
62
|
|
|
ts = rs.getTIME(l) |
63
|
|
|
self.sock.sendto(l, (self.addr, self.port)) |
64
|
|
|
|
65
|
|
|
while True: |
66
|
|
|
self.pos = self.f.tell() |
67
|
|
|
l = self.f.readline() |
68
|
|
|
if 'TERM' in l.decode('utf-8'): |
69
|
|
|
break |
70
|
|
|
if rs.getTIME(l) == ts: |
71
|
|
|
self.sock.sendto(l, (self.addr, self.port)) |
72
|
|
|
else: |
73
|
|
|
self.f.seek(self.pos) |
74
|
|
|
break |
75
|
|
|
|
76
|
|
|
def _getq(self): |
77
|
|
|
''' |
78
|
|
|
Gets a data packet from the queue and returns it. |
79
|
|
|
If no packet is immediately available, an ``Empty`` exception |
80
|
|
|
will be raised. |
81
|
|
|
|
82
|
|
|
:return: a bytes-encoded queue message |
83
|
|
|
:rtype: bytes |
84
|
|
|
''' |
85
|
|
|
q = self.queue.get_nowait() |
86
|
|
|
self.queue.task_done() |
87
|
|
|
return q |
88
|
|
|
|
89
|
|
|
def run(self): |
90
|
|
|
''' |
91
|
|
|
Start the thread. First, opens a file, determines the speed of data flow, |
92
|
|
|
then opens a socket and begins sending data at that transmission rate. |
93
|
|
|
|
94
|
|
|
Continues sending data until an ``ENDTEST`` packet arrives on the queue, |
95
|
|
|
or until the reader reaches the end of the file. |
96
|
|
|
Then, sends a ``TERM`` message to the localhost port and exits. |
97
|
|
|
''' |
98
|
|
|
self.f = open(self.data_file, 'rb') |
99
|
|
|
self.f.seek(0) |
100
|
|
|
l = self.f.readline() |
101
|
|
|
l2 = self.f.readline() |
102
|
|
|
while (rs.getTIME(l2) == rs.getTIME(l)): |
103
|
|
|
l2 = self.f.readline() |
104
|
|
|
|
105
|
|
|
self.f.seek(0) |
106
|
|
|
|
107
|
|
|
self.speed = rs.getTIME(l2) - rs.getTIME(l) |
108
|
|
|
|
109
|
|
|
printW('Opening test socket...', sender=self.sender, announce=False) |
110
|
|
|
socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR |
111
|
|
|
self.sock = s.socket(s.AF_INET, socket_type) |
112
|
|
|
|
113
|
|
|
printW('Sending data to %s:%s every %s seconds' |
114
|
|
|
% (self.addr, self.port, self.speed), |
115
|
|
|
sender=self.sender, announce=False) |
116
|
|
|
|
117
|
|
|
while self.alive: |
118
|
|
|
try: |
119
|
|
|
q = self._getq() |
120
|
|
|
if q.decode('utf-8') in 'ENDTEST': |
121
|
|
|
self.alive = False |
122
|
|
|
break |
123
|
|
|
except Empty: |
124
|
|
|
self.send() |
125
|
|
|
time.sleep(self.speed) |
126
|
|
|
|
127
|
|
|
self.f.close() |
128
|
|
|
self.sock.sendto(helpers.msg_term(), (self.addr, self.port)) |
129
|
|
|
printW('Exiting.', self.sender, announce=False) |
130
|
|
|
sys.exit() |
131
|
|
|
|