Completed
Pull Request — master (#2591)
by Manas
10:50 queued 04:53
created

st2reactor.rules.RuleEnforcer._do_enforce()   A

Complexity

Conditions 1

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 18
rs 9.4285
cc 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.constants.trace import TRACE_CONTEXT
21
from st2common.models.api.trace import TraceContext
22
from st2common.models.db.liveaction import LiveActionDB
23
24
from st2common.models.db.rule_enforcement import RuleEnforcementDB
25
from st2common.models.utils import action_param_utils
26
from st2common.models.api.auth import get_system_username
27
from st2common.persistence.rule_enforcement import RuleEnforcement
28
from st2common.services import action as action_service
29
from st2common.services import trace as trace_service
30
from st2common.util import reference
31
from st2common.util import action_db as action_db_util
32
from st2reactor.rules.datatransform import get_transformer
33
34
35
LOG = logging.getLogger('st2reactor.ruleenforcement.enforce')
36
37
EXEC_KICKED_OFF_STATES = [action_constants.LIVEACTION_STATUS_SCHEDULED,
38
                          action_constants.LIVEACTION_STATUS_REQUESTED]
39
40
41
class RuleEnforcer(object):
42
    def __init__(self, trigger_instance, rule):
43
        self.trigger_instance = trigger_instance
44
        self.rule = rule
45
46
        try:
47
            self.data_transformer = get_transformer(trigger_instance.payload)
48
        except Exception as e:
49
            message = ('Failed to template-ize trigger payload: %s. If the payload contains '
50
                       'special characters such as "{{" which dont\'t reference value in '
51
                       'a datastore, those characters need to be escaped' % (str(e)))
52
            raise ValueError(message)
53
54
    def get_resolved_parameters(self):
55
        # TODO: Refactor this to avoid additional lookup in cast_params
56
        # TODO: rename self.rule.action -> self.rule.action_exec_spec
57
        action_ref = self.rule.action['ref']
58
        action_db = action_db_util.get_action_by_ref(action_ref)
59
        if not action_db:
60
            raise ValueError('Action "%s" doesn\'t exist' % (action_ref))
61
62
        return self.data_transformer(self.rule.action.parameters)
63
64
    def enforce(self):
65
        rule_spec = {'ref': self.rule.ref, 'id': str(self.rule.id), 'uid': self.rule.uid}
66
        enforcement_db = RuleEnforcementDB(trigger_instance_id=str(self.trigger_instance.id),
67
                                           rule=rule_spec)
68
        extra = {
69
            'trigger_instance_db': self.trigger_instance,
70
            'rule_db': self.rule
71
        }
72
        execution_db = None
73
        try:
74
            execution_db = self._do_enforce()
75
            # pylint: disable=no-member
76
            enforcement_db.execution_id = str(execution_db.id)
77
            extra['execution_db'] = execution_db
78
        except Exception as e:
79
            # Record the failure reason in the RuleEnforcement.
80
            enforcement_db.failure_reason = e.message
81
            LOG.exception('Failed kicking off execution for rule %s.', self.rule, extra=extra)
82
        finally:
83
            self._update_enforcement(enforcement_db)
84
85
        # pylint: disable=no-member
86
        if not execution_db or execution_db.status not in EXEC_KICKED_OFF_STATES:
87
            LOG.audit('Rule enforcement failed. Execution of Action %s failed. '
88
                      'TriggerInstance: %s and Rule: %s',
89
                      self.rule.action.ref, self.trigger_instance, self.rule,
90
                      extra=extra)
91
        else:
92
            LOG.audit('Rule enforced. Execution %s, TriggerInstance %s and Rule %s.',
93
                      execution_db, self.trigger_instance, self.rule, extra=extra)
94
95
        return execution_db
96
97
    def _do_enforce(self):
98
        params = self.get_resolved_parameters()
99
        LOG.info('Invoking action %s for trigger_instance %s with params %s.',
100
                 self.rule.action.ref, self.trigger_instance.id,
101
                 json.dumps(params))
102
103
        # update trace before invoking the action.
104
        trace_context = self._update_trace()
105
        LOG.debug('Updated trace %s with rule %s.', trace_context, self.rule.id)
106
107
        context = {
108
            'trigger_instance': reference.get_ref_from_model(self.trigger_instance),
109
            'rule': reference.get_ref_from_model(self.rule),
110
            'user': get_system_username(),
111
            TRACE_CONTEXT: trace_context
112
        }
113
114
        return RuleEnforcer._invoke_action(self.rule.action, params, context)
115
116
    def _update_trace(self):
117
        """
118
        :rtype: ``dict`` trace_context as a dict; could be None
119
        """
120
        trace_db = None
121
        try:
122
            trace_db = trace_service.get_trace_db_by_trigger_instance(self.trigger_instance)
123
        except:
124
            LOG.exception('No Trace found for TriggerInstance %s.', self.trigger_instance.id)
125
            return None
126
127
        # This would signify some sort of coding error so assert.
128
        assert trace_db
129
130
        trace_db = trace_service.add_or_update_given_trace_db(
131
            trace_db=trace_db,
132
            rules=[
133
                trace_service.get_trace_component_for_rule(self.rule, self.trigger_instance)
134
            ])
135
        return vars(TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag))
136
137
    def _update_enforcement(self, enforcement_db):
138
        try:
139
            RuleEnforcement.add_or_update(enforcement_db)
140
        except:
141
            extra = {'enforcement_db': enforcement_db}
142
            LOG.exception('Failed writing enforcement model to db.', extra=extra)
143
144
    @staticmethod
145
    def _invoke_action(action_exec_spec, params, context=None):
146
        """
147
        Schedule an action execution.
148
149
        :type action_exec_spec: :class:`ActionExecutionSpecDB`
150
151
        :param params: Parameters to execute the action with.
152
        :type params: ``dict``
153
154
        :rtype: :class:`LiveActionDB` on successful schedueling, None otherwise.
155
        """
156
        action_ref = action_exec_spec['ref']
157
158
        # prior to shipping off the params cast them to the right type.
159
        params = action_param_utils.cast_params(action_ref, params)
160
        liveaction = LiveActionDB(action=action_ref, context=context, parameters=params)
161
        liveaction, execution = action_service.request(liveaction)
162
163
        return execution
164