Completed
Pull Request — master (#514)
by
unknown
02:57
created

ElasticsearchCountSensor.update_trigger()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
from st2reactor.sensor.base import PollingSensor
2
from elasticsearch import Elasticsearch
3
import json
4
import time
5
6
class ElasticsearchCountSensor(PollingSensor):
7
8
    def setup(self):
9
        self.host = self._config.get('host', None)
10
        self.port = self._config.get('port', None)
11
        self.query_window = self._config.get('query_window', 60)
12
        self.query_string = self._config.get('query_string', '{}')
13
        self.cooldown_multiplier = self._config.get('cooldown_multiplier', 0)
14
        self.count_threshold = self._config.get('count_threshold', 0)
15
        self.index = self._config.get('index', '_all')
16
        self._trigger_ref="elasticsearch.count_event"       
17
        self.LOG = self.sensor_service.get_logger(__name__)
18
        self.query = json.loads(self.query_string)
19
        self.es = None
20
        
21
        try:
22
            self.es = Elasticsearch([{'host': self.host, 'port': self.port}])
23
        except:
24
            self.LOG.exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port))
25
            raise Exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port))
26
27
    def poll(self):
28
        query_payload={"query": {
29
                     "bool": {
30
                         "must": [self.query],
31
                         "filter": {
32
                             "range": {
33
                                 "@timestamp": { "gte": "now-%ss" % self.query_window}}}}}}
34
        data = self.es.search(index=self.index, body=query_payload, size=0)
35
36
        hits = data.get('hits', None)
37
        if hits.get('total', 0) > self.count_threshold:
38
           payload = {}
39
           payload['results'] = hits
40
           payload['results']['query'] = query_payload
41
           self.LOG.info("Dispatching trigger")
42
           self.sensor_service.dispatch(trigger=self._trigger_ref, payload=payload)
43
           cooldown = (self.query_window * self.cooldown_multiplier)
44
           self.LOG.info("Cooling down for %i seconds" % cooldown)
45
           time.sleep(cooldown)
46
47
    def cleanup(self):
48
        # This is called when the st2 system goes down. You can perform cleanup operations like
49
        # closing the connections to external system here.
50
        pass
51
52
    def add_trigger(self, trigger):
53
        # This method is called when trigger is created
54
        pass
55
56
    def update_trigger(self, trigger):
57
        # This method is called when trigger is updated
58
        pass
59
60
    def remove_trigger(self, trigger):
61
        # This method is called when trigger is deleted
62
        pass
63