1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
from __future__ import absolute_import |
17
|
|
|
import copy |
18
|
|
|
import eventlet |
19
|
|
|
import traceback |
20
|
|
|
import uuid |
21
|
|
|
import datetime |
22
|
|
|
|
23
|
|
|
from jsonschema import exceptions as json_schema_exc |
24
|
|
|
|
25
|
|
|
from st2common.runners.base import ActionRunner |
26
|
|
|
from st2common.runners.base import get_metadata as get_runner_metadata |
27
|
|
|
from st2common import log as logging |
28
|
|
|
from st2common.constants import action as action_constants |
29
|
|
|
from st2common.constants import pack as pack_constants |
30
|
|
|
from st2common.constants import keyvalue as kv_constants |
31
|
|
|
from st2common.content.loader import MetaLoader |
32
|
|
|
from st2common.exceptions import action as action_exc |
33
|
|
|
from st2common.exceptions import actionrunner as runner_exc |
34
|
|
|
from st2common.exceptions import db as db_exc |
35
|
|
|
from st2common.models.api.notification import NotificationsHelper |
36
|
|
|
from st2common.models.db.liveaction import LiveActionDB |
37
|
|
|
from st2common.models.system import actionchain |
38
|
|
|
from st2common.models.utils import action_param_utils |
39
|
|
|
from st2common.persistence.execution import ActionExecution |
40
|
|
|
from st2common.persistence.liveaction import LiveAction |
41
|
|
|
from st2common.services import action as action_service |
42
|
|
|
from st2common.services import keyvalues as kv_service |
43
|
|
|
from st2common.util import action_db as action_db_util |
44
|
|
|
from st2common.util import isotime |
45
|
|
|
from st2common.util import date as date_utils |
46
|
|
|
from st2common.util import jinja as jinja_utils |
47
|
|
|
from st2common.util import param as param_utils |
48
|
|
|
from st2common.util.config_loader import get_config |
49
|
|
|
|
50
|
|
|
__all__ = [ |
51
|
|
|
'ActionChainRunner', |
52
|
|
|
'ChainHolder', |
53
|
|
|
|
54
|
|
|
'get_runner', |
55
|
|
|
'get_metadata' |
56
|
|
|
] |
57
|
|
|
|
58
|
|
|
LOG = logging.getLogger(__name__) |
59
|
|
|
|
60
|
|
|
RESULTS_KEY = '__results' |
61
|
|
|
JINJA_START_MARKERS = [ |
62
|
|
|
'{{', |
63
|
|
|
'{%' |
64
|
|
|
] |
65
|
|
|
PUBLISHED_VARS_KEY = 'published' |
66
|
|
|
|
67
|
|
|
|
68
|
|
|
class ChainHolder(object): |
69
|
|
|
|
70
|
|
|
def __init__(self, chainspec, chainname): |
71
|
|
|
self.actionchain = actionchain.ActionChain(**chainspec) |
72
|
|
|
self.chainname = chainname |
73
|
|
|
|
74
|
|
|
if not self.actionchain.default: |
75
|
|
|
default = self._get_default(self.actionchain) |
76
|
|
|
self.actionchain.default = default |
77
|
|
|
|
78
|
|
|
LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname) |
79
|
|
|
if not self.actionchain.default: |
80
|
|
|
raise Exception('Failed to find default node in %s.' % (self.chainname)) |
81
|
|
|
|
82
|
|
|
self.vars = {} |
83
|
|
|
|
84
|
|
|
def init_vars(self, action_parameters): |
85
|
|
|
if self.actionchain.vars: |
86
|
|
|
self.vars = self._get_rendered_vars(self.actionchain.vars, |
87
|
|
|
action_parameters=action_parameters) |
88
|
|
|
|
89
|
|
|
def restore_vars(self, ctx_vars): |
90
|
|
|
self.vars.update(copy.deepcopy(ctx_vars)) |
91
|
|
|
|
92
|
|
|
def validate(self): |
93
|
|
|
""" |
94
|
|
|
Function which performs a simple compile time validation. |
95
|
|
|
|
96
|
|
|
Keep in mind that some variables are only resolved during run time which means we can |
97
|
|
|
perform only simple validation during compile / create time. |
98
|
|
|
""" |
99
|
|
|
all_nodes = self._get_all_nodes(action_chain=self.actionchain) |
100
|
|
|
|
101
|
|
|
for node in self.actionchain.chain: |
102
|
|
|
on_success_node_name = node.on_success |
103
|
|
|
on_failure_node_name = node.on_failure |
104
|
|
|
|
105
|
|
|
# Check "on-success" path |
106
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
107
|
|
|
node_name=on_success_node_name) |
108
|
|
|
if not valid_name: |
109
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-success" in ' |
110
|
|
|
'task "%s".' % (on_success_node_name, node.name)) |
111
|
|
|
raise ValueError(msg) |
112
|
|
|
|
113
|
|
|
# Check "on-failure" path |
114
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
115
|
|
|
node_name=on_failure_node_name) |
116
|
|
|
if not valid_name: |
117
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-failure" in ' |
118
|
|
|
'task "%s".' % (on_failure_node_name, node.name)) |
119
|
|
|
raise ValueError(msg) |
120
|
|
|
|
121
|
|
|
# check if node specified in default is valid. |
122
|
|
|
if self.actionchain.default: |
123
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
124
|
|
|
node_name=self.actionchain.default) |
125
|
|
|
if not valid_name: |
126
|
|
|
msg = ('Unable to find node with name "%s" referenced in "default".' % |
127
|
|
|
self.actionchain.default) |
128
|
|
|
raise ValueError(msg) |
129
|
|
|
return True |
130
|
|
|
|
131
|
|
|
@staticmethod |
132
|
|
|
def _get_default(action_chain): |
133
|
|
|
# default is defined |
134
|
|
|
if action_chain.default: |
135
|
|
|
return action_chain.default |
136
|
|
|
# no nodes in chain |
137
|
|
|
if not action_chain.chain: |
138
|
|
|
return None |
139
|
|
|
# The first node with no references is the default node. Assumptions |
140
|
|
|
# that support this are : |
141
|
|
|
# 1. There are no loops in the chain. Even if there are loops there is |
142
|
|
|
# at least 1 node which does not end up in this loop. |
143
|
|
|
# 2. There are no fragments in the chain. |
144
|
|
|
all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain) |
145
|
|
|
node_names = set(all_nodes) |
146
|
|
|
on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain) |
147
|
|
|
on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain) |
148
|
|
|
referenced_nodes = on_success_nodes | on_failure_nodes |
149
|
|
|
possible_default_nodes = node_names - referenced_nodes |
150
|
|
|
if possible_default_nodes: |
151
|
|
|
# This is to preserve order. set([..]) does not preserve the order so iterate |
152
|
|
|
# over original array. |
153
|
|
|
for node in all_nodes: |
154
|
|
|
if node in possible_default_nodes: |
155
|
|
|
return node |
156
|
|
|
# If no node is found assume the first node in the chain list to be default. |
157
|
|
|
return action_chain.chain[0].name |
158
|
|
|
|
159
|
|
|
@staticmethod |
160
|
|
|
def _get_all_nodes(action_chain): |
161
|
|
|
""" |
162
|
|
|
Return names for all the nodes in the chain. |
163
|
|
|
""" |
164
|
|
|
all_nodes = [node.name for node in action_chain.chain] |
165
|
|
|
return all_nodes |
166
|
|
|
|
167
|
|
|
@staticmethod |
168
|
|
|
def _get_all_on_success_nodes(action_chain): |
169
|
|
|
""" |
170
|
|
|
Return names for all the tasks referenced in "on-success". |
171
|
|
|
""" |
172
|
|
|
on_success_nodes = set([node.on_success for node in action_chain.chain]) |
173
|
|
|
return on_success_nodes |
174
|
|
|
|
175
|
|
|
@staticmethod |
176
|
|
|
def _get_all_on_failure_nodes(action_chain): |
177
|
|
|
""" |
178
|
|
|
Return names for all the tasks referenced in "on-failure". |
179
|
|
|
""" |
180
|
|
|
on_failure_nodes = set([node.on_failure for node in action_chain.chain]) |
181
|
|
|
return on_failure_nodes |
182
|
|
|
|
183
|
|
|
def _is_valid_node_name(self, all_node_names, node_name): |
184
|
|
|
""" |
185
|
|
|
Function which validates that the provided node name is defined in the workflow definition |
186
|
|
|
and it's valid. |
187
|
|
|
|
188
|
|
|
Keep in mind that we can only perform validation for task names which don't include jinja |
189
|
|
|
expressions since those are rendered at run time. |
190
|
|
|
""" |
191
|
|
|
if not node_name: |
192
|
|
|
# This task name needs to be resolved during run time so we cant validate the name now |
193
|
|
|
return True |
194
|
|
|
|
195
|
|
|
is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name) |
196
|
|
|
if is_jinja_expression: |
197
|
|
|
# This task name needs to be resolved during run time so we cant validate the name |
198
|
|
|
# now |
199
|
|
|
return True |
200
|
|
|
|
201
|
|
|
return node_name in all_node_names |
202
|
|
|
|
203
|
|
|
@staticmethod |
204
|
|
|
def _get_rendered_vars(vars, action_parameters): |
|
|
|
|
205
|
|
|
if not vars: |
206
|
|
|
return {} |
207
|
|
|
context = {} |
208
|
|
|
context.update({ |
209
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
210
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
211
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
212
|
|
|
} |
213
|
|
|
}) |
214
|
|
|
context.update(action_parameters) |
215
|
|
|
LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context) |
216
|
|
|
return jinja_utils.render_values(mapping=vars, context=context) |
217
|
|
|
|
218
|
|
|
def get_node(self, node_name=None, raise_on_failure=False): |
219
|
|
|
if not node_name: |
220
|
|
|
return None |
221
|
|
|
for node in self.actionchain.chain: |
222
|
|
|
if node.name == node_name: |
223
|
|
|
return node |
224
|
|
|
if raise_on_failure: |
225
|
|
|
raise runner_exc.ActionRunnerException( |
226
|
|
|
'Unable to find node with name "%s".' % (node_name)) |
227
|
|
|
return None |
228
|
|
|
|
229
|
|
|
def get_next_node(self, curr_node_name=None, condition='on-success'): |
230
|
|
|
if not curr_node_name: |
231
|
|
|
return self.get_node(self.actionchain.default) |
232
|
|
|
current_node = self.get_node(curr_node_name) |
233
|
|
|
if condition == 'on-success': |
234
|
|
|
return self.get_node(current_node.on_success, raise_on_failure=True) |
235
|
|
|
elif condition == 'on-failure': |
236
|
|
|
return self.get_node(current_node.on_failure, raise_on_failure=True) |
237
|
|
|
raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition) |
238
|
|
|
|
239
|
|
|
|
240
|
|
|
class ActionChainRunner(ActionRunner): |
241
|
|
|
|
242
|
|
|
def __init__(self, runner_id): |
243
|
|
|
super(ActionChainRunner, self).__init__(runner_id=runner_id) |
244
|
|
|
self.chain_holder = None |
245
|
|
|
self._meta_loader = MetaLoader() |
246
|
|
|
self._skip_notify_tasks = [] |
247
|
|
|
self._display_published = True |
248
|
|
|
self._chain_notify = None |
249
|
|
|
|
250
|
|
|
def pre_run(self): |
251
|
|
|
super(ActionChainRunner, self).pre_run() |
252
|
|
|
|
253
|
|
|
chainspec_file = self.entry_point |
254
|
|
|
LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
255
|
|
|
self.action) |
256
|
|
|
|
257
|
|
|
try: |
258
|
|
|
chainspec = self._meta_loader.load(file_path=chainspec_file, |
259
|
|
|
expected_type=dict) |
260
|
|
|
except Exception as e: |
261
|
|
|
message = ('Failed to parse action chain definition from "%s": %s' % |
262
|
|
|
(chainspec_file, str(e))) |
263
|
|
|
LOG.exception('Failed to load action chain definition.') |
264
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
265
|
|
|
|
266
|
|
|
try: |
267
|
|
|
self.chain_holder = ChainHolder(chainspec, self.action_name) |
268
|
|
|
except json_schema_exc.ValidationError as e: |
269
|
|
|
# preserve the whole nasty jsonschema message as that is better to get to the |
270
|
|
|
# root cause |
271
|
|
|
message = str(e) |
272
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
273
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
274
|
|
|
except Exception as e: |
275
|
|
|
message = str(e) |
276
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
277
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
278
|
|
|
|
279
|
|
|
# Runner attributes are set lazily. So these steps |
280
|
|
|
# should happen outside the constructor. |
281
|
|
|
if getattr(self, 'liveaction', None): |
282
|
|
|
self._chain_notify = getattr(self.liveaction, 'notify', None) |
283
|
|
|
if self.runner_parameters: |
284
|
|
|
self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
285
|
|
|
self._display_published = self.runner_parameters.get('display_published', True) |
286
|
|
|
|
287
|
|
|
# Perform some pre-run chain validation |
288
|
|
|
try: |
289
|
|
|
self.chain_holder.validate() |
290
|
|
|
except Exception as e: |
291
|
|
|
raise runner_exc.ActionRunnerPreRunError(str(e)) |
292
|
|
|
|
293
|
|
|
def run(self, action_parameters): |
294
|
|
|
# Run the action chain. |
295
|
|
|
return self._run_chain(action_parameters) |
296
|
|
|
|
297
|
|
|
def cancel(self): |
298
|
|
|
# Identify the list of action executions that are workflows and cascade pause. |
299
|
|
|
for child_exec_id in self.execution.children: |
300
|
|
|
child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True) |
301
|
|
|
if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and |
302
|
|
|
child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES): |
303
|
|
|
action_service.request_cancellation( |
304
|
|
|
LiveAction.get(id=child_exec.liveaction['id']), |
305
|
|
|
self.context.get('user', None) |
306
|
|
|
) |
307
|
|
|
|
308
|
|
|
return ( |
309
|
|
|
action_constants.LIVEACTION_STATUS_CANCELING, |
310
|
|
|
self.liveaction.result, |
311
|
|
|
self.liveaction.context |
312
|
|
|
) |
313
|
|
|
|
314
|
|
|
def pause(self): |
315
|
|
|
# Identify the list of action executions that are workflows and cascade pause. |
316
|
|
|
for child_exec_id in self.execution.children: |
317
|
|
|
child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True) |
318
|
|
|
if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and |
319
|
|
|
child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING): |
320
|
|
|
action_service.request_pause( |
321
|
|
|
LiveAction.get(id=child_exec.liveaction['id']), |
322
|
|
|
self.context.get('user', None) |
323
|
|
|
) |
324
|
|
|
|
325
|
|
|
return ( |
326
|
|
|
action_constants.LIVEACTION_STATUS_PAUSING, |
327
|
|
|
self.liveaction.result, |
328
|
|
|
self.liveaction.context |
329
|
|
|
) |
330
|
|
|
|
331
|
|
|
def resume(self): |
332
|
|
|
# Restore runner and action parameters since they are not provided on resume. |
333
|
|
|
runner_parameters, action_parameters = param_utils.render_final_params( |
334
|
|
|
self.runner_type.runner_parameters, |
335
|
|
|
self.action.parameters, |
336
|
|
|
self.liveaction.parameters, |
|
|
|
|
337
|
|
|
self.liveaction.context |
|
|
|
|
338
|
|
|
) |
339
|
|
|
|
340
|
|
|
# Assign runner parameters needed for pre-run. |
341
|
|
|
if runner_parameters: |
342
|
|
|
self.runner_parameters = runner_parameters |
343
|
|
|
|
344
|
|
|
# Restore chain holder if it is not initialized. |
345
|
|
|
if not self.chain_holder: |
346
|
|
|
self.pre_run() |
347
|
|
|
|
348
|
|
|
# Change the status of the liveaction from resuming to running. |
349
|
|
|
self.liveaction = action_service.update_status( |
350
|
|
|
self.liveaction, |
351
|
|
|
action_constants.LIVEACTION_STATUS_RUNNING, |
352
|
|
|
publish=False |
353
|
|
|
) |
354
|
|
|
|
355
|
|
|
# Run the action chain. |
356
|
|
|
return self._run_chain(action_parameters, resuming=True) |
357
|
|
|
|
358
|
|
|
def _run_chain(self, action_parameters, resuming=False): |
359
|
|
|
# Set chain status to fail unless explicitly set to succeed. |
360
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
361
|
|
|
|
362
|
|
|
# Result holds the final result that the chain store in the database. |
363
|
|
|
result = {'tasks': []} |
364
|
|
|
|
365
|
|
|
# Save published variables into the result if specified. |
366
|
|
|
if self._display_published: |
367
|
|
|
result[PUBLISHED_VARS_KEY] = {} |
368
|
|
|
|
369
|
|
|
context_result = {} # Holds result which is used for the template context purposes |
370
|
|
|
top_level_error = None # Stores a reference to a top level error |
371
|
|
|
action_node = None |
372
|
|
|
last_task = None |
373
|
|
|
|
374
|
|
|
try: |
375
|
|
|
# Initialize vars with the action parameters. |
376
|
|
|
# This allows action parameers to be referenced from vars. |
377
|
|
|
self.chain_holder.init_vars(action_parameters) |
378
|
|
|
except Exception as e: |
379
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
380
|
|
|
m = 'Failed initializing ``vars`` in chain.' |
381
|
|
|
LOG.exception(m) |
382
|
|
|
top_level_error = self._format_error(e, m) |
383
|
|
|
result.update(top_level_error) |
384
|
|
|
return (chain_status, result, None) |
385
|
|
|
|
386
|
|
|
# Restore state on resuming an existing chain execution. |
387
|
|
|
if resuming: |
388
|
|
|
# Restore vars is any from the liveaction. |
389
|
|
|
ctx_vars = self.liveaction.context.pop('vars', {}) |
390
|
|
|
self.chain_holder.restore_vars(ctx_vars) |
391
|
|
|
|
392
|
|
|
# Restore result if any from the liveaction. |
393
|
|
|
if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result: |
394
|
|
|
result = self.liveaction.result |
395
|
|
|
|
396
|
|
|
# Initialize or rebuild existing context_result from liveaction |
397
|
|
|
# which holds the result used for resolving context in Jinja template. |
398
|
|
|
for task in result.get('tasks', []): |
399
|
|
|
context_result[task['name']] = task['result'] |
400
|
|
|
|
401
|
|
|
# Restore or initialize the top_level_error |
402
|
|
|
# that stores a reference to a top level error. |
403
|
|
|
if 'error' in result or 'traceback' in result: |
404
|
|
|
top_level_error = { |
405
|
|
|
'error': result.get('error'), |
406
|
|
|
'traceback': result.get('traceback') |
407
|
|
|
} |
408
|
|
|
|
409
|
|
|
# If there are no executed tasks in the chain, then get the first node. |
410
|
|
|
if len(result['tasks']) <= 0: |
411
|
|
|
try: |
412
|
|
|
action_node = self.chain_holder.get_next_node() |
413
|
|
|
except Exception as e: |
414
|
|
|
m = 'Failed to get starting node "%s".', action_node.name |
415
|
|
|
LOG.exception(m) |
416
|
|
|
top_level_error = self._format_error(e, m) |
417
|
|
|
|
418
|
|
|
# If there are no action node to run next, then mark the chain successful. |
419
|
|
|
if not action_node: |
420
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
421
|
|
|
|
422
|
|
|
# Otherwise, figure out the last task executed and |
423
|
|
|
# its state to determine where to begin executing. |
424
|
|
|
else: |
425
|
|
|
last_task = result['tasks'][-1] |
426
|
|
|
action_node = self.chain_holder.get_node(last_task['name']) |
427
|
|
|
liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
428
|
|
|
|
429
|
|
|
# If the liveaction of the last task has changed, update the result entry. |
430
|
|
|
if liveaction.status != last_task['state']: |
431
|
|
|
updated_task_result = self._get_updated_action_exec_result( |
432
|
|
|
action_node, liveaction, last_task) |
433
|
|
|
del result['tasks'][-1] |
434
|
|
|
result['tasks'].append(updated_task_result) |
435
|
|
|
|
436
|
|
|
# Also need to update context_result so the updated result |
437
|
|
|
# is available to Jinja expressions |
438
|
|
|
updated_task_name = updated_task_result['name'] |
439
|
|
|
context_result[updated_task_name]['result'] = updated_task_result['result'] |
440
|
|
|
|
441
|
|
|
# If the last task was canceled, then canceled the chain altogether. |
442
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
443
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
444
|
|
|
return (chain_status, result, None) |
445
|
|
|
|
446
|
|
|
# If the last task was paused, then stay on this action node. |
447
|
|
|
# This is explicitly put here for clarity. |
448
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
449
|
|
|
pass |
450
|
|
|
|
451
|
|
|
# If the last task succeeded, then get the next on-success action node. |
452
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
453
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
454
|
|
|
action_node = self.chain_holder.get_next_node( |
455
|
|
|
last_task['name'], condition='on-success') |
456
|
|
|
|
457
|
|
|
# If the last task failed, then get the next on-failure action node. |
458
|
|
|
if liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
459
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
460
|
|
|
action_node = self.chain_holder.get_next_node( |
461
|
|
|
last_task['name'], condition='on-failure') |
462
|
|
|
|
463
|
|
|
# Setup parent context. |
464
|
|
|
parent_context = { |
465
|
|
|
'execution_id': self.execution_id |
466
|
|
|
} |
467
|
|
|
|
468
|
|
|
if getattr(self.liveaction, 'context', None): |
469
|
|
|
parent_context.update(self.liveaction.context) |
470
|
|
|
|
471
|
|
|
# Run the action chain until there are no more tasks. |
472
|
|
|
while action_node: |
473
|
|
|
error = None |
474
|
|
|
liveaction = None |
475
|
|
|
last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None |
476
|
|
|
created_at = date_utils.get_datetime_utc_now() |
477
|
|
|
|
478
|
|
|
try: |
479
|
|
|
# If last task was paused, then fetch the liveaction and resume it first. |
480
|
|
|
if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
481
|
|
|
liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
482
|
|
|
del result['tasks'][-1] |
483
|
|
|
else: |
484
|
|
|
liveaction = self._get_next_action( |
485
|
|
|
action_node=action_node, parent_context=parent_context, |
486
|
|
|
action_params=action_parameters, context_result=context_result) |
487
|
|
|
except action_exc.InvalidActionReferencedException as e: |
488
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
489
|
|
|
m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
490
|
|
|
(action_node.name, action_node.ref)) |
491
|
|
|
LOG.exception(m) |
492
|
|
|
top_level_error = self._format_error(e, m) |
493
|
|
|
break |
494
|
|
|
except action_exc.ParameterRenderingFailedException as e: |
495
|
|
|
# Rendering parameters failed before we even got to running this action, |
496
|
|
|
# abort and fail the whole action chain |
497
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
498
|
|
|
m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name |
499
|
|
|
LOG.exception(m) |
500
|
|
|
top_level_error = self._format_error(e, m) |
501
|
|
|
break |
502
|
|
|
except db_exc.StackStormDBObjectNotFoundError as e: |
503
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
504
|
|
|
m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name |
505
|
|
|
LOG.exception(m) |
506
|
|
|
top_level_error = self._format_error(e, m) |
507
|
|
|
break |
508
|
|
|
|
509
|
|
|
try: |
510
|
|
|
# If last task was paused, then fetch the liveaction and resume it first. |
511
|
|
|
if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
512
|
|
|
LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id) |
513
|
|
|
liveaction = self._resume_action(liveaction) |
514
|
|
|
else: |
515
|
|
|
LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id) |
516
|
|
|
liveaction = self._run_action(liveaction) |
517
|
|
|
except Exception as e: |
518
|
|
|
# Save the traceback and error message |
519
|
|
|
m = 'Failed running task "%s".' % action_node.name |
520
|
|
|
LOG.exception(m) |
521
|
|
|
error = self._format_error(e, m) |
522
|
|
|
context_result[action_node.name] = error |
523
|
|
|
else: |
524
|
|
|
# Update context result |
525
|
|
|
context_result[action_node.name] = liveaction.result |
526
|
|
|
|
527
|
|
|
# Render and publish variables |
528
|
|
|
rendered_publish_vars = ActionChainRunner._render_publish_vars( |
529
|
|
|
action_node=action_node, action_parameters=action_parameters, |
530
|
|
|
execution_result=liveaction.result, previous_execution_results=context_result, |
531
|
|
|
chain_vars=self.chain_holder.vars) |
532
|
|
|
|
533
|
|
|
if rendered_publish_vars: |
534
|
|
|
self.chain_holder.vars.update(rendered_publish_vars) |
535
|
|
|
if self._display_published: |
536
|
|
|
result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
537
|
|
|
finally: |
538
|
|
|
# Record result and resolve a next node based on the task success or failure |
539
|
|
|
updated_at = date_utils.get_datetime_utc_now() |
540
|
|
|
|
541
|
|
|
task_result = self._format_action_exec_result( |
542
|
|
|
action_node, |
543
|
|
|
liveaction, |
544
|
|
|
created_at, |
545
|
|
|
updated_at, |
546
|
|
|
error=error |
547
|
|
|
) |
548
|
|
|
|
549
|
|
|
result['tasks'].append(task_result) |
550
|
|
|
|
551
|
|
|
try: |
552
|
|
|
if not liveaction: |
553
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
554
|
|
|
action_node = self.chain_holder.get_next_node( |
555
|
|
|
action_node.name, condition='on-failure') |
556
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT: |
557
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT |
558
|
|
|
action_node = self.chain_holder.get_next_node( |
559
|
|
|
action_node.name, condition='on-failure') |
560
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
561
|
|
|
LOG.info('Chain execution (%s) canceled because task "%s" is canceled.', |
562
|
|
|
self.liveaction_id, action_node.name) |
563
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
564
|
|
|
action_node = None |
565
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
566
|
|
|
LOG.info('Chain execution (%s) paused because task "%s" is paused.', |
567
|
|
|
self.liveaction_id, action_node.name) |
568
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
569
|
|
|
self._save_vars() |
570
|
|
|
action_node = None |
571
|
|
|
elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
572
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
573
|
|
|
action_node = self.chain_holder.get_next_node( |
574
|
|
|
action_node.name, condition='on-failure') |
575
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
576
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
577
|
|
|
action_node = self.chain_holder.get_next_node( |
578
|
|
|
action_node.name, condition='on-success') |
579
|
|
|
else: |
580
|
|
|
action_node = None |
581
|
|
|
except Exception as e: |
582
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
583
|
|
|
m = 'Failed to get next node "%s".' % action_node.name |
584
|
|
|
LOG.exception(m) |
585
|
|
|
top_level_error = self._format_error(e, m) |
586
|
|
|
action_node = None |
587
|
|
|
break |
|
|
|
|
588
|
|
|
|
589
|
|
|
if action_service.is_action_canceled_or_canceling(self.liveaction.id): |
590
|
|
|
LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id) |
591
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
592
|
|
|
return (chain_status, result, None) |
593
|
|
|
|
594
|
|
|
if action_service.is_action_paused_or_pausing(self.liveaction.id): |
595
|
|
|
LOG.info('Chain execution (%s) paused by user.', self.liveaction.id) |
596
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
597
|
|
|
self._save_vars() |
598
|
|
|
return (chain_status, result, self.liveaction.context) |
599
|
|
|
|
600
|
|
|
if top_level_error and isinstance(top_level_error, dict): |
601
|
|
|
result.update(top_level_error) |
602
|
|
|
|
603
|
|
|
return (chain_status, result, self.liveaction.context) |
604
|
|
|
|
605
|
|
|
def _format_error(self, e, msg): |
606
|
|
|
return { |
607
|
|
|
'error': '%s. %s' % (msg, str(e)), |
608
|
|
|
'traceback': traceback.format_exc(10) |
609
|
|
|
} |
610
|
|
|
|
611
|
|
|
def _save_vars(self): |
612
|
|
|
# Save the context vars in the liveaction context. |
613
|
|
|
self.liveaction.context['vars'] = self.chain_holder.vars |
614
|
|
|
|
615
|
|
|
@staticmethod |
616
|
|
|
def _render_publish_vars(action_node, action_parameters, execution_result, |
617
|
|
|
previous_execution_results, chain_vars): |
618
|
|
|
""" |
619
|
|
|
If no output is specified on the action_node the output is the entire execution_result. |
620
|
|
|
If any output is specified then only those variables are published as output of an |
621
|
|
|
execution of this action_node. |
622
|
|
|
The output variable can refer to a variable from the execution_result, |
623
|
|
|
previous_execution_results or chain_vars. |
624
|
|
|
""" |
625
|
|
|
if not action_node.publish: |
626
|
|
|
return {} |
627
|
|
|
|
628
|
|
|
context = {} |
629
|
|
|
context.update(action_parameters) |
630
|
|
|
context.update({action_node.name: execution_result}) |
631
|
|
|
context.update(previous_execution_results) |
632
|
|
|
context.update(chain_vars) |
633
|
|
|
context.update({RESULTS_KEY: previous_execution_results}) |
634
|
|
|
|
635
|
|
|
context.update({ |
636
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
637
|
|
|
scope=kv_constants.SYSTEM_SCOPE) |
638
|
|
|
}) |
639
|
|
|
|
640
|
|
|
context.update({ |
641
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
642
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
643
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
644
|
|
|
} |
645
|
|
|
}) |
646
|
|
|
|
647
|
|
|
try: |
648
|
|
|
rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
649
|
|
|
context=context) |
650
|
|
|
except Exception as e: |
651
|
|
|
key = getattr(e, 'key', None) |
652
|
|
|
value = getattr(e, 'value', None) |
653
|
|
|
msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
654
|
|
|
'(template string=%s): %s' % (key, action_node.name, value, str(e))) |
655
|
|
|
raise action_exc.ParameterRenderingFailedException(msg) |
656
|
|
|
|
657
|
|
|
return rendered_result |
658
|
|
|
|
659
|
|
|
@staticmethod |
660
|
|
|
def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
661
|
|
|
# setup context with original parameters and the intermediate results. |
662
|
|
|
chain_parent = chain_context.get('parent', {}) |
663
|
|
|
pack = chain_parent.get('pack') |
664
|
|
|
user = chain_parent.get('user') |
665
|
|
|
|
666
|
|
|
config = get_config(pack, user) |
667
|
|
|
|
668
|
|
|
context = {} |
669
|
|
|
context.update(original_parameters) |
670
|
|
|
context.update(results) |
671
|
|
|
context.update(chain_vars) |
672
|
|
|
context.update({RESULTS_KEY: results}) |
673
|
|
|
|
674
|
|
|
context.update({ |
675
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
676
|
|
|
scope=kv_constants.SYSTEM_SCOPE) |
677
|
|
|
}) |
678
|
|
|
|
679
|
|
|
context.update({ |
680
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
681
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
682
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
683
|
|
|
} |
684
|
|
|
}) |
685
|
|
|
context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context}) |
686
|
|
|
context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config}) |
687
|
|
|
try: |
688
|
|
|
rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
689
|
|
|
context=context) |
690
|
|
|
except Exception as e: |
691
|
|
|
LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
692
|
|
|
|
693
|
|
|
key = getattr(e, 'key', None) |
694
|
|
|
value = getattr(e, 'value', None) |
695
|
|
|
msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
696
|
|
|
'(template string=%s): %s') % (key, action_node.name, value, str(e)) |
697
|
|
|
raise action_exc.ParameterRenderingFailedException(msg) |
698
|
|
|
LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
699
|
|
|
return rendered_params |
700
|
|
|
|
701
|
|
|
def _get_next_action(self, action_node, parent_context, action_params, context_result): |
702
|
|
|
# Verify that the referenced action exists |
703
|
|
|
# TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
704
|
|
|
task_name = action_node.name |
705
|
|
|
action_ref = action_node.ref |
706
|
|
|
action_db = action_db_util.get_action_by_ref(ref=action_ref) |
707
|
|
|
|
708
|
|
|
if not action_db: |
709
|
|
|
error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
710
|
|
|
raise action_exc.InvalidActionReferencedException(error) |
711
|
|
|
|
712
|
|
|
resolved_params = ActionChainRunner._resolve_params( |
713
|
|
|
action_node=action_node, original_parameters=action_params, |
714
|
|
|
results=context_result, chain_vars=self.chain_holder.vars, |
715
|
|
|
chain_context={'parent': parent_context}) |
716
|
|
|
|
717
|
|
|
liveaction = self._build_liveaction_object( |
718
|
|
|
action_node=action_node, |
719
|
|
|
resolved_params=resolved_params, |
720
|
|
|
parent_context=parent_context) |
721
|
|
|
|
722
|
|
|
return liveaction |
723
|
|
|
|
724
|
|
|
def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
725
|
|
|
""" |
726
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
727
|
|
|
:type sleep_delay: ``float`` |
728
|
|
|
""" |
729
|
|
|
try: |
730
|
|
|
liveaction, _ = action_service.request(liveaction) |
731
|
|
|
except Exception as e: |
732
|
|
|
liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
733
|
|
|
LOG.exception('Failed to schedule liveaction.') |
734
|
|
|
raise e |
735
|
|
|
|
736
|
|
|
while (wait_for_completion and liveaction.status not in ( |
737
|
|
|
action_constants.LIVEACTION_COMPLETED_STATES + |
738
|
|
|
[action_constants.LIVEACTION_STATUS_PAUSED])): |
739
|
|
|
eventlet.sleep(sleep_delay) |
740
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
741
|
|
|
|
742
|
|
|
return liveaction |
743
|
|
|
|
744
|
|
|
def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
745
|
|
|
""" |
746
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
747
|
|
|
:type sleep_delay: ``float`` |
748
|
|
|
""" |
749
|
|
|
try: |
750
|
|
|
user = self.context.get('user', None) |
751
|
|
|
liveaction, _ = action_service.request_resume(liveaction, user) |
752
|
|
|
except Exception as e: |
753
|
|
|
liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
754
|
|
|
LOG.exception('Failed to schedule liveaction.') |
755
|
|
|
raise e |
756
|
|
|
|
757
|
|
|
while (wait_for_completion and liveaction.status not in ( |
758
|
|
|
action_constants.LIVEACTION_COMPLETED_STATES + |
759
|
|
|
[action_constants.LIVEACTION_STATUS_PAUSED])): |
760
|
|
|
eventlet.sleep(sleep_delay) |
761
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
762
|
|
|
|
763
|
|
|
return liveaction |
764
|
|
|
|
765
|
|
|
def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
766
|
|
|
liveaction = LiveActionDB(action=action_node.ref) |
767
|
|
|
|
768
|
|
|
# Setup notify for task in chain. |
769
|
|
|
notify = self._get_notify(action_node) |
770
|
|
|
if notify: |
771
|
|
|
liveaction.notify = notify |
772
|
|
|
LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
773
|
|
|
|
774
|
|
|
liveaction.context = { |
775
|
|
|
'parent': parent_context, |
776
|
|
|
'chain': vars(action_node) |
777
|
|
|
} |
778
|
|
|
liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
779
|
|
|
params=resolved_params) |
780
|
|
|
return liveaction |
781
|
|
|
|
782
|
|
|
def _get_notify(self, action_node): |
783
|
|
|
if action_node.name not in self._skip_notify_tasks: |
784
|
|
|
if action_node.notify: |
785
|
|
|
task_notify = NotificationsHelper.to_model(action_node.notify) |
786
|
|
|
return task_notify |
787
|
|
|
elif self._chain_notify: |
788
|
|
|
return self._chain_notify |
789
|
|
|
|
790
|
|
|
return None |
791
|
|
|
|
792
|
|
|
def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result): |
793
|
|
|
if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES: |
794
|
|
|
created_at = isotime.parse(prev_task_result['created_at']) |
795
|
|
|
updated_at = liveaction.end_timestamp |
796
|
|
|
else: |
797
|
|
|
created_at = isotime.parse(prev_task_result['created_at']) |
798
|
|
|
updated_at = isotime.parse(prev_task_result['updated_at']) |
799
|
|
|
|
800
|
|
|
return self._format_action_exec_result(action_node, liveaction, created_at, updated_at) |
801
|
|
|
|
802
|
|
|
def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
803
|
|
|
error=None): |
804
|
|
|
""" |
805
|
|
|
Format ActionExecution result so it can be used in the final action result output. |
806
|
|
|
|
807
|
|
|
:rtype: ``dict`` |
808
|
|
|
""" |
809
|
|
|
assert isinstance(created_at, datetime.datetime) |
810
|
|
|
assert isinstance(updated_at, datetime.datetime) |
811
|
|
|
|
812
|
|
|
result = {} |
813
|
|
|
|
814
|
|
|
execution_db = None |
815
|
|
|
if liveaction_db: |
816
|
|
|
execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
817
|
|
|
|
818
|
|
|
result['id'] = action_node.name |
819
|
|
|
result['name'] = action_node.name |
820
|
|
|
result['execution_id'] = str(execution_db.id) if execution_db else None |
821
|
|
|
result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None |
822
|
|
|
result['workflow'] = None |
823
|
|
|
|
824
|
|
|
result['created_at'] = isotime.format(dt=created_at) |
825
|
|
|
result['updated_at'] = isotime.format(dt=updated_at) |
826
|
|
|
|
827
|
|
|
if error or not liveaction_db: |
828
|
|
|
result['state'] = action_constants.LIVEACTION_STATUS_FAILED |
829
|
|
|
else: |
830
|
|
|
result['state'] = liveaction_db.status |
831
|
|
|
|
832
|
|
|
if error: |
833
|
|
|
result['result'] = error |
834
|
|
|
else: |
835
|
|
|
result['result'] = liveaction_db.result |
836
|
|
|
|
837
|
|
|
return result |
838
|
|
|
|
839
|
|
|
|
840
|
|
|
def get_runner(): |
841
|
|
|
return ActionChainRunner(str(uuid.uuid4())) |
842
|
|
|
|
843
|
|
|
|
844
|
|
|
def get_metadata(): |
845
|
|
|
return get_runner_metadata('action_chain_runner') |
846
|
|
|
|
It is generally discouraged to redefine built-ins as this makes code very hard to read.