Completed
Pull Request — master (#2643)
by Manas
06:21
created

MessageHandler._get_queue_consumer()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 2
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import abc
17
import eventlet
18
import six
19
20
from kombu.mixins import ConsumerMixin
21
22
from st2common import log as logging
23
from st2common.util.greenpooldispatch import BufferedDispatcher
24
25
26
LOG = logging.getLogger(__name__)
27
28
29
class QueueConsumer(ConsumerMixin):
30
    def __init__(self, connection, queues, handler):
31
        self.connection = connection
32
        self._dispatcher = BufferedDispatcher()
33
        self._queues = queues
34
        self._handler = handler
35
36
    def shutdown(self):
37
        self._dispatcher.shutdown()
38
39
    def get_consumers(self, Consumer, channel):
40
        consumer = Consumer(queues=self._queues, accept=['pickle'], callbacks=[self.process])
41
42
        # use prefetch_count=1 for fair dispatch. This way workers that finish an item get the next
43
        # task and the work does not get queued behind any single large item.
44
        consumer.qos(prefetch_count=1)
45
46
        return [consumer]
47
48
    def process(self, body, message):
49
        try:
50
            if not isinstance(body, self._handler.message_type):
51
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
52
53
            self._dispatcher.dispatch(self._process_message, body)
54
        except:
55
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
56
        finally:
57
            # At this point we will always ack a message.
58
            message.ack()
59
60
    def _process_message(self, body):
61
        try:
62
            self._handler.process(body)
63
        except:
64
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
65
66
67
class StagedQueueConsumer(QueueConsumer):
68
69
    def process(self, body, message):
70
        try:
71
            if not isinstance(body, self._handler.message_type):
72
                raise TypeError('Received an unexpected type "%s" for payload.' % type(body))
73
            processed_body = self._handler.pre_ack_process(body)
74
            self._dispatcher.dispatch(self._process_message, processed_body)
75
        except:
76
            LOG.exception('%s failed to process message: %s', self.__class__.__name__, body)
77
        finally:
78
            # At this point we will always ack a message.
79
            message.ack()
80
81
82
@six.add_metaclass(abc.ABCMeta)
83
class MessageHandler(object):
84
    message_type = None
85
86
    def __init__(self, connection, queues):
87
        self._queue_consumer = self._get_queue_consumer(connection, queues)
88
        self._consumer_thread = None
89
90
    def start(self, wait=False):
91
        LOG.info('Starting %s...', self.__class__.__name__)
92
        self._consumer_thread = eventlet.spawn(self._queue_consumer.run)
93
94
        if wait:
95
            self.wait()
96
97
    def wait(self):
98
        self._consumer_thread.wait()
99
100
    def shutdown(self):
101
        LOG.info('Shutting down %s...', self.__class__.__name__)
102
        self._queue_consumer.shutdown()
103
104
    @abc.abstractmethod
105
    def process(self, message):
106
        pass
107
108
    def _get_queue_consumer(self, connection, queues):
109
        return QueueConsumer(connection, queues, self)
110
111
112
@six.add_metaclass(abc.ABCMeta)
113
class StagedMessageHandler(MessageHandler):
114
115
    @abc.abstractmethod
116
    def pre_ack_process(self, message):
117
        pass
118
119
    def _get_queue_consumer(self, connection, queues):
120
        return StagedQueueConsumer(connection, queues, self)
121