Passed
Push — master ( 82f613...a5d438 )
by Plexxi
03:34
created

ActionsQueueConsumer   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 47
Duplicated Lines 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
c 5
b 0
f 0
dl 0
loc 47
rs 10
wmc 6

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 12 1
A shutdown() 0 3 1
A process() 0 20 4
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import six
19
20
from kombu.mixins import ConsumerMixin
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.util.greenpooldispatch import BufferedDispatcher
25
26
__all__ = [
27
    'QueueConsumer',
28
    'StagedQueueConsumer',
29
    'ActionsQueueConsumer',
30
31
    'MessageHandler',
32
    'StagedMessageHandler'
33
]
34
35
LOG = logging.getLogger(__name__)
36
37
38
class QueueConsumer(ConsumerMixin):
39
    def __init__(self, connection, queues, handler):
40
        self.connection = connection
41
        self._dispatcher = BufferedDispatcher()
42
        self._queues = queues
43
        self._handler = handler
44
45
    def shutdown(self):
46
        self._dispatcher.shutdown()
47
48
    def get_consumers(self, Consumer, channel):
49
        consumer = Consumer(queues=self._queues, accept=['pickle'], callbacks=[self.process])
50
51
        # use prefetch_count=1 for fair dispatch. This way workers that finish an item get the next
52
        # task and the work does not get queued behind any single large item.
53
        consumer.qos(prefetch_count=1)
54
55
        return [consumer]
56
57
    def process(self, body, message):
58
        try:
59
            if not isinstance(body, self._handler.message_type):
60
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
61
62
            self._dispatcher.dispatch(self._process_message, body)
63
        except:
64
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
65
        finally:
66
            # At this point we will always ack a message.
67
            message.ack()
68
69
    def _process_message(self, body):
70
        try:
71
            self._handler.process(body)
72
        except:
73
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
74
75
76
class StagedQueueConsumer(QueueConsumer):
77
    """
78
    Used by ``StagedMessageHandler`` to effectively manage it 2 step message handling.
79
    """
80
81
    def process(self, body, message):
82
        try:
83
            if not isinstance(body, self._handler.message_type):
84
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
85
            response = self._handler.pre_ack_process(body)
86
            self._dispatcher.dispatch(self._process_message, response)
87
        except:
88
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
89
        finally:
90
            # At this point we will always ack a message.
91
            message.ack()
92
93
94
class ActionsQueueConsumer(QueueConsumer):
95
    """
96
    Special Queue Consumer for action runner which uses multiple BufferedDispatcher pools:
97
98
    1. For regular (non-workflow) actions
99
    2. One for workflow actions
100
101
    This way we can ensure workflow actions never block non-workflow actions.
102
    """
103
104
    def __init__(self, connection, queues, handler):
0 ignored issues
show
Bug introduced by
The __init__ method of the super-class QueueConsumer is not called.

It is generally advisable to initialize the super-class by calling its __init__ method:

class SomeParent:
    def __init__(self):
        self.x = 1

class SomeChild(SomeParent):
    def __init__(self):
        # Initialize the super class
        SomeParent.__init__(self)
Loading history...
105
        self.connection = connection
106
107
        self._queues = queues
108
        self._handler = handler
109
110
        workflows_pool_size = cfg.CONF.actionrunner.workflows_pool_size
111
        actions_pool_size = cfg.CONF.actionrunner.actions_pool_size
112
        self._workflows_dispatcher = BufferedDispatcher(dispatch_pool_size=workflows_pool_size,
113
                                                        name='workflows-dispatcher')
114
        self._actions_dispatcher = BufferedDispatcher(dispatch_pool_size=actions_pool_size,
115
                                                      name='actions-dispatcher')
116
117
    def process(self, body, message):
118
        try:
119
            if not isinstance(body, self._handler.message_type):
120
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
121
122
            action_is_workflow = getattr(body, 'action_is_workflow', False)
123
            if action_is_workflow:
124
                # Use workflow dispatcher queue
125
                dispatcher = self._workflows_dispatcher
126
            else:
127
                # Use queue for regular or workflow actions
128
                dispatcher = self._actions_dispatcher
129
130
            LOG.debug('Using BufferedDispatcher pool: "%s"', str(dispatcher))
131
            dispatcher.dispatch(self._process_message, body)
132
        except:
133
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
134
        finally:
135
            # At this point we will always ack a message.
136
            message.ack()
137
138
    def shutdown(self):
139
        self._workflows_dispatcher.shutdown()
140
        self._actions_dispatcher.shutdown()
141
142
143
@six.add_metaclass(abc.ABCMeta)
144
class MessageHandler(object):
145
    message_type = None
146
147
    def __init__(self, connection, queues):
148
        self._queue_consumer = self.get_queue_consumer(connection=connection,
149
                                                       queues=queues)
150
        self._consumer_thread = None
151
152
    def start(self, wait=False):
153
        LOG.info('Starting %s...', self.__class__.__name__)
154
        self._consumer_thread = eventlet.spawn(self._queue_consumer.run)
155
156
        if wait:
157
            self.wait()
158
159
    def wait(self):
160
        self._consumer_thread.wait()
161
162
    def shutdown(self):
163
        LOG.info('Shutting down %s...', self.__class__.__name__)
164
        self._queue_consumer.shutdown()
165
166
    @abc.abstractmethod
167
    def process(self, message):
168
        pass
169
170
    def get_queue_consumer(self, connection, queues):
171
        return QueueConsumer(connection=connection, queues=queues, handler=self)
172
173
174
@six.add_metaclass(abc.ABCMeta)
175
class StagedMessageHandler(MessageHandler):
176
    """
177
    MessageHandler to deal with messages in 2 steps.
178
        1. pre_ack_process : This is called on the handler before ack-ing the message.
179
        2. process: Called after ack-in the messages
180
    This 2 step approach provides a way for the handler to do some hadling like saving to DB etc
181
    before acknowleding and then performing future processing async. This way even if the handler
182
    or owning process is taken down system will still maintain track of the message.
183
    """
184
185
    @abc.abstractmethod
186
    def pre_ack_process(self, message):
187
        """
188
        Called before acknowleding a message. Good place to track the message via a DB entry or some
189
        other applicable mechnism.
190
191
        The reponse of this method is passed into the ``process`` method. This was whatever is the
192
        processed version of the message can be moved forward. It is always possible to simply
193
        return ``message`` and have ``process`` handle the original message.
194
        """
195
        pass
196
197
    def get_queue_consumer(self, connection, queues):
198
        return StagedQueueConsumer(connection=connection, queues=queues, handler=self)
199