Passed
Push — master ( 7ddc3b...6d37f6 )
by
unknown
03:15
created

Notifier.process()   A

Complexity

Conditions 3

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
c 2
b 0
f 0
dl 0
loc 18
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import json
18
19
from kombu import Connection
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
24
from st2common.constants.action import LIVEACTION_FAILED_STATES
25
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
26
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES
27
from st2common.models.api.trace import TraceContext
28
from st2common.models.db.execution import ActionExecutionDB
29
from st2common.persistence.action import Action
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.persistence.policy import Policy
32
from st2common import policies
33
from st2common.models.system.common import ResourceReference
34
from st2common.persistence.execution import ActionExecution
35
from st2common.services import trace as trace_service
36
from st2common.transport import consumers, execution, publishers
37
from st2common.transport import utils as transport_utils
38
from st2common.transport.reactor import TriggerDispatcher
39
from st2common.util import isotime
40
from st2common.util import jinja as jinja_utils
41
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX
42
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX
43
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX
44
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE
45
from st2common.services.keyvalues import KeyValueLookup
46
47
__all__ = [
48
    'Notifier',
49
    'get_notifier'
50
]
51
52
LOG = logging.getLogger(__name__)
53
54
ACTIONUPDATE_WORK_Q = execution.get_queue('st2.notifiers.execution.work',
55
                                          routing_key=publishers.UPDATE_RK)
56
57
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable
58
# XXX: Fix this nasty positional dependency.
59
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0]
60
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1]
61
62
63
class Notifier(consumers.MessageHandler):
64
    message_type = ActionExecutionDB
65
66
    def __init__(self, connection, queues, trigger_dispatcher=None):
67
        super(Notifier, self).__init__(connection, queues)
68
        if not trigger_dispatcher:
69
            trigger_dispatcher = TriggerDispatcher(LOG)
70
        self._trigger_dispatcher = trigger_dispatcher
71
        self._notify_trigger = ResourceReference.to_string_reference(
72
            pack=NOTIFY_TRIGGER_TYPE['pack'],
73
            name=NOTIFY_TRIGGER_TYPE['name'])
74
        self._action_trigger = ResourceReference.to_string_reference(
75
            pack=ACTION_TRIGGER_TYPE['pack'],
76
            name=ACTION_TRIGGER_TYPE['name'])
77
78
    def process(self, execution_db):
79
        execution_id = str(execution_db.id)
80
        extra = {'execution': execution_db}
81
        LOG.debug('Processing execution %s', execution_id, extra=extra)
82
83
        if execution_db.status not in LIVEACTION_COMPLETED_STATES:
84
            LOG.debug('Skipping processing of execution %s since it\'s not in a completed state' %
85
                      (execution_id), extra=extra)
86
            return
87
88
        liveaction_id = execution_db.liveaction['id']
89
        liveaction_db = LiveAction.get_by_id(liveaction_id)
90
        self._apply_post_run_policies(liveaction_db=liveaction_db)
91
92
        if liveaction_db.notify is not None:
93
            self._post_notify_triggers(liveaction_db=liveaction_db, execution_db=execution_db)
94
95
        self._post_generic_trigger(liveaction_db=liveaction_db, execution_db=execution_db)
96
97
    def _get_execution_for_liveaction(self, liveaction):
98
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
99
100
        if not execution:
101
            return None
102
103
        return execution
104
105
    def _post_notify_triggers(self, liveaction_db=None, execution_db=None):
106
        notify = getattr(liveaction_db, 'notify', None)
107
108
        if not notify:
109
            return
110
111
        if notify.on_complete:
112
            self._post_notify_subsection_triggers(
113
                liveaction_db=liveaction_db, execution_db=execution_db,
114
                notify_subsection=notify.on_complete,
115
                default_message_suffix='completed.')
116
        if liveaction_db.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
117
            self._post_notify_subsection_triggers(
118
                liveaction_db=liveaction_db, execution_db=execution_db,
119
                notify_subsection=notify.on_success,
120
                default_message_suffix='succeeded.')
121
        if liveaction_db.status in LIVEACTION_FAILED_STATES and notify.on_failure:
122
            self._post_notify_subsection_triggers(
123
                liveaction_db=liveaction_db, execution_db=execution_db,
124
                notify_subsection=notify.on_failure,
125
                default_message_suffix='failed.')
126
127
    def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None,
128
                                         notify_subsection=None,
129
                                         default_message_suffix=None):
130
        routes = (getattr(notify_subsection, 'routes') or
131
                  getattr(notify_subsection, 'channels', None))
132
133
        execution_id = str(execution_db.id)
134
135
        if routes and len(routes) >= 1:
136
            payload = {}
137
            message = notify_subsection.message or (
138
                'Action ' + liveaction_db.action + ' ' + default_message_suffix)
139
            data = notify_subsection.data or {}
140
141
            jinja_context = self._build_jinja_context(
142
                liveaction_db=liveaction_db, execution_db=execution_db
143
            )
144
145
            try:
146
                message = self._transform_message(message=message,
147
                                                  context=jinja_context)
148
            except:
149
                LOG.exception('Failed (Jinja) transforming `message`.')
150
151
            try:
152
                data = self._transform_data(data=data, context=jinja_context)
153
            except:
154
                LOG.exception('Failed (Jinja) transforming `data`.')
155
156
            # At this point convert result to a string. This restricts the rulesengines
157
            # ability to introspect the result. On the other handle atleast a json usable
158
            # result is sent as part of the notification. If jinja is required to convert
159
            # to a string representation it uses str(...) which make it impossible to
160
            # parse the result as json any longer.
161
            # TODO: Use to_serializable_dict
162
            data['result'] = json.dumps(liveaction_db.result)
163
164
            payload['message'] = message
165
            payload['data'] = data
166
            payload['execution_id'] = execution_id
167
            payload['status'] = liveaction_db.status
168
            payload['start_timestamp'] = isotime.format(liveaction_db.start_timestamp)
169
170
            try:
171
                payload['end_timestamp'] = isotime.format(liveaction_db.end_timestamp)
172
            except AttributeError:
173
                # This can be raised if liveaction.end_timestamp is None, which is caused
174
                # when policy cancels a request due to concurrency
175
                # In this case, use datetime.now() instead
176
                payload['end_timestamp'] = isotime.format(datetime.utcnow())
177
178
            payload['action_ref'] = liveaction_db.action
179
            payload['runner_ref'] = self._get_runner_ref(liveaction_db.action)
180
181
            trace_context = self._get_trace_context(execution_id=execution_id)
182
183
            failed_routes = []
184
            for route in routes:
185
                try:
186
                    payload['route'] = route
187
                    # Deprecated. Only for backward compatibility reasons.
188
                    payload['channel'] = route
189
                    LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
190
                              liveaction_db.id, payload)
191
                    self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
192
                                                      trace_context=trace_context)
193
                except:
194
                    failed_routes.append(route)
195
196
            if len(failed_routes) > 0:
197
                raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes))
198
199
    def _build_jinja_context(self, liveaction_db, execution_db):
200
        context = {}
201
        context.update({
202
            DATASTORE_PARENT_SCOPE: {
203
                SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE)
204
            }
205
        })
206
        context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction_db.parameters})
207
        context.update({ACTION_CONTEXT_KV_PREFIX: liveaction_db.context})
208
        context.update({ACTION_RESULTS_KV_PREFIX: execution_db.result})
209
        return context
210
211
    def _transform_message(self, message, context=None):
212
        mapping = {'message': message}
213
        context = context or {}
214
        return (jinja_utils.render_values(mapping=mapping, context=context)).get('message',
215
                                                                                 message)
216
217
    def _transform_data(self, data, context=None):
218
        return jinja_utils.render_values(mapping=data, context=context)
219
220
    def _get_trace_context(self, execution_id):
221
        trace_db = trace_service.get_trace_db_by_action_execution(
222
            action_execution_id=execution_id)
223
        if trace_db:
224
            return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag)
225
        # If no trace_context is found then do not create a new one here. If necessary
226
        # it shall be created downstream. Sure this is impl leakage of some sort.
227
        return None
228
229
    def _post_generic_trigger(self, liveaction_db=None, execution_db=None):
230
        if not ACTION_SENSOR_ENABLED:
231
            LOG.debug('Action trigger is disabled, skipping trigger dispatch...')
232
            return
233
234
        execution_id = str(execution_db.id)
235
        payload = {'execution_id': execution_id,
236
                   'status': liveaction_db.status,
237
                   'start_timestamp': str(liveaction_db.start_timestamp),
238
                   # deprecate 'action_name' at some point and switch to 'action_ref'
239
                   'action_name': liveaction_db.action,
240
                   'action_ref': liveaction_db.action,
241
                   'runner_ref': self._get_runner_ref(liveaction_db.action),
242
                   'parameters': liveaction_db.get_masked_parameters(),
243
                   'result': liveaction_db.result}
244
        # Use execution_id to extract trace rather than liveaction. execution_id
245
        # will look-up an exact TraceDB while liveaction depending on context
246
        # may not end up going to the DB.
247
        trace_context = self._get_trace_context(execution_id=execution_id)
248
        LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
249
                  ACTION_TRIGGER_TYPE['name'], liveaction_db.id, payload, trace_context)
250
        self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
251
                                          trace_context=trace_context)
252
253
    def _apply_post_run_policies(self, liveaction_db):
254
        # Apply policies defined for the action.
255
        policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True)
256
        LOG.debug('Applying %s post_run policies' % (len(policy_dbs)))
257
258
        for policy_db in policy_dbs:
259
            driver = policies.get_driver(policy_db.ref,
260
                                         policy_db.policy_type,
261
                                         **policy_db.parameters)
262
263
            try:
264
                LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' %
265
                          (policy_db.ref, policy_db.policy_type, str(liveaction_db.id)))
266
                liveaction_db = driver.apply_after(liveaction_db)
267
            except:
268
                LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref)
269
270
        return liveaction_db
271
272
    def _get_runner_ref(self, action_ref):
273
        """
274
        Retrieve a runner reference for the provided action.
275
276
        :rtype: ``str``
277
        """
278
        action = Action.get_by_ref(action_ref)
279
        return action['runner_type']['name']
280
281
282
def get_notifier():
283
    with Connection(transport_utils.get_messaging_urls()) as conn:
284
        return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG))
285