Passed
Push — master ( 45db06...e21d39 )
by Ian
04:35 queued 12s
created

build.rsudp.c_consumer.Consumer.run()   B

Complexity

Conditions 6

Size

Total Lines 25
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 15
dl 0
loc 25
rs 8.6666
c 0
b 0
f 0
cc 6
nop 1
1
import sys
2
from threading import Thread
3
from rsudp import printM, printW, printE
4
from rsudp.test import TEST
5
6
7
class Consumer(Thread):
8
	"""
9
	The main consumer process. This consumer reads
10
	queue messages from the :class:`rsudp.p_producer.Producer`
11
	and distributes those messages to each sub-consumer in ``destinations``.
12
13
	:param queue.Queue queue: queue of data and messages sent by :class:`rsudp.p_producer.Producer`
14
	:param list destinations: list of :py:class:`queue.Queue` objects to pass data to
15
	"""
16
17
18
	def __init__(self, queue, destinations, testing=False):
19
		"""
20
		Initializes the main consumer. 
21
		
22
		"""
23
		super().__init__()
24
25
		self.sender = 'Consumer'
26
		self.queue = queue
27
		self.destinations = destinations
28
		self.running = True
29
		self.testing = testing
30
31
		printM('Starting.', self.sender)
32
33
	def run(self):
34
		"""
35
		Distributes queue objects to execute various other tasks: for example,
36
		it may be used to populate ObsPy streams for various things like
37
		plotting, alert triggers, and ground motion calculation.
38
		"""
39
		try:
40
			while self.running:
41
				p = self.queue.get()
42
				self.queue.task_done()
43
44
				for q in self.destinations:
45
					q.put(p)
46
47
				if 'TERM' in str(p):
48
					printM('Exiting.', self.sender)
49
					break
50
51
				if self.testing:
52
					TEST['x_masterqueue'][1] = True
53
54
		except Exception as e:
55
			return e
56
57
		sys.exit()
58