Completed
Pull Request — master (#2304)
by Arma
07:07
created

st2reactor.rules.TriggerInstanceDispatcher   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 47
Duplicated Lines 0 %
Metric Value
wmc 6
dl 0
loc 47
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from kombu import Connection
17
18
from st2common import log as logging
19
from st2common.constants.trace import TRACE_CONTEXT, TRACE_ID
20
from st2common.util import date as date_utils
21
from st2common.services import trace as trace_service
22
from st2common.transport import consumers, reactor
23
from st2common.transport import utils as transport_utils
24
import st2reactor.container.utils as container_utils
25
from st2reactor.rules.engine import RulesEngine
26
27
28
LOG = logging.getLogger(__name__)
29
30
RULESENGINE_WORK_Q = reactor.get_trigger_instances_queue(
31
    name='st2.trigger_instances_dispatch.rules_engine', routing_key='#')
32
33
34
class TriggerInstanceDispatcher(consumers.MessageHandler):
35
    message_type = dict
36
37
    def __init__(self, connection, queues):
38
        super(TriggerInstanceDispatcher, self).__init__(connection, queues)
39
        self.rules_engine = RulesEngine()
40
41
    def process(self, instance):
42
        trigger = instance['trigger']
43
        payload = instance['payload']
44
45
        trigger_instance = None
46
        try:
47
            trigger_instance = container_utils.create_trigger_instance(
48
                trigger,
49
                payload or {},
50
                date_utils.get_datetime_utc_now(),
51
                raise_on_no_trigger=True)
52
        except:
53
            # We got a trigger ref but we were unable to create a trigger instance.
54
            # This could be because a trigger object wasn't found in db for the ref.
55
            LOG.exception('Failed to create trigger_instance %s.', instance)
56
            return
57
58
        if trigger_instance:
59
            try:
60
                # Use trace_context from the instance and if not found create a new context
61
                # and use the trigger_instance.id as trace_tag.
62
                trace_context = instance.get(TRACE_CONTEXT, None)
63
                if not trace_context:
64
                    trace_context = {
65
                        TRACE_ID: 'trigger_instance-%s' % str(trigger_instance.id)
66
                    }
67
                # add a trace or update an existing trace with trigger_instance
68
                trace_service.add_or_update_given_trace_context(
69
                    trace_context=trace_context,
70
                    trigger_instances=[
71
                        trace_service.get_trace_component_for_trigger_instance(trigger_instance)
72
                    ])
73
                self.rules_engine.handle_trigger_instance(trigger_instance)
74
            except:
75
                # This could be a large message but at least in case of an exception
76
                # we get to see more context.
77
                # Beyond this point code cannot really handle the exception anyway so
78
                # eating up the exception.
79
                LOG.exception('Failed to handle trigger_instance %s.', instance)
80
                return
81
82
83
def get_worker():
84
    with Connection(transport_utils.get_messaging_urls()) as conn:
85
        return TriggerInstanceDispatcher(conn, [RULESENGINE_WORK_Q])
86