1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import abc |
||
18 | import eventlet |
||
19 | import six |
||
20 | |||
21 | from kombu.mixins import ConsumerMixin |
||
22 | from oslo_config import cfg |
||
23 | |||
24 | from st2common import log as logging |
||
25 | from st2common.util.greenpooldispatch import BufferedDispatcher |
||
26 | |||
27 | __all__ = [ |
||
28 | 'QueueConsumer', |
||
29 | 'StagedQueueConsumer', |
||
30 | 'ActionsQueueConsumer', |
||
31 | |||
32 | 'MessageHandler', |
||
33 | 'StagedMessageHandler' |
||
34 | ] |
||
35 | |||
36 | LOG = logging.getLogger(__name__) |
||
37 | |||
38 | |||
39 | class QueueConsumer(ConsumerMixin): |
||
40 | def __init__(self, connection, queues, handler): |
||
41 | self.connection = connection |
||
42 | self._dispatcher = BufferedDispatcher() |
||
43 | self._queues = queues |
||
44 | self._handler = handler |
||
45 | |||
46 | def shutdown(self): |
||
47 | self._dispatcher.shutdown() |
||
48 | |||
49 | def get_consumers(self, Consumer, channel): |
||
50 | consumer = Consumer(queues=self._queues, accept=['pickle'], callbacks=[self.process]) |
||
51 | |||
52 | # use prefetch_count=1 for fair dispatch. This way workers that finish an item get the next |
||
53 | # task and the work does not get queued behind any single large item. |
||
54 | consumer.qos(prefetch_count=1) |
||
55 | |||
56 | return [consumer] |
||
57 | |||
58 | def process(self, body, message): |
||
59 | try: |
||
60 | if not isinstance(body, self._handler.message_type): |
||
61 | raise TypeError('Received an unexpected type "%s" for payload.' % type(body)) |
||
62 | |||
63 | self._dispatcher.dispatch(self._process_message, body) |
||
64 | except: |
||
65 | LOG.exception('%s failed to process message: %s', self.__class__.__name__, body) |
||
66 | finally: |
||
67 | # At this point we will always ack a message. |
||
68 | message.ack() |
||
69 | |||
70 | def _process_message(self, body): |
||
71 | try: |
||
72 | self._handler.process(body) |
||
73 | except: |
||
74 | LOG.exception('%s failed to process message: %s', self.__class__.__name__, body) |
||
75 | |||
76 | |||
77 | class StagedQueueConsumer(QueueConsumer): |
||
78 | """ |
||
79 | Used by ``StagedMessageHandler`` to effectively manage it 2 step message handling. |
||
80 | """ |
||
81 | |||
82 | def process(self, body, message): |
||
83 | try: |
||
84 | if not isinstance(body, self._handler.message_type): |
||
85 | raise TypeError('Received an unexpected type "%s" for payload.' % type(body)) |
||
86 | response = self._handler.pre_ack_process(body) |
||
87 | self._dispatcher.dispatch(self._process_message, response) |
||
88 | except: |
||
89 | LOG.exception('%s failed to process message: %s', self.__class__.__name__, body) |
||
90 | finally: |
||
91 | # At this point we will always ack a message. |
||
92 | message.ack() |
||
93 | |||
94 | |||
95 | class ActionsQueueConsumer(QueueConsumer): |
||
96 | """ |
||
97 | Special Queue Consumer for action runner which uses multiple BufferedDispatcher pools: |
||
98 | |||
99 | 1. For regular (non-workflow) actions |
||
100 | 2. One for workflow actions |
||
101 | |||
102 | This way we can ensure workflow actions never block non-workflow actions. |
||
103 | """ |
||
104 | |||
105 | def __init__(self, connection, queues, handler): |
||
0 ignored issues
–
show
|
|||
106 | self.connection = connection |
||
107 | |||
108 | self._queues = queues |
||
109 | self._handler = handler |
||
110 | |||
111 | workflows_pool_size = cfg.CONF.actionrunner.workflows_pool_size |
||
112 | actions_pool_size = cfg.CONF.actionrunner.actions_pool_size |
||
113 | self._workflows_dispatcher = BufferedDispatcher(dispatch_pool_size=workflows_pool_size, |
||
114 | name='workflows-dispatcher') |
||
115 | self._actions_dispatcher = BufferedDispatcher(dispatch_pool_size=actions_pool_size, |
||
116 | name='actions-dispatcher') |
||
117 | |||
118 | def process(self, body, message): |
||
119 | try: |
||
120 | if not isinstance(body, self._handler.message_type): |
||
121 | raise TypeError('Received an unexpected type "%s" for payload.' % type(body)) |
||
122 | |||
123 | action_is_workflow = getattr(body, 'action_is_workflow', False) |
||
124 | if action_is_workflow: |
||
125 | # Use workflow dispatcher queue |
||
126 | dispatcher = self._workflows_dispatcher |
||
127 | else: |
||
128 | # Use queue for regular or workflow actions |
||
129 | dispatcher = self._actions_dispatcher |
||
130 | |||
131 | LOG.debug('Using BufferedDispatcher pool: "%s"', str(dispatcher)) |
||
132 | dispatcher.dispatch(self._process_message, body) |
||
133 | except: |
||
134 | LOG.exception('%s failed to process message: %s', self.__class__.__name__, body) |
||
135 | finally: |
||
136 | # At this point we will always ack a message. |
||
137 | message.ack() |
||
138 | |||
139 | def shutdown(self): |
||
140 | self._workflows_dispatcher.shutdown() |
||
141 | self._actions_dispatcher.shutdown() |
||
142 | |||
143 | |||
144 | class VariableMessageQueueConsumer(QueueConsumer): |
||
145 | """ |
||
146 | Used by ``VariableMessageHandler`` to processes multiple message types. |
||
147 | """ |
||
148 | |||
149 | def process(self, body, message): |
||
150 | try: |
||
151 | if not self._handler.message_types.get(type(body)): |
||
152 | raise TypeError('Received an unexpected type "%s" for payload.' % type(body)) |
||
153 | |||
154 | self._dispatcher.dispatch(self._process_message, body) |
||
155 | except: |
||
156 | LOG.exception('%s failed to process message: %s', self.__class__.__name__, body) |
||
157 | finally: |
||
158 | # At this point we will always ack a message. |
||
159 | message.ack() |
||
160 | |||
161 | |||
162 | @six.add_metaclass(abc.ABCMeta) |
||
163 | class MessageHandler(object): |
||
164 | message_type = None |
||
165 | |||
166 | def __init__(self, connection, queues): |
||
167 | self._queue_consumer = self.get_queue_consumer(connection=connection, |
||
168 | queues=queues) |
||
169 | self._consumer_thread = None |
||
170 | |||
171 | def start(self, wait=False): |
||
172 | LOG.info('Starting %s...', self.__class__.__name__) |
||
173 | self._consumer_thread = eventlet.spawn(self._queue_consumer.run) |
||
174 | |||
175 | if wait: |
||
176 | self.wait() |
||
177 | |||
178 | def wait(self): |
||
179 | self._consumer_thread.wait() |
||
180 | |||
181 | def shutdown(self): |
||
182 | LOG.info('Shutting down %s...', self.__class__.__name__) |
||
183 | self._queue_consumer.shutdown() |
||
184 | |||
185 | @abc.abstractmethod |
||
186 | def process(self, message): |
||
187 | pass |
||
188 | |||
189 | def get_queue_consumer(self, connection, queues): |
||
190 | return QueueConsumer(connection=connection, queues=queues, handler=self) |
||
191 | |||
192 | |||
193 | @six.add_metaclass(abc.ABCMeta) |
||
194 | class StagedMessageHandler(MessageHandler): |
||
195 | """ |
||
196 | MessageHandler to deal with messages in 2 steps. |
||
197 | 1. pre_ack_process : This is called on the handler before ack-ing the message. |
||
198 | 2. process: Called after ack-in the messages |
||
199 | This 2 step approach provides a way for the handler to do some hadling like saving to DB etc |
||
200 | before acknowleding and then performing future processing async. This way even if the handler |
||
201 | or owning process is taken down system will still maintain track of the message. |
||
202 | """ |
||
203 | |||
204 | @abc.abstractmethod |
||
205 | def pre_ack_process(self, message): |
||
206 | """ |
||
207 | Called before acknowleding a message. Good place to track the message via a DB entry or some |
||
208 | other applicable mechnism. |
||
209 | |||
210 | The reponse of this method is passed into the ``process`` method. This was whatever is the |
||
211 | processed version of the message can be moved forward. It is always possible to simply |
||
212 | return ``message`` and have ``process`` handle the original message. |
||
213 | """ |
||
214 | pass |
||
215 | |||
216 | def get_queue_consumer(self, connection, queues): |
||
217 | return StagedQueueConsumer(connection=connection, queues=queues, handler=self) |
||
218 | |||
219 | |||
220 | @six.add_metaclass(abc.ABCMeta) |
||
221 | class VariableMessageHandler(MessageHandler): |
||
222 | """ |
||
223 | VariableMessageHandler processes multiple message types. |
||
224 | """ |
||
225 | |||
226 | def get_queue_consumer(self, connection, queues): |
||
227 | return VariableMessageQueueConsumer(connection=connection, queues=queues, handler=self) |
||
228 |
It is generally advisable to initialize the super-class by calling its
__init__
method: