Passed
Push — develop ( c4b627...e5a1b6 )
by Plexxi
06:55 queued 04:37
created

Notifier._post_notify_subsection_triggers()   D

Complexity

Conditions 9

Size

Total Lines 69

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
c 0
b 0
f 0
dl 0
loc 69
rs 4.3212

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from datetime import datetime
17
import json
18
19
from kombu import Connection
20
from oslo_config import cfg
21
22
from st2common import log as logging
23
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
24
from st2common.constants.action import LIVEACTION_FAILED_STATES
25
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
26
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES
27
from st2common.models.api.trace import TraceContext
28
from st2common.models.db.liveaction import LiveActionDB
29
from st2common.persistence.action import Action
30
from st2common.persistence.policy import Policy
31
from st2common import policies
32
from st2common.models.system.common import ResourceReference
33
from st2common.persistence.execution import ActionExecution
34
from st2common.services import trace as trace_service
35
from st2common.transport import consumers, liveaction, publishers
36
from st2common.transport import utils as transport_utils
37
from st2common.transport.reactor import TriggerDispatcher
38
from st2common.util import isotime
39
from st2common.util import jinja as jinja_utils
40
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX
41
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX
42
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX
43
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE
44
from st2common.services.keyvalues import KeyValueLookup
45
46
__all__ = [
47
    'Notifier',
48
    'get_notifier'
49
]
50
51
LOG = logging.getLogger(__name__)
52
53
ACTIONUPDATE_WORK_Q = liveaction.get_queue('st2.notifiers.work',
54
                                           routing_key=publishers.UPDATE_RK)
55
56
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable
57
# XXX: Fix this nasty positional dependency.
58
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0]
59
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1]
60
61
62
class Notifier(consumers.MessageHandler):
63
    message_type = LiveActionDB
64
65
    def __init__(self, connection, queues, trigger_dispatcher=None):
66
        super(Notifier, self).__init__(connection, queues)
67
        if not trigger_dispatcher:
68
            trigger_dispatcher = TriggerDispatcher(LOG)
69
        self._trigger_dispatcher = trigger_dispatcher
70
        self._notify_trigger = ResourceReference.to_string_reference(
71
            pack=NOTIFY_TRIGGER_TYPE['pack'],
72
            name=NOTIFY_TRIGGER_TYPE['name'])
73
        self._action_trigger = ResourceReference.to_string_reference(
74
            pack=ACTION_TRIGGER_TYPE['pack'],
75
            name=ACTION_TRIGGER_TYPE['name'])
76
77
    def process(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
78
        live_action_id = str(liveaction.id)
79
        extra = {'live_action_db': liveaction}
80
        LOG.debug('Processing liveaction %s', live_action_id, extra=extra)
81
82
        if liveaction.status not in LIVEACTION_COMPLETED_STATES:
83
            LOG.debug('Skipping processing of liveaction %s since it\'s not in a completed state' %
84
                      (live_action_id), extra=extra)
85
            return
86
87
        execution = self._get_execution_for_liveaction(liveaction)
88
89
        if not execution:
90
            LOG.exception('Execution object corresponding to LiveAction %s not found.',
91
                          live_action_id, extra=extra)
92
            return None
93
94
        self._apply_post_run_policies(liveaction_db=liveaction)
95
96
        if liveaction.notify is not None:
97
            self._post_notify_triggers(liveaction=liveaction, execution=execution)
98
99
        self._post_generic_trigger(liveaction=liveaction, execution=execution)
100
101
    def _get_execution_for_liveaction(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
102
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
103
104
        if not execution:
105
            return None
106
107
        return execution
108
109
    def _post_notify_triggers(self, liveaction=None, execution=None):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
110
        notify = getattr(liveaction, 'notify', None)
111
112
        if not notify:
113
            return
114
115
        if notify.on_complete:
116
            self._post_notify_subsection_triggers(
117
                liveaction=liveaction, execution=execution,
118
                notify_subsection=notify.on_complete,
119
                default_message_suffix='completed.')
120
        if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
121
            self._post_notify_subsection_triggers(
122
                liveaction=liveaction, execution=execution,
123
                notify_subsection=notify.on_success,
124
                default_message_suffix='succeeded.')
125
        if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure:
126
            self._post_notify_subsection_triggers(
127
                liveaction=liveaction, execution=execution,
128
                notify_subsection=notify.on_failure,
129
                default_message_suffix='failed.')
130
131
    def _post_notify_subsection_triggers(self, liveaction=None, execution=None,
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
132
                                         notify_subsection=None,
133
                                         default_message_suffix=None):
134
        routes = (getattr(notify_subsection, 'routes') or
135
                  getattr(notify_subsection, 'channels', None))
136
137
        execution_id = str(execution.id)
138
139
        if routes and len(routes) >= 1:
140
            payload = {}
141
            message = notify_subsection.message or (
142
                'Action ' + liveaction.action + ' ' + default_message_suffix)
143
            data = notify_subsection.data or {}
144
145
            jinja_context = self._build_jinja_context(liveaction=liveaction, execution=execution)
146
147
            try:
148
                message = self._transform_message(message=message,
149
                                                  context=jinja_context)
150
            except:
151
                LOG.exception('Failed (Jinja) transforming `message`.')
152
153
            try:
154
                data = self._transform_data(data=data, context=jinja_context)
155
            except:
156
                LOG.exception('Failed (Jinja) transforming `data`.')
157
158
            # At this point convert result to a string. This restricts the rulesengines
159
            # ability to introspect the result. On the other handle atleast a json usable
160
            # result is sent as part of the notification. If jinja is required to convert
161
            # to a string representation it uses str(...) which make it impossible to
162
            # parse the result as json any longer.
163
            # TODO: Use to_serializable_dict
164
            data['result'] = json.dumps(liveaction.result)
165
166
            payload['message'] = message
167
            payload['data'] = data
168
            payload['execution_id'] = execution_id
169
            payload['status'] = liveaction.status
170
            payload['start_timestamp'] = isotime.format(liveaction.start_timestamp)
171
172
            try:
173
                payload['end_timestamp'] = isotime.format(liveaction.end_timestamp)
174
            except AttributeError:
175
                # This can be raised if liveaction.end_timestamp is None, which is caused
176
                # when policy cancels a request due to concurrency
177
                # In this case, use datetime.now() instead
178
                payload['end_timestamp'] = isotime.format(datetime.utcnow())
179
180
            payload['action_ref'] = liveaction.action
181
            payload['runner_ref'] = self._get_runner_ref(liveaction.action)
182
183
            trace_context = self._get_trace_context(execution_id=execution_id)
184
185
            failed_routes = []
186
            for route in routes:
187
                try:
188
                    payload['route'] = route
189
                    # Deprecated. Only for backward compatibility reasons.
190
                    payload['channel'] = route
191
                    LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
192
                              liveaction.id, payload)
193
                    self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
194
                                                      trace_context=trace_context)
195
                except:
196
                    failed_routes.append(route)
197
198
            if len(failed_routes) > 0:
199
                raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes))
200
201
    def _build_jinja_context(self, liveaction, execution):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
202
        context = {}
203
        context.update({SYSTEM_SCOPE: KeyValueLookup(scope=SYSTEM_SCOPE)})
204
        context.update({
205
            DATASTORE_PARENT_SCOPE: {
206
                SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE)
207
            }
208
        })
209
        context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction.parameters})
210
        context.update({ACTION_CONTEXT_KV_PREFIX: liveaction.context})
211
        context.update({ACTION_RESULTS_KV_PREFIX: execution.result})
212
        return context
213
214
    def _transform_message(self, message, context=None):
215
        mapping = {'message': message}
216
        context = context or {}
217
        return (jinja_utils.render_values(mapping=mapping, context=context)).get('message',
218
                                                                                 message)
219
220
    def _transform_data(self, data, context=None):
221
        return jinja_utils.render_values(mapping=data, context=context)
222
223
    def _get_trace_context(self, execution_id):
224
        trace_db = trace_service.get_trace_db_by_action_execution(
225
            action_execution_id=execution_id)
226
        if trace_db:
227
            return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag)
228
        # If no trace_context is found then do not create a new one here. If necessary
229
        # it shall be created downstream. Sure this is impl leakage of some sort.
230
        return None
231
232
    def _post_generic_trigger(self, liveaction=None, execution=None):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 35).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
233
        if not ACTION_SENSOR_ENABLED:
234
            LOG.debug('Action trigger is disabled, skipping trigger dispatch...')
235
            return
236
237
        execution_id = str(execution.id)
238
        payload = {'execution_id': execution_id,
239
                   'status': liveaction.status,
240
                   'start_timestamp': str(liveaction.start_timestamp),
241
                   # deprecate 'action_name' at some point and switch to 'action_ref'
242
                   'action_name': liveaction.action,
243
                   'action_ref': liveaction.action,
244
                   'runner_ref': self._get_runner_ref(liveaction.action),
245
                   'parameters': liveaction.get_masked_parameters(),
246
                   'result': liveaction.result}
247
        # Use execution_id to extract trace rather than liveaction. execution_id
248
        # will look-up an exact TraceDB while liveaction depending on context
249
        # may not end up going to the DB.
250
        trace_context = self._get_trace_context(execution_id=execution_id)
251
        LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
252
                  ACTION_TRIGGER_TYPE['name'], liveaction.id, payload, trace_context)
253
        self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
254
                                          trace_context=trace_context)
255
256
    def _apply_post_run_policies(self, liveaction_db):
257
        # Apply policies defined for the action.
258
        policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True)
259
        LOG.debug('Applying %s post_run policies' % (len(policy_dbs)))
260
261
        for policy_db in policy_dbs:
262
            driver = policies.get_driver(policy_db.ref,
263
                                         policy_db.policy_type,
264
                                         **policy_db.parameters)
265
266
            try:
267
                LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' %
268
                          (policy_db.ref, policy_db.policy_type, str(liveaction_db.id)))
269
                liveaction_db = driver.apply_after(liveaction_db)
270
            except:
271
                LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref)
272
273
        return liveaction_db
274
275
    def _get_runner_ref(self, action_ref):
276
        """
277
        Retrieve a runner reference for the provided action.
278
279
        :rtype: ``str``
280
        """
281
        action = Action.get_by_ref(action_ref)
282
        return action['runner_type']['name']
283
284
285
def get_notifier():
286
    with Connection(transport_utils.get_messaging_urls()) as conn:
287
        return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG))
288