Passed
Pull Request — master (#3382)
by Lakshmi
05:24
created

TriggerWatcher.process_task()   C

Complexity

Conditions 7

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 35
rs 5.5
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
# pylint: disable=assignment-from-none
16
17
import eventlet
18
from kombu.mixins import ConsumerMixin
19
from kombu import Connection
20
21
from st2common import log as logging
22
from st2common.persistence.trigger import Trigger
23
from st2common.transport import reactor, publishers
24
from st2common.transport import utils as transport_utils
25
import st2common.util.queues as queue_utils
26
27
LOG = logging.getLogger(__name__)
28
29
30
class TriggerWatcher(ConsumerMixin):
31
32
    sleep_interval = 0  # sleep to co-operatively yield after processing each message
33
34
    def __init__(self, create_handler, update_handler, delete_handler,
35
                 trigger_types=None, queue_suffix=None, exclusive=False,
36
                 notify_disabled=True):
37
        """
38
        :param create_handler: Function which is called on TriggerDB create event.
39
        :type create_handler: ``callable``
40
41
        :param update_handler: Function which is called on TriggerDB update event.
42
        :type update_handler: ``callable``
43
44
        :param delete_handler: Function which is called on TriggerDB delete event.
45
        :type delete_handler: ``callable``
46
47
        :param trigger_types: If provided, handler function will only be called
48
                              if the trigger in the message payload is included
49
                              in this list.
50
        :type trigger_types: ``list``
51
52
        :param exclusive: If the Q is exclusive to a specific connection which is then
53
                          single connection created by TriggerWatcher. When the connection
54
                          breaks the Q is removed by the message broker.
55
        :type exclusive: ``bool``
56
        """
57
        # TODO: Handle trigger type filtering using routing key
58
        self._create_handler = create_handler
59
        self._update_handler = update_handler
60
        self._delete_handler = delete_handler
61
        self._trigger_types = trigger_types
62
        self._trigger_watch_q = self._get_queue(queue_suffix, exclusive=exclusive)
63
        self._notify_disabled = notify_disabled
64
65
        self.connection = None
66
        self._load_thread = None
67
        self._updates_thread = None
68
69
        self._handlers = {
70
            publishers.CREATE_RK: create_handler,
71
            publishers.UPDATE_RK: update_handler,
72
            publishers.DELETE_RK: delete_handler
73
        }
74
75
    def get_consumers(self, Consumer, channel):
76
        return [Consumer(queues=[self._trigger_watch_q],
77
                         accept=['pickle'],
78
                         callbacks=[self.process_task])]
79
80
    def process_task(self, body, message):
81
        LOG.debug('process_task')
82
        LOG.debug('     body: %s', body)
83
        LOG.debug('     message.properties: %s', message.properties)
84
        LOG.debug('     message.delivery_info: %s', message.delivery_info)
85
86
        routing_key = message.delivery_info.get('routing_key', '')
87
        handler = self._handlers.get(routing_key, None)
88
89
        try:
90
            if not handler:
91
                LOG.debug('Skipping message %s as no handler was found.', message)
92
                return
93
94
            trigger_type = getattr(body, 'type', None)
95
            if self._trigger_types and trigger_type not in self._trigger_types:
96
                LOG.debug('Skipping message %s since\'t trigger_type doesn\'t match (type=%s)',
97
                          message, trigger_type)
98
                return
99
100
            enabled = getattr(body, 'enabled', True)
101
            if not enabled and not self._notify_disabled:
102
                LOG.debug('Skipping message as trigger is disabled and notifications' +
103
                          ' for disabled triggers are turned off.')
104
                return
105
106
            try:
107
                handler(body)
108
            except Exception as e:
109
                LOG.exception('Handling failed. Message body: %s. Exception: %s',
110
                              body, e.message)
111
        finally:
112
            message.ack()
113
114
        eventlet.sleep(self.sleep_interval)
115
116
    def start(self):
117
        try:
118
            self.connection = Connection(transport_utils.get_messaging_urls())
119
            self._updates_thread = eventlet.spawn(self.run)
120
            self._load_thread = eventlet.spawn(self._load_triggers_from_db)
121
        except:
122
            LOG.exception('Failed to start watcher.')
123
            self.connection.release()
124
125
    def stop(self):
126
        try:
127
            self._updates_thread = eventlet.kill(self._updates_thread)
128
            self._load_thread = eventlet.kill(self._load_thread)
129
        finally:
130
            self.connection.release()
131
132
    # Note: We sleep after we consume a message so we give a chance to other
133
    # green threads to run. If we don't do that, ConsumerMixin will block on
134
    # waiting for a message on the queue.
135
136
    def on_consume_end(self, connection, channel):
137
        super(TriggerWatcher, self).on_consume_end(connection=connection,
138
                                                   channel=channel)
139
        eventlet.sleep(seconds=self.sleep_interval)
140
141
    def on_iteration(self):
142
        super(TriggerWatcher, self).on_iteration()
143
        eventlet.sleep(seconds=self.sleep_interval)
144
145
    def _load_triggers_from_db(self):
146
        for trigger_type in self._trigger_types:
147
            for trigger in Trigger.query(type=trigger_type):
148
                LOG.debug('Found existing trigger: %s in db.' % trigger)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
149
                self._handlers[publishers.CREATE_RK](trigger)
150
151
    @staticmethod
152
    def _get_queue(queue_suffix, exclusive):
153
        queue_name = queue_utils.get_queue_name(queue_name_base='st2.trigger.watch',
154
                                                queue_name_suffix=queue_suffix,
155
                                                add_random_uuid_to_suffix=True
156
                                                )
157
        return reactor.get_trigger_cud_queue(queue_name, routing_key='#', exclusive=exclusive)
158