Passed
Pull Request — master (#3640)
by Lakshmi
06:58
created

SensorWatcher.stop()   B

Complexity

Conditions 5

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 5
c 1
b 0
f 0
dl 0
loc 16
rs 8.5454
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
# pylint: disable=assignment-from-none
16
17
# XXX: This file has a lot of duplication with TriggerWatcher.
18
# XXX: Refactor.
19
20
import eventlet
21
from kombu.mixins import ConsumerMixin
22
from kombu import Connection
23
24
from st2common import log as logging
25
from st2common.transport import reactor, publishers
26
from st2common.transport import utils as transport_utils
27
import st2common.util.queues as queue_utils
28
29
LOG = logging.getLogger(__name__)
30
31
32
class SensorWatcher(ConsumerMixin):
33
34
    def __init__(self, create_handler, update_handler, delete_handler,
35
                 queue_suffix=None):
36
        """
37
        :param create_handler: Function which is called on SensorDB create event.
38
        :type create_handler: ``callable``
39
40
        :param update_handler: Function which is called on SensorDB update event.
41
        :type update_handler: ``callable``
42
43
        :param delete_handler: Function which is called on SensorDB delete event.
44
        :type delete_handler: ``callable``
45
        """
46
        # TODO: Handle sensor type filtering using routing key
47
        self._create_handler = create_handler
48
        self._update_handler = update_handler
49
        self._delete_handler = delete_handler
50
        self._sensor_watcher_q = self._get_queue(queue_suffix)
51
52
        self.connection = None
53
        self._updates_thread = None
54
55
        self._handlers = {
56
            publishers.CREATE_RK: create_handler,
57
            publishers.UPDATE_RK: update_handler,
58
            publishers.DELETE_RK: delete_handler
59
        }
60
61
    def get_consumers(self, Consumer, channel):
62
        consumers = [Consumer(queues=[self._sensor_watcher_q],
63
                              accept=['pickle'],
64
                              callbacks=[self.process_task])]
65
        return consumers
66
67
    def process_task(self, body, message):
68
        LOG.debug('process_task')
69
        LOG.debug('     body: %s', body)
70
        LOG.debug('     message.properties: %s', message.properties)
71
        LOG.debug('     message.delivery_info: %s', message.delivery_info)
72
73
        routing_key = message.delivery_info.get('routing_key', '')
74
        handler = self._handlers.get(routing_key, None)
75
76
        try:
77
            if not handler:
78
                LOG.info('Skipping message %s as no handler was found.', message)
79
                return
80
81
            try:
82
                handler(body)
83
            except Exception as e:
84
                LOG.exception('Handling failed. Message body: %s. Exception: %s',
85
                              body, e.message)
86
        finally:
87
            message.ack()
88
89
    def start(self):
90
        try:
91
            self.connection = Connection(transport_utils.get_messaging_urls())
92
            self._updates_thread = eventlet.spawn(self.run)
93
        except:
94
            LOG.exception('Failed to start sensor_watcher.')
95
            self.connection.release()
96
97
    def stop(self):
98
        LOG.debug('Shutting down sensor watcher.')
99
        try:
100
            if self._updates_thread:
101
                self._updates_thread = eventlet.kill(self._updates_thread)
102
103
            if self.connection:
104
                channel = self.connection.channel()
105
                bound_sensor_watch_q = self._sensor_watcher_q(channel)
106
                try:
107
                    bound_sensor_watch_q.delete()
108
                except:
109
                    LOG.error('Unable to delete sensor watcher queue: %s', self._sensor_watcher_q)
110
        finally:
111
            if self.connection:
112
                self.connection.release()
113
114
    @staticmethod
115
    def _get_queue(queue_suffix):
116
        queue_name = queue_utils.get_queue_name(queue_name_base='st2.sensor.watch',
117
                                                queue_name_suffix=queue_suffix,
118
                                                add_random_uuid_to_suffix=True
119
                                                )
120
        return reactor.get_sensor_cud_queue(queue_name, routing_key='#')
121