|
1
|
|
|
import sys |
|
2
|
|
|
from threading import Thread |
|
3
|
|
|
from rsudp import printM, printW, printE |
|
4
|
|
|
import rsudp.raspberryshake as RS |
|
5
|
|
|
|
|
6
|
|
|
|
|
7
|
|
|
class Producer(Thread): |
|
8
|
|
|
''' |
|
9
|
|
|
Data Producer thread (see :ref:`producer-consumer`) which receives data from the port |
|
10
|
|
|
and puts it on the queue to be passed to the master consumer (:py:class:`rsudp.c_consumer.Consumer`). |
|
11
|
|
|
The producer also looks for flags in each consumer |
|
12
|
|
|
that indicate whether they are ``alive==False``. If so, the Producer will |
|
13
|
|
|
quit gracefully and put a TERM message on the queue, which should stop all running |
|
14
|
|
|
consumers. |
|
15
|
|
|
|
|
16
|
|
|
:param queue.Queue queue: The master queue, used to pass data to :py:class:`rsudp.c_consumer.Consumer` |
|
17
|
|
|
:param list threads: The list of :py:class:`threading.Thread` s to monitor for status changes |
|
18
|
|
|
''' |
|
19
|
|
|
|
|
20
|
|
|
def __init__(self, queue, threads): |
|
21
|
|
|
""" |
|
22
|
|
|
Initializing Producer thread. |
|
23
|
|
|
|
|
24
|
|
|
""" |
|
25
|
|
|
super().__init__() |
|
26
|
|
|
|
|
27
|
|
|
self.sender = 'Producer' |
|
28
|
|
|
self.queue = queue |
|
29
|
|
|
self.threads = threads |
|
30
|
|
|
self.stop = False |
|
31
|
|
|
|
|
32
|
|
|
self.firstaddr = '' |
|
33
|
|
|
self.blocked = [] |
|
34
|
|
|
|
|
35
|
|
|
printM('Starting.', self.sender) |
|
36
|
|
|
|
|
37
|
|
|
|
|
38
|
|
|
def _filter_sender(self, data, addr): |
|
39
|
|
|
''' |
|
40
|
|
|
Filter the message sender and put data on the consumer queue. |
|
41
|
|
|
''' |
|
42
|
|
|
if self.firstaddr == '': |
|
43
|
|
|
self.firstaddr = addr[0] |
|
44
|
|
|
printM('Receiving UDP data from %s' % (self.firstaddr), self.sender) |
|
45
|
|
|
if (self.firstaddr != '') and (addr[0] == self.firstaddr): |
|
46
|
|
|
self.queue.put(data) |
|
47
|
|
|
if data.decode('utf-8') == 'TERM': |
|
48
|
|
|
RS.producer = False |
|
49
|
|
|
self.stop = True |
|
50
|
|
|
else: |
|
51
|
|
|
if addr[0] not in self.blocked: |
|
52
|
|
|
printM('Another IP (%s) is sending UDP data to this port. Ignoring...' |
|
53
|
|
|
% (addr[0]), self.sender) |
|
54
|
|
|
self.blocked.append(addr[0]) |
|
55
|
|
|
|
|
56
|
|
|
|
|
57
|
|
|
def _tasks(self): |
|
58
|
|
|
''' |
|
59
|
|
|
Execute tasks based on the states of sub-consumers. |
|
60
|
|
|
''' |
|
61
|
|
|
for thread in self.threads: |
|
62
|
|
|
# for each thread here |
|
63
|
|
|
if thread.alarm: |
|
64
|
|
|
# if there is an alarm in a sub thread, send the ALARM message to the queues |
|
65
|
|
|
self.queue.put(RS.msg_alarm(thread.alarm)) |
|
66
|
|
|
printM('%s thread has indicated alarm state, sending ALARM message to queues' |
|
67
|
|
|
% thread.sender, sender=self.sender) |
|
68
|
|
|
# now re-arm the trigger |
|
69
|
|
|
thread.alarm = False |
|
70
|
|
|
if thread.alarm_reset: |
|
71
|
|
|
# if there's an alarm_reset flag in a sub thread, send a RESET message |
|
72
|
|
|
self.queue.put(RS.msg_reset(thread.alarm_reset)) |
|
73
|
|
|
printM('%s thread has indicated alarm reset, sending RESET message to queues' |
|
74
|
|
|
% thread.sender, sender=self.sender) |
|
75
|
|
|
# re-arm the trigger |
|
76
|
|
|
thread.alarm_reset = False |
|
77
|
|
|
if not thread.alive: |
|
78
|
|
|
# if a thread stops, set the stop flag |
|
79
|
|
|
self.stop = True |
|
80
|
|
|
|
|
81
|
|
|
|
|
82
|
|
|
def run(self): |
|
83
|
|
|
""" |
|
84
|
|
|
Distributes queue objects to execute various other tasks: for example, |
|
85
|
|
|
it may be used to populate ObsPy streams for various things like |
|
86
|
|
|
plotting, alert triggers, and ground motion calculation. |
|
87
|
|
|
""" |
|
88
|
|
|
RS.producer = True |
|
89
|
|
|
while RS.producer: |
|
90
|
|
|
data, addr = RS.sock.recvfrom(4096) |
|
91
|
|
|
self._filter_sender(data, addr) |
|
92
|
|
|
self._tasks() |
|
93
|
|
|
if self.stop: |
|
94
|
|
|
RS.producer = False |
|
95
|
|
|
break |
|
96
|
|
|
|
|
97
|
|
|
print() |
|
98
|
|
|
printM('Sending TERM signal to threads...', self.sender) |
|
99
|
|
|
self.queue.put(RS.msg_term()) |
|
100
|
|
|
self.stop = True |
|
101
|
|
|
sys.exit() |
|
102
|
|
|
|