1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
from __future__ import absolute_import |
17
|
|
|
from datetime import datetime |
18
|
|
|
import json |
19
|
|
|
|
20
|
|
|
from kombu import Connection |
21
|
|
|
from oslo_config import cfg |
22
|
|
|
|
23
|
|
|
from st2common import log as logging |
24
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED |
25
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_PAUSED |
26
|
|
|
from st2common.constants.action import LIVEACTION_FAILED_STATES |
27
|
|
|
from st2common.constants.action import LIVEACTION_COMPLETED_STATES |
28
|
|
|
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES |
29
|
|
|
from st2common.models.api.trace import TraceContext |
30
|
|
|
from st2common.models.db.execution import ActionExecutionDB |
31
|
|
|
from st2common.persistence.action import Action |
32
|
|
|
from st2common.persistence.liveaction import LiveAction |
33
|
|
|
from st2common.persistence.policy import Policy |
34
|
|
|
from st2common import policies |
35
|
|
|
from st2common.models.system.common import ResourceReference |
36
|
|
|
from st2common.persistence.execution import ActionExecution |
37
|
|
|
from st2common.services import trace as trace_service |
38
|
|
|
from st2common.services import workflows as wf_svc |
39
|
|
|
from st2common.transport import consumers |
40
|
|
|
from st2common.transport import utils as transport_utils |
41
|
|
|
from st2common.transport.reactor import TriggerDispatcher |
42
|
|
|
from st2common.util import isotime |
43
|
|
|
from st2common.util import jinja as jinja_utils |
44
|
|
|
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX |
45
|
|
|
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX |
46
|
|
|
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX |
47
|
|
|
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE |
48
|
|
|
from st2common.services.keyvalues import KeyValueLookup |
49
|
|
|
from st2common.transport.queues import NOTIFIER_ACTIONUPDATE_WORK_QUEUE |
50
|
|
|
|
51
|
|
|
__all__ = [ |
52
|
|
|
'Notifier', |
53
|
|
|
'get_notifier' |
54
|
|
|
] |
55
|
|
|
|
56
|
|
|
LOG = logging.getLogger(__name__) |
57
|
|
|
|
58
|
|
|
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable |
59
|
|
|
# XXX: Fix this nasty positional dependency. |
60
|
|
|
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0] |
61
|
|
|
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1] |
62
|
|
|
|
63
|
|
|
|
64
|
|
|
class Notifier(consumers.MessageHandler): |
65
|
|
|
message_type = ActionExecutionDB |
66
|
|
|
|
67
|
|
|
def __init__(self, connection, queues, trigger_dispatcher=None): |
68
|
|
|
super(Notifier, self).__init__(connection, queues) |
69
|
|
|
if not trigger_dispatcher: |
70
|
|
|
trigger_dispatcher = TriggerDispatcher(LOG) |
71
|
|
|
self._trigger_dispatcher = trigger_dispatcher |
72
|
|
|
self._notify_trigger = ResourceReference.to_string_reference( |
73
|
|
|
pack=NOTIFY_TRIGGER_TYPE['pack'], |
74
|
|
|
name=NOTIFY_TRIGGER_TYPE['name']) |
75
|
|
|
self._action_trigger = ResourceReference.to_string_reference( |
76
|
|
|
pack=ACTION_TRIGGER_TYPE['pack'], |
77
|
|
|
name=ACTION_TRIGGER_TYPE['name']) |
78
|
|
|
|
79
|
|
|
def process(self, execution_db): |
80
|
|
|
execution_id = str(execution_db.id) |
81
|
|
|
extra = {'execution': execution_db} |
82
|
|
|
LOG.debug('Processing execution %s', execution_id, extra=extra) |
83
|
|
|
|
84
|
|
|
if ('orchestra' in execution_db.context and |
85
|
|
|
execution_db.status == LIVEACTION_STATUS_PAUSED): |
86
|
|
|
wf_svc.handle_action_execution_pause(execution_db) |
87
|
|
|
|
88
|
|
|
if execution_db.status not in LIVEACTION_COMPLETED_STATES: |
89
|
|
|
LOG.debug('Skipping processing of execution %s since it\'s not in a completed state' % |
90
|
|
|
(execution_id), extra=extra) |
91
|
|
|
return |
92
|
|
|
|
93
|
|
|
liveaction_id = execution_db.liveaction['id'] |
94
|
|
|
liveaction_db = LiveAction.get_by_id(liveaction_id) |
95
|
|
|
self._apply_post_run_policies(liveaction_db=liveaction_db) |
96
|
|
|
|
97
|
|
|
if liveaction_db.notify is not None: |
98
|
|
|
self._post_notify_triggers(liveaction_db=liveaction_db, execution_db=execution_db) |
99
|
|
|
|
100
|
|
|
self._post_generic_trigger(liveaction_db=liveaction_db, execution_db=execution_db) |
101
|
|
|
|
102
|
|
|
if 'orchestra' in liveaction_db.context: |
103
|
|
|
wf_svc.handle_action_execution_completion(execution_db) |
104
|
|
|
|
105
|
|
|
def _get_execution_for_liveaction(self, liveaction): |
106
|
|
|
execution = ActionExecution.get(liveaction__id=str(liveaction.id)) |
107
|
|
|
|
108
|
|
|
if not execution: |
109
|
|
|
return None |
110
|
|
|
|
111
|
|
|
return execution |
112
|
|
|
|
113
|
|
|
def _post_notify_triggers(self, liveaction_db=None, execution_db=None): |
114
|
|
|
notify = getattr(liveaction_db, 'notify', None) |
115
|
|
|
|
116
|
|
|
if not notify: |
117
|
|
|
return |
118
|
|
|
|
119
|
|
|
if notify.on_complete: |
120
|
|
|
self._post_notify_subsection_triggers( |
121
|
|
|
liveaction_db=liveaction_db, execution_db=execution_db, |
122
|
|
|
notify_subsection=notify.on_complete, |
123
|
|
|
default_message_suffix='completed.') |
124
|
|
|
if liveaction_db.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success: |
125
|
|
|
self._post_notify_subsection_triggers( |
126
|
|
|
liveaction_db=liveaction_db, execution_db=execution_db, |
127
|
|
|
notify_subsection=notify.on_success, |
128
|
|
|
default_message_suffix='succeeded.') |
129
|
|
|
if liveaction_db.status in LIVEACTION_FAILED_STATES and notify.on_failure: |
130
|
|
|
self._post_notify_subsection_triggers( |
131
|
|
|
liveaction_db=liveaction_db, execution_db=execution_db, |
132
|
|
|
notify_subsection=notify.on_failure, |
133
|
|
|
default_message_suffix='failed.') |
134
|
|
|
|
135
|
|
|
def _post_notify_subsection_triggers(self, liveaction_db=None, execution_db=None, |
136
|
|
|
notify_subsection=None, |
137
|
|
|
default_message_suffix=None): |
138
|
|
|
routes = (getattr(notify_subsection, 'routes') or |
139
|
|
|
getattr(notify_subsection, 'channels', None)) |
140
|
|
|
|
141
|
|
|
execution_id = str(execution_db.id) |
142
|
|
|
|
143
|
|
|
if routes and len(routes) >= 1: |
144
|
|
|
payload = {} |
145
|
|
|
message = notify_subsection.message or ( |
146
|
|
|
'Action ' + liveaction_db.action + ' ' + default_message_suffix) |
147
|
|
|
data = notify_subsection.data or {} |
148
|
|
|
|
149
|
|
|
jinja_context = self._build_jinja_context( |
150
|
|
|
liveaction_db=liveaction_db, execution_db=execution_db |
151
|
|
|
) |
152
|
|
|
|
153
|
|
|
try: |
154
|
|
|
message = self._transform_message(message=message, |
155
|
|
|
context=jinja_context) |
156
|
|
|
except: |
157
|
|
|
LOG.exception('Failed (Jinja) transforming `message`.') |
158
|
|
|
|
159
|
|
|
try: |
160
|
|
|
data = self._transform_data(data=data, context=jinja_context) |
161
|
|
|
except: |
162
|
|
|
LOG.exception('Failed (Jinja) transforming `data`.') |
163
|
|
|
|
164
|
|
|
# At this point convert result to a string. This restricts the rulesengines |
165
|
|
|
# ability to introspect the result. On the other handle atleast a json usable |
166
|
|
|
# result is sent as part of the notification. If jinja is required to convert |
167
|
|
|
# to a string representation it uses str(...) which make it impossible to |
168
|
|
|
# parse the result as json any longer. |
169
|
|
|
# TODO: Use to_serializable_dict |
170
|
|
|
data['result'] = json.dumps(liveaction_db.result) |
171
|
|
|
|
172
|
|
|
payload['message'] = message |
173
|
|
|
payload['data'] = data |
174
|
|
|
payload['execution_id'] = execution_id |
175
|
|
|
payload['status'] = liveaction_db.status |
176
|
|
|
payload['start_timestamp'] = isotime.format(liveaction_db.start_timestamp) |
177
|
|
|
|
178
|
|
|
try: |
179
|
|
|
payload['end_timestamp'] = isotime.format(liveaction_db.end_timestamp) |
180
|
|
|
except AttributeError: |
181
|
|
|
# This can be raised if liveaction.end_timestamp is None, which is caused |
182
|
|
|
# when policy cancels a request due to concurrency |
183
|
|
|
# In this case, use datetime.now() instead |
184
|
|
|
payload['end_timestamp'] = isotime.format(datetime.utcnow()) |
185
|
|
|
|
186
|
|
|
payload['action_ref'] = liveaction_db.action |
187
|
|
|
payload['runner_ref'] = self._get_runner_ref(liveaction_db.action) |
188
|
|
|
|
189
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
190
|
|
|
|
191
|
|
|
failed_routes = [] |
192
|
|
|
for route in routes: |
193
|
|
|
try: |
194
|
|
|
payload['route'] = route |
195
|
|
|
# Deprecated. Only for backward compatibility reasons. |
196
|
|
|
payload['channel'] = route |
197
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'], |
198
|
|
|
liveaction_db.id, payload) |
199
|
|
|
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload, |
200
|
|
|
trace_context=trace_context) |
201
|
|
|
except: |
202
|
|
|
failed_routes.append(route) |
203
|
|
|
|
204
|
|
|
if len(failed_routes) > 0: |
205
|
|
|
raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes)) |
206
|
|
|
|
207
|
|
|
def _build_jinja_context(self, liveaction_db, execution_db): |
208
|
|
|
context = {} |
209
|
|
|
context.update({ |
210
|
|
|
DATASTORE_PARENT_SCOPE: { |
211
|
|
|
SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE) |
212
|
|
|
} |
213
|
|
|
}) |
214
|
|
|
context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction_db.parameters}) |
215
|
|
|
context.update({ACTION_CONTEXT_KV_PREFIX: liveaction_db.context}) |
216
|
|
|
context.update({ACTION_RESULTS_KV_PREFIX: execution_db.result}) |
217
|
|
|
return context |
218
|
|
|
|
219
|
|
|
def _transform_message(self, message, context=None): |
220
|
|
|
mapping = {'message': message} |
221
|
|
|
context = context or {} |
222
|
|
|
return (jinja_utils.render_values(mapping=mapping, context=context)).get('message', |
223
|
|
|
message) |
224
|
|
|
|
225
|
|
|
def _transform_data(self, data, context=None): |
226
|
|
|
return jinja_utils.render_values(mapping=data, context=context) |
227
|
|
|
|
228
|
|
|
def _get_trace_context(self, execution_id): |
229
|
|
|
trace_db = trace_service.get_trace_db_by_action_execution( |
230
|
|
|
action_execution_id=execution_id) |
231
|
|
|
if trace_db: |
232
|
|
|
return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag) |
233
|
|
|
# If no trace_context is found then do not create a new one here. If necessary |
234
|
|
|
# it shall be created downstream. Sure this is impl leakage of some sort. |
235
|
|
|
return None |
236
|
|
|
|
237
|
|
|
def _post_generic_trigger(self, liveaction_db=None, execution_db=None): |
238
|
|
|
if not ACTION_SENSOR_ENABLED: |
239
|
|
|
LOG.debug('Action trigger is disabled, skipping trigger dispatch...') |
240
|
|
|
return |
241
|
|
|
|
242
|
|
|
execution_id = str(execution_db.id) |
243
|
|
|
payload = {'execution_id': execution_id, |
244
|
|
|
'status': liveaction_db.status, |
245
|
|
|
'start_timestamp': str(liveaction_db.start_timestamp), |
246
|
|
|
# deprecate 'action_name' at some point and switch to 'action_ref' |
247
|
|
|
'action_name': liveaction_db.action, |
248
|
|
|
'action_ref': liveaction_db.action, |
249
|
|
|
'runner_ref': self._get_runner_ref(liveaction_db.action), |
250
|
|
|
'parameters': liveaction_db.get_masked_parameters(), |
251
|
|
|
'result': liveaction_db.result} |
252
|
|
|
# Use execution_id to extract trace rather than liveaction. execution_id |
253
|
|
|
# will look-up an exact TraceDB while liveaction depending on context |
254
|
|
|
# may not end up going to the DB. |
255
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
256
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s', |
257
|
|
|
ACTION_TRIGGER_TYPE['name'], liveaction_db.id, payload, trace_context) |
258
|
|
|
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload, |
259
|
|
|
trace_context=trace_context) |
260
|
|
|
|
261
|
|
|
def _apply_post_run_policies(self, liveaction_db): |
262
|
|
|
# Apply policies defined for the action. |
263
|
|
|
policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True) |
264
|
|
|
LOG.debug('Applying %s post_run policies' % (len(policy_dbs))) |
265
|
|
|
|
266
|
|
|
for policy_db in policy_dbs: |
267
|
|
|
driver = policies.get_driver(policy_db.ref, |
268
|
|
|
policy_db.policy_type, |
269
|
|
|
**policy_db.parameters) |
270
|
|
|
|
271
|
|
|
try: |
272
|
|
|
LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' % |
273
|
|
|
(policy_db.ref, policy_db.policy_type, str(liveaction_db.id))) |
274
|
|
|
liveaction_db = driver.apply_after(liveaction_db) |
275
|
|
|
except: |
276
|
|
|
LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref) |
277
|
|
|
|
278
|
|
|
return liveaction_db |
279
|
|
|
|
280
|
|
|
def _get_runner_ref(self, action_ref): |
281
|
|
|
""" |
282
|
|
|
Retrieve a runner reference for the provided action. |
283
|
|
|
|
284
|
|
|
:rtype: ``str`` |
285
|
|
|
""" |
286
|
|
|
action = Action.get_by_ref(action_ref) |
287
|
|
|
return action['runner_type']['name'] |
288
|
|
|
|
289
|
|
|
|
290
|
|
|
def get_notifier(): |
291
|
|
|
with Connection(transport_utils.get_messaging_urls()) as conn: |
292
|
|
|
return Notifier(conn, [NOTIFIER_ACTIONUPDATE_WORK_QUEUE], |
293
|
|
|
trigger_dispatcher=TriggerDispatcher(LOG)) |
294
|
|
|
|