1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | |||
18 | import socket |
||
19 | |||
20 | import six |
||
21 | import retrying |
||
22 | from oslo_config import cfg |
||
23 | from kombu import Connection |
||
24 | from kombu.serialization import register |
||
25 | from kombu.serialization import pickle |
||
26 | from kombu.serialization import pickle_protocol |
||
27 | from kombu.serialization import pickle_loads |
||
28 | |||
29 | from st2common import log as logging |
||
30 | from st2common.transport import utils as transport_utils |
||
31 | from st2common.transport.actionexecutionstate import ACTIONEXECUTIONSTATE_XCHG |
||
32 | from st2common.transport.announcement import ANNOUNCEMENT_XCHG |
||
33 | from st2common.transport.connection_retry_wrapper import ConnectionRetryWrapper |
||
34 | from st2common.transport.execution import EXECUTION_XCHG |
||
35 | from st2common.transport.liveaction import LIVEACTION_XCHG, LIVEACTION_STATUS_MGMT_XCHG |
||
36 | from st2common.transport.reactor import SENSOR_CUD_XCHG |
||
37 | from st2common.transport.reactor import TRIGGER_CUD_XCHG, TRIGGER_INSTANCE_XCHG |
||
38 | from st2common.transport import reactor |
||
39 | from st2common.transport.workflow import WORKFLOW_EXECUTION_XCHG |
||
40 | from st2common.transport.workflow import WORKFLOW_EXECUTION_STATUS_MGMT_XCHG |
||
41 | from st2common.transport.queues import ACTIONSCHEDULER_REQUEST_QUEUE |
||
42 | from st2common.transport.queues import ACTIONRUNNER_WORK_QUEUE |
||
43 | from st2common.transport.queues import ACTIONRUNNER_CANCEL_QUEUE |
||
44 | from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE |
||
45 | from st2common.transport.queues import RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE |
||
46 | from st2common.transport.queues import RULESENGINE_WORK_QUEUE |
||
47 | from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE |
||
48 | from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE |
||
49 | from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE |
||
50 | from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE |
||
51 | from st2common.transport.queues import WORKFLOW_EXECUTION_WORK_QUEUE |
||
52 | from st2common.transport.queues import WORKFLOW_EXECUTION_RESUME_QUEUE |
||
53 | |||
54 | LOG = logging.getLogger('st2common.transport.bootstrap') |
||
55 | |||
56 | __all__ = [ |
||
57 | 'register_exchanges', |
||
58 | 'register_exchanges_with_retry', |
||
59 | 'register_kombu_serializers', |
||
60 | |||
61 | 'EXCHANGES', |
||
62 | 'QUEUES' |
||
63 | ] |
||
64 | |||
65 | # List of exchanges which are pre-declared on service set up. |
||
66 | EXCHANGES = [ |
||
67 | ACTIONEXECUTIONSTATE_XCHG, |
||
68 | ANNOUNCEMENT_XCHG, |
||
69 | EXECUTION_XCHG, |
||
70 | LIVEACTION_XCHG, |
||
71 | LIVEACTION_STATUS_MGMT_XCHG, |
||
72 | TRIGGER_CUD_XCHG, |
||
73 | TRIGGER_INSTANCE_XCHG, |
||
74 | SENSOR_CUD_XCHG, |
||
75 | WORKFLOW_EXECUTION_XCHG, |
||
76 | WORKFLOW_EXECUTION_STATUS_MGMT_XCHG |
||
77 | ] |
||
78 | |||
79 | # List of queues which are pre-declared on service startup. |
||
80 | # All the queues need to be declared and bound up front so we can guarantee messages get routed |
||
81 | # and don't get lost even if there are no consumers online |
||
82 | QUEUES = [ |
||
83 | ACTIONSCHEDULER_REQUEST_QUEUE, |
||
84 | ACTIONRUNNER_WORK_QUEUE, |
||
85 | ACTIONRUNNER_CANCEL_QUEUE, |
||
86 | NOTIFIER_ACTIONUPDATE_WORK_QUEUE, |
||
87 | RESULTSTRACKER_ACTIONSTATE_WORK_QUEUE, |
||
88 | RULESENGINE_WORK_QUEUE, |
||
89 | |||
90 | STREAM_ANNOUNCEMENT_WORK_QUEUE, |
||
91 | STREAM_EXECUTION_ALL_WORK_QUEUE, |
||
92 | STREAM_LIVEACTION_WORK_QUEUE, |
||
93 | STREAM_EXECUTION_OUTPUT_QUEUE, |
||
94 | |||
95 | WORKFLOW_EXECUTION_WORK_QUEUE, |
||
96 | WORKFLOW_EXECUTION_RESUME_QUEUE, |
||
97 | |||
98 | # Those queues are dynamically / late created on some class init but we still need to |
||
99 | # pre-declare them for redis Kombu backend to work. |
||
100 | reactor.get_trigger_cud_queue(name='st2.preinit', routing_key='init'), |
||
101 | reactor.get_sensor_cud_queue(name='st2.preinit', routing_key='init') |
||
102 | ] |
||
103 | |||
104 | |||
105 | def _do_register_exchange(exchange, connection, channel, retry_wrapper): |
||
106 | try: |
||
107 | kwargs = { |
||
108 | 'exchange': exchange.name, |
||
109 | 'type': exchange.type, |
||
110 | 'durable': exchange.durable, |
||
111 | 'auto_delete': exchange.auto_delete, |
||
112 | 'arguments': exchange.arguments, |
||
113 | 'nowait': False, |
||
114 | 'passive': False |
||
115 | } |
||
116 | # Use the retry wrapper to increase resiliency in recoverable errors. |
||
117 | retry_wrapper.ensured(connection=connection, |
||
118 | obj=channel, |
||
119 | to_ensure_func=channel.exchange_declare, |
||
120 | **kwargs) |
||
121 | LOG.debug('Registered exchange %s (%s).' % (exchange.name, str(kwargs))) |
||
122 | except Exception: |
||
123 | LOG.exception('Failed to register exchange: %s.', exchange.name) |
||
124 | |||
125 | |||
126 | def _do_predeclare_queue(channel, queue): |
||
127 | LOG.debug('Predeclaring queue for exchange "%s"' % (queue.exchange.name)) |
||
128 | |||
129 | bound_queue = None |
||
130 | |||
131 | try: |
||
132 | bound_queue = queue(channel) |
||
133 | bound_queue.declare(nowait=False) |
||
134 | LOG.debug('Predeclared queue for exchange "%s"' % (queue.exchange.name)) |
||
135 | except Exception: |
||
136 | LOG.exception('Failed to predeclare queue for exchange "%s"' % (queue.exchange.name)) |
||
137 | |||
138 | return bound_queue |
||
139 | |||
140 | |||
141 | def register_exchanges(): |
||
142 | LOG.debug('Registering exchanges...') |
||
143 | connection_urls = transport_utils.get_messaging_urls() |
||
144 | with Connection(connection_urls) as conn: |
||
145 | # Use ConnectionRetryWrapper to deal with rmq clustering etc. |
||
146 | retry_wrapper = ConnectionRetryWrapper(cluster_size=len(connection_urls), logger=LOG) |
||
147 | |||
148 | def wrapped_register_exchanges(connection, channel): |
||
149 | for exchange in EXCHANGES: |
||
150 | _do_register_exchange(exchange=exchange, connection=connection, channel=channel, |
||
151 | retry_wrapper=retry_wrapper) |
||
152 | |||
153 | retry_wrapper.run(connection=conn, wrapped_callback=wrapped_register_exchanges) |
||
154 | |||
155 | def wrapped_predeclare_queues(connection, channel): |
||
156 | for queue in QUEUES: |
||
157 | _do_predeclare_queue(channel=channel, queue=queue) |
||
158 | |||
159 | retry_wrapper.run(connection=conn, wrapped_callback=wrapped_predeclare_queues) |
||
160 | |||
161 | |||
162 | def register_exchanges_with_retry(): |
||
163 | def retry_if_io_error(exception): |
||
164 | return isinstance(exception, socket.error) |
||
165 | |||
166 | retrying_obj = retrying.Retrying( |
||
167 | retry_on_exception=retry_if_io_error, |
||
168 | wait_fixed=cfg.CONF.messaging.connection_retry_wait, |
||
169 | stop_max_attempt_number=cfg.CONF.messaging.connection_retries |
||
170 | ) |
||
171 | return retrying_obj.call(register_exchanges) |
||
172 | |||
173 | |||
174 | def register_kombu_serializers(): |
||
175 | """ |
||
176 | Register our custom pickle serializer which knows how to handle UTF-8 (non |
||
177 | ascii) messages. |
||
178 | |||
179 | Default kombu pickle de-serializer calls .encode() on the bytes object without providing an |
||
180 | encoding. This means it default to "ascii" and fail with UnicodeDecode error. |
||
181 | |||
182 | https://github.com/celery/kombu/blob/3.0/kombu/utils/encoding.py#L47 |
||
183 | """ |
||
184 | def pickle_dumps(obj, dumper=pickle.dumps): |
||
185 | return dumper(obj, protocol=pickle_protocol) |
||
186 | |||
187 | if six.PY3: |
||
188 | def str_to_bytes(s): |
||
189 | if isinstance(s, str): |
||
190 | return s.encode('utf-8') |
||
191 | return s |
||
192 | |||
193 | def unpickle(s): |
||
194 | return pickle_loads(str_to_bytes(s)) |
||
195 | else: |
||
196 | def str_to_bytes(s): # noqa |
||
197 | if isinstance(s, unicode): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
198 | return s.encode('utf-8') |
||
199 | return s |
||
200 | unpickle = pickle_loads # noqa |
||
201 | |||
202 | register('pickle', pickle_dumps, unpickle, |
||
203 | content_type='application/x-python-serialize', |
||
204 | content_encoding='binary') |
||
205 |