| 1 |  |  | # pylint: disable=super-on-old-class | 
            
                                                                                                            
                            
            
                                    
            
            
                | 2 |  |  | from operator import itemgetter | 
            
                                                                                                            
                            
            
                                    
            
            
                | 3 |  |  | from urlparse import urljoin | 
            
                                                                                                            
                            
            
                                    
            
            
                | 4 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 5 |  |  | import requests | 
            
                                                                                                            
                            
            
                                    
            
            
                | 6 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 7 |  |  | from st2reactor.sensor.base import PollingSensor | 
            
                                                                                                            
                            
            
                                    
            
            
                | 8 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 9 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 10 |  |  | class EventsConsumerSensor(PollingSensor): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 11 |  |  |     def __init__(self, sensor_service, config=None): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 12 |  |  |         super(EventsConsumerSensor, self).__init__(sensor_service=sensor_service, | 
            
                                                                                                            
                            
            
                                    
            
            
                | 13 |  |  |                                                    config=config) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 14 |  |  |         self._trigger_ref = 'opscenter.event' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 15 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 16 |  |  |         self._logger = self._sensor_service.get_logger(__name__) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 17 |  |  |         self._base_url = self._config['opscenter_base_url'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 18 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 19 |  |  |         if not self._base_url.endswith('/'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 20 |  |  |             self._base_url = self._base_url + '/' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 21 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 22 |  |  |         self._cluster_id = self._config['cluster_id'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 23 |  |  |         self._events_url = self._cluster_id + '/events' | 
            
                                                                                                            
                            
            
                                    
            
            
                | 24 |  |  |         self._last_timestamp = None | 
            
                                                                                                            
                            
            
                                    
            
            
                | 25 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 26 |  |  |     def setup(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 27 |  |  |         pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 28 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 29 |  |  |     def poll(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 30 |  |  |         last_timestamp = self._get_last_timestamp() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 31 |  |  |         events = self._query_events(last_timestamp) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 32 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 33 |  |  |         if events: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 34 |  |  |             events.sort(key=itemgetter('timestamp'), reverse=True) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 35 |  |  |             last_timestamp = events[0]['timestamp'] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 36 |  |  |             self._set_last_timestamp(last_timestamp) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 37 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 38 |  |  |             for event in events: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 39 |  |  |                 self._dispatch_trigger_for_event(event=event) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 40 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 41 |  |  |     def cleanup(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 42 |  |  |         pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 43 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 44 |  |  |     def add_trigger(self, trigger): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 45 |  |  |         pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 46 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 47 |  |  |     def update_trigger(self, trigger): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 48 |  |  |         pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 49 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 50 |  |  |     def remove_trigger(self, trigger): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 51 |  |  |         pass | 
            
                                                                                                            
                            
            
                                    
            
            
                | 52 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 53 |  |  |     def _query_events(self, timestamp=None, count_per_batch=50): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 54 |  |  |         params = {} | 
            
                                                                                                            
                            
            
                                    
            
            
                | 55 |  |  |         params['count'] = count_per_batch | 
            
                                                                                                            
                            
            
                                    
            
            
                | 56 |  |  |         params['reverse'] = '0'  # gets all newer events > timestamp. | 
            
                                                                                                            
                            
            
                                    
            
            
                | 57 |  |  |         if timestamp: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 58 |  |  |             params['timestamp'] = timestamp | 
            
                                                                                                            
                            
            
                                    
            
            
                | 59 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 60 |  |  |         all_events = [] | 
            
                                                                                                            
                            
            
                                    
            
            
                | 61 |  |  |         done = False | 
            
                                                                                                            
                            
            
                                    
            
            
                | 62 |  |  |         while not done: | 
            
                                                                                                            
                            
            
                                    
            
            
                | 63 |  |  |             events = requests.get(self._get_events_url(), params=params).json() | 
            
                                                                                                            
                            
            
                                    
            
            
                | 64 |  |  |             all_events.extend(events) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 65 |  |  |             done = (len(events) < count_per_batch) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 66 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 67 |  |  |         return all_events | 
            
                                                                                                            
                            
            
                                    
            
            
                | 68 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 69 |  |  |     def _get_events_url(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 70 |  |  |         return urljoin(self._base_url, self._events_url) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 71 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 72 |  |  |     def _get_last_timestamp(self): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 73 |  |  |         if not self._last_timestamp and hasattr(self._sensor_service, 'get_value'): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 74 |  |  |             self._last_timestamp = long(self._sensor_service.get_value(name='last_timestamp')) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 75 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 76 |  |  |         return self._last_timestamp | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 77 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 78 |  |  |     def _set_last_timestamp(self, last_timestamp): | 
            
                                                                        
                            
            
                                    
            
            
                | 79 |  |  |         self._last_timestamp = str(last_timestamp) | 
            
                                                                        
                            
            
                                    
            
            
                | 80 |  |  |  | 
            
                                                                        
                            
            
                                    
            
            
                | 81 |  |  |         if hasattr(self._sensor_service, 'set_value'): | 
            
                                                                        
                            
            
                                    
            
            
                | 82 |  |  |             self._sensor_service.set_value(name='last_timestamp', value=last_timestamp) | 
            
                                                                                                            
                            
            
                                    
            
            
                | 83 |  |  |  | 
            
                                                                                                            
                            
            
                                    
            
            
                | 84 |  |  |     def _dispatch_trigger_for_event(self, event): | 
            
                                                                                                            
                            
            
                                    
            
            
                | 85 |  |  |         trigger = self._trigger_ref | 
            
                                                                                                            
                            
            
                                    
            
            
                | 86 |  |  |         payload = event | 
            
                                                                                                            
                                                                
            
                                    
            
            
                | 87 |  |  |         self._sensor_service.dispatch(trigger=trigger, payload=payload) | 
            
                                                        
            
                                    
            
            
                | 88 |  |  |  |