Passed
Push — master ( f17c2c...8ccc33 )
by
unknown
03:42
created

_do_predeclare_queue()   A

Complexity

Conditions 2

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 2
c 2
b 0
f 0
dl 0
loc 13
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import socket
17
18
import retrying
19
from oslo_config import cfg
20
from kombu import Connection
21
22
from st2common import log as logging
23
from st2common.transport import utils as transport_utils
24
from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG
25
from st2common.transport.announcement import ANNOUNCEMENT_XCHG
26
from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper
27
from st2common.transport.execution import EXECUTION_XCHG
28
from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG
29
from st2common.transport.reactor import SENSOR_CUD_XCHG
30
from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG
31
from st2common.transport import actionexecutionstate
32
from st2common.transport import announcement
33
from st2common.transport import execution
34
from st2common.transport import liveaction
35
from st2common.transport import reactor
36
37
LOG = logging.getLogger('st2common.transport.bootstrap')
38
39
__all__ = [
40
    'register_exchanges',
41
42
    'EXCHANGES',
43
    'QUEUES'
44
]
45
46
# List of exchanges which are pre-declared on service set up.
47
EXCHANGES = [
48
    ACTIONEXECUTIONSTATE_XCHG,
49
    ANNOUNCEMENT_XCHG,
50
    EXECUTION_XCHG,
51
    LIVEACTION_XCHG,
52
    LIVEACTION_STATUS_MGMT_XCHG,
53
    TRIGGER_CUD_XCHG,
54
    TRIGGER_INSTANCE_XCHG,
55
    SENSOR_CUD_XCHG
56
]
57
58
# List of queues which are pre-declared on service set up.
59
# Because of the worker model used, this is required with some non-standard transports such as
60
# Redis one. Even though kombu requires queues to be pre-declared upfront in such scenarios,
61
# RabbitMQ transport doesn't require that.
62
QUEUES = [
63
    actionexecutionstate.get_queue(name='st2.preinit', routing_key='init'),
64
    announcement.get_queue(name='st2.preinit', routing_key='init'),
65
    execution.get_queue(name='st2.preinit', routing_key='init'),
66
    liveaction.get_queue(name='st2.preinit', routing_key='init'),
67
    liveaction.get_status_management_queue(name='st2.preinit', routing_key='init'),
68
    reactor.get_trigger_cud_queue(name='st2.preinit', routing_key='init'),
69
    reactor.get_trigger_instances_queue(name='st2.preinit', routing_key='init'),
70
    reactor.get_sensor_cud_queue(name='st2.preinit', routing_key='init'),
71
]
72
73
74
def _do_register_exchange(exchange, connection, channel, retry_wrapper):
75
    try:
76
        kwargs = {
77
            'exchange': exchange.name,
78
            'type': exchange.type,
79
            'durable': exchange.durable,
80
            'auto_delete': exchange.auto_delete,
81
            'arguments': exchange.arguments,
82
            'nowait': False,
83
            'passive': False
84
        }
85
        # Use the retry wrapper to increase resiliency in recoverable errors.
86
        retry_wrapper.ensured(connection=connection,
87
                              obj=channel,
88
                              to_ensure_func=channel.exchange_declare,
89
                              **kwargs)
90
        LOG.debug('Registered exchange %s (%s).' % (exchange.name, str(kwargs)))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
91
    except Exception:
92
        LOG.exception('Failed to register exchange: %s.', exchange.name)
93
94
95
def _do_predeclare_queue(channel, queue):
96
    LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
97
98
    bound_queue = None
99
100
    try:
101
        bound_queue = queue(channel)
102
        bound_queue.declare(nowait=False)
103
        LOG.debug('Predeclared queue for exchange "%s"' % (queue.exchange.name))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
104
    except Exception:
105
        LOG.exception('Failed to predeclare queue for exchange "%s"' % (queue.exchange.name))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
106
107
    return bound_queue
108
109
110
def register_exchanges():
111
    LOG.debug('Registering exchanges...')
112
    connection_urls = transport_utils.get_messaging_urls()
113
    with Connection(connection_urls) as conn:
114
        # Use ConnectionRetryWrapper to deal with rmq clustering etc.
115
        retry_wrapper = ConnectionRetryWrapper(cluster_size=len(connection_urls), logger=LOG)
116
117
        def wrapped_register_exchanges(connection, channel):
118
            for exchange in EXCHANGES:
119
                _do_register_exchange(exchange=exchange, connection=connection, channel=channel,
120
                                      retry_wrapper=retry_wrapper)
121
122
        retry_wrapper.run(connection=conn, wrapped_callback=wrapped_register_exchanges)
123
124
        def wrapped_predeclare_queues(connection, channel):
125
            for queue in QUEUES:
126
                _do_predeclare_queue(channel=channel, queue=queue)
127
128
        if cfg.CONF.messaging.predeclare_queues:
129
            retry_wrapper.run(connection=conn, wrapped_callback=wrapped_predeclare_queues)
130
131
132
def register_exchanges_with_retry():
133
    def retry_if_io_error(exception):
134
        return isinstance(exception, socket.error)
135
136
    retrying_obj = retrying.Retrying(
137
        retry_on_exception=retry_if_io_error,
138
        wait_fixed=cfg.CONF.messaging.connection_retry_wait,
139
        stop_max_attempt_number=cfg.CONF.messaging.connection_retries
140
    )
141
    return retrying_obj.call(register_exchanges)
142