1
|
1 |
|
import os, sys |
2
|
1 |
|
from threading import Thread |
3
|
1 |
|
import socket as s |
4
|
1 |
|
from rsudp import printM, printW, helpers |
5
|
1 |
|
import rsudp.raspberryshake as rs |
6
|
1 |
|
from rsudp.test import TEST |
7
|
1 |
|
import time |
8
|
1 |
|
from queue import Empty |
9
|
|
|
|
10
|
|
|
|
11
|
1 |
|
class TestData(Thread): |
12
|
|
|
''' |
13
|
|
|
.. versionadded:: 0.4.3 |
14
|
|
|
|
15
|
|
|
A simple module that reads lines formatted as Raspberry Shake UDP packets |
16
|
|
|
from a file on disk, and sends them to the specified localhost port. |
17
|
|
|
Designed to quit on seeing a ``TERM`` string as the last line of the file |
18
|
|
|
or when an ``ENDTEST`` packet arrives on this thread's queue. |
19
|
|
|
|
20
|
|
|
For a diagram of ``TestData``'s position in the data hierarchy, see |
21
|
|
|
:ref:`testing_flow`. |
22
|
|
|
|
23
|
|
|
:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer` |
24
|
|
|
:param str data_file: data file to read from disk |
25
|
|
|
:param port: network port to pass UDP data to (at ``localhost`` address) |
26
|
|
|
:type port: str or int |
27
|
|
|
''' |
28
|
1 |
|
def __init__(self, q, data_file, port): |
29
|
|
|
""" |
30
|
|
|
Initializes the data supplier thread. |
31
|
|
|
""" |
32
|
1 |
|
super().__init__() |
33
|
1 |
|
self.sender = 'TestData' |
34
|
1 |
|
self.data_file = data_file |
35
|
1 |
|
self.port = port |
36
|
1 |
|
self.addr = 'localhost' |
37
|
1 |
|
self.speed = 0 |
38
|
1 |
|
self.pos = 0 |
39
|
1 |
|
self.queue = q |
40
|
1 |
|
self.sock = False |
41
|
1 |
|
self.alive = True |
42
|
|
|
|
43
|
1 |
|
printW('Sending test data from %s' |
44
|
|
|
% self.data_file, sender=self.sender, announce=False) |
45
|
|
|
|
46
|
1 |
|
def send(self): |
47
|
|
|
''' |
48
|
|
|
Send the latest line in the open file to the specified port at localhost. |
49
|
|
|
If the next line's timestamp is the same, |
50
|
|
|
that line will also be sent immediately. |
51
|
|
|
If the next line does not contain the same timestamp, |
52
|
|
|
the program will seek back to the last line read |
53
|
|
|
and then break for a new loop. |
54
|
|
|
|
55
|
|
|
If the line contains ``TERM``, the program will set ``self.alive = False`` |
56
|
|
|
and prepare to exit. |
57
|
|
|
''' |
58
|
1 |
|
l = self.f.readline() |
59
|
1 |
|
if ('TERM' in l.decode('utf-8')) or (l.decode('utf-8') == ''): |
60
|
1 |
|
printM('End of file.', self.sender) |
61
|
1 |
|
self.alive = False |
62
|
|
|
else: |
63
|
1 |
|
ts = rs.getTIME(l) |
64
|
1 |
|
self.sock.sendto(l, (self.addr, self.port)) |
65
|
|
|
|
66
|
1 |
|
while True: |
67
|
1 |
|
self.pos = self.f.tell() |
68
|
1 |
|
l = self.f.readline() |
69
|
1 |
|
if 'TERM' in l.decode('utf-8'): |
70
|
1 |
|
break |
71
|
1 |
|
if rs.getTIME(l) == ts: |
72
|
1 |
|
self.sock.sendto(l, (self.addr, self.port)) |
73
|
|
|
else: |
74
|
1 |
|
self.f.seek(self.pos) |
75
|
1 |
|
break |
76
|
|
|
|
77
|
1 |
|
def _getq(self): |
78
|
|
|
''' |
79
|
|
|
Gets a data packet from the queue and returns it. |
80
|
|
|
If no packet is immediately available, an ``Empty`` exception |
81
|
|
|
will be raised. |
82
|
|
|
|
83
|
|
|
:return: a bytes-encoded queue message |
84
|
|
|
:rtype: bytes |
85
|
|
|
''' |
86
|
1 |
|
q = self.queue.get_nowait() |
87
|
|
|
self.queue.task_done() |
88
|
|
|
return q |
89
|
|
|
|
90
|
1 |
|
def run(self): |
91
|
|
|
''' |
92
|
|
|
Start the thread. First, opens a file, determines the speed of data flow, |
93
|
|
|
then opens a socket and begins sending data at that transmission rate. |
94
|
|
|
|
95
|
|
|
Continues sending data until an ``ENDTEST`` packet arrives on the queue, |
96
|
|
|
or until the reader reaches the end of the file. |
97
|
|
|
Then, sends a ``TERM`` message to the localhost port and exits. |
98
|
|
|
''' |
99
|
1 |
|
self.f = open(self.data_file, 'rb') |
100
|
1 |
|
self.f.seek(0) |
101
|
1 |
|
l = self.f.readline() |
102
|
1 |
|
l2 = self.f.readline() |
103
|
1 |
|
while (rs.getTIME(l2) == rs.getTIME(l)): |
104
|
1 |
|
l2 = self.f.readline() |
105
|
|
|
|
106
|
1 |
|
self.f.seek(0) |
107
|
|
|
|
108
|
1 |
|
self.speed = rs.getTIME(l2) - rs.getTIME(l) |
109
|
|
|
|
110
|
1 |
|
printW('Opening test socket...', sender=self.sender, announce=False) |
111
|
1 |
|
socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR |
112
|
1 |
|
self.sock = s.socket(s.AF_INET, socket_type) |
113
|
|
|
|
114
|
1 |
|
printW('Sending data to %s:%s every %s seconds' |
115
|
|
|
% (self.addr, self.port, self.speed), |
116
|
|
|
sender=self.sender, announce=False) |
117
|
|
|
|
118
|
1 |
|
while self.alive: |
119
|
1 |
|
try: |
120
|
1 |
|
q = self._getq() |
121
|
|
|
if q.decode('utf-8') in 'ENDTEST': |
122
|
|
|
self.alive = False |
123
|
|
|
break |
124
|
1 |
|
except Empty: |
125
|
1 |
|
self.send() |
126
|
1 |
|
time.sleep(self.speed) |
127
|
1 |
|
TEST['x_send'][1] = True |
128
|
|
|
|
129
|
1 |
|
self.f.close() |
130
|
1 |
|
self.sock.sendto(helpers.msg_term(), (self.addr, self.port)) |
131
|
1 |
|
printW('Exiting.', self.sender, announce=False) |
132
|
|
|
sys.exit() |
133
|
|
|
|