1
|
|
|
from st2reactor.sensor.base import PollingSensor |
2
|
|
|
from elasticsearch import Elasticsearch |
3
|
|
|
import json |
4
|
|
|
import eventlet |
5
|
|
|
|
6
|
|
|
|
7
|
|
|
class ElasticsearchCountSensor(PollingSensor): |
8
|
|
|
|
9
|
|
|
def setup(self): |
10
|
|
|
self.host = self.config.get('host', None) |
11
|
|
|
self.port = self.config.get('port', None) |
12
|
|
|
self.query_window = self.config.get('query_window', 60) |
13
|
|
|
self.query_string = self.config.get('query_string', '{}') |
14
|
|
|
self.cooldown_multiplier = self.config.get('cooldown_multiplier', 0) |
15
|
|
|
self.count_threshold = self.config.get('count_threshold', 0) |
16
|
|
|
self.index = self.config.get('index', '_all') |
17
|
|
|
self._trigger_ref = "elasticsearch.count_event" |
18
|
|
|
self.LOG = self.sensor_service.get_logger(__name__) |
19
|
|
|
self.query = json.loads(self.query_string) |
20
|
|
|
self.es = None |
21
|
|
|
|
22
|
|
|
try: |
23
|
|
|
self.es = Elasticsearch([{'host': self.host, 'port': self.port}]) |
24
|
|
|
except Exception: |
25
|
|
|
raise |
26
|
|
|
|
27
|
|
|
def poll(self): |
28
|
|
|
query_payload = {"query": { |
29
|
|
|
"bool": { |
30
|
|
|
"must": [self.query], |
31
|
|
|
"filter": { |
32
|
|
|
"range": { |
33
|
|
|
"@timestamp": { |
34
|
|
|
"gte": "now-%ss" % |
35
|
|
|
self.query_window}}}}}} |
36
|
|
|
data = self.es.search(index=self.index, body=query_payload) |
37
|
|
|
|
38
|
|
|
hits = data.get('hits', None) |
39
|
|
|
if hits.get('total', 0) > self.count_threshold: |
40
|
|
|
payload = dict() |
41
|
|
|
payload['results'] = hits |
42
|
|
|
payload['results']['query'] = query_payload |
43
|
|
|
self.LOG.info("Dispatching trigger") |
44
|
|
|
self.sensor_service.dispatch(trigger=self._trigger_ref, |
45
|
|
|
payload=payload) |
46
|
|
|
cooldown = (self.query_window * self.cooldown_multiplier) |
47
|
|
|
self.LOG.info("Cooling down for %i seconds" % cooldown) |
48
|
|
|
eventlet.sleep(cooldown) |
49
|
|
|
|
50
|
|
|
def cleanup(self): |
51
|
|
|
# This is called when the st2 system goes down. |
52
|
|
|
# You can perform cleanup operations like |
53
|
|
|
# closing the connections to external system here. |
54
|
|
|
pass |
55
|
|
|
|
56
|
|
|
def add_trigger(self, trigger): |
57
|
|
|
# This method is called when trigger is created |
58
|
|
|
pass |
59
|
|
|
|
60
|
|
|
def update_trigger(self, trigger): |
61
|
|
|
# This method is called when trigger is updated |
62
|
|
|
pass |
63
|
|
|
|
64
|
|
|
def remove_trigger(self, trigger): |
65
|
|
|
# This method is called when trigger is deleted |
66
|
|
|
pass |
67
|
|
|
|