1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
import eventlet |
17
|
|
|
import traceback |
18
|
|
|
import uuid |
19
|
|
|
import datetime |
20
|
|
|
|
21
|
|
|
from jsonschema import exceptions as json_schema_exceptions |
22
|
|
|
|
23
|
|
|
from st2actions.runners import ActionRunner |
24
|
|
|
from st2common import log as logging |
25
|
|
|
from st2common.constants.action import ACTION_CONTEXT_KV_PREFIX |
26
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED |
27
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT |
28
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_FAILED |
29
|
|
|
from st2common.constants.action import LIVEACTION_STATUS_CANCELED |
30
|
|
|
from st2common.constants.action import LIVEACTION_COMPLETED_STATES |
31
|
|
|
from st2common.constants.action import LIVEACTION_FAILED_STATES |
32
|
|
|
from st2common.constants.keyvalue import SYSTEM_SCOPE |
33
|
|
|
from st2common.content.loader import MetaLoader |
34
|
|
|
from st2common.exceptions.action import (ParameterRenderingFailedException, |
35
|
|
|
InvalidActionReferencedException) |
36
|
|
|
from st2common.exceptions import actionrunner as runnerexceptions |
37
|
|
|
from st2common.models.api.notification import NotificationsHelper |
38
|
|
|
from st2common.models.db.liveaction import LiveActionDB |
39
|
|
|
from st2common.models.system import actionchain |
40
|
|
|
from st2common.models.utils import action_param_utils |
41
|
|
|
from st2common.persistence.execution import ActionExecution |
42
|
|
|
from st2common.services import action as action_service |
43
|
|
|
from st2common.services.keyvalues import KeyValueLookup |
44
|
|
|
from st2common.util import action_db as action_db_util |
45
|
|
|
from st2common.util import isotime |
46
|
|
|
from st2common.util import date as date_utils |
47
|
|
|
from st2common.util import jinja as jinja_utils |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
LOG = logging.getLogger(__name__) |
51
|
|
|
RESULTS_KEY = '__results' |
52
|
|
|
JINJA_START_MARKERS = [ |
53
|
|
|
'{{', |
54
|
|
|
'{%' |
55
|
|
|
] |
56
|
|
|
PUBLISHED_VARS_KEY = 'published' |
57
|
|
|
|
58
|
|
|
|
59
|
|
|
class ChainHolder(object): |
60
|
|
|
|
61
|
|
|
def __init__(self, chainspec, chainname): |
62
|
|
|
self.actionchain = actionchain.ActionChain(**chainspec) |
63
|
|
|
self.chainname = chainname |
64
|
|
|
|
65
|
|
|
if not self.actionchain.default: |
66
|
|
|
default = self._get_default(self.actionchain) |
67
|
|
|
self.actionchain.default = default |
68
|
|
|
|
69
|
|
|
LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname) |
70
|
|
|
if not self.actionchain.default: |
71
|
|
|
raise Exception('Failed to find default node in %s.' % (self.chainname)) |
72
|
|
|
|
73
|
|
|
self.vars = {} |
74
|
|
|
|
75
|
|
|
def init_vars(self, action_parameters): |
76
|
|
|
if self.actionchain.vars: |
77
|
|
|
self.vars = self._get_rendered_vars(self.actionchain.vars, |
78
|
|
|
action_parameters=action_parameters) |
79
|
|
|
|
80
|
|
|
def validate(self): |
81
|
|
|
""" |
82
|
|
|
Function which performs a simple compile time validation. |
83
|
|
|
|
84
|
|
|
Keep in mind that some variables are only resolved during run time which means we can |
85
|
|
|
perform only simple validation during compile / create time. |
86
|
|
|
""" |
87
|
|
|
all_nodes = self._get_all_nodes(action_chain=self.actionchain) |
88
|
|
|
|
89
|
|
|
for node in self.actionchain.chain: |
90
|
|
|
on_success_node_name = node.on_success |
91
|
|
|
on_failure_node_name = node.on_failure |
92
|
|
|
|
93
|
|
|
# Check "on-success" path |
94
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
95
|
|
|
node_name=on_success_node_name) |
96
|
|
|
if not valid_name: |
97
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-success" in ' |
98
|
|
|
'task "%s".' % (on_success_node_name, node.name)) |
99
|
|
|
raise ValueError(msg) |
100
|
|
|
|
101
|
|
|
# Check "on-failure" path |
102
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
103
|
|
|
node_name=on_failure_node_name) |
104
|
|
|
if not valid_name: |
105
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-failure" in ' |
106
|
|
|
'task "%s".' % (on_failure_node_name, node.name)) |
107
|
|
|
raise ValueError(msg) |
108
|
|
|
|
109
|
|
|
# check if node specified in default is valid. |
110
|
|
|
if self.actionchain.default: |
111
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
112
|
|
|
node_name=self.actionchain.default) |
113
|
|
|
if not valid_name: |
114
|
|
|
msg = ('Unable to find node with name "%s" referenced in "default".' % |
115
|
|
|
self.actionchain.default) |
116
|
|
|
raise ValueError(msg) |
117
|
|
|
return True |
118
|
|
|
|
119
|
|
|
@staticmethod |
120
|
|
|
def _get_default(action_chain): |
121
|
|
|
# default is defined |
122
|
|
|
if action_chain.default: |
123
|
|
|
return action_chain.default |
124
|
|
|
# no nodes in chain |
125
|
|
|
if not action_chain.chain: |
126
|
|
|
return None |
127
|
|
|
# The first node with no references is the default node. Assumptions |
128
|
|
|
# that support this are : |
129
|
|
|
# 1. There are no loops in the chain. Even if there are loops there is |
130
|
|
|
# at least 1 node which does not end up in this loop. |
131
|
|
|
# 2. There are no fragments in the chain. |
132
|
|
|
all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain) |
133
|
|
|
node_names = set(all_nodes) |
134
|
|
|
on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain) |
135
|
|
|
on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain) |
136
|
|
|
referenced_nodes = on_success_nodes | on_failure_nodes |
137
|
|
|
possible_default_nodes = node_names - referenced_nodes |
138
|
|
|
if possible_default_nodes: |
139
|
|
|
# This is to preserve order. set([..]) does not preserve the order so iterate |
140
|
|
|
# over original array. |
141
|
|
|
for node in all_nodes: |
142
|
|
|
if node in possible_default_nodes: |
143
|
|
|
return node |
144
|
|
|
# If no node is found assume the first node in the chain list to be default. |
145
|
|
|
return action_chain.chain[0].name |
146
|
|
|
|
147
|
|
|
@staticmethod |
148
|
|
|
def _get_all_nodes(action_chain): |
149
|
|
|
""" |
150
|
|
|
Return names for all the nodes in the chain. |
151
|
|
|
""" |
152
|
|
|
all_nodes = [node.name for node in action_chain.chain] |
153
|
|
|
return all_nodes |
154
|
|
|
|
155
|
|
|
@staticmethod |
156
|
|
|
def _get_all_on_success_nodes(action_chain): |
157
|
|
|
""" |
158
|
|
|
Return names for all the tasks referenced in "on-success". |
159
|
|
|
""" |
160
|
|
|
on_success_nodes = set([node.on_success for node in action_chain.chain]) |
161
|
|
|
return on_success_nodes |
162
|
|
|
|
163
|
|
|
@staticmethod |
164
|
|
|
def _get_all_on_failure_nodes(action_chain): |
165
|
|
|
""" |
166
|
|
|
Return names for all the tasks referenced in "on-failure". |
167
|
|
|
""" |
168
|
|
|
on_failure_nodes = set([node.on_failure for node in action_chain.chain]) |
169
|
|
|
return on_failure_nodes |
170
|
|
|
|
171
|
|
|
def _is_valid_node_name(self, all_node_names, node_name): |
172
|
|
|
""" |
173
|
|
|
Function which validates that the provided node name is defined in the workflow definition |
174
|
|
|
and it's valid. |
175
|
|
|
|
176
|
|
|
Keep in mind that we can only perform validation for task names which don't include jinja |
177
|
|
|
expressions since those are rendered at run time. |
178
|
|
|
""" |
179
|
|
|
if not node_name: |
180
|
|
|
# This task name needs to be resolved during run time so we cant validate the name now |
181
|
|
|
return True |
182
|
|
|
|
183
|
|
|
is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name) |
184
|
|
|
if is_jinja_expression: |
185
|
|
|
# This task name needs to be resolved during run time so we cant validate the name |
186
|
|
|
# now |
187
|
|
|
return True |
188
|
|
|
|
189
|
|
|
return node_name in all_node_names |
190
|
|
|
|
191
|
|
|
@staticmethod |
192
|
|
|
def _get_rendered_vars(vars, action_parameters): |
|
|
|
|
193
|
|
|
if not vars: |
194
|
|
|
return {} |
195
|
|
|
context = {SYSTEM_SCOPE: KeyValueLookup(scope=SYSTEM_SCOPE)} |
196
|
|
|
context.update(action_parameters) |
197
|
|
|
return jinja_utils.render_values(mapping=vars, context=context) |
198
|
|
|
|
199
|
|
|
def get_node(self, node_name=None, raise_on_failure=False): |
200
|
|
|
if not node_name: |
201
|
|
|
return None |
202
|
|
|
for node in self.actionchain.chain: |
203
|
|
|
if node.name == node_name: |
204
|
|
|
return node |
205
|
|
|
if raise_on_failure: |
206
|
|
|
raise runnerexceptions.ActionRunnerException('Unable to find node with name "%s".' % |
207
|
|
|
(node_name)) |
208
|
|
|
return None |
209
|
|
|
|
210
|
|
|
def get_next_node(self, curr_node_name=None, condition='on-success'): |
211
|
|
|
if not curr_node_name: |
212
|
|
|
return self.get_node(self.actionchain.default) |
213
|
|
|
current_node = self.get_node(curr_node_name) |
214
|
|
|
if condition == 'on-success': |
215
|
|
|
return self.get_node(current_node.on_success, raise_on_failure=True) |
216
|
|
|
elif condition == 'on-failure': |
217
|
|
|
return self.get_node(current_node.on_failure, raise_on_failure=True) |
218
|
|
|
raise runnerexceptions.ActionRunnerException('Unknown condition %s.' % condition) |
219
|
|
|
|
220
|
|
|
|
221
|
|
|
class ActionChainRunner(ActionRunner): |
222
|
|
|
|
223
|
|
|
def __init__(self, runner_id): |
224
|
|
|
super(ActionChainRunner, self).__init__(runner_id=runner_id) |
225
|
|
|
self.chain_holder = None |
226
|
|
|
self._meta_loader = MetaLoader() |
227
|
|
|
self._stopped = False |
228
|
|
|
self._skip_notify_tasks = [] |
229
|
|
|
self._display_published = False |
230
|
|
|
self._chain_notify = None |
231
|
|
|
|
232
|
|
|
def pre_run(self): |
233
|
|
|
super(ActionChainRunner, self).pre_run() |
234
|
|
|
|
235
|
|
|
chainspec_file = self.entry_point |
236
|
|
|
LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
237
|
|
|
self.action) |
238
|
|
|
|
239
|
|
|
try: |
240
|
|
|
chainspec = self._meta_loader.load(file_path=chainspec_file, |
241
|
|
|
expected_type=dict) |
242
|
|
|
except Exception as e: |
243
|
|
|
message = ('Failed to parse action chain definition from "%s": %s' % |
244
|
|
|
(chainspec_file, str(e))) |
245
|
|
|
LOG.exception('Failed to load action chain definition.') |
246
|
|
|
raise runnerexceptions.ActionRunnerPreRunError(message) |
247
|
|
|
|
248
|
|
|
try: |
249
|
|
|
self.chain_holder = ChainHolder(chainspec, self.action_name) |
250
|
|
|
except json_schema_exceptions.ValidationError as e: |
251
|
|
|
# preserve the whole nasty jsonschema message as that is better to get to the |
252
|
|
|
# root cause |
253
|
|
|
message = str(e) |
254
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
255
|
|
|
raise runnerexceptions.ActionRunnerPreRunError(message) |
256
|
|
|
except Exception as e: |
257
|
|
|
message = e.message or str(e) |
258
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
259
|
|
|
raise runnerexceptions.ActionRunnerPreRunError(message) |
260
|
|
|
|
261
|
|
|
# Runner attributes are set lazily. So these steps |
262
|
|
|
# should happen outside the constructor. |
263
|
|
|
if getattr(self, 'liveaction', None): |
264
|
|
|
self._chain_notify = getattr(self.liveaction, 'notify', None) |
265
|
|
|
if self.runner_parameters: |
266
|
|
|
self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
267
|
|
|
self._display_published = self.runner_parameters.get('display_published', False) |
268
|
|
|
|
269
|
|
|
# Perform some pre-run chain validation |
270
|
|
|
try: |
271
|
|
|
self.chain_holder.validate() |
272
|
|
|
except Exception as e: |
273
|
|
|
raise runnerexceptions.ActionRunnerPreRunError(e.message) |
274
|
|
|
|
275
|
|
|
def run(self, action_parameters): |
276
|
|
|
# holds final result we store. |
277
|
|
|
result = {'tasks': []} |
278
|
|
|
# published variables are to be stored for display. |
279
|
|
|
if self._display_published: |
280
|
|
|
result[PUBLISHED_VARS_KEY] = {} |
281
|
|
|
context_result = {} # holds result which is used for the template context purposes |
282
|
|
|
top_level_error = None # stores a reference to a top level error |
283
|
|
|
fail = True |
284
|
|
|
action_node = None |
285
|
|
|
|
286
|
|
|
try: |
287
|
|
|
# initialize vars once we have the action_parameters. This allows |
288
|
|
|
# vars to refer to action_parameters. |
289
|
|
|
self.chain_holder.init_vars(action_parameters) |
290
|
|
|
action_node = self.chain_holder.get_next_node() |
291
|
|
|
except Exception as e: |
292
|
|
|
LOG.exception('Failed to get starting node "%s".', action_node.name) |
293
|
|
|
|
294
|
|
|
error = ('Failed to get starting node "%s". Lookup failed: %s' % |
295
|
|
|
(action_node.name, str(e))) |
296
|
|
|
trace = traceback.format_exc(10) |
297
|
|
|
top_level_error = { |
298
|
|
|
'error': error, |
299
|
|
|
'traceback': trace |
300
|
|
|
} |
301
|
|
|
|
302
|
|
|
parent_context = { |
303
|
|
|
'execution_id': self.execution_id |
304
|
|
|
} |
305
|
|
|
if getattr(self.liveaction, 'context', None): |
306
|
|
|
parent_context.update(self.liveaction.context) |
307
|
|
|
|
308
|
|
|
while action_node: |
309
|
|
|
fail = False |
310
|
|
|
timeout = False |
311
|
|
|
error = None |
312
|
|
|
liveaction = None |
313
|
|
|
|
314
|
|
|
created_at = date_utils.get_datetime_utc_now() |
315
|
|
|
|
316
|
|
|
try: |
317
|
|
|
liveaction = self._get_next_action( |
318
|
|
|
action_node=action_node, parent_context=parent_context, |
319
|
|
|
action_params=action_parameters, context_result=context_result) |
320
|
|
|
except InvalidActionReferencedException as e: |
321
|
|
|
error = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
322
|
|
|
(action_node.name, action_node.ref)) |
323
|
|
|
LOG.exception(error) |
324
|
|
|
|
325
|
|
|
fail = True |
326
|
|
|
top_level_error = { |
327
|
|
|
'error': error, |
328
|
|
|
'traceback': traceback.format_exc(10) |
329
|
|
|
} |
330
|
|
|
break |
331
|
|
|
except ParameterRenderingFailedException as e: |
332
|
|
|
# Rendering parameters failed before we even got to running this action, abort and |
333
|
|
|
# fail the whole action chain |
334
|
|
|
LOG.exception('Failed to run action "%s".', action_node.name) |
335
|
|
|
|
336
|
|
|
fail = True |
337
|
|
|
error = ('Failed to run task "%s". Parameter rendering failed: %s' % |
338
|
|
|
(action_node.name, str(e))) |
339
|
|
|
trace = traceback.format_exc(10) |
340
|
|
|
top_level_error = { |
341
|
|
|
'error': error, |
342
|
|
|
'traceback': trace |
343
|
|
|
} |
344
|
|
|
break |
345
|
|
|
|
346
|
|
|
try: |
347
|
|
|
liveaction = self._run_action(liveaction) |
348
|
|
|
except Exception as e: |
349
|
|
|
# Save the traceback and error message |
350
|
|
|
LOG.exception('Failure in running action "%s".', action_node.name) |
351
|
|
|
|
352
|
|
|
error = { |
353
|
|
|
'error': 'Task "%s" failed: %s' % (action_node.name, str(e)), |
354
|
|
|
'traceback': traceback.format_exc(10) |
355
|
|
|
} |
356
|
|
|
context_result[action_node.name] = error |
357
|
|
|
else: |
358
|
|
|
# Update context result |
359
|
|
|
context_result[action_node.name] = liveaction.result |
360
|
|
|
|
361
|
|
|
# Render and publish variables |
362
|
|
|
rendered_publish_vars = ActionChainRunner._render_publish_vars( |
363
|
|
|
action_node=action_node, action_parameters=action_parameters, |
364
|
|
|
execution_result=liveaction.result, previous_execution_results=context_result, |
365
|
|
|
chain_vars=self.chain_holder.vars) |
366
|
|
|
|
367
|
|
|
if rendered_publish_vars: |
368
|
|
|
self.chain_holder.vars.update(rendered_publish_vars) |
369
|
|
|
if self._display_published: |
370
|
|
|
result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
371
|
|
|
finally: |
372
|
|
|
# Record result and resolve a next node based on the task success or failure |
373
|
|
|
updated_at = date_utils.get_datetime_utc_now() |
374
|
|
|
|
375
|
|
|
format_kwargs = {'action_node': action_node, 'liveaction_db': liveaction, |
376
|
|
|
'created_at': created_at, 'updated_at': updated_at} |
377
|
|
|
|
378
|
|
|
if error: |
379
|
|
|
format_kwargs['error'] = error |
380
|
|
|
|
381
|
|
|
task_result = self._format_action_exec_result(**format_kwargs) |
382
|
|
|
result['tasks'].append(task_result) |
383
|
|
|
|
384
|
|
|
if self.liveaction_id: |
385
|
|
|
self._stopped = action_service.is_action_canceled_or_canceling( |
386
|
|
|
self.liveaction_id) |
387
|
|
|
|
388
|
|
|
if self._stopped: |
389
|
|
|
LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
390
|
|
|
status = LIVEACTION_STATUS_CANCELED |
391
|
|
|
return (status, result, None) |
|
|
|
|
392
|
|
|
|
393
|
|
|
try: |
394
|
|
|
if not liveaction: |
395
|
|
|
fail = True |
396
|
|
|
action_node = self.chain_holder.get_next_node(action_node.name, |
397
|
|
|
condition='on-failure') |
398
|
|
|
elif liveaction.status in LIVEACTION_FAILED_STATES: |
399
|
|
|
if liveaction and liveaction.status == LIVEACTION_STATUS_TIMED_OUT: |
400
|
|
|
timeout = True |
401
|
|
|
else: |
402
|
|
|
fail = True |
403
|
|
|
action_node = self.chain_holder.get_next_node(action_node.name, |
404
|
|
|
condition='on-failure') |
405
|
|
|
elif liveaction.status == LIVEACTION_STATUS_CANCELED: |
406
|
|
|
# User canceled an action (task) in the workflow - cancel the execution of |
407
|
|
|
# rest of the workflow |
408
|
|
|
self._stopped = True |
409
|
|
|
LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
410
|
|
|
elif liveaction.status == LIVEACTION_STATUS_SUCCEEDED: |
411
|
|
|
action_node = self.chain_holder.get_next_node(action_node.name, |
412
|
|
|
condition='on-success') |
413
|
|
|
except Exception as e: |
414
|
|
|
LOG.exception('Failed to get next node "%s".', action_node.name) |
415
|
|
|
|
416
|
|
|
fail = True |
417
|
|
|
error = ('Failed to get next node "%s". Lookup failed: %s' % |
418
|
|
|
(action_node.name, str(e))) |
419
|
|
|
trace = traceback.format_exc(10) |
420
|
|
|
top_level_error = { |
421
|
|
|
'error': error, |
422
|
|
|
'traceback': trace |
423
|
|
|
} |
424
|
|
|
# reset action_node here so that chain breaks on failure. |
425
|
|
|
action_node = None |
426
|
|
|
break |
|
|
|
|
427
|
|
|
|
428
|
|
|
if self._stopped: |
429
|
|
|
LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
430
|
|
|
status = LIVEACTION_STATUS_CANCELED |
431
|
|
|
return (status, result, None) |
|
|
|
|
432
|
|
|
|
433
|
|
|
if fail: |
434
|
|
|
status = LIVEACTION_STATUS_FAILED |
435
|
|
|
elif timeout: |
436
|
|
|
status = LIVEACTION_STATUS_TIMED_OUT |
437
|
|
|
else: |
438
|
|
|
status = LIVEACTION_STATUS_SUCCEEDED |
439
|
|
|
|
440
|
|
|
if top_level_error: |
441
|
|
|
# Include top level error information |
442
|
|
|
result['error'] = top_level_error['error'] |
443
|
|
|
result['traceback'] = top_level_error['traceback'] |
444
|
|
|
|
445
|
|
|
return (status, result, None) |
446
|
|
|
|
447
|
|
|
@staticmethod |
448
|
|
|
def _render_publish_vars(action_node, action_parameters, execution_result, |
449
|
|
|
previous_execution_results, chain_vars): |
450
|
|
|
""" |
451
|
|
|
If no output is specified on the action_node the output is the entire execution_result. |
452
|
|
|
If any output is specified then only those variables are published as output of an |
453
|
|
|
execution of this action_node. |
454
|
|
|
The output variable can refer to a variable from the execution_result, |
455
|
|
|
previous_execution_results or chain_vars. |
456
|
|
|
""" |
457
|
|
|
if not action_node.publish: |
458
|
|
|
return {} |
459
|
|
|
|
460
|
|
|
context = {} |
461
|
|
|
context.update(action_parameters) |
462
|
|
|
context.update({action_node.name: execution_result}) |
463
|
|
|
context.update(previous_execution_results) |
464
|
|
|
context.update(chain_vars) |
465
|
|
|
context.update({RESULTS_KEY: previous_execution_results}) |
466
|
|
|
context.update({SYSTEM_SCOPE: KeyValueLookup(scope=SYSTEM_SCOPE)}) |
467
|
|
|
|
468
|
|
|
try: |
469
|
|
|
rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
470
|
|
|
context=context) |
471
|
|
|
except Exception as e: |
472
|
|
|
key = getattr(e, 'key', None) |
473
|
|
|
value = getattr(e, 'value', None) |
474
|
|
|
msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
475
|
|
|
'(template string=%s): %s' % (key, action_node.name, value, str(e))) |
476
|
|
|
raise ParameterRenderingFailedException(msg) |
477
|
|
|
|
478
|
|
|
return rendered_result |
479
|
|
|
|
480
|
|
|
@staticmethod |
481
|
|
|
def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
482
|
|
|
# setup context with original parameters and the intermediate results. |
483
|
|
|
context = {} |
484
|
|
|
context.update(original_parameters) |
485
|
|
|
context.update(results) |
486
|
|
|
context.update(chain_vars) |
487
|
|
|
context.update({RESULTS_KEY: results}) |
488
|
|
|
context.update({SYSTEM_SCOPE: KeyValueLookup(scope=SYSTEM_SCOPE)}) |
489
|
|
|
context.update({ACTION_CONTEXT_KV_PREFIX: chain_context}) |
490
|
|
|
try: |
491
|
|
|
rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
492
|
|
|
context=context) |
493
|
|
|
except Exception as e: |
494
|
|
|
LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
495
|
|
|
|
496
|
|
|
key = getattr(e, 'key', None) |
497
|
|
|
value = getattr(e, 'value', None) |
498
|
|
|
msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
499
|
|
|
'(template string=%s): %s') % (key, action_node.name, value, str(e)) |
500
|
|
|
raise ParameterRenderingFailedException(msg) |
501
|
|
|
LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
502
|
|
|
return rendered_params |
503
|
|
|
|
504
|
|
|
def _get_next_action(self, action_node, parent_context, action_params, context_result): |
505
|
|
|
# Verify that the referenced action exists |
506
|
|
|
# TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
507
|
|
|
task_name = action_node.name |
508
|
|
|
action_ref = action_node.ref |
509
|
|
|
action_db = action_db_util.get_action_by_ref(ref=action_ref) |
510
|
|
|
|
511
|
|
|
if not action_db: |
512
|
|
|
error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
513
|
|
|
raise InvalidActionReferencedException(error) |
514
|
|
|
|
515
|
|
|
resolved_params = ActionChainRunner._resolve_params( |
516
|
|
|
action_node=action_node, original_parameters=action_params, |
517
|
|
|
results=context_result, chain_vars=self.chain_holder.vars, |
518
|
|
|
chain_context={'parent': parent_context}) |
519
|
|
|
|
520
|
|
|
liveaction = self._build_liveaction_object( |
521
|
|
|
action_node=action_node, |
522
|
|
|
resolved_params=resolved_params, |
523
|
|
|
parent_context=parent_context) |
524
|
|
|
|
525
|
|
|
return liveaction |
526
|
|
|
|
527
|
|
|
def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
528
|
|
|
""" |
529
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
530
|
|
|
:type sleep_delay: ``float`` |
531
|
|
|
""" |
532
|
|
|
try: |
533
|
|
|
# request return canceled |
534
|
|
|
liveaction, _ = action_service.request(liveaction) |
535
|
|
|
except Exception as e: |
536
|
|
|
liveaction.status = LIVEACTION_STATUS_FAILED |
537
|
|
|
LOG.exception('Failed to schedule liveaction.') |
538
|
|
|
raise e |
539
|
|
|
|
540
|
|
|
while (wait_for_completion and liveaction.status not in LIVEACTION_COMPLETED_STATES): |
|
|
|
|
541
|
|
|
eventlet.sleep(sleep_delay) |
542
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
543
|
|
|
|
544
|
|
|
return liveaction |
545
|
|
|
|
546
|
|
|
def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
547
|
|
|
liveaction = LiveActionDB(action=action_node.ref) |
548
|
|
|
|
549
|
|
|
# Setup notify for task in chain. |
550
|
|
|
notify = self._get_notify(action_node) |
551
|
|
|
if notify: |
552
|
|
|
liveaction.notify = notify |
553
|
|
|
LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
554
|
|
|
|
555
|
|
|
liveaction.context = { |
556
|
|
|
'parent': parent_context, |
557
|
|
|
'chain': vars(action_node) |
558
|
|
|
} |
559
|
|
|
|
560
|
|
|
liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
561
|
|
|
params=resolved_params) |
562
|
|
|
return liveaction |
563
|
|
|
|
564
|
|
|
def _get_notify(self, action_node): |
565
|
|
|
if action_node.name not in self._skip_notify_tasks: |
566
|
|
|
if action_node.notify: |
567
|
|
|
task_notify = NotificationsHelper.to_model(action_node.notify) |
568
|
|
|
return task_notify |
569
|
|
|
elif self._chain_notify: |
570
|
|
|
return self._chain_notify |
571
|
|
|
|
572
|
|
|
return None |
573
|
|
|
|
574
|
|
|
def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
575
|
|
|
error=None): |
576
|
|
|
""" |
577
|
|
|
Format ActionExecution result so it can be used in the final action result output. |
578
|
|
|
|
579
|
|
|
:rtype: ``dict`` |
580
|
|
|
""" |
581
|
|
|
assert isinstance(created_at, datetime.datetime) |
582
|
|
|
assert isinstance(updated_at, datetime.datetime) |
583
|
|
|
|
584
|
|
|
result = {} |
585
|
|
|
|
586
|
|
|
execution_db = None |
587
|
|
|
if liveaction_db: |
588
|
|
|
execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
589
|
|
|
|
590
|
|
|
result['id'] = action_node.name |
591
|
|
|
result['name'] = action_node.name |
592
|
|
|
result['execution_id'] = str(execution_db.id) if execution_db else None |
593
|
|
|
result['workflow'] = None |
594
|
|
|
|
595
|
|
|
result['created_at'] = isotime.format(dt=created_at) |
596
|
|
|
result['updated_at'] = isotime.format(dt=updated_at) |
597
|
|
|
|
598
|
|
|
if error or not liveaction_db: |
599
|
|
|
result['state'] = LIVEACTION_STATUS_FAILED |
600
|
|
|
else: |
601
|
|
|
result['state'] = liveaction_db.status |
602
|
|
|
|
603
|
|
|
if error: |
604
|
|
|
result['result'] = error |
605
|
|
|
else: |
606
|
|
|
result['result'] = liveaction_db.result |
607
|
|
|
|
608
|
|
|
return result |
609
|
|
|
|
610
|
|
|
|
611
|
|
|
def get_runner(): |
612
|
|
|
return ActionChainRunner(str(uuid.uuid4())) |
613
|
|
|
|
It is generally discouraged to redefine built-ins as this makes code very hard to read.