Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/transport/consumers.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import abc
18
import eventlet
19
import six
20
21
from kombu.mixins import ConsumerMixin
22
from oslo_config import cfg
23
24
from st2common import log as logging
25
from st2common.util.greenpooldispatch import BufferedDispatcher
26
27
__all__ = [
28
    'QueueConsumer',
29
    'StagedQueueConsumer',
30
    'ActionsQueueConsumer',
31
32
    'MessageHandler',
33
    'StagedMessageHandler'
34
]
35
36
LOG = logging.getLogger(__name__)
37
38
39
class QueueConsumer(ConsumerMixin):
40
    def __init__(self, connection, queues, handler):
41
        self.connection = connection
42
        self._dispatcher = BufferedDispatcher()
43
        self._queues = queues
44
        self._handler = handler
45
46
    def shutdown(self):
47
        self._dispatcher.shutdown()
48
49
    def get_consumers(self, Consumer, channel):
50
        consumer = Consumer(queues=self._queues, accept=['pickle'], callbacks=[self.process])
51
52
        # use prefetch_count=1 for fair dispatch. This way workers that finish an item get the next
53
        # task and the work does not get queued behind any single large item.
54
        consumer.qos(prefetch_count=1)
55
56
        return [consumer]
57
58
    def process(self, body, message):
59
        try:
60
            if not isinstance(body, self._handler.message_type):
61
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
62
63
            self._dispatcher.dispatch(self._process_message, body)
64
        except:
65
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
66
        finally:
67
            # At this point we will always ack a message.
68
            message.ack()
69
70
    def _process_message(self, body):
71
        try:
72
            self._handler.process(body)
73
        except:
74
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
75
76
77
class StagedQueueConsumer(QueueConsumer):
78
    """
79
    Used by ``StagedMessageHandler`` to effectively manage it 2 step message handling.
80
    """
81
82
    def process(self, body, message):
83
        try:
84
            if not isinstance(body, self._handler.message_type):
85
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
86
            response = self._handler.pre_ack_process(body)
87
            self._dispatcher.dispatch(self._process_message, response)
88
        except:
89
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
90
        finally:
91
            # At this point we will always ack a message.
92
            message.ack()
93
94
95
class ActionsQueueConsumer(QueueConsumer):
96
    """
97
    Special Queue Consumer for action runner which uses multiple BufferedDispatcher pools:
98
99
    1. For regular (non-workflow) actions
100
    2. One for workflow actions
101
102
    This way we can ensure workflow actions never block non-workflow actions.
103
    """
104
105
    def __init__(self, connection, queues, handler):
0 ignored issues
show
The __init__ method of the super-class QueueConsumer is not called.

It is generally advisable to initialize the super-class by calling its __init__ method:

class SomeParent:
    def __init__(self):
        self.x = 1

class SomeChild(SomeParent):
    def __init__(self):
        # Initialize the super class
        SomeParent.__init__(self)
Loading history...
106
        self.connection = connection
107
108
        self._queues = queues
109
        self._handler = handler
110
111
        workflows_pool_size = cfg.CONF.actionrunner.workflows_pool_size
112
        actions_pool_size = cfg.CONF.actionrunner.actions_pool_size
113
        self._workflows_dispatcher = BufferedDispatcher(dispatch_pool_size=workflows_pool_size,
114
                                                        name='workflows-dispatcher')
115
        self._actions_dispatcher = BufferedDispatcher(dispatch_pool_size=actions_pool_size,
116
                                                      name='actions-dispatcher')
117
118
    def process(self, body, message):
119
        try:
120
            if not isinstance(body, self._handler.message_type):
121
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
122
123
            action_is_workflow = getattr(body, 'action_is_workflow', False)
124
            if action_is_workflow:
125
                # Use workflow dispatcher queue
126
                dispatcher = self._workflows_dispatcher
127
            else:
128
                # Use queue for regular or workflow actions
129
                dispatcher = self._actions_dispatcher
130
131
            LOG.debug('Using BufferedDispatcher pool: "%s"', str(dispatcher))
132
            dispatcher.dispatch(self._process_message, body)
133
        except:
134
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
135
        finally:
136
            # At this point we will always ack a message.
137
            message.ack()
138
139
    def shutdown(self):
140
        self._workflows_dispatcher.shutdown()
141
        self._actions_dispatcher.shutdown()
142
143
144
class VariableMessageQueueConsumer(QueueConsumer):
145
    """
146
    Used by ``VariableMessageHandler`` to processes multiple message types.
147
    """
148
149
    def process(self, body, message):
150
        try:
151
            if not self._handler.message_types.get(type(body)):
152
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
153
154
            self._dispatcher.dispatch(self._process_message, body)
155
        except:
156
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
157
        finally:
158
            # At this point we will always ack a message.
159
            message.ack()
160
161
162
@six.add_metaclass(abc.ABCMeta)
163
class MessageHandler(object):
164
    message_type = None
165
166
    def __init__(self, connection, queues):
167
        self._queue_consumer = self.get_queue_consumer(connection=connection,
168
                                                       queues=queues)
169
        self._consumer_thread = None
170
171
    def start(self, wait=False):
172
        LOG.info('Starting %s...', self.__class__.__name__)
173
        self._consumer_thread = eventlet.spawn(self._queue_consumer.run)
174
175
        if wait:
176
            self.wait()
177
178
    def wait(self):
179
        self._consumer_thread.wait()
180
181
    def shutdown(self):
182
        LOG.info('Shutting down %s...', self.__class__.__name__)
183
        self._queue_consumer.shutdown()
184
185
    @abc.abstractmethod
186
    def process(self, message):
187
        pass
188
189
    def get_queue_consumer(self, connection, queues):
190
        return QueueConsumer(connection=connection, queues=queues, handler=self)
191
192
193
@six.add_metaclass(abc.ABCMeta)
194
class StagedMessageHandler(MessageHandler):
195
    """
196
    MessageHandler to deal with messages in 2 steps.
197
        1. pre_ack_process : This is called on the handler before ack-ing the message.
198
        2. process: Called after ack-in the messages
199
    This 2 step approach provides a way for the handler to do some hadling like saving to DB etc
200
    before acknowleding and then performing future processing async. This way even if the handler
201
    or owning process is taken down system will still maintain track of the message.
202
    """
203
204
    @abc.abstractmethod
205
    def pre_ack_process(self, message):
206
        """
207
        Called before acknowleding a message. Good place to track the message via a DB entry or some
208
        other applicable mechnism.
209
210
        The reponse of this method is passed into the ``process`` method. This was whatever is the
211
        processed version of the message can be moved forward. It is always possible to simply
212
        return ``message`` and have ``process`` handle the original message.
213
        """
214
        pass
215
216
    def get_queue_consumer(self, connection, queues):
217
        return StagedQueueConsumer(connection=connection, queues=queues, handler=self)
218
219
220
@six.add_metaclass(abc.ABCMeta)
221
class VariableMessageHandler(MessageHandler):
222
    """
223
    VariableMessageHandler processes multiple message types.
224
    """
225
226
    def get_queue_consumer(self, connection, queues):
227
        return VariableMessageQueueConsumer(connection=connection, queues=queues, handler=self)
228