| 1 |  |  | import sys | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  | from threading import Thread | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  | from rsudp import printM, printW, printE | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  | import rsudp.raspberryshake as RS | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  | class Producer(Thread): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 |  |  | 	''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 |  |  | 	Data Producer thread (see :ref:`producer-consumer`) which receives data from the port | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 |  |  | 	and puts it on the queue to be passed to the master consumer (:py:class:`rsudp.c_consumer.Consumer`). | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 |  |  | 	The producer also looks for flags in each consumer | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  | 	that indicate whether they are ``alive==False``. If so, the Producer will | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  | 	quit gracefully and put a TERM message on the queue, which should stop all running | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  | 	consumers. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  | 	:param queue.Queue queue: The master queue, used to pass data to :py:class:`rsudp.c_consumer.Consumer` | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  | 	:param list threads: The list of :py:class:`threading.Thread` s to monitor for status changes | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  | 	''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 |  |  | 	def __init__(self, queue, threads): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  | 		""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  | 		Initializing Producer thread.  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 |  |  | 		 | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  | 		""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  | 		super().__init__() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  | 		self.sender = 'Producer' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  | 		self.queue = queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 |  |  | 		self.threads = threads | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  | 		self.stop = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 |  |  | 		self.firstaddr = '' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 |  |  | 		self.blocked = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 |  |  | 		printM('Starting.', self.sender) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 |  |  | 	def _filter_sender(self, data, addr): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 |  |  | 		''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 |  |  | 		Filter the message sender and put data on the consumer queue. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 |  |  | 		''' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 |  |  | 		if self.firstaddr == '': | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 |  |  | 			self.firstaddr = addr[0] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  | 			printM('Receiving UDP data from %s' % (self.firstaddr), self.sender) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 |  |  | 		if (self.firstaddr != '') and (addr[0] == self.firstaddr): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 |  |  | 			self.queue.put(data) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 47 |  |  | 			if data.decode('utf-8') == 'TERM': | 
            
                                                                                                            
                            
            
                                    
            
            
                | 48 |  |  | 				RS.producer = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 49 |  |  | 				self.stop = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 50 |  |  | 		else: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 51 |  |  | 			if addr[0] not in self.blocked: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 52 |  |  | 				printM('Another IP (%s) is sending UDP data to this port. Ignoring...' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 53 |  |  | 						% (addr[0]), self.sender) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 54 |  |  | 				self.blocked.append(addr[0]) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 55 |  |  |  | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 56 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 57 |  |  | 	def _tasks(self): | 
            
                                                                        
                            
            
                                    
            
            
                | 58 |  |  | 		''' | 
            
                                                                        
                            
            
                                    
            
            
                | 59 |  |  | 		Execute tasks based on the states of sub-consumers. | 
            
                                                                        
                            
            
                                    
            
            
                | 60 |  |  | 		''' | 
            
                                                                        
                            
            
                                    
            
            
                | 61 |  |  | 		for thread in self.threads: | 
            
                                                                        
                            
            
                                    
            
            
                | 62 |  |  | 			# for each thread here | 
            
                                                                        
                            
            
                                    
            
            
                | 63 |  |  | 			if thread.alarm: | 
            
                                                                        
                            
            
                                    
            
            
                | 64 |  |  | 				# if there is an alarm in a sub thread, send the ALARM message to the queues | 
            
                                                                        
                            
            
                                    
            
            
                | 65 |  |  | 				self.queue.put(RS.msg_alarm(thread.alarm)) | 
            
                                                                        
                            
            
                                    
            
            
                | 66 |  |  | 				printM('%s thread has indicated alarm state, sending ALARM message to queues' | 
            
                                                                        
                            
            
                                    
            
            
                | 67 |  |  | 						% thread.sender, sender=self.sender) | 
            
                                                                        
                            
            
                                    
            
            
                | 68 |  |  | 				# now re-arm the trigger | 
            
                                                                        
                            
            
                                    
            
            
                | 69 |  |  | 				thread.alarm = False | 
            
                                                                        
                            
            
                                    
            
            
                | 70 |  |  | 			if thread.alarm_reset: | 
            
                                                                        
                            
            
                                    
            
            
                | 71 |  |  | 				# if there's an alarm_reset flag in a sub thread, send a RESET message | 
            
                                                                        
                            
            
                                    
            
            
                | 72 |  |  | 				self.queue.put(RS.msg_reset(thread.alarm_reset)) | 
            
                                                                        
                            
            
                                    
            
            
                | 73 |  |  | 				printM('%s thread has indicated alarm reset, sending RESET message to queues' | 
            
                                                                        
                            
            
                                    
            
            
                | 74 |  |  | 						% thread.sender, sender=self.sender) | 
            
                                                                        
                            
            
                                    
            
            
                | 75 |  |  | 				# re-arm the trigger | 
            
                                                                        
                            
            
                                    
            
            
                | 76 |  |  | 				thread.alarm_reset = False | 
            
                                                                        
                            
            
                                    
            
            
                | 77 |  |  | 			if not thread.alive: | 
            
                                                                        
                            
            
                                    
            
            
                | 78 |  |  | 				# if a thread stops, set the stop flag | 
            
                                                                        
                            
            
                                    
            
            
                | 79 |  |  | 				self.stop = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 80 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 81 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 82 |  |  | 	def run(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 83 |  |  | 		""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 84 |  |  | 		Distributes queue objects to execute various other tasks: for example, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 85 |  |  | 		it may be used to populate ObsPy streams for various things like | 
            
                                                                                                            
                            
            
                                    
            
            
                | 86 |  |  | 		plotting, alert triggers, and ground motion calculation. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 87 |  |  | 		""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 88 |  |  | 		RS.producer = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 89 |  |  | 		while RS.producer: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 90 |  |  | 			data, addr = RS.sock.recvfrom(4096) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 91 |  |  | 			self._filter_sender(data, addr) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 92 |  |  | 			self._tasks() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 93 |  |  | 			if self.stop: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 94 |  |  | 				RS.producer = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 95 |  |  | 				break | 
            
                                                                                                            
                            
            
                                    
            
            
                | 96 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 97 |  |  | 		print() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 98 |  |  | 		printM('Sending TERM signal to threads...', self.sender) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 99 |  |  | 		self.queue.put(RS.msg_term()) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 100 |  |  | 		self.stop = True | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 101 |  |  | 		sys.exit() | 
            
                                                        
            
                                    
            
            
                | 102 |  |  |  |