1
|
|
|
from st2reactor.sensor.base import PollingSensor |
2
|
|
|
from elasticsearch import Elasticsearch |
3
|
|
|
import json |
4
|
|
|
|
5
|
|
|
class ElasticsearchCountSensor(PollingSensor): |
6
|
|
|
""" |
7
|
|
|
* self.sensor_service |
8
|
|
|
- provides utilities like |
9
|
|
|
get_logger() for writing to logs. |
10
|
|
|
dispatch() for dispatching triggers into the system. |
11
|
|
|
* self._config |
12
|
|
|
- contains configuration that was specified as |
13
|
|
|
config.yaml in the pack. |
14
|
|
|
* self._poll_interval |
15
|
|
|
- indicates the interval between two successive poll() calls. |
16
|
|
|
""" |
17
|
|
|
|
18
|
|
|
def setup(self): |
19
|
|
|
self.host = self._config.get('host', None) |
20
|
|
|
self.port = self._config.get('port', None) |
21
|
|
|
self.query_window = "%is" % self._config.get('query_window', None) |
22
|
|
|
self.query_string = self._config.get('query_string', '{}') |
23
|
|
|
self.count_threshold = self._config.get('count_threshold', 1) |
24
|
|
|
self.index = self._config.get('index', '_all') |
25
|
|
|
self._trigger_ref="elasticsearch.count_event" |
26
|
|
|
self.LOG = self.sensor_service.get_logger(__name__) |
27
|
|
|
self.query = json.loads(self.query_string) |
28
|
|
|
self.es = None |
29
|
|
|
|
30
|
|
|
try: |
31
|
|
|
self.es = Elasticsearch([{'host': self.host, 'port': self.port}]) |
32
|
|
|
except: |
33
|
|
|
self.LOG.exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port)) |
34
|
|
|
raise Exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port)) |
35
|
|
|
|
36
|
|
|
def poll(self): |
37
|
|
|
query_payload={"query": { |
38
|
|
|
"bool": { |
39
|
|
|
"must": [self.query], |
40
|
|
|
"filter": { |
41
|
|
|
"range": { |
42
|
|
|
"@timestamp": { "gte": "now-%s" % self.query_window}}}}}} |
43
|
|
|
data = self.es.search(index=self.index, body=query_payload, size=0) |
44
|
|
|
|
45
|
|
|
hits = data.get('hits', None) |
46
|
|
|
if hits.get('total', 0) > self.count_threshold: |
47
|
|
|
payload = {} |
48
|
|
|
payload['results'] = hits |
49
|
|
|
payload['results']['query'] = query_payload |
50
|
|
|
self.LOG.info("Dispatching trigger") |
51
|
|
|
self.sensor_service.dispatch(trigger=self._trigger_ref, payload=payload) |
52
|
|
|
|
53
|
|
|
def cleanup(self): |
54
|
|
|
# This is called when the st2 system goes down. You can perform cleanup operations like |
55
|
|
|
# closing the connections to external system here. |
56
|
|
|
pass |
57
|
|
|
|
58
|
|
|
def add_trigger(self, trigger): |
59
|
|
|
# This method is called when trigger is created |
60
|
|
|
pass |
61
|
|
|
|
62
|
|
|
def update_trigger(self, trigger): |
63
|
|
|
# This method is called when trigger is updated |
64
|
|
|
pass |
65
|
|
|
|
66
|
|
|
def remove_trigger(self, trigger): |
67
|
|
|
# This method is called when trigger is deleted |
68
|
|
|
pass |
69
|
|
|
|