1
|
|
|
from st2reactor.sensor.base import PollingSensor |
2
|
|
|
from elasticsearch import Elasticsearch |
3
|
|
|
import json |
4
|
|
|
import time |
5
|
|
|
|
6
|
|
|
|
7
|
|
|
class ElasticsearchCountSensor(PollingSensor): |
8
|
|
|
|
9
|
|
|
def setup(self): |
10
|
|
|
self.host = self._config.get('host', None) |
11
|
|
|
self.port = self._config.get('port', None) |
12
|
|
|
self.query_window = self._config.get('query_window', 60) |
13
|
|
|
self.query_string = self._config.get('query_string', '{}') |
14
|
|
|
self.cooldown_multiplier = self._config.get('cooldown_multiplier', 0) |
15
|
|
|
self.count_threshold = self._config.get('count_threshold', 0) |
16
|
|
|
self.index = self._config.get('index', '_all') |
17
|
|
|
self._trigger_ref="elasticsearch.count_event" |
18
|
|
|
self.LOG = self.sensor_service.get_logger(__name__) |
19
|
|
|
self.query = json.loads(self.query_string) |
20
|
|
|
self.es = None |
21
|
|
|
|
22
|
|
|
try: |
23
|
|
|
self.es = Elasticsearch([{'host': self.host, 'port': self.port}]) |
24
|
|
|
except: |
25
|
|
|
self.LOG.exception("Could not connect to elasticsearch. %s:%i" % |
26
|
|
|
(self.host, self.port)) |
27
|
|
|
raise Exception("Could not connect to elasticsearch. %s:%i" % |
28
|
|
|
(self.host, self.port)) |
29
|
|
|
|
30
|
|
|
def poll(self): |
31
|
|
|
query_payload = {"query": { |
32
|
|
|
"bool": { |
33
|
|
|
"must": [self.query], |
34
|
|
|
"filter": { |
35
|
|
|
"range": { |
36
|
|
|
"@timestamp": { |
37
|
|
|
"gte": "now-%ss" % self.query_window}}}}}} |
38
|
|
|
data = self.es.search(index=self.index, body=query_payload, size=0) |
39
|
|
|
|
40
|
|
|
hits = data.get('hits', None) |
41
|
|
|
if hits.get('total', 0) > self.count_threshold: |
42
|
|
|
payload = dict() |
43
|
|
|
payload['results'] = hits |
44
|
|
|
payload['results']['query'] = query_payload |
45
|
|
|
self.LOG.info("Dispatching trigger") |
46
|
|
|
self.sensor_service.dispatch(trigger=self._trigger_ref, |
47
|
|
|
payload=payload) |
48
|
|
|
cooldown = (self.query_window * self.cooldown_multiplier) |
49
|
|
|
self.LOG.info("Cooling down for %i seconds" % cooldown) |
50
|
|
|
time.sleep(cooldown) |
51
|
|
|
|
52
|
|
|
def cleanup(self): |
53
|
|
|
# This is called when the st2 system goes down. |
54
|
|
|
# You can perform cleanup operations like |
55
|
|
|
# closing the connections to external system here. |
56
|
|
|
pass |
57
|
|
|
|
58
|
|
|
def add_trigger(self, trigger): |
59
|
|
|
# This method is called when trigger is created |
60
|
|
|
pass |
61
|
|
|
|
62
|
|
|
def update_trigger(self, trigger): |
63
|
|
|
# This method is called when trigger is updated |
64
|
|
|
pass |
65
|
|
|
|
66
|
|
|
def remove_trigger(self, trigger): |
67
|
|
|
# This method is called when trigger is deleted |
68
|
|
|
pass |
69
|
|
|
|
70
|
|
|
|