Completed
Pull Request — master (#514)
by
unknown
03:00
created

ElasticsearchCountSensor.remove_trigger()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 3
rs 10
1
from st2reactor.sensor.base import PollingSensor
2
from elasticsearch import Elasticsearch
3
import json
4
import eventlet
5
6
7
class ElasticsearchCountSensor(PollingSensor):
8
9
    def setup(self):
10
        self.host = self.config.get('host', None)
11
        self.port = self.config.get('port', None)
12
        self.query_window = self.config.get('query_window', 60)
13
        self.query_string = self.config.get('query_string', '{}')
14
        self.cooldown_multiplier = self.config.get('cooldown_multiplier', 0)
15
        self.count_threshold = self.config.get('count_threshold', 0)
16
        self.index = self.config.get('index', '_all')
17
        self._trigger_ref = "elasticsearch.count_event"
18
        self.LOG = self.sensor_service.get_logger(__name__)
19
        self.query = json.loads(self.query_string)
20
        self.es = None
21
22
        try:
23
            self.es = Elasticsearch([{'host': self.host, 'port': self.port}])
24
        except Exception:
25
            raise
26
27
    def poll(self):
28
        query_payload = {"query": {
29
                         "bool": {
30
                             "must": [self.query],
31
                             "filter": {
32
                                 "range": {
33
                                     "@timestamp": {
34
                                         "gte": "now-%ss" %
35
                                         self.query_window}}}}}}
36
        data = self.es.search(index=self.index, body=query_payload)
37
38
        hits = data.get('hits', None)
39
        if hits.get('total', 0) > self.count_threshold:
40
            payload = dict()
41
            payload['results'] = hits
42
            payload['results']['query'] = query_payload
43
            self.LOG.info("Dispatching trigger")
44
            self.sensor_service.dispatch(trigger=self._trigger_ref,
45
                                         payload=payload)
46
            cooldown = (self.query_window * self.cooldown_multiplier)
47
            self.LOG.info("Cooling down for %i seconds" % cooldown)
48
            eventlet.sleep(cooldown)
49
50
    def cleanup(self):
51
        # This is called when the st2 system goes down.
52
        # You can perform cleanup operations like
53
        # closing the connections to external system here.
54
        pass
55
56
    def add_trigger(self, trigger):
57
        # This method is called when trigger is created
58
        pass
59
60
    def update_trigger(self, trigger):
61
        # This method is called when trigger is updated
62
        pass
63
64
    def remove_trigger(self, trigger):
65
        # This method is called when trigger is deleted
66
        pass
67