|
1
|
1 |
|
import os, sys |
|
2
|
1 |
|
from threading import Thread |
|
3
|
1 |
|
import socket as s |
|
4
|
1 |
|
from rsudp import printM, printW, helpers |
|
5
|
1 |
|
import rsudp.raspberryshake as rs |
|
6
|
1 |
|
from rsudp.test import TEST |
|
7
|
1 |
|
import time |
|
8
|
1 |
|
from queue import Empty |
|
9
|
|
|
|
|
10
|
|
|
|
|
11
|
1 |
|
class TestData(Thread): |
|
12
|
|
|
''' |
|
13
|
|
|
.. versionadded:: 0.4.3 |
|
14
|
|
|
|
|
15
|
|
|
A simple module that reads lines formatted as Raspberry Shake UDP packets |
|
16
|
|
|
from a file on disk, and sends them to the specified localhost port. |
|
17
|
|
|
Designed to quit on seeing a ``TERM`` string as the last line of the file |
|
18
|
|
|
or when an ``ENDTEST`` packet arrives on this thread's queue. |
|
19
|
|
|
|
|
20
|
|
|
For a diagram of ``TestData``'s position in the data hierarchy, see |
|
21
|
|
|
:ref:`testing_flow`. |
|
22
|
|
|
|
|
23
|
|
|
:param queue.Queue q: queue of data and messages sent by :class:`rsudp.c_consumer.Consumer` |
|
24
|
|
|
:param str data_file: data file to read from disk |
|
25
|
|
|
:param port: network port to pass UDP data to (at ``localhost`` address) |
|
26
|
|
|
:type port: str or int |
|
27
|
|
|
''' |
|
28
|
1 |
|
def __init__(self, q, data_file, port): |
|
29
|
|
|
""" |
|
30
|
|
|
Initializes the data supplier thread. |
|
31
|
|
|
""" |
|
32
|
1 |
|
super().__init__() |
|
33
|
1 |
|
self.sender = 'TestData' |
|
34
|
1 |
|
self.data_file = data_file |
|
35
|
1 |
|
self.port = port |
|
36
|
1 |
|
self.addr = 'localhost' |
|
37
|
1 |
|
self.speed = 0 |
|
38
|
1 |
|
self.pos = 0 |
|
39
|
1 |
|
self.queue = q |
|
40
|
1 |
|
self.sock = False |
|
41
|
1 |
|
self.alive = True |
|
42
|
|
|
|
|
43
|
1 |
|
printW('Sending test data from %s' |
|
44
|
|
|
% self.data_file, sender=self.sender, announce=False) |
|
45
|
|
|
|
|
46
|
1 |
|
def send(self): |
|
47
|
|
|
''' |
|
48
|
|
|
Send the latest line in the open file to the specified port at localhost. |
|
49
|
|
|
If the next line's timestamp is the same, |
|
50
|
|
|
that line will also be sent immediately. |
|
51
|
|
|
If the next line does not contain the same timestamp, |
|
52
|
|
|
the program will seek back to the last line read |
|
53
|
|
|
and then break for a new loop. |
|
54
|
|
|
|
|
55
|
|
|
If the line contains ``TERM``, the program will set ``self.alive = False`` |
|
56
|
|
|
and prepare to exit. |
|
57
|
|
|
''' |
|
58
|
1 |
|
l = self.f.readline() |
|
59
|
1 |
|
if ('TERM' in l.decode('utf-8')) or (l.decode('utf-8') == ''): |
|
60
|
1 |
|
printM('End of file.', self.sender) |
|
61
|
1 |
|
self.alive = False |
|
62
|
|
|
else: |
|
63
|
1 |
|
ts = rs.getTIME(l) |
|
64
|
1 |
|
self.sock.sendto(l, (self.addr, self.port)) |
|
65
|
|
|
|
|
66
|
1 |
|
while True: |
|
67
|
1 |
|
self.pos = self.f.tell() |
|
68
|
1 |
|
l = self.f.readline() |
|
69
|
1 |
|
if 'TERM' in l.decode('utf-8'): |
|
70
|
1 |
|
break |
|
71
|
1 |
|
if rs.getTIME(l) == ts: |
|
72
|
1 |
|
self.sock.sendto(l, (self.addr, self.port)) |
|
73
|
|
|
else: |
|
74
|
1 |
|
self.f.seek(self.pos) |
|
75
|
1 |
|
break |
|
76
|
|
|
|
|
77
|
1 |
|
def _getq(self): |
|
78
|
|
|
''' |
|
79
|
|
|
Gets a data packet from the queue and returns it. |
|
80
|
|
|
If no packet is immediately available, an ``Empty`` exception |
|
81
|
|
|
will be raised. |
|
82
|
|
|
|
|
83
|
|
|
:return: a bytes-encoded queue message |
|
84
|
|
|
:rtype: bytes |
|
85
|
|
|
''' |
|
86
|
1 |
|
q = self.queue.get_nowait() |
|
87
|
|
|
self.queue.task_done() |
|
88
|
|
|
return q |
|
89
|
|
|
|
|
90
|
1 |
|
def run(self): |
|
91
|
|
|
''' |
|
92
|
|
|
Start the thread. First, opens a file, determines the speed of data flow, |
|
93
|
|
|
then opens a socket and begins sending data at that transmission rate. |
|
94
|
|
|
|
|
95
|
|
|
Continues sending data until an ``ENDTEST`` packet arrives on the queue, |
|
96
|
|
|
or until the reader reaches the end of the file. |
|
97
|
|
|
Then, sends a ``TERM`` message to the localhost port and exits. |
|
98
|
|
|
''' |
|
99
|
1 |
|
self.f = open(self.data_file, 'rb') |
|
100
|
1 |
|
self.f.seek(0) |
|
101
|
1 |
|
l = self.f.readline() |
|
102
|
1 |
|
l2 = self.f.readline() |
|
103
|
1 |
|
while (rs.getTIME(l2) == rs.getTIME(l)): |
|
104
|
1 |
|
l2 = self.f.readline() |
|
105
|
|
|
|
|
106
|
1 |
|
self.f.seek(0) |
|
107
|
|
|
|
|
108
|
1 |
|
self.speed = rs.getTIME(l2) - rs.getTIME(l) |
|
109
|
|
|
|
|
110
|
1 |
|
printW('Opening test socket...', sender=self.sender, announce=False) |
|
111
|
1 |
|
socket_type = s.SOCK_DGRAM if os.name in 'nt' else s.SOCK_DGRAM | s.SO_REUSEADDR |
|
112
|
1 |
|
self.sock = s.socket(s.AF_INET, socket_type) |
|
113
|
|
|
|
|
114
|
1 |
|
printW('Sending data to %s:%s every %s seconds' |
|
115
|
|
|
% (self.addr, self.port, self.speed), |
|
116
|
|
|
sender=self.sender, announce=False) |
|
117
|
|
|
|
|
118
|
1 |
|
while self.alive: |
|
119
|
1 |
|
try: |
|
120
|
1 |
|
q = self._getq() |
|
121
|
|
|
if q.decode('utf-8') in 'ENDTEST': |
|
122
|
|
|
self.alive = False |
|
123
|
|
|
break |
|
124
|
1 |
|
except Empty: |
|
125
|
1 |
|
self.send() |
|
126
|
1 |
|
time.sleep(self.speed) |
|
127
|
1 |
|
TEST['x_send'][1] = True |
|
128
|
|
|
|
|
129
|
1 |
|
self.f.close() |
|
130
|
1 |
|
self.sock.sendto(helpers.msg_term(), (self.addr, self.port)) |
|
131
|
1 |
|
printW('Exiting.', self.sender, announce=False) |
|
132
|
|
|
sys.exit() |
|
133
|
|
|
|