|
1
|
|
|
from st2reactor.sensor.base import PollingSensor |
|
2
|
|
|
from elasticsearch import Elasticsearch |
|
3
|
|
|
import json |
|
4
|
|
|
|
|
5
|
|
|
class ElasticsearchCountSensor(PollingSensor): |
|
6
|
|
|
""" |
|
7
|
|
|
* self.sensor_service |
|
8
|
|
|
- provides utilities like |
|
9
|
|
|
get_logger() for writing to logs. |
|
10
|
|
|
dispatch() for dispatching triggers into the system. |
|
11
|
|
|
* self._config |
|
12
|
|
|
- contains configuration that was specified as |
|
13
|
|
|
config.yaml in the pack. |
|
14
|
|
|
* self._poll_interval |
|
15
|
|
|
- indicates the interval between two successive poll() calls. |
|
16
|
|
|
""" |
|
17
|
|
|
|
|
18
|
|
|
def setup(self): |
|
19
|
|
|
self.host = self._config.get('host', None) |
|
20
|
|
|
self.port = self._config.get('port', None) |
|
21
|
|
|
self.query_window = "%is" % self._config.get('query_window', None) |
|
22
|
|
|
self.query_string = self._config.get('query_string', '{}') |
|
23
|
|
|
self.count_threshold = self._config.get('count_threshold', 1) |
|
24
|
|
|
self.index = self._config.get('index', '_all') |
|
25
|
|
|
self._trigger_ref="elasticsearch.count_event" |
|
26
|
|
|
self.LOG = self.sensor_service.get_logger(__name__) |
|
27
|
|
|
self.query = json.loads(self.query_string) |
|
28
|
|
|
self.es = None |
|
29
|
|
|
|
|
30
|
|
|
try: |
|
31
|
|
|
self.es = Elasticsearch([{'host': self.host, 'port': self.port}]) |
|
32
|
|
|
except: |
|
33
|
|
|
self.LOG.exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port)) |
|
34
|
|
|
raise Exception("Could not connect to elasticsearch. %s:%i" % (self.host, self.port)) |
|
35
|
|
|
|
|
36
|
|
|
def poll(self): |
|
37
|
|
|
query_payload={"query": { |
|
38
|
|
|
"bool": { |
|
39
|
|
|
"must": [self.query], |
|
40
|
|
|
"filter": { |
|
41
|
|
|
"range": { |
|
42
|
|
|
"@timestamp": { "gte": "now-%s" % self.query_window}}}}}} |
|
43
|
|
|
data = self.es.search(index=self.index, body=query_payload, size=0) |
|
44
|
|
|
|
|
45
|
|
|
hits = data.get('hits', None) |
|
46
|
|
|
if hits.get('total', 0) > self.count_threshold: |
|
47
|
|
|
payload = {} |
|
48
|
|
|
payload['results'] = hits |
|
49
|
|
|
payload['results']['query'] = query_payload |
|
50
|
|
|
self.LOG.info("Dispatching trigger") |
|
51
|
|
|
self.sensor_service.dispatch(trigger=self._trigger_ref, payload=payload) |
|
52
|
|
|
|
|
53
|
|
|
def cleanup(self): |
|
54
|
|
|
# This is called when the st2 system goes down. You can perform cleanup operations like |
|
55
|
|
|
# closing the connections to external system here. |
|
56
|
|
|
pass |
|
57
|
|
|
|
|
58
|
|
|
def add_trigger(self, trigger): |
|
59
|
|
|
# This method is called when trigger is created |
|
60
|
|
|
pass |
|
61
|
|
|
|
|
62
|
|
|
def update_trigger(self, trigger): |
|
63
|
|
|
# This method is called when trigger is updated |
|
64
|
|
|
pass |
|
65
|
|
|
|
|
66
|
|
|
def remove_trigger(self, trigger): |
|
67
|
|
|
# This method is called when trigger is deleted |
|
68
|
|
|
pass |
|
69
|
|
|
|