Passed
Push — tim/handle-sensor-tracebacks ( ee2b60 )
by
unknown
03:23
created

st2stream.Listener.process()   A

Complexity

Conditions 2

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 12
rs 9.4285
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
18
from kombu import Connection, Queue
19
from kombu.mixins import ConsumerMixin
20
from oslo_config import cfg
21
22
from st2common.models.api.action import LiveActionAPI
23
from st2common.models.api.execution import ActionExecutionAPI
24
from st2common.transport import announcement, liveaction, execution, publishers
25
from st2common.transport import utils as transport_utils
26
from st2common import log as logging
27
28
__all__ = [
29
    'get_listener',
30
    'get_listener_if_set'
31
]
32
33
LOG = logging.getLogger(__name__)
34
35
_listener = None
36
37
38
class Listener(ConsumerMixin):
39
40
    def __init__(self, connection):
41
        self.connection = connection
42
        self.queues = []
43
        self._stopped = False
44
45
    def get_consumers(self, consumer, channel):
46
        return [
47
            consumer(queues=[announcement.get_queue(routing_key=publishers.ANY_RK,
48
                                                    exclusive=True)],
49
                     accept=['pickle'],
50
                     callbacks=[self.processor()]),
51
52
            consumer(queues=[execution.get_queue(routing_key=publishers.ANY_RK,
53
                                                 exclusive=True)],
54
                     accept=['pickle'],
55
                     callbacks=[self.processor(ActionExecutionAPI)]),
56
57
            consumer(queues=[Queue(None,
58
                                   liveaction.LIVEACTION_XCHG,
59
                                   routing_key=publishers.ANY_RK,
60
                                   exclusive=True)],
61
                     accept=['pickle'],
62
                     callbacks=[self.processor(LiveActionAPI)])
63
        ]
64
65
    def processor(self, model=None):
66
        def process(body, message):
67
            from_model_kwargs = {'mask_secrets': cfg.CONF.api.mask_secrets}
68
            meta = message.delivery_info
69
            event_name = '%s__%s' % (meta.get('exchange'), meta.get('routing_key'))
70
71
            try:
72
                if model:
73
                    body = model.from_model(body, **from_model_kwargs)
74
75
                self.emit(event_name, body)
76
            finally:
77
                message.ack()
78
79
        return process
80
81
    def emit(self, event, body):
82
        pack = (event, body)
83
        for queue in self.queues:
84
            queue.put(pack)
85
86
    def generator(self):
87
        queue = eventlet.Queue()
88
        self.queues.append(queue)
89
        try:
90
            while not self._stopped:
91
                try:
92
                    yield queue.get(timeout=cfg.CONF.stream.heartbeat)
93
                except eventlet.queue.Empty:
94
                    yield
95
        finally:
96
            self.queues.remove(queue)
97
98
    def shutdown(self):
99
        self._stopped = True
100
101
102
def listen(listener):
103
    try:
104
        listener.run()
105
    finally:
106
        listener.shutdown()
107
108
109
def get_listener():
110
    global _listener
0 ignored issues
show
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
111
    if not _listener:
112
        with Connection(transport_utils.get_messaging_urls()) as conn:
113
            _listener = Listener(conn)
114
            eventlet.spawn_n(listen, _listener)
115
    return _listener
116
117
118
def get_listener_if_set():
119
    global _listener
0 ignored issues
show
Unused Code introduced by
The variable _listener was imported from global scope, but was never written to.
Loading history...
120
    return _listener
121