Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/stream/listener.py (2 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import fnmatch
18
19
import eventlet
20
21
from kombu import Connection
22
from kombu.mixins import ConsumerMixin
23
from oslo_config import cfg
24
25
from st2common.models.api.action import LiveActionAPI
26
from st2common.models.api.execution import ActionExecutionAPI
27
from st2common.models.api.execution import ActionExecutionOutputAPI
28
from st2common.transport import utils as transport_utils
29
from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE
30
from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE
31
from st2common.transport.queues import STREAM_EXECUTION_UPDATE_WORK_QUEUE
32
from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE
33
from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE
34
from st2common import log as logging
35
36
__all__ = [
37
    'StreamListener',
38
    'ExecutionOutputListener',
39
40
    'get_listener',
41
    'get_listener_if_set'
42
]
43
44
LOG = logging.getLogger(__name__)
45
46
47
# Stores references to instantiated listeners
48
_stream_listener = None
49
_execution_output_listener = None
50
51
52
class BaseListener(ConsumerMixin):
53
54
    def __init__(self, connection):
55
        self.connection = connection
56
        self.queues = []
57
        self._stopped = False
58
59
    def get_consumers(self, consumer, channel):
60
        raise NotImplementedError('get_consumers() is not implemented')
61
62
    def processor(self, model=None):
63
        def process(body, message):
64
            meta = message.delivery_info
65
            event_name = '%s__%s' % (meta.get('exchange'), meta.get('routing_key'))
66
67
            try:
68
                if model:
69
                    body = model.from_model(body, mask_secrets=cfg.CONF.api.mask_secrets)
70
71
                self.emit(event_name, body)
72
            finally:
73
                message.ack()
74
75
        return process
76
77
    def emit(self, event, body):
78
        pack = (event, body)
79
        for queue in self.queues:
80
            queue.put(pack)
81
82
    def generator(self, events=None, action_refs=None, execution_ids=None):
83
        queue = eventlet.Queue()
84
        queue.put('')
85
        self.queues.append(queue)
86
87
        try:
88
            while not self._stopped:
89
                try:
90
                    # TODO: Move to common option
91
                    message = queue.get(timeout=cfg.CONF.stream.heartbeat)
92
93
                    if not message:
94
                        yield message
95
                        continue
96
97
                    event_name, body = message
98
                    # TODO: We now do late filtering, but this could also be performed on the
99
                    # message bus level if we modified our exchange layout and utilize routing keys
100
                    # Filter on event name
101
                    include_event = self._should_include_event(event_names_whitelist=events,
102
                                                               event_name=event_name)
103
                    if not include_event:
104
                        LOG.debug('Skipping event "%s"' % (event_name))
105
                        continue
106
107
                    # Filter on action ref
108
                    action_ref = self._get_action_ref_for_body(body=body)
109
                    if action_refs and action_ref not in action_refs:
110
                        LOG.debug('Skipping event "%s" with action_ref "%s"' % (event_name,
111
                                                                                action_ref))
112
                        continue
113
114
                    # Filter on execution id
115
                    execution_id = self._get_execution_id_for_body(body=body)
116
                    if execution_ids and execution_id not in execution_ids:
117
                        LOG.debug('Skipping event "%s" with execution_id "%s"' % (event_name,
118
                                                                                  execution_id))
119
                        continue
120
121
                    yield message
122
                except eventlet.queue.Empty:
123
                    yield
124
        finally:
125
            self.queues.remove(queue)
126
127
    def shutdown(self):
128
        self._stopped = True
129
130
    def _should_include_event(self, event_names_whitelist, event_name):
131
        """
132
        Return True if particular event should be included based on the event names filter.
133
        """
134
        if not event_names_whitelist:
135
            return True
136
137
        for event_name_filter_glob in event_names_whitelist:
138
            if fnmatch.fnmatch(event_name, event_name_filter_glob):
139
                return True
140
141
        return False
142
143
    def _get_action_ref_for_body(self, body):
144
        """
145
        Retrieve action_ref for the provided message body.
146
        """
147
        if not body:
148
            return None
149
150
        action_ref = None
151
152
        if isinstance(body, ActionExecutionAPI):
153
            action_ref = body.action.get('ref', None) if body.action else None
154
        elif isinstance(body, LiveActionAPI):
155
            action_ref = body.action
156
        elif isinstance(body, (ActionExecutionOutputAPI)):
157
            action_ref = body.action_ref
158
159
        return action_ref
160
161
    def _get_execution_id_for_body(self, body):
162
        if not body:
163
            return None
164
165
        execution_id = None
166
167
        if isinstance(body, ActionExecutionAPI):
168
            execution_id = str(body.id)
169
        elif isinstance(body, LiveActionAPI):
170
            execution_id = None
171
        elif isinstance(body, (ActionExecutionOutputAPI)):
172
            execution_id = body.execution_id
173
174
        return execution_id
175
176
177
class StreamListener(BaseListener):
178
    """
179
    Listener used inside stream service.
180
181
    It listenes to all the events.
182
    """
183
184
    def get_consumers(self, consumer, channel):
185
        return [
186
            consumer(queues=[STREAM_ANNOUNCEMENT_WORK_QUEUE],
187
                     accept=['pickle'],
188
                     callbacks=[self.processor()]),
189
190
            consumer(queues=[STREAM_EXECUTION_ALL_WORK_QUEUE],
191
                     accept=['pickle'],
192
                     callbacks=[self.processor(ActionExecutionAPI)]),
193
194
            consumer(queues=[STREAM_LIVEACTION_WORK_QUEUE],
195
                     accept=['pickle'],
196
                     callbacks=[self.processor(LiveActionAPI)]),
197
198
            consumer(queues=[STREAM_EXECUTION_OUTPUT_QUEUE],
199
                     accept=['pickle'],
200
                     callbacks=[self.processor(ActionExecutionOutputAPI)])
201
        ]
202
203
204
class ExecutionOutputListener(BaseListener):
205
    """
206
    Listener emitting action execution output event.
207
208
    Only listens to action execution work and output queue.
209
    """
210
211
    def get_consumers(self, consumer, channel):
212
        return [
213
            consumer(queues=[STREAM_EXECUTION_UPDATE_WORK_QUEUE],
214
                     accept=['pickle'],
215
                     callbacks=[self.processor(ActionExecutionAPI)]),
216
217
            consumer(queues=[STREAM_EXECUTION_OUTPUT_QUEUE],
218
                     accept=['pickle'],
219
                     callbacks=[self.processor(ActionExecutionOutputAPI)])
220
        ]
221
222
223
def listen(listener):
224
    try:
225
        listener.run()
226
    finally:
227
        listener.shutdown()
228
229
230
def get_listener(name):
231
    global _stream_listener
232
    global _execution_output_listener
233
234
    if name == 'stream':
235
        if not _stream_listener:
236
            with Connection(transport_utils.get_messaging_urls()) as conn:
237
                _stream_listener = StreamListener(conn)
238
                eventlet.spawn_n(listen, _stream_listener)
239
        return _stream_listener
240
    elif name == 'execution_output':
241
        if not _execution_output_listener:
242
            with Connection(transport_utils.get_messaging_urls()) as conn:
243
                _execution_output_listener = ExecutionOutputListener(conn)
244
                eventlet.spawn_n(listen, _execution_output_listener)
245
        return _execution_output_listener
246
    else:
247
        raise ValueError('Invalid listener name: %s' % (name))
248
249
250
def get_listener_if_set(name):
251
    global _stream_listener
0 ignored issues
show
The variable _stream_listener was imported from global scope, but was never written to.
Loading history...
252
    global _execution_output_listener
0 ignored issues
show
The variable _execution_output_listener was imported from global scope, but was never written to.
Loading history...
253
254
    if name == 'stream':
255
        return _stream_listener
256
    elif name == 'execution_output':
257
        return _execution_output_listener
258
    else:
259
        raise ValueError('Invalid listener name: %s' % (name))
260