| 1 |  |  | """Kytos Buffer Classes, based on Python Queue.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  | import logging | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  | from typing import Optional | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 |  |  | from janus import Queue | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  | LOG = logging.getLogger(__name__) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 |  |  | class KytosEventBuffer: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 |  |  |     """KytosEventBuffer represents a queue to store a set of KytosEvents.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  |     def __init__(self, name, queue: Queue = None): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  |         """Contructor of KytosEventBuffer receive the parameters below. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  |         Args: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  |             name (string): name of KytosEventBuffer. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  |             event_base_class (class): Class of KytosEvent. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  |             maxsize (int): maxsize _queue producer buffer | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 |  |  |             queue_cls (class): queue class from janus | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  |         self.name = name | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 |  |  |         self._queue = queue if queue is not None else Queue() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  |         self._reject_new_events = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  |     def put(self, event, timeout: Optional[float] = None): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  |         """Insert an event in KytosEventBuffer if reject_new_events is False. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 |  |  |         Reject new events is True when a kytos/core.shutdown message was | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  |         received. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 |  |  |         Args: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 |  |  |             event (:class:`~kytos.core.events.KytosEvent`): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 |  |  |                 KytosEvent sent to queue. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 |  |  |             timeout: Block if necessary until a free slot is available. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 |  |  |             If 'timeout' is a non-negative number, it blocks at most 'timeout' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 |  |  |             seconds and raises an Full exception if no free slot was available. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 |  |  |         if not self._reject_new_events: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 |  |  |             self._queue.sync_q.put(event, timeout=timeout) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 |  |  |             LOG.debug('[buffer: %s] Added: %s', self.name, event.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 |  |  |         if event.name == "kytos/core.shutdown": | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  |             LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 |  |  |                      self.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 |  |  |             self._reject_new_events = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 47 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 48 |  |  |     async def aput(self, event): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 49 |  |  |         """Insert a event in KytosEventBuffer if reject new events is False. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 50 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 51 |  |  |         Reject new events is True when a kytos/core.shutdown message was | 
            
                                                                                                            
                            
            
                                    
            
            
                | 52 |  |  |         received. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 53 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 54 |  |  |         Args: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 55 |  |  |             event (:class:`~kytos.core.events.KytosEvent`): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 56 |  |  |                 KytosEvent sent to queue. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 57 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 58 |  |  |         if not self._reject_new_events: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 59 |  |  |             await self._queue.async_q.put(event) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 60 |  |  |             LOG.debug('[buffer: %s] Added: %s', self.name, event.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 61 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 62 |  |  |         if event.name == "kytos/core.shutdown": | 
            
                                                                                                            
                            
            
                                    
            
            
                | 63 |  |  |             LOG.info('[buffer: %s] Stop mode enabled. Rejecting new events.', | 
            
                                                                                                            
                            
            
                                    
            
            
                | 64 |  |  |                      self.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 65 |  |  |             self._reject_new_events = True | 
            
                                                                                                            
                            
            
                                    
            
            
                | 66 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 67 |  |  |     def get(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 68 |  |  |         """Remove and return a event from top of queue. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 69 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 70 |  |  |         Returns: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 71 |  |  |             :class:`~kytos.core.events.KytosEvent`: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 72 |  |  |                 Event removed from top of queue. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 73 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 74 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 75 |  |  |         event = self._queue.sync_q.get() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 76 |  |  |         LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 77 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 78 |  |  |         return event | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 79 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 80 |  |  |     async def aget(self): | 
            
                                                                        
                            
            
                                    
            
            
                | 81 |  |  |         """Remove and return a event from top of queue. | 
            
                                                                        
                            
            
                                    
            
            
                | 82 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 83 |  |  |         Returns: | 
            
                                                                        
                            
            
                                    
            
            
                | 84 |  |  |             :class:`~kytos.core.events.KytosEvent`: | 
            
                                                                        
                            
            
                                    
            
            
                | 85 |  |  |                 Event removed from top of queue. | 
            
                                                                        
                            
            
                                    
            
            
                | 86 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 87 |  |  |         """ | 
            
                                                                        
                            
            
                                    
            
            
                | 88 |  |  |         event = await self._queue.async_q.get() | 
            
                                                                        
                            
            
                                    
            
            
                | 89 |  |  |         LOG.debug('[buffer: %s] Removed: %s', self.name, event.name) | 
            
                                                                        
                            
            
                                    
            
            
                | 90 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 91 |  |  |         return event | 
            
                                                                                                            
                            
            
                                    
            
            
                | 92 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 93 |  |  |     def task_done(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 94 |  |  |         """Indicate that a formerly enqueued task is complete. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 95 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 96 |  |  |         If a :func:`~kytos.core.buffers.KytosEventBuffer.join` is currently | 
            
                                                                                                            
                            
            
                                    
            
            
                | 97 |  |  |         blocking, it will resume if all itens in KytosEventBuffer have been | 
            
                                                                                                            
                            
            
                                    
            
            
                | 98 |  |  |         processed (meaning that a task_done() call was received for every item | 
            
                                                                                                            
                            
            
                                    
            
            
                | 99 |  |  |         that had been put() into the KytosEventBuffer). | 
            
                                                                                                            
                            
            
                                    
            
            
                | 100 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 101 |  |  |         self._queue.sync_q.task_done() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 102 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 103 |  |  |     def join(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 104 |  |  |         """Block until all events are gotten and processed. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 105 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 106 |  |  |         A item is processed when the method task_done is called. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 107 |  |  |         """ | 
            
                                                                                                            
                            
            
                                    
            
            
                | 108 |  |  |         self._queue.sync_q.join() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 109 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 110 |  |  |     def qsize(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 111 |  |  |         """Return the size of KytosEventBuffer.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 112 |  |  |         return self._queue.sync_q.qsize() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 113 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 114 |  |  |     def empty(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 115 |  |  |         """Return True if KytosEventBuffer is empty.""" | 
            
                                                                                                            
                            
            
                                    
            
            
                | 116 |  |  |         return self._queue.sync_q.empty() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 117 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 118 |  |  |     def full(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 119 |  |  |         """Return True if KytosEventBuffer is full of KytosEvent.""" | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 120 |  |  |         return self._queue.sync_q.full() | 
            
                                                        
            
                                    
            
            
                | 121 |  |  |  |