Completed
Pull Request — master (#514)
by
unknown
05:49
created

ElasticsearchCountSensor.setup()   A

Complexity

Conditions 2

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 17
rs 9.4285
1
from st2reactor.sensor.base import PollingSensor
2
from elasticsearch import Elasticsearch
3
import json
4
5
class ElasticsearchCountSensor(PollingSensor):
6
    """
7
    * self.sensor_service
8
        - provides utilities like
9
            get_logger() for writing to logs.
10
            dispatch() for dispatching triggers into the system.
11
    * self._config
12
        - contains configuration that was specified as
13
          config.yaml in the pack.
14
    * self._poll_interval
15
        - indicates the interval between two successive poll() calls.
16
    """
17
18
    def setup(self):
19
        self.host = self._config.get('host', None)
20
        self.port = self._config.get('port', None)
21
        self.query_window = "%is" % self._config.get('query_window', None)
22
        self.query_string = self._config.get('query_string', '{}')
23
        self.count_threshold = self._config.get('count_threshold', 1)
24
        self.index = self._config.get('index', '_all')
25
        self._trigger_ref="elasticsearch.count_event"       
26
        self.LOG = self.sensor_service.get_logger(__name__)
27
        self.query = json.loads(self.query_string)
28
        self.es = None
29
        
30
        try:
31
            self.es = Elasticsearch([{'host': self.host, 'port': self.port}])
32
        except:
33
            self.LOG.exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port))
34
            raise Exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port))
35
36
    def poll(self):
37
        query_payload={"query": {
38
                     "bool": {
39
                         "must": [self.query],
40
                         "filter": {
41
                             "range": {
42
                                 "@timestamp": { "gte": "now-%s" % self.query_window}}}}}}
43
        data = self.es.search(index=self.index, body=query_payload, size=0)
44
45
        hits = data.get('hits', None)
46
        if hits.get('total', 0) > self.count_threshold:
47
           payload = {}
48
           payload['results'] = hits
49
           payload['results']['query'] = query_payload
50
           self.LOG.info("Dispatching trigger")
51
           self.sensor_service.dispatch(trigger=self._trigger_ref, payload=payload)
52
53
    def cleanup(self):
54
        # This is called when the st2 system goes down. You can perform cleanup operations like
55
        # closing the connections to external system here.
56
        pass
57
58
    def add_trigger(self, trigger):
59
        # This method is called when trigger is created
60
        pass
61
62
    def update_trigger(self, trigger):
63
        # This method is called when trigger is updated
64
        pass
65
66
    def remove_trigger(self, trigger):
67
        # This method is called when trigger is deleted
68
        pass
69