1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
import json |
17
|
|
|
|
18
|
|
|
from kombu import Connection |
19
|
|
|
from oslo_config import cfg |
20
|
|
|
|
21
|
|
|
from st2common import log as logging |
22
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED |
23
|
|
|
from st2common.constants.action import LIVEACTION_FAILED_STATES |
24
|
|
|
from st2common.constants.action import LIVEACTION_COMPLETED_STATES |
25
|
|
|
from st2common.constants.triggers import INTERNAL_TRIGGER_TYPES |
26
|
|
|
from st2common.models.api.trace import TraceContext |
27
|
|
|
from st2common.models.db.liveaction import LiveActionDB |
28
|
|
|
from st2common.persistence.action import Action |
29
|
|
|
from st2common.persistence.policy import Policy |
30
|
|
|
from st2common import policies |
31
|
|
|
from st2common.models.system.common import ResourceReference |
32
|
|
|
from st2common.persistence.execution import ActionExecution |
33
|
|
|
from st2common.services import trace as trace_service |
34
|
|
|
from st2common.transport import consumers, liveaction, publishers |
35
|
|
|
from st2common.transport import utils as transport_utils |
36
|
|
|
from st2common.transport.reactor import TriggerDispatcher |
37
|
|
|
from st2common.util import isotime |
38
|
|
|
from st2common.util import jinja as jinja_utils |
39
|
|
|
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX |
40
|
|
|
from st2common.constants.action import ACTION_PARAMETERS_KV_PREFIX |
41
|
|
|
from st2common.constants.action import ACTION_RESULTS_KV_PREFIX |
42
|
|
|
from st2common.constants.system import SYSTEM_KV_PREFIX |
43
|
|
|
from st2common.services.keyvalues import KeyValueLookup |
44
|
|
|
|
45
|
|
|
__all__ = [ |
46
|
|
|
'Notifier', |
47
|
|
|
'get_notifier' |
48
|
|
|
] |
49
|
|
|
|
50
|
|
|
LOG = logging.getLogger(__name__) |
51
|
|
|
|
52
|
|
|
ACTIONUPDATE_WORK_Q = liveaction.get_queue('st2.notifiers.work', |
53
|
|
|
routing_key=publishers.UPDATE_RK) |
54
|
|
|
|
55
|
|
|
ACTION_SENSOR_ENABLED = cfg.CONF.action_sensor.enable |
56
|
|
|
# XXX: Fix this nasty positional dependency. |
57
|
|
|
ACTION_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][0] |
58
|
|
|
NOTIFY_TRIGGER_TYPE = INTERNAL_TRIGGER_TYPES['action'][1] |
59
|
|
|
|
60
|
|
|
|
61
|
|
|
class Notifier(consumers.MessageHandler): |
62
|
|
|
message_type = LiveActionDB |
63
|
|
|
|
64
|
|
|
def __init__(self, connection, queues, trigger_dispatcher=None): |
65
|
|
|
super(Notifier, self).__init__(connection, queues) |
66
|
|
|
if not trigger_dispatcher: |
67
|
|
|
trigger_dispatcher = TriggerDispatcher(LOG) |
68
|
|
|
self._trigger_dispatcher = trigger_dispatcher |
69
|
|
|
self._notify_trigger = ResourceReference.to_string_reference( |
70
|
|
|
pack=NOTIFY_TRIGGER_TYPE['pack'], |
71
|
|
|
name=NOTIFY_TRIGGER_TYPE['name']) |
72
|
|
|
self._action_trigger = ResourceReference.to_string_reference( |
73
|
|
|
pack=ACTION_TRIGGER_TYPE['pack'], |
74
|
|
|
name=ACTION_TRIGGER_TYPE['name']) |
75
|
|
|
|
76
|
|
|
def process(self, liveaction): |
|
|
|
|
77
|
|
|
live_action_id = str(liveaction.id) |
78
|
|
|
extra = {'live_action_db': liveaction} |
79
|
|
|
LOG.debug('Processing liveaction %s', live_action_id, extra=extra) |
80
|
|
|
|
81
|
|
|
if liveaction.status not in LIVEACTION_COMPLETED_STATES: |
82
|
|
|
LOG.debug('Skipping processing of liveaction %s since it\'s not in a completed state' % |
83
|
|
|
(live_action_id), extra=extra) |
84
|
|
|
return |
85
|
|
|
|
86
|
|
|
execution = self._get_execution_for_liveaction(liveaction) |
87
|
|
|
|
88
|
|
|
if not execution: |
89
|
|
|
LOG.exception('Execution object corresponding to LiveAction %s not found.', |
90
|
|
|
live_action_id, extra=extra) |
91
|
|
|
return None |
92
|
|
|
|
93
|
|
|
self._apply_post_run_policies(liveaction=liveaction) |
94
|
|
|
|
95
|
|
|
if liveaction.notify is not None: |
96
|
|
|
self._post_notify_triggers(liveaction=liveaction, execution=execution) |
97
|
|
|
|
98
|
|
|
self._post_generic_trigger(liveaction=liveaction, execution=execution) |
99
|
|
|
|
100
|
|
|
def _get_execution_for_liveaction(self, liveaction): |
|
|
|
|
101
|
|
|
execution = ActionExecution.get(liveaction__id=str(liveaction.id)) |
102
|
|
|
|
103
|
|
|
if not execution: |
104
|
|
|
return None |
105
|
|
|
|
106
|
|
|
return execution |
107
|
|
|
|
108
|
|
|
def _post_notify_triggers(self, liveaction=None, execution=None): |
|
|
|
|
109
|
|
|
notify = getattr(liveaction, 'notify', None) |
110
|
|
|
|
111
|
|
|
if not notify: |
112
|
|
|
return |
113
|
|
|
|
114
|
|
|
if notify.on_complete: |
115
|
|
|
self._post_notify_subsection_triggers( |
116
|
|
|
liveaction=liveaction, execution=execution, |
117
|
|
|
notify_subsection=notify.on_complete, |
118
|
|
|
default_message_suffix='completed.') |
119
|
|
|
if liveaction.status == LIVEACTION_STATUS_SUCCEEDED and notify.on_success: |
120
|
|
|
self._post_notify_subsection_triggers( |
121
|
|
|
liveaction=liveaction, execution=execution, |
122
|
|
|
notify_subsection=notify.on_success, |
123
|
|
|
default_message_suffix='succeeded.') |
124
|
|
|
if liveaction.status in LIVEACTION_FAILED_STATES and notify.on_failure: |
125
|
|
|
self._post_notify_subsection_triggers( |
126
|
|
|
liveaction=liveaction, execution=execution, |
127
|
|
|
notify_subsection=notify.on_failure, |
128
|
|
|
default_message_suffix='failed.') |
129
|
|
|
|
130
|
|
|
def _post_notify_subsection_triggers(self, liveaction=None, execution=None, |
|
|
|
|
131
|
|
|
notify_subsection=None, |
132
|
|
|
default_message_suffix=None): |
133
|
|
|
routes = (getattr(notify_subsection, 'routes') or |
134
|
|
|
getattr(notify_subsection, 'channels', None)) |
135
|
|
|
|
136
|
|
|
execution_id = str(execution.id) |
137
|
|
|
|
138
|
|
|
if routes and len(routes) >= 1: |
139
|
|
|
payload = {} |
140
|
|
|
message = notify_subsection.message or ( |
141
|
|
|
'Action ' + liveaction.action + ' ' + default_message_suffix) |
142
|
|
|
data = notify_subsection.data or {} |
143
|
|
|
|
144
|
|
|
jinja_context = self._build_jinja_context(liveaction=liveaction, execution=execution) |
145
|
|
|
|
146
|
|
|
try: |
147
|
|
|
message = self._transform_message(message=message, |
148
|
|
|
context=jinja_context) |
149
|
|
|
except: |
150
|
|
|
LOG.exception('Failed (Jinja) transforming `message`.') |
151
|
|
|
|
152
|
|
|
try: |
153
|
|
|
data = self._transform_data(data=data, context=jinja_context) |
154
|
|
|
except: |
155
|
|
|
LOG.exception('Failed (Jinja) transforming `data`.') |
156
|
|
|
|
157
|
|
|
# At this point convert result to a string. This restricts the rulesengines |
158
|
|
|
# ability to introspect the result. On the other handle atleast a json usable |
159
|
|
|
# result is sent as part of the notification. If jinja is required to convert |
160
|
|
|
# to a string representation it uses str(...) which make it impossible to |
161
|
|
|
# parse the result as json any longer. |
162
|
|
|
# TODO: Use to_serializable_dict |
163
|
|
|
data['result'] = json.dumps(liveaction.result) |
164
|
|
|
|
165
|
|
|
payload['message'] = message |
166
|
|
|
payload['data'] = data |
167
|
|
|
payload['execution_id'] = execution_id |
168
|
|
|
payload['status'] = liveaction.status |
169
|
|
|
payload['start_timestamp'] = isotime.format(liveaction.start_timestamp) |
170
|
|
|
payload['end_timestamp'] = isotime.format(liveaction.end_timestamp) |
171
|
|
|
payload['action_ref'] = liveaction.action |
172
|
|
|
payload['runner_ref'] = self._get_runner_ref(liveaction.action) |
173
|
|
|
|
174
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
175
|
|
|
|
176
|
|
|
failed_routes = [] |
177
|
|
|
for route in routes: |
178
|
|
|
try: |
179
|
|
|
payload['route'] = route |
180
|
|
|
# Deprecated. Only for backward compatibility reasons. |
181
|
|
|
payload['channel'] = route |
182
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s.', NOTIFY_TRIGGER_TYPE['name'], |
183
|
|
|
liveaction.id, payload) |
184
|
|
|
self._trigger_dispatcher.dispatch(self._notify_trigger, payload=payload, |
185
|
|
|
trace_context=trace_context) |
186
|
|
|
except: |
187
|
|
|
failed_routes.append(route) |
188
|
|
|
|
189
|
|
|
if len(failed_routes) > 0: |
190
|
|
|
raise Exception('Failed notifications to routes: %s' % ', '.join(failed_routes)) |
191
|
|
|
|
192
|
|
|
def _build_jinja_context(self, liveaction, execution): |
|
|
|
|
193
|
|
|
context = {SYSTEM_KV_PREFIX: KeyValueLookup()} |
194
|
|
|
context.update({ACTION_PARAMETERS_KV_PREFIX: liveaction.parameters}) |
195
|
|
|
context.update({ACTION_CONTEXT_KV_PREFIX: liveaction.context}) |
196
|
|
|
context.update({ACTION_RESULTS_KV_PREFIX: execution.result}) |
197
|
|
|
return context |
198
|
|
|
|
199
|
|
|
def _transform_message(self, message, context=None): |
200
|
|
|
mapping = {'message': message} |
201
|
|
|
context = context or {} |
202
|
|
|
return (jinja_utils.render_values(mapping=mapping, context=context)).get('message', |
203
|
|
|
message) |
204
|
|
|
|
205
|
|
|
def _transform_data(self, data, context=None): |
206
|
|
|
return jinja_utils.render_values(mapping=data, context=context) |
207
|
|
|
|
208
|
|
|
def _get_trace_context(self, execution_id): |
209
|
|
|
trace_db = trace_service.get_trace_db_by_action_execution( |
210
|
|
|
action_execution_id=execution_id) |
211
|
|
|
if trace_db: |
212
|
|
|
return TraceContext(id_=str(trace_db.id), trace_tag=trace_db.trace_tag) |
213
|
|
|
# If no trace_context is found then do not create a new one here. If necessary |
214
|
|
|
# it shall be created downstream. Sure this is impl leakage of some sort. |
215
|
|
|
return None |
216
|
|
|
|
217
|
|
|
def _post_generic_trigger(self, liveaction=None, execution=None): |
|
|
|
|
218
|
|
|
if not ACTION_SENSOR_ENABLED: |
219
|
|
|
LOG.debug('Action trigger is disabled, skipping trigger dispatch...') |
220
|
|
|
return |
221
|
|
|
|
222
|
|
|
execution_id = str(execution.id) |
223
|
|
|
payload = {'execution_id': execution_id, |
224
|
|
|
'status': liveaction.status, |
225
|
|
|
'start_timestamp': str(liveaction.start_timestamp), |
226
|
|
|
# deprecate 'action_name' at some point and switch to 'action_ref' |
227
|
|
|
'action_name': liveaction.action, |
228
|
|
|
'action_ref': liveaction.action, |
229
|
|
|
'runner_ref': self._get_runner_ref(liveaction.action), |
230
|
|
|
'parameters': liveaction.get_masked_parameters(), |
231
|
|
|
'result': liveaction.result} |
232
|
|
|
# Use execution_id to extract trace rather than liveaction. execution_id |
233
|
|
|
# will look-up an exact TraceDB while liveaction depending on context |
234
|
|
|
# may not end up going to the DB. |
235
|
|
|
trace_context = self._get_trace_context(execution_id=execution_id) |
236
|
|
|
LOG.debug('POSTing %s for %s. Payload - %s. TraceContext - %s', |
237
|
|
|
ACTION_TRIGGER_TYPE['name'], liveaction.id, payload, trace_context) |
238
|
|
|
self._trigger_dispatcher.dispatch(self._action_trigger, payload=payload, |
239
|
|
|
trace_context=trace_context) |
240
|
|
|
|
241
|
|
|
def _apply_post_run_policies(self, liveaction=None): |
|
|
|
|
242
|
|
|
# Apply policies defined for the action. |
243
|
|
|
policy_dbs = Policy.query(resource_ref=liveaction.action) |
244
|
|
|
LOG.debug('Applying %s post_run policies' % (len(policy_dbs))) |
245
|
|
|
|
246
|
|
|
for policy_db in policy_dbs: |
247
|
|
|
driver = policies.get_driver(policy_db.ref, |
248
|
|
|
policy_db.policy_type, |
249
|
|
|
**policy_db.parameters) |
250
|
|
|
|
251
|
|
|
try: |
252
|
|
|
LOG.debug('Applying post_run policy "%s" (%s) for liveaction %s' % |
253
|
|
|
(policy_db.ref, policy_db.policy_type, str(liveaction.id))) |
254
|
|
|
liveaction = driver.apply_after(liveaction) |
255
|
|
|
except: |
256
|
|
|
LOG.exception('An exception occurred while applying policy "%s".', policy_db.ref) |
257
|
|
|
|
258
|
|
|
def _get_runner_ref(self, action_ref): |
259
|
|
|
""" |
260
|
|
|
Retrieve a runner reference for the provided action. |
261
|
|
|
|
262
|
|
|
:rtype: ``str`` |
263
|
|
|
""" |
264
|
|
|
action = Action.get_by_ref(action_ref) |
265
|
|
|
return action['runner_type']['name'] |
266
|
|
|
|
267
|
|
|
|
268
|
|
|
def get_notifier(): |
269
|
|
|
with Connection(transport_utils.get_messaging_urls()) as conn: |
270
|
|
|
return Notifier(conn, [ACTIONUPDATE_WORK_Q], trigger_dispatcher=TriggerDispatcher(LOG)) |
271
|
|
|
|
It is generally a bad practice to shadow variables from the outer-scope. In most cases, this is done unintentionally and might lead to unexpected behavior: