Completed
Pull Request — master (#2643)
by Manas
05:59
created

_compose_pre_ack_process_response()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 5
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from kombu import Connection
17
18
from st2common import log as logging
19
from st2common.constants.trace import TRACE_CONTEXT, TRACE_ID
20
from st2common.constants import triggers as trigger_constants
21
from st2common.util import date as date_utils
22
from st2common.services import trace as trace_service
23
from st2common.transport import consumers, reactor
24
from st2common.transport import utils as transport_utils
25
import st2reactor.container.utils as container_utils
26
from st2reactor.rules.engine import RulesEngine
27
28
29
LOG = logging.getLogger(__name__)
30
31
RULESENGINE_WORK_Q = reactor.get_trigger_instances_queue(
32
    name='st2.trigger_instances_dispatch.rules_engine', routing_key='#')
33
34
35
class TriggerInstanceDispatcher(consumers.StagedMessageHandler):
36
    message_type = dict
37
38
    def __init__(self, connection, queues):
39
        super(TriggerInstanceDispatcher, self).__init__(connection, queues)
40
        self.rules_engine = RulesEngine()
41
42
    def pre_ack_process(self, message):
43
        '''
44
        TriggerInstance from message is create prior to acknowledging the message. This
45
        gets us a way to not acknowledge messages.
46
        '''
47
        trigger = message['trigger']
48
        payload = message['payload']
49
50
        # Accomodate for not being able to create a TrigegrInstance if a TriggerDB
51
        # is not found.
52
        trigger_instance = container_utils.create_trigger_instance(
53
            trigger,
54
            payload or {},
55
            date_utils.get_datetime_utc_now(),
56
            raise_on_no_trigger=True)
57
58
        return _compose_pre_ack_process_response(trigger_instance, message)
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable '_compose_pre_ack_process_response'
Loading history...
59
60
    def process(self, pre_ack_response):
61
62
        trigger_instance, message = _decompose_pre_ack_process_response(pre_ack_response)
0 ignored issues
show
Comprehensibility Best Practice introduced by
Undefined variable '_decompose_pre_ack_process_response'
Loading history...
63
        if not trigger_instance:
64
            raise ValueError('No trigger_instance provided for processing.')
65
66
        try:
67
            # Use trace_context from the message and if not found create a new context
68
            # and use the trigger_instance.id as trace_tag.
69
            trace_context = message.get(TRACE_CONTEXT, None)
70
            if not trace_context:
71
                trace_context = {
72
                    TRACE_ID: 'trigger_instance-%s' % str(trigger_instance.id)
73
                }
74
            # add a trace or update an existing trace with trigger_instance
75
            trace_service.add_or_update_given_trace_context(
76
                trace_context=trace_context,
77
                trigger_instances=[
78
                    trace_service.get_trace_component_for_trigger_instance(trigger_instance)
79
            ])
80
81
            container_utils.update_trigger_instance_status(
82
                trigger_instance, trigger_constants.TRIGGER_INSTANCE_PROCESSING)
83
            self.rules_engine.handle_trigger_instance(trigger_instance)
84
            container_utils.update_trigger_instance_status(
85
                trigger_instance, trigger_constants.TRIGGER_INSTANCE_PROCESSED)
86
        except:
87
            # TODO : Capture the reason for failure.
88
            container_utils.update_trigger_instance_status(
89
                trigger_instance, trigger_constants.TRIGGER_INSTANCE_PROCESSING_FAILED)
90
            # This could be a large message but at least in case of an exception
91
            # we get to see more context.
92
            # Beyond this point code cannot really handle the exception anyway so
93
            # eating up the exception.
94
            LOG.exception('Failed to handle trigger_instance %s.', trigger_instance)
95
            return
96
97
    def _compose_pre_ack_process_response(self, trigger_instance, message):
98
        """
99
        Codify response of the pre_ack_process method.
100
        """
101
        return {'trigger_instance': trigger_instance, 'message': message}
102
103
    def _decompose_pre_ack_process_response(self, response):
104
        """
105
        Break-down response of pre_ack_process into constituents for simpler consumption.
106
        """
107
        return response.get('trigger_instance', None), response.get('message', None)
108
109
110
def get_worker():
111
    with Connection(transport_utils.get_messaging_urls()) as conn:
112
        return TriggerInstanceDispatcher(conn, [RULESENGINE_WORK_Q])
113