Completed
Pull Request — master (#514)
by
unknown
02:45
created

ElasticsearchCountSensor.remove_trigger()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
from st2reactor.sensor.base import PollingSensor
2
from elasticsearch import Elasticsearch
3
import json
4
import time
5
6
7
class ElasticsearchCountSensor(PollingSensor):
8
9
    def setup(self):
10
        self.host = self._config.get('host', None)
11
        self.port = self._config.get('port', None)
12
        self.query_window = self._config.get('query_window', 60)
13
        self.query_string = self._config.get('query_string', '{}')
14
        self.cooldown_multiplier = self._config.get('cooldown_multiplier', 0)
15
        self.count_threshold = self._config.get('count_threshold', 0)
16
        self.index = self._config.get('index', '_all')
17
        self._trigger_ref="elasticsearch.count_event"       
18
        self.LOG = self.sensor_service.get_logger(__name__)
19
        self.query = json.loads(self.query_string)
20
        self.es = None
21
        
22
        try:
23
            self.es = Elasticsearch([{'host': self.host, 'port': self.port}])
24
        except:
25
            self.LOG.exception("Could not connect to elasticsearch. %s:%i" %
26
                               (self.host, self.port))
27
            raise Exception("Could not connect to elasticsearch. %s:%i" %
28
                            (self.host, self.port))
29
30
    def poll(self):
31
        query_payload = {"query": {
32
                           "bool": {
33
                             "must": [self.query],
34
                             "filter": {
35
                               "range": {
36
                                 "@timestamp": {
37
                                     "gte": "now-%ss" % self.query_window}}}}}}
38
        data = self.es.search(index=self.index, body=query_payload, size=0)
39
40
        hits = data.get('hits', None)
41
        if hits.get('total', 0) > self.count_threshold:
42
            payload = dict()
43
            payload['results'] = hits
44
            payload['results']['query'] = query_payload
45
            self.LOG.info("Dispatching trigger")
46
            self.sensor_service.dispatch(trigger=self._trigger_ref,
47
                                         payload=payload)
48
            cooldown = (self.query_window * self.cooldown_multiplier)
49
            self.LOG.info("Cooling down for %i seconds" % cooldown)
50
            time.sleep(cooldown)
51
52
    def cleanup(self):
53
        # This is called when the st2 system goes down.
54
        # You can perform cleanup operations like
55
        # closing the connections to external system here.
56
        pass
57
58
    def add_trigger(self, trigger):
59
        # This method is called when trigger is created
60
        pass
61
62
    def update_trigger(self, trigger):
63
        # This method is called when trigger is updated
64
        pass
65
66
    def remove_trigger(self, trigger):
67
        # This method is called when trigger is deleted
68
        pass
69
70