Completed
Pull Request — master (#544)
by
unknown
05:43
created

EventsConsumerSensor   A

Complexity

Total Complexity 20

Size/Duplication

Total Lines 78
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 78
rs 10
c 0
b 0
f 0
wmc 20
1
# pylint: disable=super-on-old-class
2
from operator import itemgetter
3
from urlparse import urljoin
4
5
import requests
6
7
from st2reactor.sensor.base import PollingSensor
8
9
10
class EventsConsumerSensor(PollingSensor):
11
    def __init__(self, sensor_service, config=None):
12
        super(EventsConsumerSensor, self).__init__(sensor_service=sensor_service,
13
                                                   config=config)
14
        self._trigger_ref = 'opscenter.event'
15
16
        self._logger = self._sensor_service.get_logger(__name__)
17
        self._base_url = self._config['opscenter_base_url']
18
19
        if not self._base_url.endswith('/'):
20
            self._base_url = self._base_url + '/'
21
22
        self._cluster_id = self._config['cluster_id']
23
        self._events_url = self._cluster_id + '/events'
24
        self._last_timestamp = None
25
26
    def setup(self):
27
        pass
28
29
    def poll(self):
30
        last_timestamp = self._get_last_timestamp()
31
        events = self._query_events(last_timestamp)
32
33
        if events:
34
            events.sort(key=itemgetter('timestamp'), reverse=True)
35
            last_timestamp = events[0]['timestamp']
36
            self._set_last_timestamp(last_timestamp)
37
38
            for event in events:
39
                self._dispatch_trigger_for_event(event=event)
40
41
    def cleanup(self):
42
        pass
43
44
    def add_trigger(self, trigger):
45
        pass
46
47
    def update_trigger(self, trigger):
48
        pass
49
50
    def remove_trigger(self, trigger):
51
        pass
52
53
    def _query_events(self, timestamp=None, count_per_batch=50):
54
        params = {}
55
        params['count'] = count_per_batch
56
        params['reverse'] = '0'  # gets all newer events > timestamp.
57
        if timestamp:
58
            params['timestamp'] = timestamp
59
60
        all_events = []
61
        done = False
62
        while not done:
63
            events = requests.get(self._get_events_url(), params=params).json()
64
            all_events.extend(events)
65
            done = (len(events) < count_per_batch)
66
67
        return all_events
68
69
    def _get_events_url(self):
70
        return urljoin(self._base_url, self._events_url)
71
72
    def _get_last_timestamp(self):
73
        if not self._last_timestamp and hasattr(self._sensor_service, 'get_value'):
74
            self._last_timestamp = long(self._sensor_service.get_value(name='last_timestamp'))
75
76
        return self._last_timestamp
77
78
    def _set_last_timestamp(self, last_timestamp):
79
        self._last_timestamp = str(last_timestamp)
80
81
        if hasattr(self._sensor_service, 'set_value'):
82
            self._sensor_service.set_value(name='last_timestamp', value=last_timestamp)
83
84
    def _dispatch_trigger_for_event(self, event):
85
        trigger = self._trigger_ref
86
        payload = event
87
        self._sensor_service.dispatch(trigger=trigger, payload=payload)
88