Completed
Pull Request — master (#2643)
by Manas
06:04
created

TriggerInstanceDispatcher.process()   A

Complexity

Conditions 3

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 13
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from kombu import Connection
17
18
from st2common import log as logging
19
from st2common.constants.trace import TRACE_CONTEXT, TRACE_ID
20
from st2common.util import date as date_utils
21
from st2common.services import trace as trace_service
22
from st2common.transport import consumers, reactor
23
from st2common.transport import utils as transport_utils
24
import st2reactor.container.utils as container_utils
25
from st2reactor.rules.engine import RulesEngine
26
27
28
LOG = logging.getLogger(__name__)
29
30
RULESENGINE_WORK_Q = reactor.get_trigger_instances_queue(
31
    name='st2.trigger_instances_dispatch.rules_engine', routing_key='#')
32
33
34
class TriggerInstanceDispatcher(consumers.StagedMessageHandler):
35
    message_type = dict
36
37
    def __init__(self, connection, queues):
38
        super(TriggerInstanceDispatcher, self).__init__(connection, queues)
39
        self.rules_engine = RulesEngine()
40
41
    def pre_ack_process(self, message):
42
        '''
43
        TriggerInstance from message is create prior to acknowledging the message. This
44
        gets us a way to not acknowledge messages.
45
        '''
46
        trigger = message['trigger']
47
        payload = message['payload']
48
49
        # Accomodate for not being able to create a TrigegrInstance if a TriggerDB
50
        # is not found.
51
        trigger_instance = container_utils.create_trigger_instance(
52
            trigger,
53
            payload or {},
54
            date_utils.get_datetime_utc_now(),
55
            raise_on_no_trigger=True)
56
        # Use trace_context from the instance and if not found create a new context
57
        # and use the trigger_instance.id as trace_tag.
58
        trace_context = message.get(TRACE_CONTEXT, None)
59
        if not trace_context:
60
            trace_context = {
61
                TRACE_ID: 'trigger_instance-%s' % str(trigger_instance.id)
62
            }
63
        # add a trace or update an existing trace with trigger_instance
64
        trace_service.add_or_update_given_trace_context(
65
            trace_context=trace_context,
66
            trigger_instances=[
67
                trace_service.get_trace_component_for_trigger_instance(trigger_instance)
68
            ])
69
        return trigger_instance
70
71
    def process(self, trigger_instance):
72
        if not trigger_instance:
73
            raise ValueError('No trigger_instance provided for processing.')
74
        try:
75
76
            self.rules_engine.handle_trigger_instance(trigger_instance)
77
        except:
78
            # This could be a large message but at least in case of an exception
79
            # we get to see more context.
80
            # Beyond this point code cannot really handle the exception anyway so
81
            # eating up the exception.
82
            LOG.exception('Failed to handle trigger_instance %s.', trigger_instance)
83
            return
84
85
86
def get_worker():
87
    with Connection(transport_utils.get_messaging_urls()) as conn:
88
        return TriggerInstanceDispatcher(conn, [RULESENGINE_WORK_Q])
89