1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
from datetime import datetime |
17
|
|
|
import json |
18
|
|
|
|
19
|
|
|
from kombu import Connection |
20
|
|
|
from oslo_config import cfg |
21
|
|
|
|
22
|
|
|
from st2common import log as logging |
23
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED |
24
|
|
|
from st2common.constants.action import LIVEACTION_FAILED_STATES |
25
|
|
|
from st2common.constants.action import LIVEACTION_COMPLETED_STATES |
26
|
|
|
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES |
27
|
|
|
from st2common.models.api.trace import TraceContext |
28
|
|
|
from st2common.models.db.liveaction import LiveActionDB |
29
|
|
|
from st2common.persistence.action import Action |
30
|
|
|
from st2common.persistence.policy import Policy |
31
|
|
|
from st2common import policies |
32
|
|
|
from st2common.models.system.common import ResourceReference |
33
|
|
|
from st2common.persistence.execution import ActionExecution |
34
|
|
|
from st2common.services import trace as trace_service |
35
|
|
|
from st2common.transport import consumers, liveaction, publishers |
36
|
|
|
from st2common.transport import utils as transport_utils |
37
|
|
|
from st2common.transport.reactor import TriggerDispatcher |
38
|
|
|
from st2common.util import isotime |
39
|
|
|
from st2common.util import jinja as jinja_utils |
40
|
|
|
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX |
41
|
|
|
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX |
42
|
|
|
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX |
43
|
|
|
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE |
44
|
|
|
from st2common.services.keyvalues import KeyValueLookup |
45
|
|
|
|
46
|
|
|
__all__ = [ |
47
|
|
|
'Notifier', |
48
|
|
|
'get_notifier' |
49
|
|
|
] |
50
|
|
|
|
51
|
|
|
LOG = logging.getLogger(__name__) |
52
|
|
|
|
53
|
|
|
ACTIONUPDATE_WORK_Q = liveaction.get_queue('st2.notifiers.work', |
54
|
|
|
routing_key=publishers.UPDATE_RK) |
55
|
|
|
|
56
|
|
|
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable |
57
|
|
|
# XXX: Fix this nasty positional dependency. |
58
|
|
|
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0] |
59
|
|
|
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1] |
60
|
|
|
|
61
|
|
|
|
62
|
|
|
class Notifier(consumers.MessageHandler): |
63
|
|
|
message_type = LiveActionDB |
64
|
|
|
|
65
|
|
|
def __init__(self, connection, queues, trigger_dispatcher=None): |
66
|
|
|
super(Notifier, self).__init__(connection, queues) |
67
|
|
|
if not trigger_dispatcher: |
68
|
|
|
trigger_dispatcher = TriggerDispatcher(LOG) |
69
|
|
|
self._trigger_dispatcher = trigger_dispatcher |
70
|
|
|
self._notify_trigger = ResourceReference.to_string_reference( |
71
|
|
|
pack=NOTIFY_TRIGGER_TYPE['pack'], |
72
|
|
|
name=NOTIFY_TRIGGER_TYPE['name']) |
73
|
|
|
self._action_trigger = ResourceReference.to_string_reference( |
74
|
|
|
pack=ACTION_TRIGGER_TYPE['pack'], |
75
|
|
|
name=ACTION_TRIGGER_TYPE['name']) |
76
|
|
|
|
77
|
|
|
def process(self, liveaction): |
|
|
|
|
78
|
|
|
live_action_id = str(liveaction.id) |
79
|
|
|
extra = {'live_action_db': liveaction} |
80
|
|
|
LOG.debug('Processing liveaction %s', live_action_id, extra=extra) |
81
|
|
|
|
82
|
|
|
if liveaction.status not in LIVEACTION_COMPLETED_STATES: |
83
|
|
|
LOG.debug('Skipping processing of liveaction %s since it\'s not in a completed state' % |
84
|
|
|
(live_action_id), extra=extra) |
85
|
|
|
return |
86
|
|
|
|
87
|
|
|
execution = self._get_execution_for_liveaction(liveaction) |
88
|
|
|
|
89
|
|
|
if not execution: |
90
|
|
|
LOG.exception('Execution object corresponding to LiveAction %s not found.', |
91
|
|
|
live_action_id, extra=extra) |
92
|
|
|
return None |
93
|
|
|
|
94
|
|
|
self._apply_post_run_policies(liveaction_db=liveaction) |
95
|
|
|
|
96
|
|
|
if liveaction.notify is not None: |
97
|
|
|
self._post_notify_triggers(liveaction=liveaction, execution=execution) |
98
|
|
|
|
99
|
|
|
self._post_generic_trigger(liveaction=liveaction, execution=execution) |
100
|
|
|
|
101
|
|
|
def _get_execution_for_liveaction(self, liveaction): |
|
|
|
|
102
|
|
|
execution = ActionExecution.get(liveaction__id=str(liveaction.id)) |
103
|
|
|
|
104
|
|
|
if not execution: |
105
|
|
|
return None |
106
|
|
|
|
107
|
|
|
return execution |
108
|
|
|
|
109
|
|
|
def _post_notify_triggers(self, liveaction=None, execution=None): |
|
|
|
|
110
|
|
|
notify = getattr(liveaction, 'notify', None) |
111
|
|
|
|
112
|
|
|
if not notify: |
113
|
|
|
return |
114
|
|
|
|
115
|
|
|
if notify.on_complete: |
116
|
|
|
self._post_notify_subsection_triggers( |
117
|
|
|
liveaction=liveaction, execution=execution, |
118
|
|
|
notify_subsection=notify.on_complete, |
119
|
|
|
default_message_suffix='completed.') |
120
|
|
|
if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success: |
121
|
|
|
self._post_notify_subsection_triggers( |
122
|
|
|
liveaction=liveaction, execution=execution, |
123
|
|
|
notify_subsection=notify.on_success, |
124
|
|
|
default_message_suffix='succeeded.') |
125
|
|
|
if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure: |
126
|
|
|
self._post_notify_subsection_triggers( |
127
|
|
|
liveaction=liveaction, execution=execution, |
128
|
|
|
notify_subsection=notify.on_failure, |
129
|
|
|
default_message_suffix='failed.') |
130
|
|
|
|
131
|
|
|
def _post_notify_subsection_triggers(self, liveaction=None, execution=None, |
|
|
|
|
132
|
|
|
notify_subsection=None, |
133
|
|
|
default_message_suffix=None): |
134
|
|
|
routes = (getattr(notify_subsection, 'routes') or |
135
|
|
|
getattr(notify_subsection, 'channels', None)) |
136
|
|
|
|
137
|
|
|
execution_id = str(execution.id) |
138
|
|
|
|
139
|
|
|
if routes and len(routes) >= 1: |
140
|
|
|
payload = {} |
141
|
|
|
message = notify_subsection.message or ( |
142
|
|
|
'Action ' + liveaction.action + ' ' + default_message_suffix) |
143
|
|
|
data = notify_subsection.data or {} |
144
|
|
|
|
145
|
|
|
jinja_context = self._build_jinja_context(liveaction=liveaction, execution=execution) |
146
|
|
|
|
147
|
|
|
try: |
148
|
|
|
message = self._transform_message(message=message, |
149
|
|
|
context=jinja_context) |
150
|
|
|
except: |
151
|
|
|
LOG.exception('Failed (Jinja) transforming `message`.') |
152
|
|
|
|
153
|
|
|
try: |
154
|
|
|
data = self._transform_data(data=data, context=jinja_context) |
155
|
|
|
except: |
156
|
|
|
LOG.exception('Failed (Jinja) transforming `data`.') |
157
|
|
|
|
158
|
|
|
# At this point convert result to a string. This restricts the rulesengines |
159
|
|
|
# ability to introspect the result. On the other handle atleast a json usable |
160
|
|
|
# result is sent as part of the notification. If jinja is required to convert |
161
|
|
|
# to a string representation it uses str(...) which make it impossible to |
162
|
|
|
# parse the result as json any longer. |
163
|
|
|
# TODO: Use to_serializable_dict |
164
|
|
|
data['result'] = json.dumps(liveaction.result) |
165
|
|
|
|
166
|
|
|
payload['message'] = message |
167
|
|
|
payload['data'] = data |
168
|
|
|
payload['execution_id'] = execution_id |
169
|
|
|
payload['status'] = liveaction.status |
170
|
|
|
payload['start_timestamp'] = isotime.format(liveaction.start_timestamp) |
171
|
|
|
|
172
|
|
|
try: |
173
|
|
|
payload['end_timestamp'] = isotime.format(liveaction.end_timestamp) |
174
|
|
|
except AttributeError: |
175
|
|
|
# This can be raised if liveaction.end_timestamp is None, which is caused |
176
|
|
|
# when policy cancels a request due to concurrency |
177
|
|
|
# In this case, use datetime.now() instead |
178
|
|
|
payload['end_timestamp'] = isotime.format(datetime.utcnow()) |
179
|
|
|
|
180
|
|
|
payload['action_ref'] = liveaction.action |
181
|
|
|
payload['runner_ref'] = self._get_runner_ref(liveaction.action) |
182
|
|
|
|
183
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
184
|
|
|
|
185
|
|
|
failed_routes = [] |
186
|
|
|
for route in routes: |
187
|
|
|
try: |
188
|
|
|
payload['route'] = route |
189
|
|
|
# Deprecated. Only for backward compatibility reasons. |
190
|
|
|
payload['channel'] = route |
191
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'], |
192
|
|
|
liveaction.id, payload) |
193
|
|
|
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload, |
194
|
|
|
trace_context=trace_context) |
195
|
|
|
except: |
196
|
|
|
failed_routes.append(route) |
197
|
|
|
|
198
|
|
|
if len(failed_routes) > 0: |
199
|
|
|
raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes)) |
200
|
|
|
|
201
|
|
|
def _build_jinja_context(self, liveaction, execution): |
|
|
|
|
202
|
|
|
context = {} |
203
|
|
|
context.update({SYSTEM_SCOPE: KeyValueLookup(scope=SYSTEM_SCOPE)}) |
204
|
|
|
context.update({ |
205
|
|
|
DATASTORE_PARENT_SCOPE: { |
206
|
|
|
SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE) |
207
|
|
|
} |
208
|
|
|
}) |
209
|
|
|
context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction.parameters}) |
210
|
|
|
context.update({ACTION_CONTEXT_KV_PREFIX: liveaction.context}) |
211
|
|
|
context.update({ACTION_RESULTS_KV_PREFIX: execution.result}) |
212
|
|
|
return context |
213
|
|
|
|
214
|
|
|
def _transform_message(self, message, context=None): |
215
|
|
|
mapping = {'message': message} |
216
|
|
|
context = context or {} |
217
|
|
|
return (jinja_utils.render_values(mapping=mapping, context=context)).get('message', |
218
|
|
|
message) |
219
|
|
|
|
220
|
|
|
def _transform_data(self, data, context=None): |
221
|
|
|
return jinja_utils.render_values(mapping=data, context=context) |
222
|
|
|
|
223
|
|
|
def _get_trace_context(self, execution_id): |
224
|
|
|
trace_db = trace_service.get_trace_db_by_action_execution( |
225
|
|
|
action_execution_id=execution_id) |
226
|
|
|
if trace_db: |
227
|
|
|
return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag) |
228
|
|
|
# If no trace_context is found then do not create a new one here. If necessary |
229
|
|
|
# it shall be created downstream. Sure this is impl leakage of some sort. |
230
|
|
|
return None |
231
|
|
|
|
232
|
|
|
def _post_generic_trigger(self, liveaction=None, execution=None): |
|
|
|
|
233
|
|
|
if not ACTION_SENSOR_ENABLED: |
234
|
|
|
LOG.debug('Action trigger is disabled, skipping trigger dispatch...') |
235
|
|
|
return |
236
|
|
|
|
237
|
|
|
execution_id = str(execution.id) |
238
|
|
|
payload = {'execution_id': execution_id, |
239
|
|
|
'status': liveaction.status, |
240
|
|
|
'start_timestamp': str(liveaction.start_timestamp), |
241
|
|
|
# deprecate 'action_name' at some point and switch to 'action_ref' |
242
|
|
|
'action_name': liveaction.action, |
243
|
|
|
'action_ref': liveaction.action, |
244
|
|
|
'runner_ref': self._get_runner_ref(liveaction.action), |
245
|
|
|
'parameters': liveaction.get_masked_parameters(), |
246
|
|
|
'result': liveaction.result} |
247
|
|
|
# Use execution_id to extract trace rather than liveaction. execution_id |
248
|
|
|
# will look-up an exact TraceDB while liveaction depending on context |
249
|
|
|
# may not end up going to the DB. |
250
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
251
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s', |
252
|
|
|
ACTION_TRIGGER_TYPE['name'], liveaction.id, payload, trace_context) |
253
|
|
|
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload, |
254
|
|
|
trace_context=trace_context) |
255
|
|
|
|
256
|
|
|
def _apply_post_run_policies(self, liveaction_db): |
257
|
|
|
# Apply policies defined for the action. |
258
|
|
|
policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True) |
259
|
|
|
LOG.debug('Applying %s post_run policies' % (len(policy_dbs))) |
260
|
|
|
|
261
|
|
|
for policy_db in policy_dbs: |
262
|
|
|
driver = policies.get_driver(policy_db.ref, |
263
|
|
|
policy_db.policy_type, |
264
|
|
|
**policy_db.parameters) |
265
|
|
|
|
266
|
|
|
try: |
267
|
|
|
LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' % |
268
|
|
|
(policy_db.ref, policy_db.policy_type, str(liveaction_db.id))) |
269
|
|
|
liveaction_db = driver.apply_after(liveaction_db) |
270
|
|
|
except: |
271
|
|
|
LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref) |
272
|
|
|
|
273
|
|
|
return liveaction_db |
274
|
|
|
|
275
|
|
|
def _get_runner_ref(self, action_ref): |
276
|
|
|
""" |
277
|
|
|
Retrieve a runner reference for the provided action. |
278
|
|
|
|
279
|
|
|
:rtype: ``str`` |
280
|
|
|
""" |
281
|
|
|
action = Action.get_by_ref(action_ref) |
282
|
|
|
return action['runner_type']['name'] |
283
|
|
|
|
284
|
|
|
|
285
|
|
|
def get_notifier(): |
286
|
|
|
with Connection(transport_utils.get_messaging_urls()) as conn: |
287
|
|
|
return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG)) |
288
|
|
|
|
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: