Completed
Pull Request — master (#2588)
by Manas
05:57
created

get_resolved_parameters()   A

Complexity

Conditions 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 9
rs 9.6666
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
18
from st2common import log as logging
19
from st2common.constants import action as action_constants
20
from st2common.constants.trace import TRACE_CONTEXT
21
from st2common.models.api.trace import TraceContext
22
from st2common.models.db.liveaction import LiveActionDB
23
from st2common.models.db.rule_enforcement import RuleEnforcementDB
24
from st2common.models.utils import action_param_utils
25
from st2common.models.api.auth import get_system_username
26
from st2common.persistence.rule_enforcement import RuleEnforcement
27
from st2common.services import action as action_service
28
from st2common.services import trace as trace_service
29
from st2common.util import reference
30
from st2common.util import action_db as action_db_util
31
from st2reactor.rules.datatransform import get_transformer
32
33
34
LOG = logging.getLogger('st2reactor.ruleenforcement.enforce')
35
36
EXEC_KICKED_OFF_STATES = [action_constants.LIVEACTION_STATUS_SCHEDULED,
37
                          action_constants.LIVEACTION_STATUS_REQUESTED]
38
39
40
class RuleEnforcer(object):
41
    def __init__(self, trigger_instance, rule):
42
        self.trigger_instance = trigger_instance
43
        self.rule = rule
44
45
        try:
46
            self.data_transformer = get_transformer(trigger_instance.payload)
47
        except Exception as e:
48
            message = ('Failed to template-ize trigger payload: %s. If the payload contains '
49
                       'special characters such as "{{" which dont\'t reference value in '
50
                       'a datastore, those characters need to be escaped' % (str(e)))
51
            raise ValueError(message)
52
53
    def get_resolved_parameters(self):
54
        # TODO: Refactor this to avoid additional lookup in cast_params
55
        # TODO: rename self.rule.action -> self.rule.action_exec_spec
56
        action_ref = self.rule.action['ref']
57
        action_db = action_db_util.get_action_by_ref(action_ref)
58
        if not action_db:
59
            raise ValueError('Action "%s" doesn\'t exist' % (action_ref))
60
61
        return self.data_transformer(self.rule.action.parameters)
62
63
    def enforce(self):
64
        params = self.get_resolved_parameters()
65
        LOG.info('Invoking action %s for trigger_instance %s with params %s.',
66
                 self.rule.action.ref, self.trigger_instance.id,
67
                 json.dumps(params))
68
69
        # update trace before invoking the action.
70
        trace_context = self._update_trace()
71
        LOG.debug('Updated trace %s with rule %s.', trace_context, self.rule.id)
72
73
        context = {
74
            'trigger_instance': reference.get_ref_from_model(self.trigger_instance),
75
            'rule': reference.get_ref_from_model(self.rule),
76
            'user': get_system_username(),
77
            TRACE_CONTEXT: trace_context
78
        }
79
80
        extra = {'trigger_instance_db': self.trigger_instance, 'rule_db': self.rule}
81
        rule_spec = {'ref': self.rule.ref, 'id': str(self.rule.id), 'uid': self.rule.uid}
82
        enforcement_db = RuleEnforcementDB(trigger_instance_id=str(self.trigger_instance.id),
83
                                           rule=rule_spec)
84
        try:
85
            execution_db = RuleEnforcer._invoke_action(self.rule.action, params, context)
86
            # pylint: disable=no-member
87
            enforcement_db.execution_id = str(execution_db.id)
88
            # pylint: enable=no-member
89
        except:
90
            LOG.exception('Failed kicking off execution for rule %s.', self.rule, extra=extra)
91
            return None
92
        finally:
93
            self._update_enforcement(enforcement_db)
94
95
        extra['execution_db'] = execution_db
96
        # pylint: disable=no-member
97
        if execution_db.status not in EXEC_KICKED_OFF_STATES:
98
            # pylint: enable=no-member
99
            LOG.audit('Rule enforcement failed. Execution of Action %s failed. '
100
                      'TriggerInstance: %s and Rule: %s',
101
                      self.rule.action.name, self.trigger_instance, self.rule,
102
                      extra=extra)
103
            return execution_db
104
105
        LOG.audit('Rule enforced. Execution %s, TriggerInstance %s and Rule %s.',
106
                  execution_db, self.trigger_instance, self.rule, extra=extra)
107
108
        return execution_db
109
110
    def _update_trace(self):
111
        """
112
        :rtype: ``dict`` trace_context as a dict; could be None
113
        """
114
        trace_db = None
115
        try:
116
            trace_db = trace_service.get_trace_db_by_trigger_instance(self.trigger_instance)
117
        except:
118
            LOG.exception('No Trace found for TriggerInstance %s.', self.trigger_instance.id)
119
            return None
120
121
        # This would signify some sort of coding error so assert.
122
        assert trace_db
123
124
        trace_db = trace_service.add_or_update_given_trace_db(
125
            trace_db=trace_db,
126
            rules=[
127
                trace_service.get_trace_component_for_rule(self.rule, self.trigger_instance)
128
            ])
129
        return vars(TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag))
130
131
    def _update_enforcement(self, enforcement_db):
132
        try:
133
            RuleEnforcement.add_or_update(enforcement_db)
134
        except:
135
            extra = {'enforcement_db': enforcement_db}
136
            LOG.exception('Failed writing enforcement model to db.', extra=extra)
137
138
    @staticmethod
139
    def _invoke_action(action_exec_spec, params, context=None):
140
        """
141
        Schedule an action execution.
142
143
        :type action_exec_spec: :class:`ActionExecutionSpecDB`
144
145
        :param params: Parameters to execute the action with.
146
        :type params: ``dict``
147
148
        :rtype: :class:`LiveActionDB` on successful schedueling, None otherwise.
149
        """
150
        action_ref = action_exec_spec['ref']
151
152
        # prior to shipping off the params cast them to the right type.
153
        params = action_param_utils.cast_params(action_ref, params)
154
        liveaction = LiveActionDB(action=action_ref, context=context, parameters=params)
155
        liveaction, execution = action_service.request(liveaction)
156
157
        return execution
158