Test Failed
Push — master ( f06717...a2792c )
by Tomaz
01:48 queued 10s
created

st2common/st2common/transport/bootstrap_utils.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import socket
19
20
import six
21
import retrying
22
from oslo_config import cfg
23
from kombu.serialization import register
24
from kombu.serialization import pickle
25
from kombu.serialization import pickle_protocol
26
from kombu.serialization import pickle_loads
27
28
from st2common import log as logging
29
from st2common.transport import utils as transport_utils
30
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
31
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
32
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
33
from st2common.transport.execution import EXECUTION_XCHG
34
from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG
35
from st2common.transport.reactor import SENSOR_CUD_XCHG
36
from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
37
from st2common.transport import reactor
38
from st2common.transport.workflow import WORKFLOW_EXECUTION_XCHG
39
from st2common.transport.workflow import WORKFLOW_EXECUTION_STATUS_MGMT_XCHG
40
from st2common.transport.queues import ACTIONSCHEDULER_REQUEST_QUEUE
41
from st2common.transport.queues import ACTIONRUNNER_WORK_QUEUE
42
from st2common.transport.queues import ACTIONRUNNER_CANCEL_QUEUE
43
from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE
44
from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE
45
from st2common.transport.queues import RULESENGINE_WORK_QUEUE
46
from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE
47
from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE
48
from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE
49
from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE
50
from st2common.transport.queues import WORKFLOW_EXECUTION_WORK_QUEUE
51
from st2common.transport.queues import WORKFLOW_EXECUTION_RESUME_QUEUE
52
53
LOG = logging.getLogger('st2common.transport.bootstrap')
54
55
__all__ = [
56
    'register_exchanges',
57
    'register_exchanges_with_retry',
58
    'register_kombu_serializers',
59
60
    'EXCHANGES',
61
    'QUEUES'
62
]
63
64
# List of exchanges which are pre-declared on service set up.
65
EXCHANGES = [
66
    ACTIONEXECUTIONSTATE_XCHG,
67
    ANNOUNCEMENT_XCHG,
68
    EXECUTION_XCHG,
69
    LIVEACTION_XCHG,
70
    LIVEACTION_STATUS_MGMT_XCHG,
71
    TRIGGER_CUD_XCHG,
72
    TRIGGER_INSTANCE_XCHG,
73
    SENSOR_CUD_XCHG,
74
    WORKFLOW_EXECUTION_XCHG,
75
    WORKFLOW_EXECUTION_STATUS_MGMT_XCHG
76
]
77
78
# List of queues which are pre-declared on service startup.
79
# All the queues need to be declared and bound up front so we can guarantee messages get routed
80
# and don't get lost even if there are no consumers online
81
QUEUES = [
82
    ACTIONSCHEDULER_REQUEST_QUEUE,
83
    ACTIONRUNNER_WORK_QUEUE,
84
    ACTIONRUNNER_CANCEL_QUEUE,
85
    NOTIFIER_ACTIONUPDATE_WORK_QUEUE,
86
    RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE,
87
    RULESENGINE_WORK_QUEUE,
88
89
    STREAM_ANNOUNCEMENT_WORK_QUEUE,
90
    STREAM_EXECUTION_ALL_WORK_QUEUE,
91
    STREAM_LIVEACTION_WORK_QUEUE,
92
    STREAM_EXECUTION_OUTPUT_QUEUE,
93
94
    WORKFLOW_EXECUTION_WORK_QUEUE,
95
    WORKFLOW_EXECUTION_RESUME_QUEUE,
96
97
    # Those queues are dynamically / late created on some class init but we still need to
98
    # pre-declare them for redis Kombu backend to work.
99
    reactor.get_trigger_cud_queue(name='st2.preinit', routing_key='init'),
100
    reactor.get_sensor_cud_queue(name='st2.preinit', routing_key='init')
101
]
102
103
104
def _do_register_exchange(exchange, connection, channel, retry_wrapper):
105
    try:
106
        kwargs = {
107
            'exchange': exchange.name,
108
            'type': exchange.type,
109
            'durable': exchange.durable,
110
            'auto_delete': exchange.auto_delete,
111
            'arguments': exchange.arguments,
112
            'nowait': False,
113
            'passive': False
114
        }
115
        # Use the retry wrapper to increase resiliency in recoverable errors.
116
        retry_wrapper.ensured(connection=connection,
117
                              obj=channel,
118
                              to_ensure_func=channel.exchange_declare,
119
                              **kwargs)
120
        LOG.debug('Registered exchange %s (%s).' % (exchange.name, str(kwargs)))
121
    except Exception:
122
        LOG.exception('Failed to register exchange: %s.', exchange.name)
123
124
125
def _do_predeclare_queue(channel, queue):
126
    LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name))
127
128
    bound_queue = None
129
130
    try:
131
        bound_queue = queue(channel)
132
        bound_queue.declare(nowait=False)
133
        LOG.debug('Predeclared queue for exchange "%s"' % (queue.exchange.name))
134
    except Exception:
135
        LOG.exception('Failed to predeclare queue for exchange "%s"' % (queue.exchange.name))
136
137
    return bound_queue
138
139
140
def register_exchanges():
141
    LOG.debug('Registering exchanges...')
142
    connection_urls = transport_utils.get_messaging_urls()
143
144
    with transport_utils.get_connection() as conn:
145
        # Use ConnectionRetryWrapper to deal with rmq clustering etc.
146
        retry_wrapper = ConnectionRetryWrapper(cluster_size=len(connection_urls), logger=LOG)
147
148
        def wrapped_register_exchanges(connection, channel):
149
            for exchange in EXCHANGES:
150
                _do_register_exchange(exchange=exchange, connection=connection, channel=channel,
151
                                      retry_wrapper=retry_wrapper)
152
153
        retry_wrapper.run(connection=conn, wrapped_callback=wrapped_register_exchanges)
154
155
        def wrapped_predeclare_queues(connection, channel):
156
            for queue in QUEUES:
157
                _do_predeclare_queue(channel=channel, queue=queue)
158
159
        retry_wrapper.run(connection=conn, wrapped_callback=wrapped_predeclare_queues)
160
161
162
def register_exchanges_with_retry():
163
    def retry_if_io_error(exception):
164
        return isinstance(exception, socket.error)
165
166
    retrying_obj = retrying.Retrying(
167
        retry_on_exception=retry_if_io_error,
168
        wait_fixed=cfg.CONF.messaging.connection_retry_wait,
169
        stop_max_attempt_number=cfg.CONF.messaging.connection_retries
170
    )
171
    return retrying_obj.call(register_exchanges)
172
173
174
def register_kombu_serializers():
175
    """
176
    Register our custom pickle serializer which knows how to handle UTF-8 (non
177
    ascii) messages.
178
179
    Default kombu pickle de-serializer calls .encode() on the bytes object without providing an
180
    encoding. This means it default to "ascii" and fail with UnicodeDecode error.
181
182
    https://github.com/celery/kombu/blob/3.0/kombu/utils/encoding.py#L47
183
    """
184
    def pickle_dumps(obj, dumper=pickle.dumps):
185
        return dumper(obj, protocol=pickle_protocol)
186
187
    if six.PY3:
188
        def str_to_bytes(s):
189
            if isinstance(s, str):
190
                return s.encode('utf-8')
191
            return s
192
193
        def unpickle(s):
194
            return pickle_loads(str_to_bytes(s))
195
    else:
196
        def str_to_bytes(s):                # noqa
197
            if isinstance(s, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'unicode'
Loading history...
198
                return s.encode('utf-8')
199
            return s
200
        unpickle = pickle_loads  # noqa
201
202
    register('pickle', pickle_dumps, unpickle,
203
             content_type='application/x-python-serialize',
204
             content_encoding='binary')
205