Test Setup Failed
Pull Request — master (#4154)
by W
04:10
created

Notifier.process()   B

Complexity

Conditions 6

Size

Total Lines 25

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
c 0
b 0
f 0
dl 0
loc 25
rs 7.5384
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
from datetime import datetime
18
import json
19
20
from kombu import Connection
21
from oslo_config import cfg
22
23
from st2common import log as logging
24
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
25
from st2common.constants.action import LIVEACTION_STATUS_PAUSED
26
from st2common.constants.action import LIVEACTION_FAILED_STATES
27
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
28
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES
29
from st2common.models.api.trace import TraceContext
30
from st2common.models.db.execution import ActionExecutionDB
31
from st2common.persistence.action import Action
32
from st2common.persistence.liveaction import LiveAction
33
from st2common.persistence.policy import Policy
34
from st2common import policies
35
from st2common.models.system.common import ResourceReference
36
from st2common.persistence.execution import ActionExecution
37
from st2common.services import trace as trace_service
38
from st2common.services import workflows as wf_svc
39
from st2common.transport import consumers
40
from st2common.transport import utils as transport_utils
41
from st2common.transport.reactor import TriggerDispatcher
42
from st2common.util import isotime
43
from st2common.util import jinja as jinja_utils
44
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX
45
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX
46
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX
47
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE
48
from st2common.services.keyvalues import KeyValueLookup
49
from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE
50
51
__all__ = [
52
    'Notifier',
53
    'get_notifier'
54
]
55
56
LOG = logging.getLogger(__name__)
57
58
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable
59
# XXX: Fix this nasty positional dependency.
60
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0]
61
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1]
62
63
64
class Notifier(consumers.MessageHandler):
65
    message_type = ActionExecutionDB
66
67
    def __init__(self, connection, queues, trigger_dispatcher=None):
68
        super(Notifier, self).__init__(connection, queues)
69
        if not trigger_dispatcher:
70
            trigger_dispatcher = TriggerDispatcher(LOG)
71
        self._trigger_dispatcher = trigger_dispatcher
72
        self._notify_trigger = ResourceReference.to_string_reference(
73
            pack=NOTIFY_TRIGGER_TYPE['pack'],
74
            name=NOTIFY_TRIGGER_TYPE['name'])
75
        self._action_trigger = ResourceReference.to_string_reference(
76
            pack=ACTION_TRIGGER_TYPE['pack'],
77
            name=ACTION_TRIGGER_TYPE['name'])
78
79
    def process(self, execution_db):
80
        execution_id = str(execution_db.id)
81
        extra = {'execution': execution_db}
82
        LOG.debug('Processing execution %s', execution_id, extra=extra)
83
84
        if ('orchestra' in execution_db.context and
85
                execution_db.status == LIVEACTION_STATUS_PAUSED):
86
            wf_svc.handle_action_execution_pause(execution_db)
87
88
        if execution_db.status not in LIVEACTION_COMPLETED_STATES:
89
            LOG.debug('Skipping processing of execution %s since it\'s not in a completed state' %
90
                      (execution_id), extra=extra)
91
            return
92
93
        liveaction_id = execution_db.liveaction['id']
94
        liveaction_db = LiveAction.get_by_id(liveaction_id)
95
        self._apply_post_run_policies(liveaction_db=liveaction_db)
96
97
        if liveaction_db.notify is not None:
98
            self._post_notify_triggers(liveaction_db=liveaction_db, execution_db=execution_db)
99
100
        self._post_generic_trigger(liveaction_db=liveaction_db, execution_db=execution_db)
101
102
        if 'orchestra' in liveaction_db.context:
103
            wf_svc.handle_action_execution_completion(execution_db)
104
105
    def _get_execution_for_liveaction(self, liveaction):
106
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
107
108
        if not execution:
109
            return None
110
111
        return execution
112
113
    def _post_notify_triggers(self, liveaction_db=None, execution_db=None):
114
        notify = getattr(liveaction_db, 'notify', None)
115
116
        if not notify:
117
            return
118
119
        if notify.on_complete:
120
            self._post_notify_subsection_triggers(
121
                liveaction_db=liveaction_db, execution_db=execution_db,
122
                notify_subsection=notify.on_complete,
123
                default_message_suffix='completed.')
124
        if liveaction_db.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
125
            self._post_notify_subsection_triggers(
126
                liveaction_db=liveaction_db, execution_db=execution_db,
127
                notify_subsection=notify.on_success,
128
                default_message_suffix='succeeded.')
129
        if liveaction_db.status in LIVEACTION_FAILED_STATES and notify.on_failure:
130
            self._post_notify_subsection_triggers(
131
                liveaction_db=liveaction_db, execution_db=execution_db,
132
                notify_subsection=notify.on_failure,
133
                default_message_suffix='failed.')
134
135
    def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None,
136
                                         notify_subsection=None,
137
                                         default_message_suffix=None):
138
        routes = (getattr(notify_subsection, 'routes') or
139
                  getattr(notify_subsection, 'channels', None))
140
141
        execution_id = str(execution_db.id)
142
143
        if routes and len(routes) >= 1:
144
            payload = {}
145
            message = notify_subsection.message or (
146
                'Action ' + liveaction_db.action + ' ' + default_message_suffix)
147
            data = notify_subsection.data or {}
148
149
            jinja_context = self._build_jinja_context(
150
                liveaction_db=liveaction_db, execution_db=execution_db
151
            )
152
153
            try:
154
                message = self._transform_message(message=message,
155
                                                  context=jinja_context)
156
            except:
157
                LOG.exception('Failed (Jinja) transforming `message`.')
158
159
            try:
160
                data = self._transform_data(data=data, context=jinja_context)
161
            except:
162
                LOG.exception('Failed (Jinja) transforming `data`.')
163
164
            # At this point convert result to a string. This restricts the rulesengines
165
            # ability to introspect the result. On the other handle atleast a json usable
166
            # result is sent as part of the notification. If jinja is required to convert
167
            # to a string representation it uses str(...) which make it impossible to
168
            # parse the result as json any longer.
169
            # TODO: Use to_serializable_dict
170
            data['result'] = json.dumps(liveaction_db.result)
171
172
            payload['message'] = message
173
            payload['data'] = data
174
            payload['execution_id'] = execution_id
175
            payload['status'] = liveaction_db.status
176
            payload['start_timestamp'] = isotime.format(liveaction_db.start_timestamp)
177
178
            try:
179
                payload['end_timestamp'] = isotime.format(liveaction_db.end_timestamp)
180
            except AttributeError:
181
                # This can be raised if liveaction.end_timestamp is None, which is caused
182
                # when policy cancels a request due to concurrency
183
                # In this case, use datetime.now() instead
184
                payload['end_timestamp'] = isotime.format(datetime.utcnow())
185
186
            payload['action_ref'] = liveaction_db.action
187
            payload['runner_ref'] = self._get_runner_ref(liveaction_db.action)
188
189
            trace_context = self._get_trace_context(execution_id=execution_id)
190
191
            failed_routes = []
192
            for route in routes:
193
                try:
194
                    payload['route'] = route
195
                    # Deprecated. Only for backward compatibility reasons.
196
                    payload['channel'] = route
197
                    LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
198
                              liveaction_db.id, payload)
199
                    self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
200
                                                      trace_context=trace_context)
201
                except:
202
                    failed_routes.append(route)
203
204
            if len(failed_routes) > 0:
205
                raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes))
206
207
    def _build_jinja_context(self, liveaction_db, execution_db):
208
        context = {}
209
        context.update({
210
            DATASTORE_PARENT_SCOPE: {
211
                SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE)
212
            }
213
        })
214
        context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction_db.parameters})
215
        context.update({ACTION_CONTEXT_KV_PREFIX: liveaction_db.context})
216
        context.update({ACTION_RESULTS_KV_PREFIX: execution_db.result})
217
        return context
218
219
    def _transform_message(self, message, context=None):
220
        mapping = {'message': message}
221
        context = context or {}
222
        return (jinja_utils.render_values(mapping=mapping, context=context)).get('message',
223
                                                                                 message)
224
225
    def _transform_data(self, data, context=None):
226
        return jinja_utils.render_values(mapping=data, context=context)
227
228
    def _get_trace_context(self, execution_id):
229
        trace_db = trace_service.get_trace_db_by_action_execution(
230
            action_execution_id=execution_id)
231
        if trace_db:
232
            return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag)
233
        # If no trace_context is found then do not create a new one here. If necessary
234
        # it shall be created downstream. Sure this is impl leakage of some sort.
235
        return None
236
237
    def _post_generic_trigger(self, liveaction_db=None, execution_db=None):
238
        if not ACTION_SENSOR_ENABLED:
239
            LOG.debug('Action trigger is disabled, skipping trigger dispatch...')
240
            return
241
242
        execution_id = str(execution_db.id)
243
        payload = {'execution_id': execution_id,
244
                   'status': liveaction_db.status,
245
                   'start_timestamp': str(liveaction_db.start_timestamp),
246
                   # deprecate 'action_name' at some point and switch to 'action_ref'
247
                   'action_name': liveaction_db.action,
248
                   'action_ref': liveaction_db.action,
249
                   'runner_ref': self._get_runner_ref(liveaction_db.action),
250
                   'parameters': liveaction_db.get_masked_parameters(),
251
                   'result': liveaction_db.result}
252
        # Use execution_id to extract trace rather than liveaction. execution_id
253
        # will look-up an exact TraceDB while liveaction depending on context
254
        # may not end up going to the DB.
255
        trace_context = self._get_trace_context(execution_id=execution_id)
256
        LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
257
                  ACTION_TRIGGER_TYPE['name'], liveaction_db.id, payload, trace_context)
258
        self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
259
                                          trace_context=trace_context)
260
261
    def _apply_post_run_policies(self, liveaction_db):
262
        # Apply policies defined for the action.
263
        policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True)
264
        LOG.debug('Applying %s post_run policies' % (len(policy_dbs)))
265
266
        for policy_db in policy_dbs:
267
            driver = policies.get_driver(policy_db.ref,
268
                                         policy_db.policy_type,
269
                                         **policy_db.parameters)
270
271
            try:
272
                LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' %
273
                          (policy_db.ref, policy_db.policy_type, str(liveaction_db.id)))
274
                liveaction_db = driver.apply_after(liveaction_db)
275
            except:
276
                LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref)
277
278
        return liveaction_db
279
280
    def _get_runner_ref(self, action_ref):
281
        """
282
        Retrieve a runner reference for the provided action.
283
284
        :rtype: ``str``
285
        """
286
        action = Action.get_by_ref(action_ref)
287
        return action['runner_type']['name']
288
289
290
def get_notifier():
291
    with Connection(transport_utils.get_messaging_urls()) as conn:
292
        return Notifier(conn, [NOTIFIER_ACTIONUPDATE_WORK_QUEUE],
293
                        trigger_dispatcher=TriggerDispatcher(LOG))
294