Completed
Pull Request — master (#468)
by Manas
02:14
created

RabbitMQQueueSensor.remove_trigger()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 2
rs 10
1
import json
2
import pickle
3
4
import eventlet
5
import pika
6
7
from pika.credentials import PlainCredentials
8
9
from st2reactor.sensor.base import Sensor
10
11
DESERIALIZATION_FUNCTIONS = {
12
    'json': json.loads,
13
    'pickle': pickle.loads
14
}
15
16
17
class RabbitMQQueueSensor(Sensor):
18
    """Sensor which monitors a RabbitMQ queue for new messages
19
20
    This is a RabbitMQ Queue sensor i.e. it works on the simplest RabbitMQ
21
    messaging model as described in https://www.rabbitmq.com/tutorials/tutorial-one-python.html.
22
23
    It is capable of simultaneously consuming from multiple queues. Each message is
24
    dispatched to stackstorm as a `rabbitmq.new_message` TriggerInstance.
25
    """
26
    def __init__(self, sensor_service, config=None):
27
        super(RabbitMQQueueSensor, self).__init__(sensor_service=sensor_service, config=config)
28
29
        self._logger = self._sensor_service.get_logger(name=self.__class__.__name__)
30
        self.host = self._config['sensor_config']['host']
31
        self.username = self._config['sensor_config']['username']
32
        self.password = self._config['sensor_config']['password']
33
34
        queue_sensor_config = self._config['sensor_config']['rabbitmq_queue_sensor']
35
        self.queues = queue_sensor_config['queues']
36
        if not isinstance(self.queues, list):
37
            self.queues = [self.queues]
38
        self.deserialization_method = queue_sensor_config['deserialization_method']
39
40
        supported_methods = DESERIALIZATION_FUNCTIONS.keys()
41
        if self.deserialization_method and self.deserialization_method not in supported_methods:
42
            raise ValueError('Invalid deserialization method specified: %s' %
43
                             (self.deserialization_method))
44
45
        self.conn = None
46
        self.channel = None
47
48
    def run(self):
49
        self._logger.info('Starting to consume messages from RabbitMQ for %s', self.queues)
50
        # run in an eventlet in-order to yield correctly
51
        gt = eventlet.spawn(self.channel.start_consuming)
52
        # wait else the sensor will quit
53
        gt.wait()
54
55
    def cleanup(self):
56
        if self.conn:
57
            self.conn.close()
58
59
    def setup(self):
60
        if self.username and self.password:
61
            credentials = PlainCredentials(username=self.username, password=self.password)
62
            connection_params = pika.ConnectionParameters(host=self.host, credentials=credentials)
63
        else:
64
            connection_params = pika.ConnectionParameters(host=self.host)
65
66
        self.conn = pika.BlockingConnection(connection_params)
67
        self.channel = self.conn.channel()
68
        self.channel.basic_qos(prefetch_count=1)
69
70
        # Setup Qs for listening
71
        for queue in self.queues:
72
            self.channel.queue_declare(queue=queue, durable=True)
73
74
            def callback(ch, method, properties, body):
75
                self._dispatch_trigger(ch, method, properties, body, queue)
76
77
            self.channel.basic_consume(callback, queue=queue)
78
79
    def _dispatch_trigger(self, ch, method, properties, body, queue):
80
        body = self._deserialize_body(body=body)
81
        self._logger.debug('Received message for queue %s with body %s', queue, body)
82
83
        payload = {"queue": queue, "body": body}
84
        try:
85
            self._sensor_service.dispatch(trigger="rabbitmq.new_message", payload=payload)
86
        finally:
87
            self.channel.basic_ack(delivery_tag=method.delivery_tag)
88
89
    def update_trigger(self, trigger):
90
        pass
91
92
    def add_trigger(self, trigger):
93
        pass
94
95
    def remove_trigger(self, trigger):
96
        pass
97
98
    def _deserialize_body(self, body):
99
        if not self.deserialization_method:
100
            return body
101
102
        deserialization_func = DESERIALIZATION_FUNCTIONS[self.deserialization_method]
103
104
        try:
105
            body = deserialization_func(body)
106
        except Exception:
107
            pass
108
109
        return body
110