Completed
Pull Request — master (#2643)
by Manas
15:19 queued 09:14
created

MessageHandler._get_queue_consumer()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import six
19
20
from kombu.mixins import ConsumerMixin
21
22
from st2common import log as logging
23
from st2common.util.greenpooldispatch import BufferedDispatcher
24
25
26
LOG = logging.getLogger(__name__)
27
28
29
class QueueConsumer(ConsumerMixin):
30
    def __init__(self, connection, queues, handler):
31
        self.connection = connection
32
        self._dispatcher = BufferedDispatcher()
33
        self._queues = queues
34
        self._handler = handler
35
36
    def shutdown(self):
37
        self._dispatcher.shutdown()
38
39
    def get_consumers(self, Consumer, channel):
40
        consumer = Consumer(queues=self._queues, accept=['pickle'], callbacks=[self.process])
41
42
        # use prefetch_count=1 for fair dispatch. This way workers that finish an item get the next
43
        # task and the work does not get queued behind any single large item.
44
        consumer.qos(prefetch_count=1)
45
46
        return [consumer]
47
48
    def process(self, body, message):
49
        try:
50
            if not isinstance(body, self._handler.message_type):
51
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
52
53
            self._dispatcher.dispatch(self._process_message, body)
54
        except:
55
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
56
57
        message.ack()
58
59
    def _process_message(self, body):
60
        try:
61
            self._handler.process(body)
62
        except:
63
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
64
65
66
class StagedQueueConsumer(QueueConsumer):
67
68
    def process(self, body, message):
69
        try:
70
            if not isinstance(body, self._handler.message_type):
71
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
72
            processed_body = self._handler.pre_ack_process(body)
73
            self._dispatcher.dispatch(self._process_message, processed_body)
74
        except:
75
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
76
        # At this point we will always ack a specific message.
77
        message.ack()
78
79
80
@six.add_metaclass(abc.ABCMeta)
81
class MessageHandler(object):
82
    message_type = None
83
84
    def __init__(self, connection, queues):
85
        self._queue_consumer = self._get_queue_consumer(connection, queues)
86
        self._consumer_thread = None
87
88
    def start(self, wait=False):
89
        LOG.info('Starting %s...', self.__class__.__name__)
90
        self._consumer_thread = eventlet.spawn(self._queue_consumer.run)
91
92
        if wait:
93
            self.wait()
94
95
    def wait(self):
96
        self._consumer_thread.wait()
97
98
    def shutdown(self):
99
        LOG.info('Shutting down %s...', self.__class__.__name__)
100
        self._queue_consumer.shutdown()
101
102
    @abc.abstractmethod
103
    def process(self, message):
104
        pass
105
106
    def _get_queue_consumer(self, connection, queues):
107
        return QueueConsumer(connection, queues, self)
108
109
110
@six.add_metaclass(abc.ABCMeta)
111
class StagedMessageHandler(MessageHandler):
112
113
    @abc.abstractmethod
114
    def pre_ack_process(self, message):
115
        pass
116
117
    def _get_queue_consumer(self, connection, queues):
118
        return StagedQueueConsumer(connection, queues, self)
119