Issues (8)

st2common/st2common/transport/bootstrap_utils.py (1 issue)

1
# Copyright 2019 Extreme Networks, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
from __future__ import absolute_import
16
17
import socket
18
19
import six
20
import retrying
21
from oslo_config import cfg
22
from kombu.serialization import register
23
from kombu.serialization import pickle
24
from kombu.serialization import pickle_protocol
25
from kombu.serialization import pickle_loads
26
27
from st2common import log as logging
28
from st2common.transport import utils as transport_utils
29
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
30
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
31
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
32
from st2common.transport.execution import EXECUTION_XCHG
33
from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG
34
from st2common.transport.reactor import SENSOR_CUD_XCHG
35
from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
36
from st2common.transport import reactor
37
from st2common.transport.workflow import WORKFLOW_EXECUTION_XCHG
38
from st2common.transport.workflow import WORKFLOW_EXECUTION_STATUS_MGMT_XCHG
39
from st2common.transport.queues import ACTIONSCHEDULER_REQUEST_QUEUE
40
from st2common.transport.queues import ACTIONRUNNER_WORK_QUEUE
41
from st2common.transport.queues import ACTIONRUNNER_CANCEL_QUEUE
42
from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE
43
from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE
44
from st2common.transport.queues import RULESENGINE_WORK_QUEUE
45
from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE
46
from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE
47
from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE
48
from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE
49
from st2common.transport.queues import WORKFLOW_EXECUTION_WORK_QUEUE
50
from st2common.transport.queues import WORKFLOW_EXECUTION_RESUME_QUEUE
51
52
LOG = logging.getLogger('st2common.transport.bootstrap')
53
54
__all__ = [
55
    'register_exchanges',
56
    'register_exchanges_with_retry',
57
    'register_kombu_serializers',
58
59
    'EXCHANGES',
60
    'QUEUES'
61
]
62
63
# List of exchanges which are pre-declared on service set up.
64
EXCHANGES = [
65
    ACTIONEXECUTIONSTATE_XCHG,
66
    ANNOUNCEMENT_XCHG,
67
    EXECUTION_XCHG,
68
    LIVEACTION_XCHG,
69
    LIVEACTION_STATUS_MGMT_XCHG,
70
    TRIGGER_CUD_XCHG,
71
    TRIGGER_INSTANCE_XCHG,
72
    SENSOR_CUD_XCHG,
73
    WORKFLOW_EXECUTION_XCHG,
74
    WORKFLOW_EXECUTION_STATUS_MGMT_XCHG
75
]
76
77
# List of queues which are pre-declared on service startup.
78
# All the queues need to be declared and bound up front so we can guarantee messages get routed
79
# and don't get lost even if there are no consumers online
80
QUEUES = [
81
    ACTIONSCHEDULER_REQUEST_QUEUE,
82
    ACTIONRUNNER_WORK_QUEUE,
83
    ACTIONRUNNER_CANCEL_QUEUE,
84
    NOTIFIER_ACTIONUPDATE_WORK_QUEUE,
85
    RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE,
86
    RULESENGINE_WORK_QUEUE,
87
88
    STREAM_ANNOUNCEMENT_WORK_QUEUE,
89
    STREAM_EXECUTION_ALL_WORK_QUEUE,
90
    STREAM_LIVEACTION_WORK_QUEUE,
91
    STREAM_EXECUTION_OUTPUT_QUEUE,
92
93
    WORKFLOW_EXECUTION_WORK_QUEUE,
94
    WORKFLOW_EXECUTION_RESUME_QUEUE,
95
96
    # Those queues are dynamically / late created on some class init but we still need to
97
    # pre-declare them for redis Kombu backend to work.
98
    reactor.get_trigger_cud_queue(name='st2.preinit', routing_key='init'),
99
    reactor.get_sensor_cud_queue(name='st2.preinit', routing_key='init')
100
]
101
102
103
def _do_register_exchange(exchange, connection, channel, retry_wrapper):
104
    try:
105
        kwargs = {
106
            'exchange': exchange.name,
107
            'type': exchange.type,
108
            'durable': exchange.durable,
109
            'auto_delete': exchange.auto_delete,
110
            'arguments': exchange.arguments,
111
            'nowait': False,
112
            'passive': False
113
        }
114
        # Use the retry wrapper to increase resiliency in recoverable errors.
115
        retry_wrapper.ensured(connection=connection,
116
                              obj=channel,
117
                              to_ensure_func=channel.exchange_declare,
118
                              **kwargs)
119
        LOG.debug('Registered exchange %s (%s).' % (exchange.name, str(kwargs)))
120
    except Exception:
121
        LOG.exception('Failed to register exchange: %s.', exchange.name)
122
123
124
def _do_predeclare_queue(channel, queue):
125
    LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name))
126
127
    bound_queue = None
128
129
    try:
130
        bound_queue = queue(channel)
131
        bound_queue.declare(nowait=False)
132
        LOG.debug('Predeclared queue for exchange "%s"' % (queue.exchange.name))
133
    except Exception:
134
        LOG.exception('Failed to predeclare queue for exchange "%s"' % (queue.exchange.name))
135
136
    return bound_queue
137
138
139
def register_exchanges():
140
    LOG.debug('Registering exchanges...')
141
    connection_urls = transport_utils.get_messaging_urls()
142
143
    with transport_utils.get_connection() as conn:
144
        # Use ConnectionRetryWrapper to deal with rmq clustering etc.
145
        retry_wrapper = ConnectionRetryWrapper(cluster_size=len(connection_urls), logger=LOG)
146
147
        def wrapped_register_exchanges(connection, channel):
148
            for exchange in EXCHANGES:
149
                _do_register_exchange(exchange=exchange, connection=connection, channel=channel,
150
                                      retry_wrapper=retry_wrapper)
151
152
        retry_wrapper.run(connection=conn, wrapped_callback=wrapped_register_exchanges)
153
154
        def wrapped_predeclare_queues(connection, channel):
155
            for queue in QUEUES:
156
                _do_predeclare_queue(channel=channel, queue=queue)
157
158
        retry_wrapper.run(connection=conn, wrapped_callback=wrapped_predeclare_queues)
159
160
161
def register_exchanges_with_retry():
162
    def retry_if_io_error(exception):
163
        return isinstance(exception, socket.error)
164
165
    retrying_obj = retrying.Retrying(
166
        retry_on_exception=retry_if_io_error,
167
        wait_fixed=cfg.CONF.messaging.connection_retry_wait,
168
        stop_max_attempt_number=cfg.CONF.messaging.connection_retries
169
    )
170
    return retrying_obj.call(register_exchanges)
171
172
173
def register_kombu_serializers():
174
    """
175
    Register our custom pickle serializer which knows how to handle UTF-8 (non
176
    ascii) messages.
177
178
    Default kombu pickle de-serializer calls .encode() on the bytes object without providing an
179
    encoding. This means it default to "ascii" and fail with UnicodeDecode error.
180
181
    https://github.com/celery/kombu/blob/3.0/kombu/utils/encoding.py#L47
182
    """
183
    def pickle_dumps(obj, dumper=pickle.dumps):
184
        return dumper(obj, protocol=pickle_protocol)
185
186
    if six.PY3:
187
        def str_to_bytes(s):
188
            if isinstance(s, str):
189
                return s.encode('utf-8')
190
            return s
191
192
        def unpickle(s):
193
            return pickle_loads(str_to_bytes(s))
194
    else:
195
        def str_to_bytes(s):                # noqa
196
            if isinstance(s, unicode):
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable 'unicode'
Loading history...
197
                return s.encode('utf-8')
198
            return s
199
        unpickle = pickle_loads  # noqa
200
201
    register('pickle', pickle_dumps, unpickle,
202
             content_type='application/x-python-serialize',
203
             content_encoding='binary')
204