Completed
Pull Request — master (#2334)
by Edward
06:02
created

st2actions.notifier.Notifier   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 205
Duplicated Lines 0 %
Metric Value
wmc 34
dl 0
loc 205
rs 9.2

12 Methods

Rating   Name   Duplication   Size   Complexity  
C _post_notify_triggers() 0 21 7
A _transform_message() 0 5 1
A _apply_post_run_policies() 0 16 3
A _post_generic_trigger() 0 23 2
A __init__() 0 11 2
A _get_runner_ref() 0 8 1
A _get_trace_context() 0 8 2
A _build_jinja_context() 0 6 1
C _post_notify_subsection_triggers() 0 61 8
A _get_execution_for_liveaction() 0 7 2
B process() 0 23 4
A _transform_data() 0 2 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
18
from kombu import Connection
19
from oslo_config import cfg
20
21
from st2common import log as logging
22
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
23
from st2common.constants.action import LIVEACTION_FAILED_STATES
24
from st2common.constants.action import LIVEACTION_COMPLETED_STATES
25
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES
26
from st2common.models.api.trace import TraceContext
27
from st2common.models.db.liveaction import LiveActionDB
28
from st2common.persistence.action import Action
29
from st2common.persistence.policy import Policy
30
from st2common import policies
31
from st2common.models.system.common import ResourceReference
32
from st2common.persistence.execution import ActionExecution
33
from st2common.services import trace as trace_service
34
from st2common.transport import consumers, liveaction, publishers
35
from st2common.transport import utils as transport_utils
36
from st2common.transport.reactor import TriggerDispatcher
37
from st2common.util import isotime
38
from st2common.util import jinja as jinja_utils
39
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX
40
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX
41
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX
42
from st2common.constants.system import SYSTEM_KV_PREFIX
43
from st2common.services.keyvalues import KeyValueLookup
44
45
__all__ = [
46
    'Notifier',
47
    'get_notifier'
48
]
49
50
LOG = logging.getLogger(__name__)
51
52
ACTIONUPDATE_WORK_Q = liveaction.get_queue('st2.notifiers.work',
53
                                           routing_key=publishers.UPDATE_RK)
54
55
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable
56
# XXX: Fix this nasty positional dependency.
57
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0]
58
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1]
59
60
61
class Notifier(consumers.MessageHandler):
62
    message_type = LiveActionDB
63
64
    def __init__(self, connection, queues, trigger_dispatcher=None):
65
        super(Notifier, self).__init__(connection, queues)
66
        if not trigger_dispatcher:
67
            trigger_dispatcher = TriggerDispatcher(LOG)
68
        self._trigger_dispatcher = trigger_dispatcher
69
        self._notify_trigger = ResourceReference.to_string_reference(
70
            pack=NOTIFY_TRIGGER_TYPE['pack'],
71
            name=NOTIFY_TRIGGER_TYPE['name'])
72
        self._action_trigger = ResourceReference.to_string_reference(
73
            pack=ACTION_TRIGGER_TYPE['pack'],
74
            name=ACTION_TRIGGER_TYPE['name'])
75
76
    def process(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
77
        live_action_id = str(liveaction.id)
78
        extra = {'live_action_db': liveaction}
79
        LOG.debug('Processing liveaction %s', live_action_id, extra=extra)
80
81
        if liveaction.status not in LIVEACTION_COMPLETED_STATES:
82
            LOG.debug('Skipping processing of liveaction %s since it\'s not in a completed state' %
83
                      (live_action_id), extra=extra)
84
            return
85
86
        execution = self._get_execution_for_liveaction(liveaction)
87
88
        if not execution:
89
            LOG.exception('Execution object corresponding to LiveAction %s not found.',
90
                          live_action_id, extra=extra)
91
            return None
92
93
        self._apply_post_run_policies(liveaction=liveaction)
94
95
        if liveaction.notify is not None:
96
            self._post_notify_triggers(liveaction=liveaction, execution=execution)
97
98
        self._post_generic_trigger(liveaction=liveaction, execution=execution)
99
100
    def _get_execution_for_liveaction(self, liveaction):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
101
        execution = ActionExecution.get(liveaction__id=str(liveaction.id))
102
103
        if not execution:
104
            return None
105
106
        return execution
107
108
    def _post_notify_triggers(self, liveaction=None, execution=None):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
109
        notify = getattr(liveaction, 'notify', None)
110
111
        if not notify:
112
            return
113
114
        if notify.on_complete:
115
            self._post_notify_subsection_triggers(
116
                liveaction=liveaction, execution=execution,
117
                notify_subsection=notify.on_complete,
118
                default_message_suffix='completed.')
119
        if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success:
120
            self._post_notify_subsection_triggers(
121
                liveaction=liveaction, execution=execution,
122
                notify_subsection=notify.on_success,
123
                default_message_suffix='succeeded.')
124
        if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure:
125
            self._post_notify_subsection_triggers(
126
                liveaction=liveaction, execution=execution,
127
                notify_subsection=notify.on_failure,
128
                default_message_suffix='failed.')
129
130
    def _post_notify_subsection_triggers(self, liveaction=None, execution=None,
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
131
                                         notify_subsection=None,
132
                                         default_message_suffix=None):
133
        routes = (getattr(notify_subsection, 'routes') or
134
                  getattr(notify_subsection, 'channels', None))
135
136
        execution_id = str(execution.id)
137
138
        if routes and len(routes) >= 1:
139
            payload = {}
140
            message = notify_subsection.message or (
141
                'Action ' + liveaction.action + ' ' + default_message_suffix)
142
            data = notify_subsection.data or {}
143
144
            jinja_context = self._build_jinja_context(liveaction=liveaction, execution=execution)
145
146
            try:
147
                message = self._transform_message(message=message,
148
                                                  context=jinja_context)
149
            except:
150
                LOG.exception('Failed (Jinja) transforming `message`.')
151
152
            try:
153
                data = self._transform_data(data=data, context=jinja_context)
154
            except:
155
                LOG.exception('Failed (Jinja) transforming `data`.')
156
157
            # At this point convert result to a string. This restricts the rulesengines
158
            # ability to introspect the result. On the other handle atleast a json usable
159
            # result is sent as part of the notification. If jinja is required to convert
160
            # to a string representation it uses str(...) which make it impossible to
161
            # parse the result as json any longer.
162
            # TODO: Use to_serializable_dict
163
            data['result'] = json.dumps(liveaction.result)
164
165
            payload['message'] = message
166
            payload['data'] = data
167
            payload['execution_id'] = execution_id
168
            payload['status'] = liveaction.status
169
            payload['start_timestamp'] = isotime.format(liveaction.start_timestamp)
170
            payload['end_timestamp'] = isotime.format(liveaction.end_timestamp)
171
            payload['action_ref'] = liveaction.action
172
            payload['runner_ref'] = self._get_runner_ref(liveaction.action)
173
174
            trace_context = self._get_trace_context(execution_id=execution_id)
175
176
            failed_routes = []
177
            for route in routes:
178
                try:
179
                    payload['route'] = route
180
                    # Deprecated. Only for backward compatibility reasons.
181
                    payload['channel'] = route
182
                    LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'],
183
                              liveaction.id, payload)
184
                    self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload,
185
                                                      trace_context=trace_context)
186
                except:
187
                    failed_routes.append(route)
188
189
            if len(failed_routes) > 0:
190
                raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes))
191
192
    def _build_jinja_context(self, liveaction, execution):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
193
        context = {SYSTEM_KV_PREFIX: KeyValueLookup()}
194
        context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction.parameters})
195
        context.update({ACTION_CONTEXT_KV_PREFIX: liveaction.context})
196
        context.update({ACTION_RESULTS_KV_PREFIX: execution.result})
197
        return context
198
199
    def _transform_message(self, message, context=None):
200
        mapping = {'message': message}
201
        context = context or {}
202
        return (jinja_utils.render_values(mapping=mapping, context=context)).get('message',
203
                                                                                 message)
204
205
    def _transform_data(self, data, context=None):
206
        return jinja_utils.render_values(mapping=data, context=context)
207
208
    def _get_trace_context(self, execution_id):
209
        trace_db = trace_service.get_trace_db_by_action_execution(
210
            action_execution_id=execution_id)
211
        if trace_db:
212
            return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag)
213
        # If no trace_context is found then do not create a new one here. If necessary
214
        # it shall be created downstream. Sure this is impl leakage of some sort.
215
        return None
216
217
    def _post_generic_trigger(self, liveaction=None, execution=None):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
218
        if not ACTION_SENSOR_ENABLED:
219
            LOG.debug('Action trigger is disabled, skipping trigger dispatch...')
220
            return
221
222
        execution_id = str(execution.id)
223
        payload = {'execution_id': execution_id,
224
                   'status': liveaction.status,
225
                   'start_timestamp': str(liveaction.start_timestamp),
226
                   # deprecate 'action_name' at some point and switch to 'action_ref'
227
                   'action_name': liveaction.action,
228
                   'action_ref': liveaction.action,
229
                   'runner_ref': self._get_runner_ref(liveaction.action),
230
                   'parameters': liveaction.get_masked_parameters(),
231
                   'result': liveaction.result}
232
        # Use execution_id to extract trace rather than liveaction. execution_id
233
        # will look-up an exact TraceDB while liveaction depending on context
234
        # may not end up going to the DB.
235
        trace_context = self._get_trace_context(execution_id=execution_id)
236
        LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s',
237
                  ACTION_TRIGGER_TYPE['name'], liveaction.id, payload, trace_context)
238
        self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload,
239
                                          trace_context=trace_context)
240
241
    def _apply_post_run_policies(self, liveaction=None):
0 ignored issues
show
Comprehensibility Bug introduced by
liveaction is re-defining a name which is already available in the outer-scope (previously defined on line 34).

It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior:

param = 5

class Foo:
    def __init__(self, param):   # "param" would be flagged here
        self.param = param
Loading history...
242
        # Apply policies defined for the action.
243
        policy_dbs = Policy.query(resource_ref=liveaction.action)
244
        LOG.debug('Applying %s post_run policies' % (len(policy_dbs)))
245
246
        for policy_db in policy_dbs:
247
            driver = policies.get_driver(policy_db.ref,
248
                                         policy_db.policy_type,
249
                                         **policy_db.parameters)
250
251
            try:
252
                LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' %
253
                          (policy_db.ref, policy_db.policy_type, str(liveaction.id)))
254
                liveaction = driver.apply_after(liveaction)
255
            except:
256
                LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref)
257
258
    def _get_runner_ref(self, action_ref):
259
        """
260
        Retrieve a runner reference for the provided action.
261
262
        :rtype: ``str``
263
        """
264
        action = Action.get_by_ref(action_ref)
265
        return action['runner_type']['name']
266
267
268
def get_notifier():
269
    with Connection(transport_utils.get_messaging_urls()) as conn:
270
        return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG))
271