|
1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
|
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
|
3
|
|
|
# this work for additional information regarding copyright ownership. |
|
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
|
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
|
6
|
|
|
# the License. You may obtain a copy of the License at |
|
7
|
|
|
# |
|
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
9
|
|
|
# |
|
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
|
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13
|
|
|
# See the License for the specific language governing permissions and |
|
14
|
|
|
# limitations under the License. |
|
15
|
|
|
|
|
16
|
|
|
from datetime import datetime |
|
17
|
|
|
import json |
|
18
|
|
|
|
|
19
|
|
|
from kombu import Connection |
|
20
|
|
|
from oslo_config import cfg |
|
21
|
|
|
|
|
22
|
|
|
from st2common import log as logging |
|
23
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED |
|
24
|
|
|
from st2common.constants.action import LIVEACTION_FAILED_STATES |
|
25
|
|
|
from st2common.constants.action import LIVEACTION_COMPLETED_STATES |
|
26
|
|
|
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES |
|
27
|
|
|
from st2common.models.api.trace import TraceContext |
|
28
|
|
|
from st2common.models.db.execution import ActionExecutionDB |
|
29
|
|
|
from st2common.persistence.action import Action |
|
30
|
|
|
from st2common.persistence.liveaction import LiveAction |
|
31
|
|
|
from st2common.persistence.policy import Policy |
|
32
|
|
|
from st2common import policies |
|
33
|
|
|
from st2common.models.system.common import ResourceReference |
|
34
|
|
|
from st2common.persistence.execution import ActionExecution |
|
35
|
|
|
from st2common.services import trace as trace_service |
|
36
|
|
|
from st2common.transport import consumers, execution, liveaction, publishers |
|
|
|
|
|
|
37
|
|
|
from st2common.transport import utils as transport_utils |
|
38
|
|
|
from st2common.transport.reactor import TriggerDispatcher |
|
39
|
|
|
from st2common.util import isotime |
|
40
|
|
|
from st2common.util import jinja as jinja_utils |
|
41
|
|
|
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX |
|
42
|
|
|
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX |
|
43
|
|
|
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX |
|
44
|
|
|
from st2common.constants.keyvalue import FULL_SYSTEM_SCOPE, SYSTEM_SCOPE, DATASTORE_PARENT_SCOPE |
|
45
|
|
|
from st2common.services.keyvalues import KeyValueLookup |
|
46
|
|
|
|
|
47
|
|
|
__all__ = [ |
|
48
|
|
|
'Notifier', |
|
49
|
|
|
'get_notifier' |
|
50
|
|
|
] |
|
51
|
|
|
|
|
52
|
|
|
LOG = logging.getLogger(__name__) |
|
53
|
|
|
|
|
54
|
|
|
ACTIONUPDATE_WORK_Q = execution.get_queue('st2.notifiers.work', |
|
55
|
|
|
routing_key=publishers.UPDATE_RK) |
|
56
|
|
|
|
|
57
|
|
|
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable |
|
58
|
|
|
# XXX: Fix this nasty positional dependency. |
|
59
|
|
|
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0] |
|
60
|
|
|
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1] |
|
61
|
|
|
|
|
62
|
|
|
|
|
63
|
|
|
class Notifier(consumers.MessageHandler): |
|
64
|
|
|
message_type = ActionExecutionDB |
|
65
|
|
|
|
|
66
|
|
|
def __init__(self, connection, queues, trigger_dispatcher=None): |
|
67
|
|
|
super(Notifier, self).__init__(connection, queues) |
|
68
|
|
|
if not trigger_dispatcher: |
|
69
|
|
|
trigger_dispatcher = TriggerDispatcher(LOG) |
|
70
|
|
|
self._trigger_dispatcher = trigger_dispatcher |
|
71
|
|
|
self._notify_trigger = ResourceReference.to_string_reference( |
|
72
|
|
|
pack=NOTIFY_TRIGGER_TYPE['pack'], |
|
73
|
|
|
name=NOTIFY_TRIGGER_TYPE['name']) |
|
74
|
|
|
self._action_trigger = ResourceReference.to_string_reference( |
|
75
|
|
|
pack=ACTION_TRIGGER_TYPE['pack'], |
|
76
|
|
|
name=ACTION_TRIGGER_TYPE['name']) |
|
77
|
|
|
|
|
78
|
|
|
def process(self, execution): |
|
|
|
|
|
|
79
|
|
|
execution_id = str(execution.id) |
|
80
|
|
|
extra = {'execution': execution} |
|
81
|
|
|
LOG.debug('Processing execution %s', execution_id, extra=extra) |
|
82
|
|
|
|
|
83
|
|
|
if execution.status not in LIVEACTION_COMPLETED_STATES: |
|
84
|
|
|
LOG.debug('Skipping processing of execution %s since it\'s not in a completed state' % |
|
85
|
|
|
(execution_id), extra=extra) |
|
86
|
|
|
return |
|
87
|
|
|
|
|
88
|
|
|
liveaction_id = execution.liveaction['id'] |
|
89
|
|
|
liveaction_db = LiveAction.get_by_id(liveaction_id) |
|
90
|
|
|
self._apply_post_run_policies(liveaction_db=liveaction_db) |
|
91
|
|
|
|
|
92
|
|
|
if liveaction_db.notify is not None: |
|
93
|
|
|
self._post_notify_triggers(liveaction=liveaction_db, execution=execution) |
|
94
|
|
|
|
|
95
|
|
|
self._post_generic_trigger(liveaction=liveaction_db, execution=execution) |
|
96
|
|
|
|
|
97
|
|
|
def _get_execution_for_liveaction(self, liveaction): |
|
|
|
|
|
|
98
|
|
|
execution = ActionExecution.get(liveaction__id=str(liveaction.id)) |
|
|
|
|
|
|
99
|
|
|
|
|
100
|
|
|
if not execution: |
|
101
|
|
|
return None |
|
102
|
|
|
|
|
103
|
|
|
return execution |
|
104
|
|
|
|
|
105
|
|
|
def _post_notify_triggers(self, liveaction=None, execution=None): |
|
|
|
|
|
|
106
|
|
|
notify = getattr(liveaction, 'notify', None) |
|
107
|
|
|
|
|
108
|
|
|
if not notify: |
|
109
|
|
|
return |
|
110
|
|
|
|
|
111
|
|
|
if notify.on_complete: |
|
112
|
|
|
self._post_notify_subsection_triggers( |
|
113
|
|
|
liveaction=liveaction, execution=execution, |
|
114
|
|
|
notify_subsection=notify.on_complete, |
|
115
|
|
|
default_message_suffix='completed.') |
|
116
|
|
|
if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success: |
|
117
|
|
|
self._post_notify_subsection_triggers( |
|
118
|
|
|
liveaction=liveaction, execution=execution, |
|
119
|
|
|
notify_subsection=notify.on_success, |
|
120
|
|
|
default_message_suffix='succeeded.') |
|
121
|
|
|
if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure: |
|
122
|
|
|
self._post_notify_subsection_triggers( |
|
123
|
|
|
liveaction=liveaction, execution=execution, |
|
124
|
|
|
notify_subsection=notify.on_failure, |
|
125
|
|
|
default_message_suffix='failed.') |
|
126
|
|
|
|
|
127
|
|
|
def _post_notify_subsection_triggers(self, liveaction=None, execution=None, |
|
|
|
|
|
|
128
|
|
|
notify_subsection=None, |
|
129
|
|
|
default_message_suffix=None): |
|
130
|
|
|
routes = (getattr(notify_subsection, 'routes') or |
|
131
|
|
|
getattr(notify_subsection, 'channels', None)) |
|
132
|
|
|
|
|
133
|
|
|
execution_id = str(execution.id) |
|
134
|
|
|
|
|
135
|
|
|
if routes and len(routes) >= 1: |
|
136
|
|
|
payload = {} |
|
137
|
|
|
message = notify_subsection.message or ( |
|
138
|
|
|
'Action ' + liveaction.action + ' ' + default_message_suffix) |
|
139
|
|
|
data = notify_subsection.data or {} |
|
140
|
|
|
|
|
141
|
|
|
jinja_context = self._build_jinja_context(liveaction=liveaction, execution=execution) |
|
142
|
|
|
|
|
143
|
|
|
try: |
|
144
|
|
|
message = self._transform_message(message=message, |
|
145
|
|
|
context=jinja_context) |
|
146
|
|
|
except: |
|
147
|
|
|
LOG.exception('Failed (Jinja) transforming `message`.') |
|
148
|
|
|
|
|
149
|
|
|
try: |
|
150
|
|
|
data = self._transform_data(data=data, context=jinja_context) |
|
151
|
|
|
except: |
|
152
|
|
|
LOG.exception('Failed (Jinja) transforming `data`.') |
|
153
|
|
|
|
|
154
|
|
|
# At this point convert result to a string. This restricts the rulesengines |
|
155
|
|
|
# ability to introspect the result. On the other handle atleast a json usable |
|
156
|
|
|
# result is sent as part of the notification. If jinja is required to convert |
|
157
|
|
|
# to a string representation it uses str(...) which make it impossible to |
|
158
|
|
|
# parse the result as json any longer. |
|
159
|
|
|
# TODO: Use to_serializable_dict |
|
160
|
|
|
data['result'] = json.dumps(liveaction.result) |
|
161
|
|
|
|
|
162
|
|
|
payload['message'] = message |
|
163
|
|
|
payload['data'] = data |
|
164
|
|
|
payload['execution_id'] = execution_id |
|
165
|
|
|
payload['status'] = liveaction.status |
|
166
|
|
|
payload['start_timestamp'] = isotime.format(liveaction.start_timestamp) |
|
167
|
|
|
|
|
168
|
|
|
try: |
|
169
|
|
|
payload['end_timestamp'] = isotime.format(liveaction.end_timestamp) |
|
170
|
|
|
except AttributeError: |
|
171
|
|
|
# This can be raised if liveaction.end_timestamp is None, which is caused |
|
172
|
|
|
# when policy cancels a request due to concurrency |
|
173
|
|
|
# In this case, use datetime.now() instead |
|
174
|
|
|
payload['end_timestamp'] = isotime.format(datetime.utcnow()) |
|
175
|
|
|
|
|
176
|
|
|
payload['action_ref'] = liveaction.action |
|
177
|
|
|
payload['runner_ref'] = self._get_runner_ref(liveaction.action) |
|
178
|
|
|
|
|
179
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
|
180
|
|
|
|
|
181
|
|
|
failed_routes = [] |
|
182
|
|
|
for route in routes: |
|
183
|
|
|
try: |
|
184
|
|
|
payload['route'] = route |
|
185
|
|
|
# Deprecated. Only for backward compatibility reasons. |
|
186
|
|
|
payload['channel'] = route |
|
187
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'], |
|
188
|
|
|
liveaction.id, payload) |
|
189
|
|
|
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload, |
|
190
|
|
|
trace_context=trace_context) |
|
191
|
|
|
except: |
|
192
|
|
|
failed_routes.append(route) |
|
193
|
|
|
|
|
194
|
|
|
if len(failed_routes) > 0: |
|
195
|
|
|
raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes)) |
|
196
|
|
|
|
|
197
|
|
|
def _build_jinja_context(self, liveaction, execution): |
|
|
|
|
|
|
198
|
|
|
context = {} |
|
199
|
|
|
context.update({ |
|
200
|
|
|
DATASTORE_PARENT_SCOPE: { |
|
201
|
|
|
SYSTEM_SCOPE: KeyValueLookup(scope=FULL_SYSTEM_SCOPE) |
|
202
|
|
|
} |
|
203
|
|
|
}) |
|
204
|
|
|
context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction.parameters}) |
|
205
|
|
|
context.update({ACTION_CONTEXT_KV_PREFIX: liveaction.context}) |
|
206
|
|
|
context.update({ACTION_RESULTS_KV_PREFIX: execution.result}) |
|
207
|
|
|
return context |
|
208
|
|
|
|
|
209
|
|
|
def _transform_message(self, message, context=None): |
|
210
|
|
|
mapping = {'message': message} |
|
211
|
|
|
context = context or {} |
|
212
|
|
|
return (jinja_utils.render_values(mapping=mapping, context=context)).get('message', |
|
213
|
|
|
message) |
|
214
|
|
|
|
|
215
|
|
|
def _transform_data(self, data, context=None): |
|
216
|
|
|
return jinja_utils.render_values(mapping=data, context=context) |
|
217
|
|
|
|
|
218
|
|
|
def _get_trace_context(self, execution_id): |
|
219
|
|
|
trace_db = trace_service.get_trace_db_by_action_execution( |
|
220
|
|
|
action_execution_id=execution_id) |
|
221
|
|
|
if trace_db: |
|
222
|
|
|
return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag) |
|
223
|
|
|
# If no trace_context is found then do not create a new one here. If necessary |
|
224
|
|
|
# it shall be created downstream. Sure this is impl leakage of some sort. |
|
225
|
|
|
return None |
|
226
|
|
|
|
|
227
|
|
|
def _post_generic_trigger(self, liveaction=None, execution=None): |
|
|
|
|
|
|
228
|
|
|
if not ACTION_SENSOR_ENABLED: |
|
229
|
|
|
LOG.debug('Action trigger is disabled, skipping trigger dispatch...') |
|
230
|
|
|
return |
|
231
|
|
|
|
|
232
|
|
|
execution_id = str(execution.id) |
|
233
|
|
|
payload = {'execution_id': execution_id, |
|
234
|
|
|
'status': liveaction.status, |
|
235
|
|
|
'start_timestamp': str(liveaction.start_timestamp), |
|
236
|
|
|
# deprecate 'action_name' at some point and switch to 'action_ref' |
|
237
|
|
|
'action_name': liveaction.action, |
|
238
|
|
|
'action_ref': liveaction.action, |
|
239
|
|
|
'runner_ref': self._get_runner_ref(liveaction.action), |
|
240
|
|
|
'parameters': liveaction.get_masked_parameters(), |
|
241
|
|
|
'result': liveaction.result} |
|
242
|
|
|
# Use execution_id to extract trace rather than liveaction. execution_id |
|
243
|
|
|
# will look-up an exact TraceDB while liveaction depending on context |
|
244
|
|
|
# may not end up going to the DB. |
|
245
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
|
246
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s', |
|
247
|
|
|
ACTION_TRIGGER_TYPE['name'], liveaction.id, payload, trace_context) |
|
248
|
|
|
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload, |
|
249
|
|
|
trace_context=trace_context) |
|
250
|
|
|
|
|
251
|
|
|
def _apply_post_run_policies(self, liveaction_db): |
|
252
|
|
|
# Apply policies defined for the action. |
|
253
|
|
|
policy_dbs = Policy.query(resource_ref=liveaction_db.action, enabled=True) |
|
254
|
|
|
LOG.debug('Applying %s post_run policies' % (len(policy_dbs))) |
|
255
|
|
|
|
|
256
|
|
|
for policy_db in policy_dbs: |
|
257
|
|
|
driver = policies.get_driver(policy_db.ref, |
|
258
|
|
|
policy_db.policy_type, |
|
259
|
|
|
**policy_db.parameters) |
|
260
|
|
|
|
|
261
|
|
|
try: |
|
262
|
|
|
LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' % |
|
263
|
|
|
(policy_db.ref, policy_db.policy_type, str(liveaction_db.id))) |
|
264
|
|
|
liveaction_db = driver.apply_after(liveaction_db) |
|
265
|
|
|
except: |
|
266
|
|
|
LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref) |
|
267
|
|
|
|
|
268
|
|
|
return liveaction_db |
|
269
|
|
|
|
|
270
|
|
|
def _get_runner_ref(self, action_ref): |
|
271
|
|
|
""" |
|
272
|
|
|
Retrieve a runner reference for the provided action. |
|
273
|
|
|
|
|
274
|
|
|
:rtype: ``str`` |
|
275
|
|
|
""" |
|
276
|
|
|
action = Action.get_by_ref(action_ref) |
|
277
|
|
|
return action['runner_type']['name'] |
|
278
|
|
|
|
|
279
|
|
|
|
|
280
|
|
|
def get_notifier(): |
|
281
|
|
|
with Connection(transport_utils.get_messaging_urls()) as conn: |
|
282
|
|
|
return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG)) |
|
283
|
|
|
|