| Total Complexity | 7 | 
| Total Lines | 58 | 
| Duplicated Lines | 0 % | 
| Changes | 0 | ||
| 1 | import sys  | 
            ||
| 2 | from threading import Thread  | 
            ||
| 3 | from rsudp import printM, printW, printE  | 
            ||
| 4 | from rsudp.test import TEST  | 
            ||
| 5 | |||
| 6 | |||
| 7 | class Consumer(Thread):  | 
            ||
| 8 | """  | 
            ||
| 9 | The main consumer process. This consumer reads  | 
            ||
| 10 | queue messages from the :class:`rsudp.p_producer.Producer`  | 
            ||
| 11 | and distributes those messages to each sub-consumer in ``destinations``.  | 
            ||
| 12 | |||
| 13 | :param queue.Queue queue: queue of data and messages sent by :class:`rsudp.p_producer.Producer`  | 
            ||
| 14 | :param list destinations: list of :py:class:`queue.Queue` objects to pass data to  | 
            ||
| 15 | """  | 
            ||
| 16 | |||
| 17 | |||
| 18 | def __init__(self, queue, destinations, testing=False):  | 
            ||
| 19 | """  | 
            ||
| 20 | Initializes the main consumer.  | 
            ||
| 21 | |||
| 22 | """  | 
            ||
| 23 | super().__init__()  | 
            ||
| 24 | |||
| 25 | self.sender = 'Consumer'  | 
            ||
| 26 | self.queue = queue  | 
            ||
| 27 | self.destinations = destinations  | 
            ||
| 28 | self.running = True  | 
            ||
| 29 | self.testing = testing  | 
            ||
| 30 | |||
| 31 | 		printM('Starting.', self.sender) | 
            ||
| 32 | |||
| 33 | def run(self):  | 
            ||
| 34 | """  | 
            ||
| 35 | Distributes queue objects to execute various other tasks: for example,  | 
            ||
| 36 | it may be used to populate ObsPy streams for various things like  | 
            ||
| 37 | plotting, alert triggers, and ground motion calculation.  | 
            ||
| 38 | """  | 
            ||
| 39 | try:  | 
            ||
| 40 | while self.running:  | 
            ||
| 41 | p = self.queue.get()  | 
            ||
| 42 | self.queue.task_done()  | 
            ||
| 43 | |||
| 44 | for q in self.destinations:  | 
            ||
| 45 | q.put(p)  | 
            ||
| 46 | |||
| 47 | if 'TERM' in str(p):  | 
            ||
| 48 | 					printM('Exiting.', self.sender) | 
            ||
| 49 | break  | 
            ||
| 50 | |||
| 51 | if self.testing:  | 
            ||
| 52 | TEST['x_masterqueue'][1] = True  | 
            ||
| 53 | |||
| 54 | except Exception as e:  | 
            ||
| 55 | return e  | 
            ||
| 56 | |||
| 57 | sys.exit()  | 
            ||
| 58 |