Test Failed
Pull Request — master (#3496)
by Lakshmi
05:35
created

Notifier.process()   A

Complexity

Conditions 3

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 18
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import json
18
19
from kombu import Connection
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
24
from st2common.constants.action import LIVEACTION_FAILED_STATES
25
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
26
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES
27
from st2common.models.api.trace import TraceContext
28
from st2common.models.db.execution import ActionExecutionDB
29
from st2common.persistence.action import Action
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.persistence.policy import Policy
32
from st2common import policies
33
from st2common.models.system.common import ResourceReference
34
from st2common.persistence.execution import ActionExecution
35
from st2common.services import trace as trace_service
36
from st2common.transport import consumers, execution, liveaction, publishers
0 ignored issues
show
Unused Code introduced by
Unused liveaction imported from st2common.transport
Loading history...
37
from st2common.transport import utils as transport_utils
38
from st2common.transport.reactor import TriggerDispatcher
39
from st2common.util import isotime
40
from st2common.util import jinja as jinja_utils
41
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX
42
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX
43
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX
44
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE
45
from st2common.services.keyvalues import KeyValueLookup
46
47
__all__ = [
48
    'Notifier',
49
    'get_notifier'
50
]
51
52
LOG = logging.getLogger(__name__)
53
54
ACTIONUPDATE_WORK_Q = execution.get_queue('st2.notifiers.work',
55
                                          routing_key=publishers.UPDATE_RK)
56
57
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable
58
# XXX: Fix this nasty positional dependency.
59
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0]
60
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1]
61
62
63
class Notifier(consumers.MessageHandler):
64
    message_type = ActionExecutionDB
65
66
    def __init__(self, connection, queues, trigger_dispatcher=None):
67
        super(Notifier, self).__init__(connection, queues)
68
        if not trigger_dispatcher:
69
            trigger_dispatcher = TriggerDispatcher(LOG)
70
        self._trigger_dispatcher = trigger_dispatcher
71
        self._notify_trigger = ResourceReference.to_string_reference(
72
            pack=NOTIFY_TRIGGER_TYPE['pack'],
73
            name=NOTIFY_TRIGGER_TYPE['name'])
74
        self._action_trigger = ResourceReference.to_string_reference(
75
            pack=ACTION_TRIGGER_TYPE['pack'],
76
            name=ACTION_TRIGGER_TYPE['name'])
77
78
    def process(self, execution):
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
79
        execution_id = str(execution.id)
80
        extra = {'execution': execution}
81
        LOG.debug('Processing execution %s', execution_id, extra=extra)
82
83
        if execution.status not in LIVEACTION_COMPLETED_STATES:
84
            LOG.debug('Skipping processing of execution %s since it\'s not in a completed state' %
85
                      (execution_id), extra=extra)
86
            return
87
88
        liveaction_id = execution.liveaction['id']
89
        liveaction_db = LiveAction.get_by_id(liveaction_id)
90
        self._apply_post_run_policies(liveaction_db=liveaction_db)
91
92
        if liveaction_db.notify is not None:
93
            self._post_notify_triggers(liveaction=liveaction_db, execution=execution)
94
95
        self._post_generic_trigger(liveaction=liveaction_db, execution=execution)
96
97
    def _get_execution_for_liveaction(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
98
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
99
100
        if not execution:
101
            return None
102
103
        return execution
104
105
    def _post_notify_triggers(self, liveaction=None, execution=None):
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
106
        notify = getattr(liveaction, 'notify', None)
107
108
        if not notify:
109
            return
110
111
        if notify.on_complete:
112
            self._post_notify_subsection_triggers(
113
                liveaction=liveaction, execution=execution,
114
                notify_subsection=notify.on_complete,
115
                default_message_suffix='completed.')
116
        if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
117
            self._post_notify_subsection_triggers(
118
                liveaction=liveaction, execution=execution,
119
                notify_subsection=notify.on_success,
120
                default_message_suffix='succeeded.')
121
        if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure:
122
            self._post_notify_subsection_triggers(
123
                liveaction=liveaction, execution=execution,
124
                notify_subsection=notify.on_failure,
125
                default_message_suffix='failed.')
126
127
    def _post_notify_subsection_triggers(self, liveaction=None, execution=None,
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
128
                                         notify_subsection=None,
129
                                         default_message_suffix=None):
130
        routes = (getattr(notify_subsection, 'routes') or
131
                  getattr(notify_subsection, 'channels', None))
132
133
        execution_id = str(execution.id)
134
135
        if routes and len(routes) >= 1:
136
            payload = {}
137
            message = notify_subsection.message or (
138
                'Action ' + liveaction.action + ' ' + default_message_suffix)
139
            data = notify_subsection.data or {}
140
141
            jinja_context = self._build_jinja_context(liveaction=liveaction, execution=execution)
142
143
            try:
144
                message = self._transform_message(message=message,
145
                                                  context=jinja_context)
146
            except:
147
                LOG.exception('Failed (Jinja) transforming `message`.')
148
149
            try:
150
                data = self._transform_data(data=data, context=jinja_context)
151
            except:
152
                LOG.exception('Failed (Jinja) transforming `data`.')
153
154
            # At this point convert result to a string. This restricts the rulesengines
155
            # ability to introspect the result. On the other handle atleast a json usable
156
            # result is sent as part of the notification. If jinja is required to convert
157
            # to a string representation it uses str(...) which make it impossible to
158
            # parse the result as json any longer.
159
            # TODO: Use to_serializable_dict
160
            data['result'] = json.dumps(liveaction.result)
161
162
            payload['message'] = message
163
            payload['data'] = data
164
            payload['execution_id'] = execution_id
165
            payload['status'] = liveaction.status
166
            payload['start_timestamp'] = isotime.format(liveaction.start_timestamp)
167
168
            try:
169
                payload['end_timestamp'] = isotime.format(liveaction.end_timestamp)
170
            except AttributeError:
171
                # This can be raised if liveaction.end_timestamp is None, which is caused
172
                # when policy cancels a request due to concurrency
173
                # In this case, use datetime.now() instead
174
                payload['end_timestamp'] = isotime.format(datetime.utcnow())
175
176
            payload['action_ref'] = liveaction.action
177
            payload['runner_ref'] = self._get_runner_ref(liveaction.action)
178
179
            trace_context = self._get_trace_context(execution_id=execution_id)
180
181
            failed_routes = []
182
            for route in routes:
183
                try:
184
                    payload['route'] = route
185
                    # Deprecated. Only for backward compatibility reasons.
186
                    payload['channel'] = route
187
                    LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
188
                              liveaction.id, payload)
189
                    self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
190
                                                      trace_context=trace_context)
191
                except:
192
                    failed_routes.append(route)
193
194
            if len(failed_routes) > 0:
195
                raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes))
196
197
    def _build_jinja_context(self, liveaction, execution):
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
198
        context = {}
199
        context.update({
200
            DATASTORE_PARENT_SCOPE: {
201
                SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE)
202
            }
203
        })
204
        context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction.parameters})
205
        context.update({ACTION_CONTEXT_KV_PREFIX: liveaction.context})
206
        context.update({ACTION_RESULTS_KV_PREFIX: execution.result})
207
        return context
208
209
    def _transform_message(self, message, context=None):
210
        mapping = {'message': message}
211
        context = context or {}
212
        return (jinja_utils.render_values(mapping=mapping, context=context)).get('message',
213
                                                                                 message)
214
215
    def _transform_data(self, data, context=None):
216
        return jinja_utils.render_values(mapping=data, context=context)
217
218
    def _get_trace_context(self, execution_id):
219
        trace_db = trace_service.get_trace_db_by_action_execution(
220
            action_execution_id=execution_id)
221
        if trace_db:
222
            return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag)
223
        # If no trace_context is found then do not create a new one here. If necessary
224
        # it shall be created downstream. Sure this is impl leakage of some sort.
225
        return None
226
227
    def _post_generic_trigger(self, liveaction=None, execution=None):
0 ignored issues
show
Comprehensibility Bug introduced by
execution is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 36).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
228
        if not ACTION_SENSOR_ENABLED:
229
            LOG.debug('Action trigger is disabled, skipping trigger dispatch...')
230
            return
231
232
        execution_id = str(execution.id)
233
        payload = {'execution_id': execution_id,
234
                   'status': liveaction.status,
235
                   'start_timestamp': str(liveaction.start_timestamp),
236
                   # deprecate 'action_name' at some point and switch to 'action_ref'
237
                   'action_name': liveaction.action,
238
                   'action_ref': liveaction.action,
239
                   'runner_ref': self._get_runner_ref(liveaction.action),
240
                   'parameters': liveaction.get_masked_parameters(),
241
                   'result': liveaction.result}
242
        # Use execution_id to extract trace rather than liveaction. execution_id
243
        # will look-up an exact TraceDB while liveaction depending on context
244
        # may not end up going to the DB.
245
        trace_context = self._get_trace_context(execution_id=execution_id)
246
        LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
247
                  ACTION_TRIGGER_TYPE['name'], liveaction.id, payload, trace_context)
248
        self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
249
                                          trace_context=trace_context)
250
251
    def _apply_post_run_policies(self, liveaction_db):
252
        # Apply policies defined for the action.
253
        policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True)
254
        LOG.debug('Applying %s post_run policies' % (len(policy_dbs)))
255
256
        for policy_db in policy_dbs:
257
            driver = policies.get_driver(policy_db.ref,
258
                                         policy_db.policy_type,
259
                                         **policy_db.parameters)
260
261
            try:
262
                LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' %
263
                          (policy_db.ref, policy_db.policy_type, str(liveaction_db.id)))
264
                liveaction_db = driver.apply_after(liveaction_db)
265
            except:
266
                LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref)
267
268
        return liveaction_db
269
270
    def _get_runner_ref(self, action_ref):
271
        """
272
        Retrieve a runner reference for the provided action.
273
274
        :rtype: ``str``
275
        """
276
        action = Action.get_by_ref(action_ref)
277
        return action['runner_type']['name']
278
279
280
def get_notifier():
281
    with Connection(transport_utils.get_messaging_urls()) as conn:
282
        return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG))
283