1 | # Copyright 2019 Extreme Networks, Inc. |
||
2 | # |
||
3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
||
4 | # you may not use this file except in compliance with the License. |
||
5 | # You may obtain a copy of the License at |
||
6 | # |
||
7 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
8 | # |
||
9 | # Unless required by applicable law or agreed to in writing, software |
||
10 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
12 | # See the License for the specific language governing permissions and |
||
13 | # limitations under the License. |
||
14 | |||
15 | from __future__ import absolute_import |
||
16 | |||
17 | import socket |
||
18 | |||
19 | import six |
||
20 | import retrying |
||
21 | from oslo_config import cfg |
||
22 | from kombu.serialization import register |
||
23 | from kombu.serialization import pickle |
||
24 | from kombu.serialization import pickle_protocol |
||
25 | from kombu.serialization import pickle_loads |
||
26 | |||
27 | from st2common import log as logging |
||
28 | from st2common.transport import utils as transport_utils |
||
29 | from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG |
||
30 | from st2common.transport.announcement import ANNOUNCEMENT_XCHG |
||
31 | from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper |
||
32 | from st2common.transport.execution import EXECUTION_XCHG |
||
33 | from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG |
||
34 | from st2common.transport.reactor import SENSOR_CUD_XCHG |
||
35 | from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG |
||
36 | from st2common.transport import reactor |
||
37 | from st2common.transport.workflow import WORKFLOW_EXECUTION_XCHG |
||
38 | from st2common.transport.workflow import WORKFLOW_EXECUTION_STATUS_MGMT_XCHG |
||
39 | from st2common.transport.queues import ACTIONSCHEDULER_REQUEST_QUEUE |
||
40 | from st2common.transport.queues import ACTIONRUNNER_WORK_QUEUE |
||
41 | from st2common.transport.queues import ACTIONRUNNER_CANCEL_QUEUE |
||
42 | from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE |
||
43 | from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE |
||
44 | from st2common.transport.queues import RULESENGINE_WORK_QUEUE |
||
45 | from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE |
||
46 | from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE |
||
47 | from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE |
||
48 | from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE |
||
49 | from st2common.transport.queues import WORKFLOW_EXECUTION_WORK_QUEUE |
||
50 | from st2common.transport.queues import WORKFLOW_EXECUTION_RESUME_QUEUE |
||
51 | |||
52 | LOG = logging.getLogger('st2common.transport.bootstrap') |
||
53 | |||
54 | __all__ = [ |
||
55 | 'register_exchanges', |
||
56 | 'register_exchanges_with_retry', |
||
57 | 'register_kombu_serializers', |
||
58 | |||
59 | 'EXCHANGES', |
||
60 | 'QUEUES' |
||
61 | ] |
||
62 | |||
63 | # List of exchanges which are pre-declared on service set up. |
||
64 | EXCHANGES = [ |
||
65 | ACTIONEXECUTIONSTATE_XCHG, |
||
66 | ANNOUNCEMENT_XCHG, |
||
67 | EXECUTION_XCHG, |
||
68 | LIVEACTION_XCHG, |
||
69 | LIVEACTION_STATUS_MGMT_XCHG, |
||
70 | TRIGGER_CUD_XCHG, |
||
71 | TRIGGER_INSTANCE_XCHG, |
||
72 | SENSOR_CUD_XCHG, |
||
73 | WORKFLOW_EXECUTION_XCHG, |
||
74 | WORKFLOW_EXECUTION_STATUS_MGMT_XCHG |
||
75 | ] |
||
76 | |||
77 | # List of queues which are pre-declared on service startup. |
||
78 | # All the queues need to be declared and bound up front so we can guarantee messages get routed |
||
79 | # and don't get lost even if there are no consumers online |
||
80 | QUEUES = [ |
||
81 | ACTIONSCHEDULER_REQUEST_QUEUE, |
||
82 | ACTIONRUNNER_WORK_QUEUE, |
||
83 | ACTIONRUNNER_CANCEL_QUEUE, |
||
84 | NOTIFIER_ACTIONUPDATE_WORK_QUEUE, |
||
85 | RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE, |
||
86 | RULESENGINE_WORK_QUEUE, |
||
87 | |||
88 | STREAM_ANNOUNCEMENT_WORK_QUEUE, |
||
89 | STREAM_EXECUTION_ALL_WORK_QUEUE, |
||
90 | STREAM_LIVEACTION_WORK_QUEUE, |
||
91 | STREAM_EXECUTION_OUTPUT_QUEUE, |
||
92 | |||
93 | WORKFLOW_EXECUTION_WORK_QUEUE, |
||
94 | WORKFLOW_EXECUTION_RESUME_QUEUE, |
||
95 | |||
96 | # Those queues are dynamically / late created on some class init but we still need to |
||
97 | # pre-declare them for redis Kombu backend to work. |
||
98 | reactor.get_trigger_cud_queue(name='st2.preinit', routing_key='init'), |
||
99 | reactor.get_sensor_cud_queue(name='st2.preinit', routing_key='init') |
||
100 | ] |
||
101 | |||
102 | |||
103 | def _do_register_exchange(exchange, connection, channel, retry_wrapper): |
||
104 | try: |
||
105 | kwargs = { |
||
106 | 'exchange': exchange.name, |
||
107 | 'type': exchange.type, |
||
108 | 'durable': exchange.durable, |
||
109 | 'auto_delete': exchange.auto_delete, |
||
110 | 'arguments': exchange.arguments, |
||
111 | 'nowait': False, |
||
112 | 'passive': False |
||
113 | } |
||
114 | # Use the retry wrapper to increase resiliency in recoverable errors. |
||
115 | retry_wrapper.ensured(connection=connection, |
||
116 | obj=channel, |
||
117 | to_ensure_func=channel.exchange_declare, |
||
118 | **kwargs) |
||
119 | LOG.debug('Registered exchange %s (%s).' % (exchange.name, str(kwargs))) |
||
120 | except Exception: |
||
121 | LOG.exception('Failed to register exchange: %s.', exchange.name) |
||
122 | |||
123 | |||
124 | def _do_predeclare_queue(channel, queue): |
||
125 | LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name)) |
||
126 | |||
127 | bound_queue = None |
||
128 | |||
129 | try: |
||
130 | bound_queue = queue(channel) |
||
131 | bound_queue.declare(nowait=False) |
||
132 | LOG.debug('Predeclared queue for exchange "%s"' % (queue.exchange.name)) |
||
133 | except Exception: |
||
134 | LOG.exception('Failed to predeclare queue for exchange "%s"' % (queue.exchange.name)) |
||
135 | |||
136 | return bound_queue |
||
137 | |||
138 | |||
139 | def register_exchanges(): |
||
140 | LOG.debug('Registering exchanges...') |
||
141 | connection_urls = transport_utils.get_messaging_urls() |
||
142 | |||
143 | with transport_utils.get_connection() as conn: |
||
144 | # Use ConnectionRetryWrapper to deal with rmq clustering etc. |
||
145 | retry_wrapper = ConnectionRetryWrapper(cluster_size=len(connection_urls), logger=LOG) |
||
146 | |||
147 | def wrapped_register_exchanges(connection, channel): |
||
148 | for exchange in EXCHANGES: |
||
149 | _do_register_exchange(exchange=exchange, connection=connection, channel=channel, |
||
150 | retry_wrapper=retry_wrapper) |
||
151 | |||
152 | retry_wrapper.run(connection=conn, wrapped_callback=wrapped_register_exchanges) |
||
153 | |||
154 | def wrapped_predeclare_queues(connection, channel): |
||
155 | for queue in QUEUES: |
||
156 | _do_predeclare_queue(channel=channel, queue=queue) |
||
157 | |||
158 | retry_wrapper.run(connection=conn, wrapped_callback=wrapped_predeclare_queues) |
||
159 | |||
160 | |||
161 | def register_exchanges_with_retry(): |
||
162 | def retry_if_io_error(exception): |
||
163 | return isinstance(exception, socket.error) |
||
164 | |||
165 | retrying_obj = retrying.Retrying( |
||
166 | retry_on_exception=retry_if_io_error, |
||
167 | wait_fixed=cfg.CONF.messaging.connection_retry_wait, |
||
168 | stop_max_attempt_number=cfg.CONF.messaging.connection_retries |
||
169 | ) |
||
170 | return retrying_obj.call(register_exchanges) |
||
171 | |||
172 | |||
173 | def register_kombu_serializers(): |
||
174 | """ |
||
175 | Register our custom pickle serializer which knows how to handle UTF-8 (non |
||
176 | ascii) messages. |
||
177 | |||
178 | Default kombu pickle de-serializer calls .encode() on the bytes object without providing an |
||
179 | encoding. This means it default to "ascii" and fail with UnicodeDecode error. |
||
180 | |||
181 | https://github.com/celery/kombu/blob/3.0/kombu/utils/encoding.py#L47 |
||
182 | """ |
||
183 | def pickle_dumps(obj, dumper=pickle.dumps): |
||
184 | return dumper(obj, protocol=pickle_protocol) |
||
185 | |||
186 | if six.PY3: |
||
187 | def str_to_bytes(s): |
||
188 | if isinstance(s, str): |
||
189 | return s.encode('utf-8') |
||
190 | return s |
||
191 | |||
192 | def unpickle(s): |
||
193 | return pickle_loads(str_to_bytes(s)) |
||
194 | else: |
||
195 | def str_to_bytes(s): # noqa |
||
196 | if isinstance(s, unicode): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
197 | return s.encode('utf-8') |
||
198 | return s |
||
199 | unpickle = pickle_loads # noqa |
||
200 | |||
201 | register('pickle', pickle_dumps, unpickle, |
||
202 | content_type='application/x-python-serialize', |
||
203 | content_encoding='binary') |
||
204 |