build.rsudp.p_producer   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 106
Duplicated Lines 0 %

Test Coverage

Coverage 92.86%

Importance

Changes 0
Metric Value
wmc 16
eloc 60
dl 0
loc 106
ccs 52
cts 56
cp 0.9286
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A Producer._tasks() 0 23 5
B Producer._filter_sender() 0 17 6
A Producer.__init__() 0 17 1
A Producer.run() 0 22 4
1 1
import sys
2 1
from threading import Thread
3 1
from rsudp import printM, printW, printE, helpers
4 1
import rsudp.raspberryshake as RS
5 1
from rsudp.test import TEST
6
7
8 1
class Producer(Thread):
9
	'''
10
	Data Producer thread (see :ref:`producer-consumer`) which receives data from the port
11
	and puts it on the queue to be passed to the master consumer (:py:class:`rsudp.c_consumer.Consumer`).
12
	The producer also looks for flags in each consumer
13
	that indicate whether they are ``alive==False``. If so, the Producer will
14
	quit gracefully and put a TERM message on the queue, which should stop all running
15
	consumers.
16
17
	:param queue.Queue queue: The master queue, used to pass data to :py:class:`rsudp.c_consumer.Consumer`
18
	:param list threads: The list of :py:class:`threading.Thread` s to monitor for status changes
19
	'''
20
21 1
	def __init__(self, queue, threads, testing=False):
22
		"""
23
		Initializing Producer thread. 
24
		
25
		"""
26 1
		super().__init__()
27
28 1
		self.sender = 'Producer'
29 1
		self.queue = queue
30 1
		self.threads = threads
31 1
		self.stop = False
32 1
		self.testing = testing
33
34 1
		self.firstaddr = ''
35 1
		self.blocked = []
36
37 1
		printM('Starting.', self.sender)
38
39
40 1
	def _filter_sender(self, data, addr):
41
		'''
42
		Filter the message sender and put data on the consumer queue.
43
		'''
44 1
		if self.firstaddr == '':
45 1
			self.firstaddr = addr[0]
46 1
			printM('Receiving UDP data from %s' % (self.firstaddr), self.sender)
47 1
		if (self.firstaddr != '') and (addr[0] == self.firstaddr):
48 1
			self.queue.put(data)
49 1
			if data.decode('utf-8') == 'TERM':
50 1
				RS.producer = False
51 1
				self.stop = True
52
		else:
53
			if addr[0] not in self.blocked:
54
				printM('Another IP (%s) is sending UDP data to this port. Ignoring...'
55
						% (addr[0]), self.sender)
56
				self.blocked.append(addr[0])
57
58
59 1
	def _tasks(self):
60
		'''
61
		Execute tasks based on the states of sub-consumers.
62
		'''
63 1
		for thread in self.threads:
64
			# for each thread here
65 1
			if thread.alarm:
66
				# if there is an alarm in a sub thread, send the ALARM message to the queues
67 1
				self.queue.put(helpers.msg_alarm(thread.alarm))
68 1
				printM('%s thread has indicated alarm state, sending ALARM message to queues'
69
						% thread.sender, sender=self.sender)
70
				# now re-arm the trigger
71 1
				thread.alarm = False
72 1
			if thread.alarm_reset:
73
				# if there's an alarm_reset flag in a sub thread, send a RESET message
74 1
				self.queue.put(helpers.msg_reset(thread.alarm_reset))
75 1
				printM('%s thread has indicated alarm reset, sending RESET message to queues'
76
						% thread.sender, sender=self.sender)
77
				# re-arm the trigger
78 1
				thread.alarm_reset = False
79 1
			if not thread.alive:
80
				# if a thread stops, set the stop flag
81
				self.stop = True
82
83
84 1
	def run(self):
85
		"""
86
		Distributes queue objects to execute various other tasks: for example,
87
		it may be used to populate ObsPy streams for various things like
88
		plotting, alert triggers, and ground motion calculation.
89
		"""
90 1
		RS.producer = True
91 1
		while RS.producer:
92 1
			data, addr = RS.sock.recvfrom(4096)
93 1
			self._filter_sender(data, addr)
94 1
			self._tasks()
95 1
			if self.stop:
96 1
				RS.producer = False
97 1
				break
98 1
			if self.testing:
99 1
				TEST['x_data'][1] = True
100
101 1
		print()
102 1
		printM('Sending TERM signal to threads...', self.sender)
103 1
		self.queue.put(helpers.msg_term())
104 1
		self.stop = True
105
		sys.exit()
106