1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
3
|
|
|
# this work for additional information regarding copyright ownership. |
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
6
|
|
|
# the License. You may obtain a copy of the License at |
7
|
|
|
# |
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
9
|
|
|
# |
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
13
|
|
|
# See the License for the specific language governing permissions and |
14
|
|
|
# limitations under the License. |
15
|
|
|
|
16
|
|
|
import copy |
17
|
|
|
import eventlet |
18
|
|
|
import traceback |
19
|
|
|
import uuid |
20
|
|
|
import datetime |
21
|
|
|
|
22
|
|
|
from jsonschema import exceptions as json_schema_exc |
23
|
|
|
|
24
|
|
|
from st2common.runners.base import ActionRunner |
25
|
|
|
from st2common import log as logging |
26
|
|
|
from st2common.constants import action as action_constants |
27
|
|
|
from st2common.constants import pack as pack_constants |
28
|
|
|
from st2common.constants import keyvalue as kv_constants |
29
|
|
|
from st2common.content.loader import MetaLoader |
30
|
|
|
from st2common.exceptions import action as action_exc |
31
|
|
|
from st2common.exceptions import actionrunner as runner_exc |
32
|
|
|
from st2common.exceptions import db as db_exc |
33
|
|
|
from st2common.models.api.notification import NotificationsHelper |
34
|
|
|
from st2common.models.db.liveaction import LiveActionDB |
35
|
|
|
from st2common.models.system import actionchain |
36
|
|
|
from st2common.models.utils import action_param_utils |
37
|
|
|
from st2common.persistence.execution import ActionExecution |
38
|
|
|
from st2common.persistence.liveaction import LiveAction |
39
|
|
|
from st2common.services import action as action_service |
40
|
|
|
from st2common.services import keyvalues as kv_service |
41
|
|
|
from st2common.util import action_db as action_db_util |
42
|
|
|
from st2common.util import isotime |
43
|
|
|
from st2common.util import date as date_utils |
44
|
|
|
from st2common.util import jinja as jinja_utils |
45
|
|
|
from st2common.util import param as param_utils |
46
|
|
|
from st2common.util.config_loader import get_config |
47
|
|
|
|
48
|
|
|
|
49
|
|
|
LOG = logging.getLogger(__name__) |
50
|
|
|
RESULTS_KEY = '__results' |
51
|
|
|
JINJA_START_MARKERS = [ |
52
|
|
|
'{{', |
53
|
|
|
'{%' |
54
|
|
|
] |
55
|
|
|
PUBLISHED_VARS_KEY = 'published' |
56
|
|
|
|
57
|
|
|
|
58
|
|
|
class ChainHolder(object): |
59
|
|
|
|
60
|
|
|
def __init__(self, chainspec, chainname): |
61
|
|
|
self.actionchain = actionchain.ActionChain(**chainspec) |
62
|
|
|
self.chainname = chainname |
63
|
|
|
|
64
|
|
|
if not self.actionchain.default: |
65
|
|
|
default = self._get_default(self.actionchain) |
66
|
|
|
self.actionchain.default = default |
67
|
|
|
|
68
|
|
|
LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname) |
69
|
|
|
if not self.actionchain.default: |
70
|
|
|
raise Exception('Failed to find default node in %s.' % (self.chainname)) |
71
|
|
|
|
72
|
|
|
self.vars = {} |
73
|
|
|
|
74
|
|
|
def init_vars(self, action_parameters): |
75
|
|
|
if self.actionchain.vars: |
76
|
|
|
self.vars = self._get_rendered_vars(self.actionchain.vars, |
77
|
|
|
action_parameters=action_parameters) |
78
|
|
|
|
79
|
|
|
def restore_vars(self, ctx_vars): |
80
|
|
|
self.vars.update(copy.deepcopy(ctx_vars)) |
81
|
|
|
|
82
|
|
|
def validate(self): |
83
|
|
|
""" |
84
|
|
|
Function which performs a simple compile time validation. |
85
|
|
|
|
86
|
|
|
Keep in mind that some variables are only resolved during run time which means we can |
87
|
|
|
perform only simple validation during compile / create time. |
88
|
|
|
""" |
89
|
|
|
all_nodes = self._get_all_nodes(action_chain=self.actionchain) |
90
|
|
|
|
91
|
|
|
for node in self.actionchain.chain: |
92
|
|
|
on_success_node_name = node.on_success |
93
|
|
|
on_failure_node_name = node.on_failure |
94
|
|
|
|
95
|
|
|
# Check "on-success" path |
96
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
97
|
|
|
node_name=on_success_node_name) |
98
|
|
|
if not valid_name: |
99
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-success" in ' |
100
|
|
|
'task "%s".' % (on_success_node_name, node.name)) |
101
|
|
|
raise ValueError(msg) |
102
|
|
|
|
103
|
|
|
# Check "on-failure" path |
104
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
105
|
|
|
node_name=on_failure_node_name) |
106
|
|
|
if not valid_name: |
107
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-failure" in ' |
108
|
|
|
'task "%s".' % (on_failure_node_name, node.name)) |
109
|
|
|
raise ValueError(msg) |
110
|
|
|
|
111
|
|
|
# check if node specified in default is valid. |
112
|
|
|
if self.actionchain.default: |
113
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
114
|
|
|
node_name=self.actionchain.default) |
115
|
|
|
if not valid_name: |
116
|
|
|
msg = ('Unable to find node with name "%s" referenced in "default".' % |
117
|
|
|
self.actionchain.default) |
118
|
|
|
raise ValueError(msg) |
119
|
|
|
return True |
120
|
|
|
|
121
|
|
|
@staticmethod |
122
|
|
|
def _get_default(action_chain): |
123
|
|
|
# default is defined |
124
|
|
|
if action_chain.default: |
125
|
|
|
return action_chain.default |
126
|
|
|
# no nodes in chain |
127
|
|
|
if not action_chain.chain: |
128
|
|
|
return None |
129
|
|
|
# The first node with no references is the default node. Assumptions |
130
|
|
|
# that support this are : |
131
|
|
|
# 1. There are no loops in the chain. Even if there are loops there is |
132
|
|
|
# at least 1 node which does not end up in this loop. |
133
|
|
|
# 2. There are no fragments in the chain. |
134
|
|
|
all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain) |
135
|
|
|
node_names = set(all_nodes) |
136
|
|
|
on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain) |
137
|
|
|
on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain) |
138
|
|
|
referenced_nodes = on_success_nodes | on_failure_nodes |
139
|
|
|
possible_default_nodes = node_names - referenced_nodes |
140
|
|
|
if possible_default_nodes: |
141
|
|
|
# This is to preserve order. set([..]) does not preserve the order so iterate |
142
|
|
|
# over original array. |
143
|
|
|
for node in all_nodes: |
144
|
|
|
if node in possible_default_nodes: |
145
|
|
|
return node |
146
|
|
|
# If no node is found assume the first node in the chain list to be default. |
147
|
|
|
return action_chain.chain[0].name |
148
|
|
|
|
149
|
|
|
@staticmethod |
150
|
|
|
def _get_all_nodes(action_chain): |
151
|
|
|
""" |
152
|
|
|
Return names for all the nodes in the chain. |
153
|
|
|
""" |
154
|
|
|
all_nodes = [node.name for node in action_chain.chain] |
155
|
|
|
return all_nodes |
156
|
|
|
|
157
|
|
|
@staticmethod |
158
|
|
|
def _get_all_on_success_nodes(action_chain): |
159
|
|
|
""" |
160
|
|
|
Return names for all the tasks referenced in "on-success". |
161
|
|
|
""" |
162
|
|
|
on_success_nodes = set([node.on_success for node in action_chain.chain]) |
163
|
|
|
return on_success_nodes |
164
|
|
|
|
165
|
|
|
@staticmethod |
166
|
|
|
def _get_all_on_failure_nodes(action_chain): |
167
|
|
|
""" |
168
|
|
|
Return names for all the tasks referenced in "on-failure". |
169
|
|
|
""" |
170
|
|
|
on_failure_nodes = set([node.on_failure for node in action_chain.chain]) |
171
|
|
|
return on_failure_nodes |
172
|
|
|
|
173
|
|
|
def _is_valid_node_name(self, all_node_names, node_name): |
174
|
|
|
""" |
175
|
|
|
Function which validates that the provided node name is defined in the workflow definition |
176
|
|
|
and it's valid. |
177
|
|
|
|
178
|
|
|
Keep in mind that we can only perform validation for task names which don't include jinja |
179
|
|
|
expressions since those are rendered at run time. |
180
|
|
|
""" |
181
|
|
|
if not node_name: |
182
|
|
|
# This task name needs to be resolved during run time so we cant validate the name now |
183
|
|
|
return True |
184
|
|
|
|
185
|
|
|
is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name) |
186
|
|
|
if is_jinja_expression: |
187
|
|
|
# This task name needs to be resolved during run time so we cant validate the name |
188
|
|
|
# now |
189
|
|
|
return True |
190
|
|
|
|
191
|
|
|
return node_name in all_node_names |
192
|
|
|
|
193
|
|
|
@staticmethod |
194
|
|
|
def _get_rendered_vars(vars, action_parameters): |
195
|
|
|
if not vars: |
196
|
|
|
return {} |
197
|
|
|
context = {} |
198
|
|
|
context.update({ |
199
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
200
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
201
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
202
|
|
|
} |
203
|
|
|
}) |
204
|
|
|
context.update(action_parameters) |
205
|
|
|
LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context) |
206
|
|
|
return jinja_utils.render_values(mapping=vars, context=context) |
207
|
|
|
|
208
|
|
|
def get_node(self, node_name=None, raise_on_failure=False): |
209
|
|
|
if not node_name: |
210
|
|
|
return None |
211
|
|
|
for node in self.actionchain.chain: |
212
|
|
|
if node.name == node_name: |
213
|
|
|
return node |
214
|
|
|
if raise_on_failure: |
215
|
|
|
raise runner_exc.ActionRunnerException( |
216
|
|
|
'Unable to find node with name "%s".' % (node_name)) |
217
|
|
|
return None |
218
|
|
|
|
219
|
|
|
def get_next_node(self, curr_node_name=None, condition='on-success'): |
220
|
|
|
if not curr_node_name: |
221
|
|
|
return self.get_node(self.actionchain.default) |
222
|
|
|
current_node = self.get_node(curr_node_name) |
223
|
|
|
if condition == 'on-success': |
224
|
|
|
return self.get_node(current_node.on_success, raise_on_failure=True) |
225
|
|
|
elif condition == 'on-failure': |
226
|
|
|
return self.get_node(current_node.on_failure, raise_on_failure=True) |
227
|
|
|
raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition) |
228
|
|
|
|
229
|
|
|
|
230
|
|
|
class ActionChainRunner(ActionRunner): |
231
|
|
|
|
232
|
|
|
def __init__(self, runner_id): |
233
|
|
|
super(ActionChainRunner, self).__init__(runner_id=runner_id) |
234
|
|
|
self.chain_holder = None |
235
|
|
|
self._meta_loader = MetaLoader() |
236
|
|
|
self._skip_notify_tasks = [] |
237
|
|
|
self._display_published = True |
238
|
|
|
self._chain_notify = None |
239
|
|
|
|
240
|
|
|
def pre_run(self): |
241
|
|
|
super(ActionChainRunner, self).pre_run() |
242
|
|
|
|
243
|
|
|
chainspec_file = self.entry_point |
244
|
|
|
LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
245
|
|
|
self.action) |
246
|
|
|
|
247
|
|
|
try: |
248
|
|
|
chainspec = self._meta_loader.load(file_path=chainspec_file, |
249
|
|
|
expected_type=dict) |
250
|
|
|
except Exception as e: |
251
|
|
|
message = ('Failed to parse action chain definition from "%s": %s' % |
252
|
|
|
(chainspec_file, str(e))) |
253
|
|
|
LOG.exception('Failed to load action chain definition.') |
254
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
255
|
|
|
|
256
|
|
|
try: |
257
|
|
|
self.chain_holder = ChainHolder(chainspec, self.action_name) |
258
|
|
|
except json_schema_exc.ValidationError as e: |
259
|
|
|
# preserve the whole nasty jsonschema message as that is better to get to the |
260
|
|
|
# root cause |
261
|
|
|
message = str(e) |
262
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
263
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
264
|
|
|
except Exception as e: |
265
|
|
|
message = e.message or str(e) |
266
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
267
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
268
|
|
|
|
269
|
|
|
# Runner attributes are set lazily. So these steps |
270
|
|
|
# should happen outside the constructor. |
271
|
|
|
if getattr(self, 'liveaction', None): |
272
|
|
|
self._chain_notify = getattr(self.liveaction, 'notify', None) |
273
|
|
|
if self.runner_parameters: |
274
|
|
|
self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
275
|
|
|
self._display_published = self.runner_parameters.get('display_published', True) |
276
|
|
|
|
277
|
|
|
# Perform some pre-run chain validation |
278
|
|
|
try: |
279
|
|
|
self.chain_holder.validate() |
280
|
|
|
except Exception as e: |
281
|
|
|
raise runner_exc.ActionRunnerPreRunError(e.message) |
282
|
|
|
|
283
|
|
|
def run(self, action_parameters): |
284
|
|
|
# Run the action chain. |
285
|
|
|
return self._run_chain(action_parameters) |
286
|
|
|
|
287
|
|
|
def pause(self): |
288
|
|
|
# Identify the list of action executions that are workflows and cascade pause. |
289
|
|
|
for child_exec_id in self.execution.children: |
290
|
|
|
child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True) |
291
|
|
|
if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and |
292
|
|
|
child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING): |
293
|
|
|
action_service.request_pause( |
294
|
|
|
LiveAction.get(id=child_exec.liveaction['id']), |
295
|
|
|
self.context.get('user', None) |
296
|
|
|
) |
297
|
|
|
|
298
|
|
|
return ( |
299
|
|
|
action_constants.LIVEACTION_STATUS_PAUSING, |
300
|
|
|
self.liveaction.result, |
301
|
|
|
self.liveaction.context |
302
|
|
|
) |
303
|
|
|
|
304
|
|
|
def resume(self): |
305
|
|
|
# Restore runner and action parameters since they are not provided on resume. |
306
|
|
|
runner_parameters, action_parameters = param_utils.render_final_params( |
307
|
|
|
self.runner_type_db.runner_parameters, |
308
|
|
|
self.action.parameters, |
309
|
|
|
self.liveaction.parameters, |
310
|
|
|
self.liveaction.context |
311
|
|
|
) |
312
|
|
|
|
313
|
|
|
# Assign runner parameters needed for pre-run. |
314
|
|
|
if runner_parameters: |
315
|
|
|
self.runner_parameters = runner_parameters |
316
|
|
|
|
317
|
|
|
# Restore chain holder if it is not initialized. |
318
|
|
|
if not self.chain_holder: |
319
|
|
|
self.pre_run() |
320
|
|
|
|
321
|
|
|
# Change the status of the liveaction from resuming to running. |
322
|
|
|
self.liveaction = action_service.update_status( |
323
|
|
|
self.liveaction, |
324
|
|
|
action_constants.LIVEACTION_STATUS_RUNNING, |
325
|
|
|
publish=False |
326
|
|
|
) |
327
|
|
|
|
328
|
|
|
# Run the action chain. |
329
|
|
|
return self._run_chain(action_parameters, resuming=True) |
330
|
|
|
|
331
|
|
|
def _run_chain(self, action_parameters, resuming=False): |
332
|
|
|
# Set chain status to fail unless explicitly set to succeed. |
333
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
334
|
|
|
|
335
|
|
|
# Result holds the final result that the chain store in the database. |
336
|
|
|
result = {'tasks': []} |
337
|
|
|
|
338
|
|
|
# Save published variables into the result if specified. |
339
|
|
|
if self._display_published: |
340
|
|
|
result[PUBLISHED_VARS_KEY] = {} |
341
|
|
|
|
342
|
|
|
context_result = {} # Holds result which is used for the template context purposes |
343
|
|
|
top_level_error = None # Stores a reference to a top level error |
344
|
|
|
action_node = None |
345
|
|
|
last_task = None |
346
|
|
|
|
347
|
|
|
try: |
348
|
|
|
# Initialize vars with the action parameters. |
349
|
|
|
# This allows action parameers to be referenced from vars. |
350
|
|
|
self.chain_holder.init_vars(action_parameters) |
351
|
|
|
except Exception as e: |
352
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
353
|
|
|
m = 'Failed initializing ``vars`` in chain.' |
354
|
|
|
LOG.exception(m) |
355
|
|
|
top_level_error = self._format_error(e, m) |
356
|
|
|
result.update(top_level_error) |
357
|
|
|
return (chain_status, result, None) |
358
|
|
|
|
359
|
|
|
# Restore state on resuming an existing chain execution. |
360
|
|
|
if resuming: |
361
|
|
|
# Restore vars is any from the liveaction. |
362
|
|
|
ctx_vars = self.liveaction.context.pop('vars', {}) |
363
|
|
|
self.chain_holder.restore_vars(ctx_vars) |
364
|
|
|
|
365
|
|
|
# Restore result if any from the liveaction. |
366
|
|
|
if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result: |
367
|
|
|
result = self.liveaction.result |
368
|
|
|
|
369
|
|
|
# Initialize or rebuild existing context_result from liveaction |
370
|
|
|
# which holds the result used for resolving context in Jinja template. |
371
|
|
|
for task in result.get('tasks', []): |
372
|
|
|
context_result[task['name']] = task['result'] |
373
|
|
|
|
374
|
|
|
# Restore or initialize the top_level_error |
375
|
|
|
# that stores a reference to a top level error. |
376
|
|
|
if 'error' in result or 'traceback' in result: |
377
|
|
|
top_level_error = { |
378
|
|
|
'error': result.get('error'), |
379
|
|
|
'traceback': result.get('traceback') |
380
|
|
|
} |
381
|
|
|
|
382
|
|
|
# If there are no executed tasks in the chain, then get the first node. |
383
|
|
|
if len(result['tasks']) <= 0: |
384
|
|
|
try: |
385
|
|
|
action_node = self.chain_holder.get_next_node() |
386
|
|
|
except Exception as e: |
387
|
|
|
m = 'Failed to get starting node "%s".', action_node.name |
388
|
|
|
LOG.exception(m) |
389
|
|
|
top_level_error = self._format_error(e, m) |
390
|
|
|
|
391
|
|
|
# If there are no action node to run next, then mark the chain successful. |
392
|
|
|
if not action_node: |
393
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
394
|
|
|
|
395
|
|
|
# Otherwise, figure out the last task executed and |
396
|
|
|
# its state to determine where to begin executing. |
397
|
|
|
else: |
398
|
|
|
last_task = result['tasks'][-1] |
399
|
|
|
action_node = self.chain_holder.get_node(last_task['name']) |
400
|
|
|
liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
401
|
|
|
|
402
|
|
|
# If the liveaction of the last task has changed, update the result entry. |
403
|
|
|
if liveaction.status != last_task['state']: |
404
|
|
|
updated_task_result = self._get_updated_action_exec_result( |
405
|
|
|
action_node, liveaction, last_task) |
406
|
|
|
del result['tasks'][-1] |
407
|
|
|
result['tasks'].append(updated_task_result) |
408
|
|
|
|
409
|
|
|
# If the last task was canceled, then canceled the chain altogether. |
410
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
411
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
412
|
|
|
return (chain_status, result, None) |
413
|
|
|
|
414
|
|
|
# If the last task was paused, then stay on this action node. |
415
|
|
|
# This is explicitly put here for clarity. |
416
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
417
|
|
|
pass |
418
|
|
|
|
419
|
|
|
# If the last task succeeded, then get the next on-success action node. |
420
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
421
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
422
|
|
|
action_node = self.chain_holder.get_next_node( |
423
|
|
|
last_task['name'], condition='on-success') |
424
|
|
|
|
425
|
|
|
# If the last task failed, then get the next on-failure action node. |
426
|
|
|
if liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
427
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
428
|
|
|
action_node = self.chain_holder.get_next_node( |
429
|
|
|
last_task['name'], condition='on-failure') |
430
|
|
|
|
431
|
|
|
# Setup parent context. |
432
|
|
|
parent_context = { |
433
|
|
|
'execution_id': self.execution_id |
434
|
|
|
} |
435
|
|
|
|
436
|
|
|
if getattr(self.liveaction, 'context', None): |
437
|
|
|
parent_context.update(self.liveaction.context) |
438
|
|
|
|
439
|
|
|
# Run the action chain until there are no more tasks. |
440
|
|
|
while action_node: |
441
|
|
|
error = None |
442
|
|
|
liveaction = None |
443
|
|
|
last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None |
444
|
|
|
created_at = date_utils.get_datetime_utc_now() |
445
|
|
|
|
446
|
|
|
try: |
447
|
|
|
# If last task was paused, then fetch the liveaction and resume it first. |
448
|
|
|
if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
449
|
|
|
liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
450
|
|
|
del result['tasks'][-1] |
451
|
|
|
else: |
452
|
|
|
liveaction = self._get_next_action( |
453
|
|
|
action_node=action_node, parent_context=parent_context, |
454
|
|
|
action_params=action_parameters, context_result=context_result) |
455
|
|
|
except action_exc.InvalidActionReferencedException as e: |
456
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
457
|
|
|
m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
458
|
|
|
(action_node.name, action_node.ref)) |
459
|
|
|
LOG.exception(m) |
460
|
|
|
top_level_error = self._format_error(e, m) |
461
|
|
|
break |
462
|
|
|
except action_exc.ParameterRenderingFailedException as e: |
463
|
|
|
# Rendering parameters failed before we even got to running this action, |
464
|
|
|
# abort and fail the whole action chain |
465
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
466
|
|
|
m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name |
467
|
|
|
LOG.exception(m) |
468
|
|
|
top_level_error = self._format_error(e, m) |
469
|
|
|
break |
470
|
|
|
except db_exc.StackStormDBObjectNotFoundError as e: |
471
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
472
|
|
|
m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name |
473
|
|
|
LOG.exception(m) |
474
|
|
|
top_level_error = self._format_error(e, m) |
475
|
|
|
break |
476
|
|
|
|
477
|
|
|
try: |
478
|
|
|
# If last task was paused, then fetch the liveaction and resume it first. |
479
|
|
|
if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
480
|
|
|
LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id) |
481
|
|
|
liveaction = self._resume_action(liveaction) |
482
|
|
|
else: |
483
|
|
|
LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id) |
484
|
|
|
liveaction = self._run_action(liveaction) |
485
|
|
|
except Exception as e: |
486
|
|
|
# Save the traceback and error message |
487
|
|
|
m = 'Failed running task "%s".' % action_node.name |
488
|
|
|
LOG.exception(m) |
489
|
|
|
error = self._format_error(e, m) |
490
|
|
|
context_result[action_node.name] = error |
491
|
|
|
else: |
492
|
|
|
# Update context result |
493
|
|
|
context_result[action_node.name] = liveaction.result |
494
|
|
|
|
495
|
|
|
# Render and publish variables |
496
|
|
|
rendered_publish_vars = ActionChainRunner._render_publish_vars( |
497
|
|
|
action_node=action_node, action_parameters=action_parameters, |
498
|
|
|
execution_result=liveaction.result, previous_execution_results=context_result, |
499
|
|
|
chain_vars=self.chain_holder.vars) |
500
|
|
|
|
501
|
|
|
if rendered_publish_vars: |
502
|
|
|
self.chain_holder.vars.update(rendered_publish_vars) |
503
|
|
|
if self._display_published: |
504
|
|
|
result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
505
|
|
|
finally: |
506
|
|
|
# Record result and resolve a next node based on the task success or failure |
507
|
|
|
updated_at = date_utils.get_datetime_utc_now() |
508
|
|
|
|
509
|
|
|
task_result = self._format_action_exec_result( |
510
|
|
|
action_node, |
511
|
|
|
liveaction, |
512
|
|
|
created_at, |
513
|
|
|
updated_at, |
514
|
|
|
error=error |
515
|
|
|
) |
516
|
|
|
|
517
|
|
|
result['tasks'].append(task_result) |
518
|
|
|
|
519
|
|
|
try: |
520
|
|
|
if not liveaction: |
521
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
522
|
|
|
action_node = self.chain_holder.get_next_node( |
523
|
|
|
action_node.name, condition='on-failure') |
524
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT: |
525
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT |
526
|
|
|
action_node = self.chain_holder.get_next_node( |
527
|
|
|
action_node.name, condition='on-failure') |
528
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
529
|
|
|
LOG.info('Chain execution (%s) canceled because task "%s" is canceled.', |
530
|
|
|
self.liveaction_id, action_node.name) |
531
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
532
|
|
|
action_node = None |
533
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
534
|
|
|
LOG.info('Chain execution (%s) paused because task "%s" is paused.', |
535
|
|
|
self.liveaction_id, action_node.name) |
536
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
537
|
|
|
self._save_vars() |
538
|
|
|
action_node = None |
539
|
|
|
elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
540
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
541
|
|
|
action_node = self.chain_holder.get_next_node( |
542
|
|
|
action_node.name, condition='on-failure') |
543
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
544
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
545
|
|
|
action_node = self.chain_holder.get_next_node( |
546
|
|
|
action_node.name, condition='on-success') |
547
|
|
|
else: |
548
|
|
|
action_node = None |
549
|
|
|
except Exception as e: |
550
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
551
|
|
|
m = 'Failed to get next node "%s".' % action_node.name |
552
|
|
|
LOG.exception(m) |
553
|
|
|
top_level_error = self._format_error(e, m) |
554
|
|
|
action_node = None |
555
|
|
|
break |
556
|
|
|
|
557
|
|
|
if action_service.is_action_canceled_or_canceling(self.liveaction.id): |
558
|
|
|
LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id) |
559
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
560
|
|
|
return (chain_status, result, None) |
561
|
|
|
|
562
|
|
|
if action_service.is_action_paused_or_pausing(self.liveaction.id): |
563
|
|
|
LOG.info('Chain execution (%s) paused by user.', self.liveaction.id) |
564
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
565
|
|
|
self._save_vars() |
566
|
|
|
return (chain_status, result, self.liveaction.context) |
567
|
|
|
|
568
|
|
|
if top_level_error and isinstance(top_level_error, dict): |
569
|
|
|
result.update(top_level_error) |
570
|
|
|
|
571
|
|
|
return (chain_status, result, self.liveaction.context) |
572
|
|
|
|
573
|
|
|
def _format_error(self, e, msg): |
574
|
|
|
return { |
575
|
|
|
'error': '%s. %s' % (msg, str(e)), |
576
|
|
|
'traceback': traceback.format_exc(10) |
577
|
|
|
} |
578
|
|
|
|
579
|
|
|
def _save_vars(self): |
580
|
|
|
# Save the context vars in the liveaction context. |
581
|
|
|
self.liveaction.context['vars'] = self.chain_holder.vars |
582
|
|
|
|
583
|
|
|
@staticmethod |
584
|
|
|
def _render_publish_vars(action_node, action_parameters, execution_result, |
585
|
|
|
previous_execution_results, chain_vars): |
586
|
|
|
""" |
587
|
|
|
If no output is specified on the action_node the output is the entire execution_result. |
588
|
|
|
If any output is specified then only those variables are published as output of an |
589
|
|
|
execution of this action_node. |
590
|
|
|
The output variable can refer to a variable from the execution_result, |
591
|
|
|
previous_execution_results or chain_vars. |
592
|
|
|
""" |
593
|
|
|
if not action_node.publish: |
594
|
|
|
return {} |
595
|
|
|
|
596
|
|
|
context = {} |
597
|
|
|
context.update(action_parameters) |
598
|
|
|
context.update({action_node.name: execution_result}) |
599
|
|
|
context.update(previous_execution_results) |
600
|
|
|
context.update(chain_vars) |
601
|
|
|
context.update({RESULTS_KEY: previous_execution_results}) |
602
|
|
|
|
603
|
|
|
context.update({ |
604
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
605
|
|
|
scope=kv_constants.SYSTEM_SCOPE) |
606
|
|
|
}) |
607
|
|
|
|
608
|
|
|
context.update({ |
609
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
610
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
611
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
612
|
|
|
} |
613
|
|
|
}) |
614
|
|
|
|
615
|
|
|
try: |
616
|
|
|
rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
617
|
|
|
context=context) |
618
|
|
|
except Exception as e: |
619
|
|
|
key = getattr(e, 'key', None) |
620
|
|
|
value = getattr(e, 'value', None) |
621
|
|
|
msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
622
|
|
|
'(template string=%s): %s' % (key, action_node.name, value, str(e))) |
623
|
|
|
raise action_exc.ParameterRenderingFailedException(msg) |
624
|
|
|
|
625
|
|
|
return rendered_result |
626
|
|
|
|
627
|
|
|
@staticmethod |
628
|
|
|
def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
629
|
|
|
# setup context with original parameters and the intermediate results. |
630
|
|
|
chain_parent = chain_context.get('parent', {}) |
631
|
|
|
pack = chain_parent.get('pack') |
632
|
|
|
user = chain_parent.get('user') |
633
|
|
|
|
634
|
|
|
config = get_config(pack, user) |
635
|
|
|
|
636
|
|
|
context = {} |
637
|
|
|
context.update(original_parameters) |
638
|
|
|
context.update(results) |
639
|
|
|
context.update(chain_vars) |
640
|
|
|
context.update({RESULTS_KEY: results}) |
641
|
|
|
|
642
|
|
|
context.update({ |
643
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
644
|
|
|
scope=kv_constants.SYSTEM_SCOPE) |
645
|
|
|
}) |
646
|
|
|
|
647
|
|
|
context.update({ |
648
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
649
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
650
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
651
|
|
|
} |
652
|
|
|
}) |
653
|
|
|
context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context}) |
654
|
|
|
context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config}) |
655
|
|
|
try: |
656
|
|
|
rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
657
|
|
|
context=context) |
658
|
|
|
except Exception as e: |
659
|
|
|
LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
660
|
|
|
|
661
|
|
|
key = getattr(e, 'key', None) |
662
|
|
|
value = getattr(e, 'value', None) |
663
|
|
|
msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
664
|
|
|
'(template string=%s): %s') % (key, action_node.name, value, str(e)) |
665
|
|
|
raise action_exc.ParameterRenderingFailedException(msg) |
666
|
|
|
LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
667
|
|
|
return rendered_params |
668
|
|
|
|
669
|
|
|
def _get_next_action(self, action_node, parent_context, action_params, context_result): |
670
|
|
|
# Verify that the referenced action exists |
671
|
|
|
# TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
672
|
|
|
task_name = action_node.name |
673
|
|
|
action_ref = action_node.ref |
674
|
|
|
action_db = action_db_util.get_action_by_ref(ref=action_ref) |
675
|
|
|
|
676
|
|
|
if not action_db: |
677
|
|
|
error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
678
|
|
|
raise action_exc.InvalidActionReferencedException(error) |
679
|
|
|
|
680
|
|
|
resolved_params = ActionChainRunner._resolve_params( |
681
|
|
|
action_node=action_node, original_parameters=action_params, |
682
|
|
|
results=context_result, chain_vars=self.chain_holder.vars, |
683
|
|
|
chain_context={'parent': parent_context}) |
684
|
|
|
|
685
|
|
|
liveaction = self._build_liveaction_object( |
686
|
|
|
action_node=action_node, |
687
|
|
|
resolved_params=resolved_params, |
688
|
|
|
parent_context=parent_context) |
689
|
|
|
|
690
|
|
|
return liveaction |
691
|
|
|
|
692
|
|
|
def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
693
|
|
|
""" |
694
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
695
|
|
|
:type sleep_delay: ``float`` |
696
|
|
|
""" |
697
|
|
|
try: |
698
|
|
|
liveaction, _ = action_service.request(liveaction) |
699
|
|
|
except Exception as e: |
700
|
|
|
liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
701
|
|
|
LOG.exception('Failed to schedule liveaction.') |
702
|
|
|
raise e |
703
|
|
|
|
704
|
|
|
while (wait_for_completion and liveaction.status not in ( |
705
|
|
|
action_constants.LIVEACTION_COMPLETED_STATES + |
706
|
|
|
[action_constants.LIVEACTION_STATUS_PAUSED])): |
707
|
|
|
eventlet.sleep(sleep_delay) |
708
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
709
|
|
|
|
710
|
|
|
return liveaction |
711
|
|
|
|
712
|
|
|
def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
713
|
|
|
""" |
714
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
715
|
|
|
:type sleep_delay: ``float`` |
716
|
|
|
""" |
717
|
|
|
try: |
718
|
|
|
user = self.context.get('user', None) |
719
|
|
|
liveaction, _ = action_service.request_resume(liveaction, user) |
720
|
|
|
except Exception as e: |
721
|
|
|
liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
722
|
|
|
LOG.exception('Failed to schedule liveaction.') |
723
|
|
|
raise e |
724
|
|
|
|
725
|
|
|
while (wait_for_completion and liveaction.status not in ( |
726
|
|
|
action_constants.LIVEACTION_COMPLETED_STATES + |
727
|
|
|
[action_constants.LIVEACTION_STATUS_PAUSED])): |
728
|
|
|
eventlet.sleep(sleep_delay) |
729
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
730
|
|
|
|
731
|
|
|
return liveaction |
732
|
|
|
|
733
|
|
|
def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
734
|
|
|
liveaction = LiveActionDB(action=action_node.ref) |
735
|
|
|
|
736
|
|
|
# Setup notify for task in chain. |
737
|
|
|
notify = self._get_notify(action_node) |
738
|
|
|
if notify: |
739
|
|
|
liveaction.notify = notify |
740
|
|
|
LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
741
|
|
|
|
742
|
|
|
liveaction.context = { |
743
|
|
|
'parent': parent_context, |
744
|
|
|
'chain': vars(action_node) |
745
|
|
|
} |
746
|
|
|
liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
747
|
|
|
params=resolved_params) |
748
|
|
|
return liveaction |
749
|
|
|
|
750
|
|
|
def _get_notify(self, action_node): |
751
|
|
|
if action_node.name not in self._skip_notify_tasks: |
752
|
|
|
if action_node.notify: |
753
|
|
|
task_notify = NotificationsHelper.to_model(action_node.notify) |
754
|
|
|
return task_notify |
755
|
|
|
elif self._chain_notify: |
756
|
|
|
return self._chain_notify |
757
|
|
|
|
758
|
|
|
return None |
759
|
|
|
|
760
|
|
|
def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result): |
761
|
|
|
if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES: |
762
|
|
|
created_at = isotime.parse(prev_task_result['created_at']) |
763
|
|
|
updated_at = liveaction.end_timestamp |
764
|
|
|
else: |
765
|
|
|
created_at = isotime.parse(prev_task_result['created_at']) |
766
|
|
|
updated_at = isotime.parse(prev_task_result['updated_at']) |
767
|
|
|
|
768
|
|
|
return self._format_action_exec_result(action_node, liveaction, created_at, updated_at) |
769
|
|
|
|
770
|
|
|
def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
771
|
|
|
error=None): |
772
|
|
|
""" |
773
|
|
|
Format ActionExecution result so it can be used in the final action result output. |
774
|
|
|
|
775
|
|
|
:rtype: ``dict`` |
776
|
|
|
""" |
777
|
|
|
assert isinstance(created_at, datetime.datetime) |
778
|
|
|
assert isinstance(updated_at, datetime.datetime) |
779
|
|
|
|
780
|
|
|
result = {} |
781
|
|
|
|
782
|
|
|
execution_db = None |
783
|
|
|
if liveaction_db: |
784
|
|
|
execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
785
|
|
|
|
786
|
|
|
result['id'] = action_node.name |
787
|
|
|
result['name'] = action_node.name |
788
|
|
|
result['execution_id'] = str(execution_db.id) if execution_db else None |
789
|
|
|
result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None |
790
|
|
|
result['workflow'] = None |
791
|
|
|
|
792
|
|
|
result['created_at'] = isotime.format(dt=created_at) |
793
|
|
|
result['updated_at'] = isotime.format(dt=updated_at) |
794
|
|
|
|
795
|
|
|
if error or not liveaction_db: |
796
|
|
|
result['state'] = action_constants.LIVEACTION_STATUS_FAILED |
797
|
|
|
else: |
798
|
|
|
result['state'] = liveaction_db.status |
799
|
|
|
|
800
|
|
|
if error: |
801
|
|
|
result['result'] = error |
802
|
|
|
else: |
803
|
|
|
result['result'] = liveaction_db.result |
804
|
|
|
|
805
|
|
|
return result |
806
|
|
|
|
807
|
|
|
|
808
|
|
|
def get_runner(): |
809
|
|
|
return ActionChainRunner(str(uuid.uuid4())) |
810
|
|
|
|