build.rsudp.c_consumer.Consumer.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 14
ccs 8
cts 8
cp 1
rs 10
c 0
b 0
f 0
cc 1
nop 4
crap 1
1 1
import sys
2 1
from threading import Thread
3 1
from rsudp import printM, printW, printE
4 1
from rsudp.test import TEST
5
6
7 1
class Consumer(Thread):
8
	"""
9
	The main consumer process. This consumer reads
10
	queue messages from the :class:`rsudp.p_producer.Producer`
11
	and distributes those messages to each sub-consumer in ``destinations``.
12
13
	:param queue.Queue queue: queue of data and messages sent by :class:`rsudp.p_producer.Producer`
14
	:param list destinations: list of :py:class:`queue.Queue` objects to pass data to
15
	"""
16
17
18 1
	def __init__(self, queue, destinations, testing=False):
19
		"""
20
		Initializes the main consumer. 
21
		
22
		"""
23 1
		super().__init__()
24
25 1
		self.sender = 'Consumer'
26 1
		self.queue = queue
27 1
		self.destinations = destinations
28 1
		self.running = True
29 1
		self.testing = testing
30
31 1
		printM('Starting.', self.sender)
32
33 1
	def run(self):
34
		"""
35
		Distributes queue objects to execute various other tasks: for example,
36
		it may be used to populate ObsPy streams for various things like
37
		plotting, alert triggers, and ground motion calculation.
38
		"""
39 1
		try:
40 1
			while self.running:
41 1
				p = self.queue.get()
42 1
				self.queue.task_done()
43
44 1
				for q in self.destinations:
45 1
					q.put(p)
46
47 1
				if 'TERM' in str(p):
48 1
					printM('Exiting.', self.sender)
49 1
					break
50
51 1
				if self.testing:
52 1
					TEST['x_masterqueue'][1] = True
53
54
		except Exception as e:
55
			return e
56
57
		sys.exit()
58