Passed
Push — master ( f4a294...d6c02d )
by Ian
04:36
created

build.rsudp.p_producer.Producer._tasks()   A

Complexity

Conditions 5

Size

Total Lines 23
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 14
nop 1
dl 0
loc 23
rs 9.2333
c 0
b 0
f 0
1
import sys
2
from threading import Thread
3
from rsudp import printM, printW, printE
4
import rsudp.raspberryshake as RS
5
6
7
class Producer(Thread):
8
	'''
9
	Data Producer thread (see :ref:`producer-consumer`) which receives data from the port
10
	and puts it on the queue to be passed to the master consumer (:py:class:`rsudp.c_consumer.Consumer`).
11
	The producer also looks for flags in each consumer
12
	that indicate whether they are ``alive==False``. If so, the Producer will
13
	quit gracefully and put a TERM message on the queue, which should stop all running
14
	consumers.
15
16
	:param queue.Queue queue: The master queue, used to pass data to :py:class:`rsudp.c_consumer.Consumer`
17
	:param list threads: The list of :py:class:`threading.Thread` s to monitor for status changes
18
	'''
19
20
	def __init__(self, queue, threads):
21
		"""
22
		Initializing Producer thread. 
23
		
24
		"""
25
		super().__init__()
26
27
		self.sender = 'Producer'
28
		self.queue = queue
29
		self.threads = threads
30
		self.stop = False
31
32
		self.firstaddr = ''
33
		self.blocked = []
34
35
		printM('Starting.', self.sender)
36
37
38
	def _filter_sender(self, data, addr):
39
		'''
40
		Filter the message sender and put data on the consumer queue.
41
		'''
42
		if self.firstaddr == '':
43
			self.firstaddr = addr[0]
44
			printM('Receiving UDP data from %s' % (self.firstaddr), self.sender)
45
		if (self.firstaddr != '') and (addr[0] == self.firstaddr):
46
			self.queue.put(data)
47
			if data.decode('utf-8') == 'TERM':
48
				RS.producer = False
49
				self.stop = True
50
		else:
51
			if addr[0] not in self.blocked:
52
				printM('Another IP (%s) is sending UDP data to this port. Ignoring...'
53
						% (addr[0]), self.sender)
54
				self.blocked.append(addr[0])
55
56
57
	def _tasks(self):
58
		'''
59
		Execute tasks based on the states of sub-consumers.
60
		'''
61
		for thread in self.threads:
62
			# for each thread here
63
			if thread.alarm:
64
				# if there is an alarm in a sub thread, send the ALARM message to the queues
65
				self.queue.put(RS.msg_alarm(thread.alarm))
66
				printM('%s thread has indicated alarm state, sending ALARM message to queues'
67
						% thread.sender, sender=self.sender)
68
				# now re-arm the trigger
69
				thread.alarm = False
70
			if thread.alarm_reset:
71
				# if there's an alarm_reset flag in a sub thread, send a RESET message
72
				self.queue.put(RS.msg_reset(thread.alarm_reset))
73
				printM('%s thread has indicated alarm reset, sending RESET message to queues'
74
						% thread.sender, sender=self.sender)
75
				# re-arm the trigger
76
				thread.alarm_reset = False
77
			if not thread.alive:
78
				# if a thread stops, set the stop flag
79
				self.stop = True
80
81
82
	def run(self):
83
		"""
84
		Distributes queue objects to execute various other tasks: for example,
85
		it may be used to populate ObsPy streams for various things like
86
		plotting, alert triggers, and ground motion calculation.
87
		"""
88
		RS.producer = True
89
		while RS.producer:
90
			data, addr = RS.sock.recvfrom(4096)
91
			self._filter_sender(data, addr)
92
			self._tasks()
93
			if self.stop:
94
				RS.producer = False
95
				break
96
97
		print()
98
		printM('Sending TERM signal to threads...', self.sender)
99
		self.queue.put(RS.msg_term())
100
		self.stop = True
101
		sys.exit()
102