1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | # pylint: disable=assignment-from-none |
||
16 | |||
17 | from __future__ import absolute_import |
||
18 | import eventlet |
||
19 | from kombu.mixins import ConsumerMixin |
||
20 | from kombu import Connection |
||
21 | |||
22 | from st2common import log as logging |
||
23 | from st2common.persistence.trigger import Trigger |
||
24 | from st2common.transport import reactor, publishers |
||
25 | from st2common.transport import utils as transport_utils |
||
26 | import st2common.util.queues as queue_utils |
||
27 | |||
28 | LOG = logging.getLogger(__name__) |
||
29 | |||
30 | |||
31 | class TriggerWatcher(ConsumerMixin): |
||
32 | |||
33 | sleep_interval = 0 # sleep to co-operatively yield after processing each message |
||
34 | |||
35 | def __init__(self, create_handler, update_handler, delete_handler, |
||
36 | trigger_types=None, queue_suffix=None, exclusive=False): |
||
37 | """ |
||
38 | :param create_handler: Function which is called on TriggerDB create event. |
||
39 | :type create_handler: ``callable`` |
||
40 | |||
41 | :param update_handler: Function which is called on TriggerDB update event. |
||
42 | :type update_handler: ``callable`` |
||
43 | |||
44 | :param delete_handler: Function which is called on TriggerDB delete event. |
||
45 | :type delete_handler: ``callable`` |
||
46 | |||
47 | :param trigger_types: If provided, handler function will only be called |
||
48 | if the trigger in the message payload is included |
||
49 | in this list. |
||
50 | :type trigger_types: ``list`` |
||
51 | |||
52 | :param exclusive: If the Q is exclusive to a specific connection which is then |
||
53 | single connection created by TriggerWatcher. When the connection |
||
54 | breaks the Q is removed by the message broker. |
||
55 | :type exclusive: ``bool`` |
||
56 | """ |
||
57 | # TODO: Handle trigger type filtering using routing key |
||
58 | self._create_handler = create_handler |
||
59 | self._update_handler = update_handler |
||
60 | self._delete_handler = delete_handler |
||
61 | self._trigger_types = trigger_types |
||
62 | self._trigger_watch_q = self._get_queue(queue_suffix, exclusive=exclusive) |
||
63 | |||
64 | self.connection = None |
||
65 | self._load_thread = None |
||
66 | self._updates_thread = None |
||
67 | |||
68 | self._handlers = { |
||
69 | publishers.CREATE_RK: create_handler, |
||
70 | publishers.UPDATE_RK: update_handler, |
||
71 | publishers.DELETE_RK: delete_handler |
||
72 | } |
||
73 | |||
74 | def get_consumers(self, Consumer, channel): |
||
75 | return [Consumer(queues=[self._trigger_watch_q], |
||
76 | accept=['pickle'], |
||
77 | callbacks=[self.process_task])] |
||
78 | |||
79 | def process_task(self, body, message): |
||
80 | LOG.debug('process_task') |
||
81 | LOG.debug(' body: %s', body) |
||
82 | LOG.debug(' message.properties: %s', message.properties) |
||
83 | LOG.debug(' message.delivery_info: %s', message.delivery_info) |
||
84 | |||
85 | routing_key = message.delivery_info.get('routing_key', '') |
||
86 | handler = self._handlers.get(routing_key, None) |
||
87 | |||
88 | try: |
||
89 | if not handler: |
||
90 | LOG.debug('Skipping message %s as no handler was found.', message) |
||
91 | return |
||
92 | |||
93 | trigger_type = getattr(body, 'type', None) |
||
94 | if self._trigger_types and trigger_type not in self._trigger_types: |
||
95 | LOG.debug('Skipping message %s since trigger_type doesn\'t match (type=%s)', |
||
96 | message, trigger_type) |
||
97 | return |
||
98 | |||
99 | try: |
||
100 | handler(body) |
||
101 | except Exception as e: |
||
102 | LOG.exception('Handling failed. Message body: %s. Exception: %s', |
||
103 | body, e.message) |
||
104 | finally: |
||
105 | message.ack() |
||
106 | |||
107 | eventlet.sleep(self.sleep_interval) |
||
108 | |||
109 | def start(self): |
||
110 | try: |
||
111 | self.connection = Connection(transport_utils.get_messaging_urls()) |
||
112 | self._updates_thread = eventlet.spawn(self.run) |
||
113 | self._load_thread = eventlet.spawn(self._load_triggers_from_db) |
||
114 | except: |
||
115 | LOG.exception('Failed to start watcher.') |
||
116 | self.connection.release() |
||
117 | |||
118 | def stop(self): |
||
119 | try: |
||
120 | self._updates_thread = eventlet.kill(self._updates_thread) |
||
121 | self._load_thread = eventlet.kill(self._load_thread) |
||
122 | finally: |
||
123 | self.connection.release() |
||
124 | |||
125 | # Note: We sleep after we consume a message so we give a chance to other |
||
126 | # green threads to run. If we don't do that, ConsumerMixin will block on |
||
127 | # waiting for a message on the queue. |
||
128 | |||
129 | def on_consume_end(self, connection, channel): |
||
130 | super(TriggerWatcher, self).on_consume_end(connection=connection, |
||
131 | channel=channel) |
||
132 | eventlet.sleep(seconds=self.sleep_interval) |
||
133 | |||
134 | def on_iteration(self): |
||
135 | super(TriggerWatcher, self).on_iteration() |
||
136 | eventlet.sleep(seconds=self.sleep_interval) |
||
137 | |||
138 | def _load_triggers_from_db(self): |
||
139 | for trigger_type in self._trigger_types: |
||
140 | for trigger in Trigger.query(type=trigger_type): |
||
141 | LOG.debug('Found existing trigger: %s in db.' % trigger) |
||
0 ignored issues
–
show
Coding Style
Best Practice
introduced
by
Loading history...
|
|||
142 | self._handlers[publishers.CREATE_RK](trigger) |
||
143 | |||
144 | @staticmethod |
||
145 | def _get_queue(queue_suffix, exclusive): |
||
146 | queue_name = queue_utils.get_queue_name(queue_name_base='st2.trigger.watch', |
||
147 | queue_name_suffix=queue_suffix, |
||
148 | add_random_uuid_to_suffix=True |
||
149 | ) |
||
150 | return reactor.get_trigger_cud_queue(queue_name, routing_key='#', exclusive=exclusive) |
||
151 |