Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/services/triggerwatcher.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
# pylint: disable=assignment-from-none
16
17
from __future__ import absolute_import
18
import eventlet
19
from kombu.mixins import ConsumerMixin
20
from kombu import Connection
21
22
from st2common import log as logging
23
from st2common.persistence.trigger import Trigger
24
from st2common.transport import reactor, publishers
25
from st2common.transport import utils as transport_utils
26
import st2common.util.queues as queue_utils
27
28
LOG = logging.getLogger(__name__)
29
30
31
class TriggerWatcher(ConsumerMixin):
32
33
    sleep_interval = 0  # sleep to co-operatively yield after processing each message
34
35
    def __init__(self, create_handler, update_handler, delete_handler,
36
                 trigger_types=None, queue_suffix=None, exclusive=False):
37
        """
38
        :param create_handler: Function which is called on TriggerDB create event.
39
        :type create_handler: ``callable``
40
41
        :param update_handler: Function which is called on TriggerDB update event.
42
        :type update_handler: ``callable``
43
44
        :param delete_handler: Function which is called on TriggerDB delete event.
45
        :type delete_handler: ``callable``
46
47
        :param trigger_types: If provided, handler function will only be called
48
                              if the trigger in the message payload is included
49
                              in this list.
50
        :type trigger_types: ``list``
51
52
        :param exclusive: If the Q is exclusive to a specific connection which is then
53
                          single connection created by TriggerWatcher. When the connection
54
                          breaks the Q is removed by the message broker.
55
        :type exclusive: ``bool``
56
        """
57
        # TODO: Handle trigger type filtering using routing key
58
        self._create_handler = create_handler
59
        self._update_handler = update_handler
60
        self._delete_handler = delete_handler
61
        self._trigger_types = trigger_types
62
        self._trigger_watch_q = self._get_queue(queue_suffix, exclusive=exclusive)
63
64
        self.connection = None
65
        self._load_thread = None
66
        self._updates_thread = None
67
68
        self._handlers = {
69
            publishers.CREATE_RK: create_handler,
70
            publishers.UPDATE_RK: update_handler,
71
            publishers.DELETE_RK: delete_handler
72
        }
73
74
    def get_consumers(self, Consumer, channel):
75
        return [Consumer(queues=[self._trigger_watch_q],
76
                         accept=['pickle'],
77
                         callbacks=[self.process_task])]
78
79
    def process_task(self, body, message):
80
        LOG.debug('process_task')
81
        LOG.debug('     body: %s', body)
82
        LOG.debug('     message.properties: %s', message.properties)
83
        LOG.debug('     message.delivery_info: %s', message.delivery_info)
84
85
        routing_key = message.delivery_info.get('routing_key', '')
86
        handler = self._handlers.get(routing_key, None)
87
88
        try:
89
            if not handler:
90
                LOG.debug('Skipping message %s as no handler was found.', message)
91
                return
92
93
            trigger_type = getattr(body, 'type', None)
94
            if self._trigger_types and trigger_type not in self._trigger_types:
95
                LOG.debug('Skipping message %s since trigger_type doesn\'t match (type=%s)',
96
                          message, trigger_type)
97
                return
98
99
            try:
100
                handler(body)
101
            except Exception as e:
102
                LOG.exception('Handling failed. Message body: %s. Exception: %s',
103
                              body, e.message)
104
        finally:
105
            message.ack()
106
107
        eventlet.sleep(self.sleep_interval)
108
109
    def start(self):
110
        try:
111
            self.connection = Connection(transport_utils.get_messaging_urls())
112
            self._updates_thread = eventlet.spawn(self.run)
113
            self._load_thread = eventlet.spawn(self._load_triggers_from_db)
114
        except:
115
            LOG.exception('Failed to start watcher.')
116
            self.connection.release()
117
118
    def stop(self):
119
        try:
120
            self._updates_thread = eventlet.kill(self._updates_thread)
121
            self._load_thread = eventlet.kill(self._load_thread)
122
        finally:
123
            self.connection.release()
124
125
    # Note: We sleep after we consume a message so we give a chance to other
126
    # green threads to run. If we don't do that, ConsumerMixin will block on
127
    # waiting for a message on the queue.
128
129
    def on_consume_end(self, connection, channel):
130
        super(TriggerWatcher, self).on_consume_end(connection=connection,
131
                                                   channel=channel)
132
        eventlet.sleep(seconds=self.sleep_interval)
133
134
    def on_iteration(self):
135
        super(TriggerWatcher, self).on_iteration()
136
        eventlet.sleep(seconds=self.sleep_interval)
137
138
    def _load_triggers_from_db(self):
139
        for trigger_type in self._trigger_types:
140
            for trigger in Trigger.query(type=trigger_type):
141
                LOG.debug('Found existing trigger: %s in db.' % trigger)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
142
                self._handlers[publishers.CREATE_RK](trigger)
143
144
    @staticmethod
145
    def _get_queue(queue_suffix, exclusive):
146
        queue_name = queue_utils.get_queue_name(queue_name_base='st2.trigger.watch',
147
                                                queue_name_suffix=queue_suffix,
148
                                                add_random_uuid_to_suffix=True
149
                                                )
150
        return reactor.get_trigger_cud_queue(queue_name, routing_key='#', exclusive=exclusive)
151