Completed
Pull Request — master (#2643)
by Manas
15:19 queued 09:14
created

TriggerInstanceDispatcher.process()   A

Complexity

Conditions 3

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 18
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from kombu import Connection
17
18
from st2common import log as logging
19
from st2common.constants.trace import TRACE_CONTEXT, TRACE_ID
20
from st2common.constants import trigger as trigger_constants
21
from st2common.util import date as date_utils
22
from st2common.services import trace as trace_service
23
from st2common.transport import consumers, reactor
24
from st2common.transport import utils as transport_utils
25
import st2reactor.container.utils as container_utils
26
from st2reactor.rules.engine import RulesEngine
27
28
29
LOG = logging.getLogger(__name__)
30
31
RULESENGINE_WORK_Q = reactor.get_trigger_instances_queue(
32
    name='st2.trigger_instances_dispatch.rules_engine', routing_key='#')
33
34
35
class TriggerInstanceDispatcher(consumers.StagedMessageHandler):
36
    message_type = dict
37
38
    def __init__(self, connection, queues):
39
        super(TriggerInstanceDispatcher, self).__init__(connection, queues)
40
        self.rules_engine = RulesEngine()
41
42
    def pre_ack_process(self, message):
43
        '''
44
        TriggerInstance from message is create prior to acknowledging the message. This
45
        gets us a way to not acknowledge messages.
46
        '''
47
        trigger = message['trigger']
48
        payload = message['payload']
49
50
        # Accomodate for not being able to create a TrigegrInstance if a TriggerDB
51
        # is not found.
52
        trigger_instance = container_utils.create_trigger_instance(
53
            trigger,
54
            payload or {},
55
            date_utils.get_datetime_utc_now(),
56
            raise_on_no_trigger=True)
57
        # Use trace_context from the instance and if not found create a new context
58
        # and use the trigger_instance.id as trace_tag.
59
        trace_context = message.get(TRACE_CONTEXT, None)
60
        if not trace_context:
61
            trace_context = {
62
                TRACE_ID: 'trigger_instance-%s' % str(trigger_instance.id)
63
            }
64
        # add a trace or update an existing trace with trigger_instance
65
        trace_service.add_or_update_given_trace_context(
66
            trace_context=trace_context,
67
            trigger_instances=[
68
                trace_service.get_trace_component_for_trigger_instance(trigger_instance)
69
            ])
70
        return trigger_instance
71
72
    def process(self, trigger_instance):
73
        if not trigger_instance:
74
            raise ValueError('No trigger_instance provided for processing.')
75
        try:
76
            container_utils.update_trigger_instance_status(
77
                trigger_instance, trigger_constants.TRIGGER_INSTANCE_STATUS_PROCESSING)
78
            self.rules_engine.handle_trigger_instance(trigger_instance)
79
            container_utils.update_trigger_instance_status(
80
                trigger_instance, trigger_constants.TRIGGER_INSTANCE_STATUS_PROCESSED)
81
        except:
82
            container_utils.update_trigger_instance_status(
83
                trigger_instance, trigger_constants.TRIGGER_INSTANCE_STATUS_PROCESSING_FAILED)
84
            # This could be a large message but at least in case of an exception
85
            # we get to see more context.
86
            # Beyond this point code cannot really handle the exception anyway so
87
            # eating up the exception.
88
            LOG.exception('Failed to handle trigger_instance %s.', trigger_instance)
89
            return
90
91
92
def get_worker():
93
    with Connection(transport_utils.get_messaging_urls()) as conn:
94
        return TriggerInstanceDispatcher(conn, [RULESENGINE_WORK_Q])
95