Completed
Pull Request — master (#2591)
by Manas
05:29
created

st2reactor.rules.RuleEnforcer._do_enforce()   A

Complexity

Conditions 1

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 18
rs 9.4285
cc 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.constants.trace import TRACE_CONTEXT
21
from st2common.models.api.trace import TraceContext
22
from st2common.models.db.liveaction import LiveActionDB
23
24
from st2common.models.db.rule_enforcement import RuleEnforcementDB
25
from st2common.models.utils import action_param_utils
26
from st2common.models.api.auth import get_system_username
27
from st2common.persistence.rule_enforcement import RuleEnforcement
28
from st2common.services import action as action_service
29
from st2common.services import trace as trace_service
30
from st2common.util import reference
31
from st2common.util import action_db as action_db_util
32
from st2reactor.rules.datatransform import get_transformer
33
34
35
LOG = logging.getLogger('st2reactor.ruleenforcement.enforce')
36
37
EXEC_KICKED_OFF_STATES = [action_constants.LIVEACTION_STATUS_SCHEDULED,
38
                          action_constants.LIVEACTION_STATUS_REQUESTED]
39
40
41
class RuleEnforcer(object):
42
    def __init__(self, trigger_instance, rule):
43
        self.trigger_instance = trigger_instance
44
        self.rule = rule
45
46
        try:
47
            self.data_transformer = get_transformer(trigger_instance.payload)
48
        except Exception as e:
49
            message = ('Failed to template-ize trigger payload: %s. If the payload contains '
50
                       'special characters such as "{{" which dont\'t reference value in '
51
                       'a datastore, those characters need to be escaped' % (str(e)))
52
            raise ValueError(message)
53
54
    def get_resolved_parameters(self):
55
        # TODO: Refactor this to avoid additional lookup in cast_params
56
        # TODO: rename self.rule.action -> self.rule.action_exec_spec
57
        action_ref = self.rule.action['ref']
58
        action_db = action_db_util.get_action_by_ref(action_ref)
59
        if not action_db:
60
            raise ValueError('Action "%s" doesn\'t exist' % (action_ref))
61
62
        return self.data_transformer(self.rule.action.parameters)
63
64
    def enforce(self):
65
        rule_spec = {'ref': self.rule.ref, 'id': str(self.rule.id), 'uid': self.rule.uid}
66
        enforcement_db = RuleEnforcementDB(trigger_instance_id=str(self.trigger_instance.id),
67
                                           rule=rule_spec)
68
        extra = {
69
            'trigger_instance_db': self.trigger_instance,
70
            'rule_db': self.rule
71
        }
72
73
        try:
74
            execution_db = self._do_enforce()
75
            enforcement_db.execution_id = str(execution_db.id)
76
            extra['execution_db'] = execution_db
77
        except Exception as e:
78
            # Record the failure reason in the RuleEnforcement.
79
            enforcement_db.failure_reason = e.msg
80
            LOG.exception('Failed kicking off execution for rule %s.', self.rule, extra=extra)
81
        finally:
82
            self._update_enforcement(enforcement_db)
83
84
        if not execution_db or execution_db.status not in EXEC_KICKED_OFF_STATES:
85
            LOG.audit('Rule enforcement failed. Execution of Action %s failed. '
86
                      'TriggerInstance: %s and Rule: %s',
87
                      self.rule.action.name, self.trigger_instance, self.rule,
88
                      extra=extra)
89
        else:
90
            LOG.audit('Rule enforced. Execution %s, TriggerInstance %s and Rule %s.',
91
                      execution_db, self.trigger_instance, self.rule, extra=extra)
92
93
        return execution_db
94
95
    def _do_enforce(self):
96
        params = self.get_resolved_parameters()
97
        LOG.info('Invoking action %s for trigger_instance %s with params %s.',
98
                 self.rule.action.ref, self.trigger_instance.id,
99
                 json.dumps(params))
100
101
        # update trace before invoking the action.
102
        trace_context = self._update_trace()
103
        LOG.debug('Updated trace %s with rule %s.', trace_context, self.rule.id)
104
105
        context = {
106
            'trigger_instance': reference.get_ref_from_model(self.trigger_instance),
107
            'rule': reference.get_ref_from_model(self.rule),
108
            'user': get_system_username(),
109
            TRACE_CONTEXT: trace_context
110
        }
111
112
        return RuleEnforcer._invoke_action(self.rule.action, params, context)
113
114
    def _update_trace(self):
115
        """
116
        :rtype: ``dict`` trace_context as a dict; could be None
117
        """
118
        trace_db = None
119
        try:
120
            trace_db = trace_service.get_trace_db_by_trigger_instance(self.trigger_instance)
121
        except:
122
            LOG.exception('No Trace found for TriggerInstance %s.', self.trigger_instance.id)
123
            return None
124
125
        # This would signify some sort of coding error so assert.
126
        assert trace_db
127
128
        trace_db = trace_service.add_or_update_given_trace_db(
129
            trace_db=trace_db,
130
            rules=[
131
                trace_service.get_trace_component_for_rule(self.rule, self.trigger_instance)
132
            ])
133
        return vars(TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag))
134
135
    def _update_enforcement(self, enforcement_db):
136
        try:
137
            RuleEnforcement.add_or_update(enforcement_db)
138
        except:
139
            extra = {'enforcement_db': enforcement_db}
140
            LOG.exception('Failed writing enforcement model to db.', extra=extra)
141
142
    @staticmethod
143
    def _invoke_action(action_exec_spec, params, context=None):
144
        """
145
        Schedule an action execution.
146
147
        :type action_exec_spec: :class:`ActionExecutionSpecDB`
148
149
        :param params: Parameters to execute the action with.
150
        :type params: ``dict``
151
152
        :rtype: :class:`LiveActionDB` on successful schedueling, None otherwise.
153
        """
154
        action_ref = action_exec_spec['ref']
155
156
        # prior to shipping off the params cast them to the right type.
157
        params = action_param_utils.cast_params(action_ref, params)
158
        liveaction = LiveActionDB(action=action_ref, context=context, parameters=params)
159
        liveaction, execution = action_service.request(liveaction)
160
161
        return execution
162