Completed
Pull Request — master (#468)
by Manas
02:12
created

RabbitMQQueueSensor   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 88
Duplicated Lines 0 %
Metric Value
dl 0
loc 88
rs 10
wmc 19

9 Methods

Rating   Name   Duplication   Size   Complexity  
A cleanup() 0 3 2
A _deserialize_body() 0 12 3
A add_trigger() 0 2 1
A callback() 0 8 1
A remove_trigger() 0 2 1
B setup() 0 17 5
A run() 0 5 1
A __init__() 0 20 4
A update_trigger() 0 2 1
1
import json
2
import pickle
3
4
import eventlet
5
import pika
6
7
from pika.credentials import PlainCredentials
8
9
from st2reactor.sensor.base import Sensor
10
11
DESERIALIZATION_FUNCTIONS = {
12
    'json': json.loads,
13
    'pickle': pickle.loads
14
}
15
16
17
class RabbitMQQueueSensor(Sensor):
18
    """Sensor which monitors a RabbitMQ queue for new messages
19
20
    This is a RabbitMQ Queue sensor i.e. it works on the simplest RabbitMQ
21
    messaging model as described in https://www.rabbitmq.com/tutorials/tutorial-one-python.html.
22
23
    It is capable of simultaneously consuming from multiple queues. Each message is
24
    dispatched to stackstorm as a `rabbitmq.new_message` TriggerInstance.
25
    """
26
    def __init__(self, sensor_service, config=None):
27
        super(RabbitMQQueueSensor, self).__init__(sensor_service=sensor_service, config=config)
28
29
        self.host = self._config['sensor_config']['host']
30
        self.username = self._config['sensor_config']['username']
31
        self.password = self._config['sensor_config']['password']
32
33
        queue_sensor_config = self._config['sensor_config']['rabbitmq_queue_sensor']
34
        self.queues = queue_sensor_config['queues']
35
        if not isinstance(self.queues, list):
36
            self.queues = [self.queues]
37
        self.deserialization_method = queue_sensor_config['deserialization_method']
38
39
        supported_methods = DESERIALIZATION_FUNCTIONS.keys()
40
        if self.deserialization_method and self.deserialization_method not in supported_methods:
41
            raise ValueError('Invalid deserialization method specified: %s' %
42
                             (self.deserialization_method))
43
44
        self.conn = None
45
        self.channel = None
46
47
    def run(self):
48
        # run in an eventlet in-order to yield correctly
49
        gt = eventlet.spawn(self.channel.start_consuming)
50
        # wait else the sensor will quit
51
        gt.wait()
52
53
    def cleanup(self):
54
        if self.conn:
55
            self.conn.close()
56
57
    def setup(self):
58
        if self.username and self.password:
59
            credentials = PlainCredentials(username=self.username, password=self.password)
60
            connection_params = pika.ConnectionParameters(host=self.host, credentials=credentials)
61
        else:
62
            connection_params = pika.ConnectionParameters(host=self.host)
63
64
        self.conn = pika.BlockingConnection(connection_params)
65
        self.channel = self.conn.channel()
66
        self.channel.basic_qos(prefetch_count=1)
67
68
        # Setup Qs for listening
69
        for queue in self.queues:
70
            self.channel.queue_declare(queue=queue, durable=True)
71
            cb = lambda ch, method, properties, body: self.callback(
72
                ch, method, properties, body, queue)
73
            self.channel.basic_consume(cb, queue=queue)
74
75
    def callback(self, ch, method, properties, body, queue):
76
        body = self._deserialize_body(body=body)
77
        payload = {"queue": queue, "body": body}
78
79
        try:
80
            self._sensor_service.dispatch(trigger="rabbitmq.new_message", payload=payload)
81
        finally:
82
            self.channel.basic_ack(delivery_tag=method.delivery_tag)
83
84
    def update_trigger(self, trigger):
85
        pass
86
87
    def add_trigger(self, trigger):
88
        pass
89
90
    def remove_trigger(self, trigger):
91
        pass
92
93
    def _deserialize_body(self, body):
94
        if not self.deserialization_method:
95
            return body
96
97
        deserialization_func = DESERIALIZATION_FUNCTIONS[self.deserialization_method]
98
99
        try:
100
            body = deserialization_func(body)
101
        except Exception:
102
            pass
103
104
        return body
105