1
|
|
|
import json |
2
|
|
|
import pickle |
3
|
|
|
|
4
|
|
|
import eventlet |
5
|
|
|
import pika |
6
|
|
|
|
7
|
|
|
from pika.credentials import PlainCredentials |
8
|
|
|
|
9
|
|
|
from st2reactor.sensor.base import Sensor |
10
|
|
|
|
11
|
|
|
DESERIALIZATION_FUNCTIONS = { |
12
|
|
|
'json': json.loads, |
13
|
|
|
'pickle': pickle.loads |
14
|
|
|
} |
15
|
|
|
|
16
|
|
|
|
17
|
|
|
class RabbitMQQueueSensor(Sensor): |
18
|
|
|
"""Sensor which monitors a RabbitMQ queue for new messages |
19
|
|
|
|
20
|
|
|
This is a RabbitMQ Queue sensor i.e. it works on the simplest RabbitMQ |
21
|
|
|
messaging model as described in https://www.rabbitmq.com/tutorials/tutorial-one-python.html. |
22
|
|
|
|
23
|
|
|
It is capable of simultaneously consuming from multiple queues. Each message is |
24
|
|
|
dispatched to stackstorm as a `rabbitmq.new_message` TriggerInstance. |
25
|
|
|
""" |
26
|
|
|
def __init__(self, sensor_service, config=None): |
27
|
|
|
super(RabbitMQQueueSensor, self).__init__(sensor_service=sensor_service, config=config) |
28
|
|
|
|
29
|
|
|
self._logger = self._sensor_service.get_logger(name=self.__class__.__name__) |
30
|
|
|
self.host = self._config['sensor_config']['host'] |
31
|
|
|
self.username = self._config['sensor_config']['username'] |
32
|
|
|
self.password = self._config['sensor_config']['password'] |
33
|
|
|
|
34
|
|
|
queue_sensor_config = self._config['sensor_config']['rabbitmq_queue_sensor'] |
35
|
|
|
self.queues = queue_sensor_config['queues'] |
36
|
|
|
if not isinstance(self.queues, list): |
37
|
|
|
self.queues = [self.queues] |
38
|
|
|
self.deserialization_method = queue_sensor_config['deserialization_method'] |
39
|
|
|
|
40
|
|
|
supported_methods = DESERIALIZATION_FUNCTIONS.keys() |
41
|
|
|
if self.deserialization_method and self.deserialization_method not in supported_methods: |
42
|
|
|
raise ValueError('Invalid deserialization method specified: %s' % |
43
|
|
|
(self.deserialization_method)) |
44
|
|
|
|
45
|
|
|
self.conn = None |
46
|
|
|
self.channel = None |
47
|
|
|
|
48
|
|
|
def run(self): |
49
|
|
|
self._logger.info('Starting to consume messages from RabbitMQ for %s', self.queues) |
50
|
|
|
# run in an eventlet in-order to yield correctly |
51
|
|
|
gt = eventlet.spawn(self.channel.start_consuming) |
52
|
|
|
# wait else the sensor will quit |
53
|
|
|
gt.wait() |
54
|
|
|
|
55
|
|
|
def cleanup(self): |
56
|
|
|
if self.conn: |
57
|
|
|
self.conn.close() |
58
|
|
|
|
59
|
|
|
def setup(self): |
60
|
|
|
if self.username and self.password: |
61
|
|
|
credentials = PlainCredentials(username=self.username, password=self.password) |
62
|
|
|
connection_params = pika.ConnectionParameters(host=self.host, credentials=credentials) |
63
|
|
|
else: |
64
|
|
|
connection_params = pika.ConnectionParameters(host=self.host) |
65
|
|
|
|
66
|
|
|
self.conn = pika.BlockingConnection(connection_params) |
67
|
|
|
self.channel = self.conn.channel() |
68
|
|
|
self.channel.basic_qos(prefetch_count=1) |
69
|
|
|
|
70
|
|
|
# Setup Qs for listening |
71
|
|
|
for queue in self.queues: |
72
|
|
|
self.channel.queue_declare(queue=queue, durable=True) |
73
|
|
|
|
74
|
|
|
def callback(ch, method, properties, body): |
75
|
|
|
self._dispatch_trigger(ch, method, properties, body, queue) |
76
|
|
|
|
77
|
|
|
self.channel.basic_consume(callback, queue=queue) |
78
|
|
|
|
79
|
|
|
def _dispatch_trigger(self, ch, method, properties, body, queue): |
80
|
|
|
body = self._deserialize_body(body=body) |
81
|
|
|
self._logger.debug('Received message for queue %s with body %s', queue, body) |
82
|
|
|
|
83
|
|
|
payload = {"queue": queue, "body": body} |
84
|
|
|
try: |
85
|
|
|
self._sensor_service.dispatch(trigger="rabbitmq.new_message", payload=payload) |
86
|
|
|
finally: |
87
|
|
|
self.channel.basic_ack(delivery_tag=method.delivery_tag) |
88
|
|
|
|
89
|
|
|
def update_trigger(self, trigger): |
90
|
|
|
pass |
91
|
|
|
|
92
|
|
|
def add_trigger(self, trigger): |
93
|
|
|
pass |
94
|
|
|
|
95
|
|
|
def remove_trigger(self, trigger): |
96
|
|
|
pass |
97
|
|
|
|
98
|
|
|
def _deserialize_body(self, body): |
99
|
|
|
if not self.deserialization_method: |
100
|
|
|
return body |
101
|
|
|
|
102
|
|
|
deserialization_func = DESERIALIZATION_FUNCTIONS[self.deserialization_method] |
103
|
|
|
|
104
|
|
|
try: |
105
|
|
|
body = deserialization_func(body) |
106
|
|
|
except Exception: |
107
|
|
|
pass |
108
|
|
|
|
109
|
|
|
return body |
110
|
|
|
|