1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import fnmatch |
||
18 | |||
19 | import eventlet |
||
20 | |||
21 | from kombu import Connection |
||
22 | from kombu.mixins import ConsumerMixin |
||
23 | from oslo_config import cfg |
||
24 | |||
25 | from st2common.models.api.action import LiveActionAPI |
||
26 | from st2common.models.api.execution import ActionExecutionAPI |
||
27 | from st2common.models.api.execution import ActionExecutionOutputAPI |
||
28 | from st2common.transport import utils as transport_utils |
||
29 | from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE |
||
30 | from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE |
||
31 | from st2common.transport.queues import STREAM_EXECUTION_UPDATE_WORK_QUEUE |
||
32 | from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE |
||
33 | from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE |
||
34 | from st2common import log as logging |
||
35 | |||
36 | __all__ = [ |
||
37 | 'StreamListener', |
||
38 | 'ExecutionOutputListener', |
||
39 | |||
40 | 'get_listener', |
||
41 | 'get_listener_if_set' |
||
42 | ] |
||
43 | |||
44 | LOG = logging.getLogger(__name__) |
||
45 | |||
46 | |||
47 | # Stores references to instantiated listeners |
||
48 | _stream_listener = None |
||
49 | _execution_output_listener = None |
||
50 | |||
51 | |||
52 | class BaseListener(ConsumerMixin): |
||
53 | |||
54 | def __init__(self, connection): |
||
55 | self.connection = connection |
||
56 | self.queues = [] |
||
57 | self._stopped = False |
||
58 | |||
59 | def get_consumers(self, consumer, channel): |
||
60 | raise NotImplementedError('get_consumers() is not implemented') |
||
61 | |||
62 | def processor(self, model=None): |
||
63 | def process(body, message): |
||
64 | meta = message.delivery_info |
||
65 | event_name = '%s__%s' % (meta.get('exchange'), meta.get('routing_key')) |
||
66 | |||
67 | try: |
||
68 | if model: |
||
69 | body = model.from_model(body, mask_secrets=cfg.CONF.api.mask_secrets) |
||
70 | |||
71 | self.emit(event_name, body) |
||
72 | finally: |
||
73 | message.ack() |
||
74 | |||
75 | return process |
||
76 | |||
77 | def emit(self, event, body): |
||
78 | pack = (event, body) |
||
79 | for queue in self.queues: |
||
80 | queue.put(pack) |
||
81 | |||
82 | def generator(self, events=None, action_refs=None, execution_ids=None): |
||
83 | queue = eventlet.Queue() |
||
84 | queue.put('') |
||
85 | self.queues.append(queue) |
||
86 | |||
87 | try: |
||
88 | while not self._stopped: |
||
89 | try: |
||
90 | # TODO: Move to common option |
||
91 | message = queue.get(timeout=cfg.CONF.stream.heartbeat) |
||
92 | |||
93 | if not message: |
||
94 | yield message |
||
95 | continue |
||
96 | |||
97 | event_name, body = message |
||
98 | # TODO: We now do late filtering, but this could also be performed on the |
||
99 | # message bus level if we modified our exchange layout and utilize routing keys |
||
100 | # Filter on event name |
||
101 | include_event = self._should_include_event(event_names_whitelist=events, |
||
102 | event_name=event_name) |
||
103 | if not include_event: |
||
104 | LOG.debug('Skipping event "%s"' % (event_name)) |
||
105 | continue |
||
106 | |||
107 | # Filter on action ref |
||
108 | action_ref = self._get_action_ref_for_body(body=body) |
||
109 | if action_refs and action_ref not in action_refs: |
||
110 | LOG.debug('Skipping event "%s" with action_ref "%s"' % (event_name, |
||
111 | action_ref)) |
||
112 | continue |
||
113 | |||
114 | # Filter on execution id |
||
115 | execution_id = self._get_execution_id_for_body(body=body) |
||
116 | if execution_ids and execution_id not in execution_ids: |
||
117 | LOG.debug('Skipping event "%s" with execution_id "%s"' % (event_name, |
||
118 | execution_id)) |
||
119 | continue |
||
120 | |||
121 | yield message |
||
122 | except eventlet.queue.Empty: |
||
123 | yield |
||
124 | finally: |
||
125 | self.queues.remove(queue) |
||
126 | |||
127 | def shutdown(self): |
||
128 | self._stopped = True |
||
129 | |||
130 | def _should_include_event(self, event_names_whitelist, event_name): |
||
131 | """ |
||
132 | Return True if particular event should be included based on the event names filter. |
||
133 | """ |
||
134 | if not event_names_whitelist: |
||
135 | return True |
||
136 | |||
137 | for event_name_filter_glob in event_names_whitelist: |
||
138 | if fnmatch.fnmatch(event_name, event_name_filter_glob): |
||
139 | return True |
||
140 | |||
141 | return False |
||
142 | |||
143 | def _get_action_ref_for_body(self, body): |
||
144 | """ |
||
145 | Retrieve action_ref for the provided message body. |
||
146 | """ |
||
147 | if not body: |
||
148 | return None |
||
149 | |||
150 | action_ref = None |
||
151 | |||
152 | if isinstance(body, ActionExecutionAPI): |
||
153 | action_ref = body.action.get('ref', None) if body.action else None |
||
154 | elif isinstance(body, LiveActionAPI): |
||
155 | action_ref = body.action |
||
156 | elif isinstance(body, (ActionExecutionOutputAPI)): |
||
157 | action_ref = body.action_ref |
||
158 | |||
159 | return action_ref |
||
160 | |||
161 | def _get_execution_id_for_body(self, body): |
||
162 | if not body: |
||
163 | return None |
||
164 | |||
165 | execution_id = None |
||
166 | |||
167 | if isinstance(body, ActionExecutionAPI): |
||
168 | execution_id = str(body.id) |
||
169 | elif isinstance(body, LiveActionAPI): |
||
170 | execution_id = None |
||
171 | elif isinstance(body, (ActionExecutionOutputAPI)): |
||
172 | execution_id = body.execution_id |
||
173 | |||
174 | return execution_id |
||
175 | |||
176 | |||
177 | class StreamListener(BaseListener): |
||
178 | """ |
||
179 | Listener used inside stream service. |
||
180 | |||
181 | It listenes to all the events. |
||
182 | """ |
||
183 | |||
184 | def get_consumers(self, consumer, channel): |
||
185 | return [ |
||
186 | consumer(queues=[STREAM_ANNOUNCEMENT_WORK_QUEUE], |
||
187 | accept=['pickle'], |
||
188 | callbacks=[self.processor()]), |
||
189 | |||
190 | consumer(queues=[STREAM_EXECUTION_ALL_WORK_QUEUE], |
||
191 | accept=['pickle'], |
||
192 | callbacks=[self.processor(ActionExecutionAPI)]), |
||
193 | |||
194 | consumer(queues=[STREAM_LIVEACTION_WORK_QUEUE], |
||
195 | accept=['pickle'], |
||
196 | callbacks=[self.processor(LiveActionAPI)]), |
||
197 | |||
198 | consumer(queues=[STREAM_EXECUTION_OUTPUT_QUEUE], |
||
199 | accept=['pickle'], |
||
200 | callbacks=[self.processor(ActionExecutionOutputAPI)]) |
||
201 | ] |
||
202 | |||
203 | |||
204 | class ExecutionOutputListener(BaseListener): |
||
205 | """ |
||
206 | Listener emitting action execution output event. |
||
207 | |||
208 | Only listens to action execution work and output queue. |
||
209 | """ |
||
210 | |||
211 | def get_consumers(self, consumer, channel): |
||
212 | return [ |
||
213 | consumer(queues=[STREAM_EXECUTION_UPDATE_WORK_QUEUE], |
||
214 | accept=['pickle'], |
||
215 | callbacks=[self.processor(ActionExecutionAPI)]), |
||
216 | |||
217 | consumer(queues=[STREAM_EXECUTION_OUTPUT_QUEUE], |
||
218 | accept=['pickle'], |
||
219 | callbacks=[self.processor(ActionExecutionOutputAPI)]) |
||
220 | ] |
||
221 | |||
222 | |||
223 | def listen(listener): |
||
224 | try: |
||
225 | listener.run() |
||
226 | finally: |
||
227 | listener.shutdown() |
||
228 | |||
229 | |||
230 | def get_listener(name): |
||
231 | global _stream_listener |
||
232 | global _execution_output_listener |
||
233 | |||
234 | if name == 'stream': |
||
235 | if not _stream_listener: |
||
236 | with Connection(transport_utils.get_messaging_urls()) as conn: |
||
237 | _stream_listener = StreamListener(conn) |
||
238 | eventlet.spawn_n(listen, _stream_listener) |
||
239 | return _stream_listener |
||
240 | elif name == 'execution_output': |
||
241 | if not _execution_output_listener: |
||
242 | with Connection(transport_utils.get_messaging_urls()) as conn: |
||
243 | _execution_output_listener = ExecutionOutputListener(conn) |
||
244 | eventlet.spawn_n(listen, _execution_output_listener) |
||
245 | return _execution_output_listener |
||
246 | else: |
||
247 | raise ValueError('Invalid listener name: %s' % (name)) |
||
248 | |||
249 | |||
250 | def get_listener_if_set(name): |
||
251 | global _stream_listener |
||
0 ignored issues
–
show
Unused Code
introduced
by
Loading history...
|
|||
252 | global _execution_output_listener |
||
0 ignored issues
–
show
|
|||
253 | |||
254 | if name == 'stream': |
||
255 | return _stream_listener |
||
256 | elif name == 'execution_output': |
||
257 | return _execution_output_listener |
||
258 | else: |
||
259 | raise ValueError('Invalid listener name: %s' % (name)) |
||
260 |