Passed
Push — master ( e8a7db...5b26d7 )
by Plexxi
03:19
created

get_resolved_parameters()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 9
rs 9.6666
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.constants.trace import TRACE_CONTEXT
21
from st2common.models.api.trace import TraceContext
22
from st2common.models.db.liveaction import LiveActionDB
23
24
from st2common.models.db.rule_enforcement import RuleEnforcementDB
25
from st2common.models.utils import action_param_utils
26
from st2common.models.api.auth import get_system_username
27
from st2common.persistence.rule_enforcement import RuleEnforcement
28
from st2common.services import action as action_service
29
from st2common.services import trace as trace_service
30
from st2common.util import reference
31
from st2common.util import action_db as action_db_util
32
from st2reactor.rules.datatransform import get_transformer
33
34
35
LOG = logging.getLogger('st2reactor.ruleenforcement.enforce')
36
37
EXEC_KICKED_OFF_STATES = [action_constants.LIVEACTION_STATUS_SCHEDULED,
38
                          action_constants.LIVEACTION_STATUS_REQUESTED]
39
40
41
class RuleEnforcer(object):
42
    def __init__(self, trigger_instance, rule):
43
        self.trigger_instance = trigger_instance
44
        self.rule = rule
45
46
        try:
47
            self.data_transformer = get_transformer(trigger_instance.payload)
48
        except Exception as e:
49
            message = ('Failed to template-ize trigger payload: %s. If the payload contains '
50
                       'special characters such as "{{" which dont\'t reference value in '
51
                       'a datastore, those characters need to be escaped' % (str(e)))
52
            raise ValueError(message)
53
54
    def get_resolved_parameters(self):
55
        # TODO: Refactor this to avoid additional lookup in cast_params
56
        # TODO: rename self.rule.action -> self.rule.action_exec_spec
57
        action_ref = self.rule.action['ref']
58
        action_db = action_db_util.get_action_by_ref(action_ref)
59
        if not action_db:
60
            raise ValueError('Action "%s" doesn\'t exist' % (action_ref))
61
62
        return self.data_transformer(self.rule.action.parameters)
63
64
    def enforce(self):
65
        params = self.get_resolved_parameters()
66
        LOG.info('Invoking action %s for trigger_instance %s with params %s.',
67
                 self.rule.action.ref, self.trigger_instance.id,
68
                 json.dumps(params))
69
70
        # update trace before invoking the action.
71
        trace_context = self._update_trace()
72
        LOG.debug('Updated trace %s with rule %s.', trace_context, self.rule.id)
73
74
        context = {
75
            'trigger_instance': reference.get_ref_from_model(self.trigger_instance),
76
            'rule': reference.get_ref_from_model(self.rule),
77
            'user': get_system_username(),
78
            TRACE_CONTEXT: trace_context
79
        }
80
81
        extra = {'trigger_instance_db': self.trigger_instance, 'rule_db': self.rule}
82
        rule_spec = {'ref': self.rule.ref, 'id': str(self.rule.id), 'uid': self.rule.uid}
83
        enforcement_db = RuleEnforcementDB(trigger_instance_id=str(self.trigger_instance.id),
84
                                           rule=rule_spec)
85
        try:
86
            execution_db = RuleEnforcer._invoke_action(self.rule.action, params, context)
87
            # pylint: disable=no-member
88
            enforcement_db.execution_id = str(execution_db.id)
89
            # pylint: enable=no-member
90
        except:
91
            LOG.exception('Failed kicking off execution for rule %s.', self.rule, extra=extra)
92
            return None
93
        finally:
94
            self._update_enforcement(enforcement_db)
95
96
        extra['execution_db'] = execution_db
97
        # pylint: disable=no-member
98
        if execution_db.status not in EXEC_KICKED_OFF_STATES:
99
            # pylint: enable=no-member
100
            LOG.audit('Rule enforcement failed. Execution of Action %s failed. '
101
                      'TriggerInstance: %s and Rule: %s',
102
                      self.rule.action.name, self.trigger_instance, self.rule,
103
                      extra=extra)
104
            return execution_db
105
106
        LOG.audit('Rule enforced. Execution %s, TriggerInstance %s and Rule %s.',
107
                  execution_db, self.trigger_instance, self.rule, extra=extra)
108
109
        return execution_db
110
111
    def _update_trace(self):
112
        """
113
        :rtype: ``dict`` trace_context as a dict; could be None
114
        """
115
        trace_db = None
116
        try:
117
            trace_db = trace_service.get_trace_db_by_trigger_instance(self.trigger_instance)
118
        except:
119
            LOG.exception('No Trace found for TriggerInstance %s.', self.trigger_instance.id)
120
            return None
121
122
        # This would signify some sort of coding error so assert.
123
        assert trace_db
124
125
        trace_db = trace_service.add_or_update_given_trace_db(
126
            trace_db=trace_db,
127
            rules=[
128
                trace_service.get_trace_component_for_rule(self.rule, self.trigger_instance)
129
            ])
130
        return vars(TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag))
131
132
    def _update_enforcement(self, enforcement_db):
133
        try:
134
            RuleEnforcement.add_or_update(enforcement_db)
135
        except:
136
            extra = {'enforcement_db': enforcement_db}
137
            LOG.exception('Failed writing enforcement model to db.', extra=extra)
138
139
    @staticmethod
140
    def _invoke_action(action_exec_spec, params, context=None):
141
        """
142
        Schedule an action execution.
143
144
        :type action_exec_spec: :class:`ActionExecutionSpecDB`
145
146
        :param params: Parameters to execute the action with.
147
        :type params: ``dict``
148
149
        :rtype: :class:`LiveActionDB` on successful schedueling, None otherwise.
150
        """
151
        action_ref = action_exec_spec['ref']
152
153
        # prior to shipping off the params cast them to the right type.
154
        params = action_param_utils.cast_params(action_ref, params)
155
        liveaction = LiveActionDB(action=action_ref, context=context, parameters=params)
156
        liveaction, execution = action_service.request(liveaction)
157
158
        return execution
159