Passed
Push — master ( f1fe9e...5c5de8 )
by
unknown
03:44
created

StreamListener   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 24
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 24
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A get_consumers() 0 17 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
18
from kombu import Connection
19
from kombu.mixins import ConsumerMixin
20
from oslo_config import cfg
21
22
from st2common.models.api.action import LiveActionAPI
23
from st2common.models.api.execution import ActionExecutionAPI
24
from st2common.models.api.execution import ActionExecutionOutputAPI
25
from st2common.transport import utils as transport_utils
26
from st2common.transport.queues import STREAM_ANNOUNCEMENT_WORK_QUEUE
27
from st2common.transport.queues import STREAM_EXECUTION_ALL_WORK_QUEUE
28
from st2common.transport.queues import STREAM_EXECUTION_UPDATE_WORK_QUEUE
29
from st2common.transport.queues import STREAM_LIVEACTION_WORK_QUEUE
30
from st2common.transport.queues import STREAM_EXECUTION_OUTPUT_QUEUE
31
from st2common import log as logging
32
33
__all__ = [
34
    'StreamListener',
35
    'ExecutionOutputListener',
36
37
    'get_listener',
38
    'get_listener_if_set'
39
]
40
41
LOG = logging.getLogger(__name__)
42
43
44
# Stores references to instantiated listeners
45
_stream_listener = None
46
_execution_output_listener = None
47
48
49
class BaseListener(ConsumerMixin):
50
51
    def __init__(self, connection):
52
        self.connection = connection
53
        self.queues = []
54
        self._stopped = False
55
56
    def get_consumers(self, consumer, channel):
57
        raise NotImplementedError('get_consumers() is not implemented')
58
59
    def processor(self, model=None):
60
        def process(body, message):
61
            meta = message.delivery_info
62
            event_name = '%s__%s' % (meta.get('exchange'), meta.get('routing_key'))
63
64
            try:
65
                if model:
66
                    body = model.from_model(body, mask_secrets=cfg.CONF.api.mask_secrets)
67
68
                self.emit(event_name, body)
69
            finally:
70
                message.ack()
71
72
        return process
73
74
    def emit(self, event, body):
75
        pack = (event, body)
76
        for queue in self.queues:
77
            queue.put(pack)
78
79
    def generator(self, events=None, action_refs=None, execution_ids=None):
80
        queue = eventlet.Queue()
81
        queue.put('')
82
        self.queues.append(queue)
83
84
        try:
85
            while not self._stopped:
86
                try:
87
                    # TODO: Move to common option
88
                    message = queue.get(timeout=cfg.CONF.stream.heartbeat)
89
90
                    if not message:
91
                        yield message
92
                        continue
93
94
                    event_name, body = message
95
                    # TODO: We now do late filtering, but this could also be performed on the
96
                    # message bus level if we modified our exchange layout and utilize routing keys
97
                    # Filter on event name
98
                    if events and event_name not in events:
99
                        LOG.debug('Skipping event "%s"' % (event_name))
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
100
                        continue
101
102
                    # Filter on action ref
103
                    action_ref = self._get_action_ref_for_body(body=body)
104
                    if action_refs and action_ref not in action_refs:
105
                        LOG.debug('Skipping event "%s" with action_ref "%s"' % (event_name,
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
106
                                                                                action_ref))
107
                        continue
108
109
                    # Filter on execution id
110
                    execution_id = self._get_execution_id_for_body(body=body)
111
                    if execution_ids and execution_id not in execution_ids:
112
                        LOG.debug('Skipping event "%s" with execution_id "%s"' % (event_name,
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
113
                                                                                  execution_id))
114
                        continue
115
116
                    yield message
117
                except eventlet.queue.Empty:
118
                    yield
119
        finally:
120
            self.queues.remove(queue)
121
122
    def shutdown(self):
123
        self._stopped = True
124
125
    def _get_action_ref_for_body(self, body):
126
        """
127
        Retrieve action_ref for the provided message body.
128
        """
129
        if not body:
130
            return None
131
132
        action_ref = None
133
134
        if isinstance(body, ActionExecutionAPI):
135
            action_ref = body.action.get('ref', None) if body.action else None
136
        elif isinstance(body, LiveActionAPI):
137
            action_ref = body.action
138
        elif isinstance(body, (ActionExecutionOutputAPI)):
139
            action_ref = body.action_ref
140
141
        return action_ref
142
143
    def _get_execution_id_for_body(self, body):
144
        if not body:
145
            return None
146
147
        execution_id = None
148
149
        if isinstance(body, ActionExecutionAPI):
150
            execution_id = str(body.id)
151
        elif isinstance(body, LiveActionAPI):
152
            execution_id = None
153
        elif isinstance(body, (ActionExecutionOutputAPI)):
154
            execution_id = body.execution_id
155
156
        return execution_id
157
158
159
class StreamListener(BaseListener):
160
    """
161
    Listener used inside stream service.
162
163
    It listenes to all the events.
164
    """
165
166
    def get_consumers(self, consumer, channel):
167
        return [
168
            consumer(queues=[STREAM_ANNOUNCEMENT_WORK_QUEUE],
169
                     accept=['pickle'],
170
                     callbacks=[self.processor()]),
171
172
            consumer(queues=[STREAM_EXECUTION_ALL_WORK_QUEUE],
173
                     accept=['pickle'],
174
                     callbacks=[self.processor(ActionExecutionAPI)]),
175
176
            consumer(queues=[STREAM_LIVEACTION_WORK_QUEUE],
177
                     accept=['pickle'],
178
                     callbacks=[self.processor(LiveActionAPI)]),
179
180
            consumer(queues=[STREAM_EXECUTION_OUTPUT_QUEUE],
181
                     accept=['pickle'],
182
                     callbacks=[self.processor(ActionExecutionOutputAPI)])
183
        ]
184
185
186
class ExecutionOutputListener(BaseListener):
187
    """
188
    Listener emitting action execution output event.
189
190
    Only listens to action execution work and output queue.
191
    """
192
193
    def get_consumers(self, consumer, channel):
194
        return [
195
            consumer(queues=[STREAM_EXECUTION_UPDATE_WORK_QUEUE],
196
                     accept=['pickle'],
197
                     callbacks=[self.processor(ActionExecutionAPI)]),
198
199
            consumer(queues=[STREAM_EXECUTION_OUTPUT_QUEUE],
200
                     accept=['pickle'],
201
                     callbacks=[self.processor(ActionExecutionOutputAPI)])
202
        ]
203
204
205
def listen(listener):
206
    try:
207
        listener.run()
208
    finally:
209
        listener.shutdown()
210
211
212
def get_listener(name):
213
    global _stream_listener
0 ignored issues
show
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
214
    global _execution_output_listener
0 ignored issues
show
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
215
216
    if name == 'stream':
217
        if not _stream_listener:
218
            with Connection(transport_utils.get_messaging_urls()) as conn:
219
                _stream_listener = StreamListener(conn)
220
                eventlet.spawn_n(listen, _stream_listener)
221
        return _stream_listener
222
    elif name == 'execution_output':
223
        if not _execution_output_listener:
224
            with Connection(transport_utils.get_messaging_urls()) as conn:
225
                _execution_output_listener = ExecutionOutputListener(conn)
226
                eventlet.spawn_n(listen, _execution_output_listener)
227
        return _execution_output_listener
228
    else:
229
        raise ValueError('Invalid listener name: %s' % (name))
230
231
232
def get_listener_if_set(name):
233
    global _stream_listener
0 ignored issues
show
Unused Code introduced by
The variable _stream_listener was imported from global scope, but was never written to.
Loading history...
234
    global _execution_output_listener
0 ignored issues
show
Unused Code introduced by
The variable _execution_output_listener was imported from global scope, but was never written to.
Loading history...
235
236
    if name == 'stream':
237
        return _stream_listener
238
    elif name == 'execution_output':
239
        return _execution_output_listener
240
    else:
241
        raise ValueError('Invalid listener name: %s' % (name))
242