1
|
|
|
from st2reactor.sensor.base import PollingSensor |
2
|
|
|
from elasticsearch import Elasticsearch |
3
|
|
|
import json |
4
|
|
|
import time |
5
|
|
|
|
6
|
|
|
class ElasticsearchCountSensor(PollingSensor): |
7
|
|
|
|
8
|
|
|
def setup(self): |
9
|
|
|
self.host = self._config.get('host', None) |
10
|
|
|
self.port = self._config.get('port', None) |
11
|
|
|
self.query_window = self._config.get('query_window', 60) |
12
|
|
|
self.query_string = self._config.get('query_string', '{}') |
13
|
|
|
self.cooldown_multiplier = self._config.get('cooldown_multiplier', 0) |
14
|
|
|
self.count_threshold = self._config.get('count_threshold', 0) |
15
|
|
|
self.index = self._config.get('index', '_all') |
16
|
|
|
self._trigger_ref="elasticsearch.count_event" |
17
|
|
|
self.LOG = self.sensor_service.get_logger(__name__) |
18
|
|
|
self.query = json.loads(self.query_string) |
19
|
|
|
self.es = None |
20
|
|
|
|
21
|
|
|
try: |
22
|
|
|
self.es = Elasticsearch([{'host': self.host, 'port': self.port}]) |
23
|
|
|
except: |
24
|
|
|
self.LOG.exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port)) |
25
|
|
|
raise Exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port)) |
26
|
|
|
|
27
|
|
|
def poll(self): |
28
|
|
|
query_payload={"query": { |
29
|
|
|
"bool": { |
30
|
|
|
"must": [self.query], |
31
|
|
|
"filter": { |
32
|
|
|
"range": { |
33
|
|
|
"@timestamp": { "gte": "now-%ss" % self.query_window}}}}}} |
34
|
|
|
data = self.es.search(index=self.index, body=query_payload, size=0) |
35
|
|
|
|
36
|
|
|
hits = data.get('hits', None) |
37
|
|
|
if hits.get('total', 0) > self.count_threshold: |
38
|
|
|
payload = {} |
39
|
|
|
payload['results'] = hits |
40
|
|
|
payload['results']['query'] = query_payload |
41
|
|
|
self.LOG.info("Dispatching trigger") |
42
|
|
|
self.sensor_service.dispatch(trigger=self._trigger_ref, payload=payload) |
43
|
|
|
cooldown = (self.query_window * self.cooldown_multiplier) |
44
|
|
|
self.LOG.info("Cooling down for %i seconds" % cooldown) |
45
|
|
|
time.sleep(cooldown) |
46
|
|
|
|
47
|
|
|
def cleanup(self): |
48
|
|
|
# This is called when the st2 system goes down. You can perform cleanup operations like |
49
|
|
|
# closing the connections to external system here. |
50
|
|
|
pass |
51
|
|
|
|
52
|
|
|
def add_trigger(self, trigger): |
53
|
|
|
# This method is called when trigger is created |
54
|
|
|
pass |
55
|
|
|
|
56
|
|
|
def update_trigger(self, trigger): |
57
|
|
|
# This method is called when trigger is updated |
58
|
|
|
pass |
59
|
|
|
|
60
|
|
|
def remove_trigger(self, trigger): |
61
|
|
|
# This method is called when trigger is deleted |
62
|
|
|
pass |
63
|
|
|
|