1
|
1 |
|
import sys |
2
|
1 |
|
from threading import Thread |
3
|
1 |
|
from rsudp import printM, printW, printE, helpers |
4
|
1 |
|
import rsudp.raspberryshake as RS |
5
|
1 |
|
from rsudp.test import TEST |
6
|
|
|
|
7
|
|
|
|
8
|
1 |
|
class Producer(Thread): |
9
|
|
|
''' |
10
|
|
|
Data Producer thread (see :ref:`producer-consumer`) which receives data from the port |
11
|
|
|
and puts it on the queue to be passed to the master consumer (:py:class:`rsudp.c_consumer.Consumer`). |
12
|
|
|
The producer also looks for flags in each consumer |
13
|
|
|
that indicate whether they are ``alive==False``. If so, the Producer will |
14
|
|
|
quit gracefully and put a TERM message on the queue, which should stop all running |
15
|
|
|
consumers. |
16
|
|
|
|
17
|
|
|
:param queue.Queue queue: The master queue, used to pass data to :py:class:`rsudp.c_consumer.Consumer` |
18
|
|
|
:param list threads: The list of :py:class:`threading.Thread` s to monitor for status changes |
19
|
|
|
''' |
20
|
|
|
|
21
|
1 |
|
def __init__(self, queue, threads, testing=False): |
22
|
|
|
""" |
23
|
|
|
Initializing Producer thread. |
24
|
|
|
|
25
|
|
|
""" |
26
|
1 |
|
super().__init__() |
27
|
|
|
|
28
|
1 |
|
self.sender = 'Producer' |
29
|
1 |
|
self.queue = queue |
30
|
1 |
|
self.threads = threads |
31
|
1 |
|
self.stop = False |
32
|
1 |
|
self.testing = testing |
33
|
|
|
|
34
|
1 |
|
self.firstaddr = '' |
35
|
1 |
|
self.blocked = [] |
36
|
|
|
|
37
|
1 |
|
printM('Starting.', self.sender) |
38
|
|
|
|
39
|
|
|
|
40
|
1 |
|
def _filter_sender(self, data, addr): |
41
|
|
|
''' |
42
|
|
|
Filter the message sender and put data on the consumer queue. |
43
|
|
|
''' |
44
|
1 |
|
if self.firstaddr == '': |
45
|
1 |
|
self.firstaddr = addr[0] |
46
|
1 |
|
printM('Receiving UDP data from %s' % (self.firstaddr), self.sender) |
47
|
1 |
|
if (self.firstaddr != '') and (addr[0] == self.firstaddr): |
48
|
1 |
|
self.queue.put(data) |
49
|
1 |
|
if data.decode('utf-8') == 'TERM': |
50
|
1 |
|
RS.producer = False |
51
|
1 |
|
self.stop = True |
52
|
|
|
else: |
53
|
|
|
if addr[0] not in self.blocked: |
54
|
|
|
printM('Another IP (%s) is sending UDP data to this port. Ignoring...' |
55
|
|
|
% (addr[0]), self.sender) |
56
|
|
|
self.blocked.append(addr[0]) |
57
|
|
|
|
58
|
|
|
|
59
|
1 |
|
def _tasks(self): |
60
|
|
|
''' |
61
|
|
|
Execute tasks based on the states of sub-consumers. |
62
|
|
|
''' |
63
|
1 |
|
for thread in self.threads: |
64
|
|
|
# for each thread here |
65
|
1 |
|
if thread.alarm: |
66
|
|
|
# if there is an alarm in a sub thread, send the ALARM message to the queues |
67
|
1 |
|
self.queue.put(helpers.msg_alarm(thread.alarm)) |
68
|
1 |
|
printM('%s thread has indicated alarm state, sending ALARM message to queues' |
69
|
|
|
% thread.sender, sender=self.sender) |
70
|
|
|
# now re-arm the trigger |
71
|
1 |
|
thread.alarm = False |
72
|
1 |
|
if thread.alarm_reset: |
73
|
|
|
# if there's an alarm_reset flag in a sub thread, send a RESET message |
74
|
1 |
|
self.queue.put(helpers.msg_reset(thread.alarm_reset)) |
75
|
1 |
|
printM('%s thread has indicated alarm reset, sending RESET message to queues' |
76
|
|
|
% thread.sender, sender=self.sender) |
77
|
|
|
# re-arm the trigger |
78
|
1 |
|
thread.alarm_reset = False |
79
|
1 |
|
if not thread.alive: |
80
|
|
|
# if a thread stops, set the stop flag |
81
|
|
|
self.stop = True |
82
|
|
|
|
83
|
|
|
|
84
|
1 |
|
def run(self): |
85
|
|
|
""" |
86
|
|
|
Distributes queue objects to execute various other tasks: for example, |
87
|
|
|
it may be used to populate ObsPy streams for various things like |
88
|
|
|
plotting, alert triggers, and ground motion calculation. |
89
|
|
|
""" |
90
|
1 |
|
RS.producer = True |
91
|
1 |
|
while RS.producer: |
92
|
1 |
|
data, addr = RS.sock.recvfrom(4096) |
93
|
1 |
|
self._filter_sender(data, addr) |
94
|
1 |
|
self._tasks() |
95
|
1 |
|
if self.stop: |
96
|
1 |
|
RS.producer = False |
97
|
1 |
|
break |
98
|
1 |
|
if self.testing: |
99
|
1 |
|
TEST['x_data'][1] = True |
100
|
|
|
|
101
|
1 |
|
print() |
102
|
1 |
|
printM('Sending TERM signal to threads...', self.sender) |
103
|
1 |
|
self.queue.put(helpers.msg_term()) |
104
|
1 |
|
self.stop = True |
105
|
|
|
sys.exit() |
106
|
|
|
|