|
1
|
|
|
# Licensed to the StackStorm, Inc ('StackStorm') under one or more |
|
2
|
|
|
# contributor license agreements. See the NOTICE file distributed with |
|
3
|
|
|
# this work for additional information regarding copyright ownership. |
|
4
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0 |
|
5
|
|
|
# (the "License"); you may not use this file except in compliance with |
|
6
|
|
|
# the License. You may obtain a copy of the License at |
|
7
|
|
|
# |
|
8
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
|
9
|
|
|
# |
|
10
|
|
|
# Unless required by applicable law or agreed to in writing, software |
|
11
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
12
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13
|
|
|
# See the License for the specific language governing permissions and |
|
14
|
|
|
# limitations under the License. |
|
15
|
|
|
|
|
16
|
|
|
import copy |
|
17
|
|
|
import eventlet |
|
18
|
|
|
import traceback |
|
19
|
|
|
import uuid |
|
20
|
|
|
import datetime |
|
21
|
|
|
|
|
22
|
|
|
from jsonschema import exceptions as json_schema_exc |
|
23
|
|
|
|
|
24
|
|
|
from st2common.runners.base import ActionRunner |
|
25
|
|
|
from st2common import log as logging |
|
26
|
|
|
from st2common.constants import action as action_constants |
|
27
|
|
|
from st2common.constants import pack as pack_constants |
|
28
|
|
|
from st2common.constants import keyvalue as kv_constants |
|
29
|
|
|
from st2common.content.loader import MetaLoader |
|
30
|
|
|
from st2common.exceptions import action as action_exc |
|
31
|
|
|
from st2common.exceptions import actionrunner as runner_exc |
|
32
|
|
|
from st2common.exceptions import db as db_exc |
|
33
|
|
|
from st2common.models.api.notification import NotificationsHelper |
|
34
|
|
|
from st2common.models.db.liveaction import LiveActionDB |
|
35
|
|
|
from st2common.models.system import actionchain |
|
36
|
|
|
from st2common.models.utils import action_param_utils |
|
37
|
|
|
from st2common.persistence.execution import ActionExecution |
|
38
|
|
|
from st2common.persistence.liveaction import LiveAction |
|
39
|
|
|
from st2common.services import action as action_service |
|
40
|
|
|
from st2common.services import keyvalues as kv_service |
|
41
|
|
|
from st2common.util import action_db as action_db_util |
|
42
|
|
|
from st2common.util import isotime |
|
43
|
|
|
from st2common.util import date as date_utils |
|
44
|
|
|
from st2common.util import jinja as jinja_utils |
|
45
|
|
|
from st2common.util import param as param_utils |
|
46
|
|
|
from st2common.util.config_loader import get_config |
|
47
|
|
|
|
|
48
|
|
|
|
|
49
|
|
|
LOG = logging.getLogger(__name__) |
|
50
|
|
|
RESULTS_KEY = '__results' |
|
51
|
|
|
JINJA_START_MARKERS = [ |
|
52
|
|
|
'{{', |
|
53
|
|
|
'{%' |
|
54
|
|
|
] |
|
55
|
|
|
PUBLISHED_VARS_KEY = 'published' |
|
56
|
|
|
|
|
57
|
|
|
|
|
58
|
|
|
class ChainHolder(object): |
|
59
|
|
|
|
|
60
|
|
|
def __init__(self, chainspec, chainname): |
|
61
|
|
|
self.actionchain = actionchain.ActionChain(**chainspec) |
|
62
|
|
|
self.chainname = chainname |
|
63
|
|
|
|
|
64
|
|
|
if not self.actionchain.default: |
|
65
|
|
|
default = self._get_default(self.actionchain) |
|
66
|
|
|
self.actionchain.default = default |
|
67
|
|
|
|
|
68
|
|
|
LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname) |
|
69
|
|
|
if not self.actionchain.default: |
|
70
|
|
|
raise Exception('Failed to find default node in %s.' % (self.chainname)) |
|
71
|
|
|
|
|
72
|
|
|
self.vars = {} |
|
73
|
|
|
|
|
74
|
|
|
def init_vars(self, action_parameters): |
|
75
|
|
|
if self.actionchain.vars: |
|
76
|
|
|
self.vars = self._get_rendered_vars(self.actionchain.vars, |
|
77
|
|
|
action_parameters=action_parameters) |
|
78
|
|
|
|
|
79
|
|
|
def restore_vars(self, ctx_vars): |
|
80
|
|
|
self.vars.update(copy.deepcopy(ctx_vars)) |
|
81
|
|
|
|
|
82
|
|
|
def validate(self): |
|
83
|
|
|
""" |
|
84
|
|
|
Function which performs a simple compile time validation. |
|
85
|
|
|
|
|
86
|
|
|
Keep in mind that some variables are only resolved during run time which means we can |
|
87
|
|
|
perform only simple validation during compile / create time. |
|
88
|
|
|
""" |
|
89
|
|
|
all_nodes = self._get_all_nodes(action_chain=self.actionchain) |
|
90
|
|
|
|
|
91
|
|
|
for node in self.actionchain.chain: |
|
92
|
|
|
on_success_node_name = node.on_success |
|
93
|
|
|
on_failure_node_name = node.on_failure |
|
94
|
|
|
|
|
95
|
|
|
# Check "on-success" path |
|
96
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
|
97
|
|
|
node_name=on_success_node_name) |
|
98
|
|
|
if not valid_name: |
|
99
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-success" in ' |
|
100
|
|
|
'task "%s".' % (on_success_node_name, node.name)) |
|
101
|
|
|
raise ValueError(msg) |
|
102
|
|
|
|
|
103
|
|
|
# Check "on-failure" path |
|
104
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
|
105
|
|
|
node_name=on_failure_node_name) |
|
106
|
|
|
if not valid_name: |
|
107
|
|
|
msg = ('Unable to find node with name "%s" referenced in "on-failure" in ' |
|
108
|
|
|
'task "%s".' % (on_failure_node_name, node.name)) |
|
109
|
|
|
raise ValueError(msg) |
|
110
|
|
|
|
|
111
|
|
|
# check if node specified in default is valid. |
|
112
|
|
|
if self.actionchain.default: |
|
113
|
|
|
valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
|
114
|
|
|
node_name=self.actionchain.default) |
|
115
|
|
|
if not valid_name: |
|
116
|
|
|
msg = ('Unable to find node with name "%s" referenced in "default".' % |
|
117
|
|
|
self.actionchain.default) |
|
118
|
|
|
raise ValueError(msg) |
|
119
|
|
|
return True |
|
120
|
|
|
|
|
121
|
|
|
@staticmethod |
|
122
|
|
|
def _get_default(action_chain): |
|
123
|
|
|
# default is defined |
|
124
|
|
|
if action_chain.default: |
|
125
|
|
|
return action_chain.default |
|
126
|
|
|
# no nodes in chain |
|
127
|
|
|
if not action_chain.chain: |
|
128
|
|
|
return None |
|
129
|
|
|
# The first node with no references is the default node. Assumptions |
|
130
|
|
|
# that support this are : |
|
131
|
|
|
# 1. There are no loops in the chain. Even if there are loops there is |
|
132
|
|
|
# at least 1 node which does not end up in this loop. |
|
133
|
|
|
# 2. There are no fragments in the chain. |
|
134
|
|
|
all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain) |
|
135
|
|
|
node_names = set(all_nodes) |
|
136
|
|
|
on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain) |
|
137
|
|
|
on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain) |
|
138
|
|
|
referenced_nodes = on_success_nodes | on_failure_nodes |
|
139
|
|
|
possible_default_nodes = node_names - referenced_nodes |
|
140
|
|
|
if possible_default_nodes: |
|
141
|
|
|
# This is to preserve order. set([..]) does not preserve the order so iterate |
|
142
|
|
|
# over original array. |
|
143
|
|
|
for node in all_nodes: |
|
144
|
|
|
if node in possible_default_nodes: |
|
145
|
|
|
return node |
|
146
|
|
|
# If no node is found assume the first node in the chain list to be default. |
|
147
|
|
|
return action_chain.chain[0].name |
|
148
|
|
|
|
|
149
|
|
|
@staticmethod |
|
150
|
|
|
def _get_all_nodes(action_chain): |
|
151
|
|
|
""" |
|
152
|
|
|
Return names for all the nodes in the chain. |
|
153
|
|
|
""" |
|
154
|
|
|
all_nodes = [node.name for node in action_chain.chain] |
|
155
|
|
|
return all_nodes |
|
156
|
|
|
|
|
157
|
|
|
@staticmethod |
|
158
|
|
|
def _get_all_on_success_nodes(action_chain): |
|
159
|
|
|
""" |
|
160
|
|
|
Return names for all the tasks referenced in "on-success". |
|
161
|
|
|
""" |
|
162
|
|
|
on_success_nodes = set([node.on_success for node in action_chain.chain]) |
|
163
|
|
|
return on_success_nodes |
|
164
|
|
|
|
|
165
|
|
|
@staticmethod |
|
166
|
|
|
def _get_all_on_failure_nodes(action_chain): |
|
167
|
|
|
""" |
|
168
|
|
|
Return names for all the tasks referenced in "on-failure". |
|
169
|
|
|
""" |
|
170
|
|
|
on_failure_nodes = set([node.on_failure for node in action_chain.chain]) |
|
171
|
|
|
return on_failure_nodes |
|
172
|
|
|
|
|
173
|
|
|
def _is_valid_node_name(self, all_node_names, node_name): |
|
174
|
|
|
""" |
|
175
|
|
|
Function which validates that the provided node name is defined in the workflow definition |
|
176
|
|
|
and it's valid. |
|
177
|
|
|
|
|
178
|
|
|
Keep in mind that we can only perform validation for task names which don't include jinja |
|
179
|
|
|
expressions since those are rendered at run time. |
|
180
|
|
|
""" |
|
181
|
|
|
if not node_name: |
|
182
|
|
|
# This task name needs to be resolved during run time so we cant validate the name now |
|
183
|
|
|
return True |
|
184
|
|
|
|
|
185
|
|
|
is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name) |
|
186
|
|
|
if is_jinja_expression: |
|
187
|
|
|
# This task name needs to be resolved during run time so we cant validate the name |
|
188
|
|
|
# now |
|
189
|
|
|
return True |
|
190
|
|
|
|
|
191
|
|
|
return node_name in all_node_names |
|
192
|
|
|
|
|
193
|
|
|
@staticmethod |
|
194
|
|
|
def _get_rendered_vars(vars, action_parameters): |
|
195
|
|
|
if not vars: |
|
196
|
|
|
return {} |
|
197
|
|
|
context = {} |
|
198
|
|
|
context.update({ |
|
199
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
|
200
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
|
201
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
|
202
|
|
|
} |
|
203
|
|
|
}) |
|
204
|
|
|
context.update(action_parameters) |
|
205
|
|
|
LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context) |
|
206
|
|
|
return jinja_utils.render_values(mapping=vars, context=context) |
|
207
|
|
|
|
|
208
|
|
|
def get_node(self, node_name=None, raise_on_failure=False): |
|
209
|
|
|
if not node_name: |
|
210
|
|
|
return None |
|
211
|
|
|
for node in self.actionchain.chain: |
|
212
|
|
|
if node.name == node_name: |
|
213
|
|
|
return node |
|
214
|
|
|
if raise_on_failure: |
|
215
|
|
|
raise runner_exc.ActionRunnerException( |
|
216
|
|
|
'Unable to find node with name "%s".' % (node_name)) |
|
217
|
|
|
return None |
|
218
|
|
|
|
|
219
|
|
|
def get_next_node(self, curr_node_name=None, condition='on-success'): |
|
220
|
|
|
if not curr_node_name: |
|
221
|
|
|
return self.get_node(self.actionchain.default) |
|
222
|
|
|
current_node = self.get_node(curr_node_name) |
|
223
|
|
|
if condition == 'on-success': |
|
224
|
|
|
return self.get_node(current_node.on_success, raise_on_failure=True) |
|
225
|
|
|
elif condition == 'on-failure': |
|
226
|
|
|
return self.get_node(current_node.on_failure, raise_on_failure=True) |
|
227
|
|
|
raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition) |
|
228
|
|
|
|
|
229
|
|
|
|
|
230
|
|
|
class ActionChainRunner(ActionRunner): |
|
231
|
|
|
|
|
232
|
|
|
def __init__(self, runner_id): |
|
233
|
|
|
super(ActionChainRunner, self).__init__(runner_id=runner_id) |
|
234
|
|
|
self.chain_holder = None |
|
235
|
|
|
self._meta_loader = MetaLoader() |
|
236
|
|
|
self._skip_notify_tasks = [] |
|
237
|
|
|
self._display_published = True |
|
238
|
|
|
self._chain_notify = None |
|
239
|
|
|
|
|
240
|
|
|
def pre_run(self): |
|
241
|
|
|
super(ActionChainRunner, self).pre_run() |
|
242
|
|
|
|
|
243
|
|
|
chainspec_file = self.entry_point |
|
244
|
|
|
LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
|
245
|
|
|
self.action) |
|
246
|
|
|
|
|
247
|
|
|
try: |
|
248
|
|
|
chainspec = self._meta_loader.load(file_path=chainspec_file, |
|
249
|
|
|
expected_type=dict) |
|
250
|
|
|
except Exception as e: |
|
251
|
|
|
message = ('Failed to parse action chain definition from "%s": %s' % |
|
252
|
|
|
(chainspec_file, str(e))) |
|
253
|
|
|
LOG.exception('Failed to load action chain definition.') |
|
254
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
|
255
|
|
|
|
|
256
|
|
|
try: |
|
257
|
|
|
self.chain_holder = ChainHolder(chainspec, self.action_name) |
|
258
|
|
|
except json_schema_exc.ValidationError as e: |
|
259
|
|
|
# preserve the whole nasty jsonschema message as that is better to get to the |
|
260
|
|
|
# root cause |
|
261
|
|
|
message = str(e) |
|
262
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
|
263
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
|
264
|
|
|
except Exception as e: |
|
265
|
|
|
message = e.message or str(e) |
|
266
|
|
|
LOG.exception('Failed to instantiate ActionChain.') |
|
267
|
|
|
raise runner_exc.ActionRunnerPreRunError(message) |
|
268
|
|
|
|
|
269
|
|
|
# Runner attributes are set lazily. So these steps |
|
270
|
|
|
# should happen outside the constructor. |
|
271
|
|
|
if getattr(self, 'liveaction', None): |
|
272
|
|
|
self._chain_notify = getattr(self.liveaction, 'notify', None) |
|
273
|
|
|
if self.runner_parameters: |
|
274
|
|
|
self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
|
275
|
|
|
self._display_published = self.runner_parameters.get('display_published', True) |
|
276
|
|
|
|
|
277
|
|
|
# Perform some pre-run chain validation |
|
278
|
|
|
try: |
|
279
|
|
|
self.chain_holder.validate() |
|
280
|
|
|
except Exception as e: |
|
281
|
|
|
raise runner_exc.ActionRunnerPreRunError(e.message) |
|
282
|
|
|
|
|
283
|
|
|
def run(self, action_parameters): |
|
284
|
|
|
# Run the action chain. |
|
285
|
|
|
return self._run_chain(action_parameters) |
|
286
|
|
|
|
|
287
|
|
|
def pause(self): |
|
288
|
|
|
# Identify the list of action executions that are workflows and cascade pause. |
|
289
|
|
|
for child_exec_id in self.execution.children: |
|
290
|
|
|
child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True) |
|
291
|
|
|
if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and |
|
292
|
|
|
child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING): |
|
293
|
|
|
action_service.request_pause( |
|
294
|
|
|
LiveAction.get(id=child_exec.liveaction['id']), |
|
295
|
|
|
self.context.get('user', None) |
|
296
|
|
|
) |
|
297
|
|
|
|
|
298
|
|
|
return ( |
|
299
|
|
|
action_constants.LIVEACTION_STATUS_PAUSING, |
|
300
|
|
|
self.liveaction.result, |
|
301
|
|
|
self.liveaction.context |
|
302
|
|
|
) |
|
303
|
|
|
|
|
304
|
|
|
def resume(self): |
|
305
|
|
|
# Restore runner and action parameters since they are not provided on resume. |
|
306
|
|
|
runner_parameters, action_parameters = param_utils.render_final_params( |
|
307
|
|
|
self.runner_type_db.runner_parameters, |
|
308
|
|
|
self.action.parameters, |
|
309
|
|
|
self.liveaction.parameters, |
|
310
|
|
|
self.liveaction.context |
|
311
|
|
|
) |
|
312
|
|
|
|
|
313
|
|
|
# Assign runner parameters needed for pre-run. |
|
314
|
|
|
if runner_parameters: |
|
315
|
|
|
self.runner_parameters = runner_parameters |
|
316
|
|
|
|
|
317
|
|
|
# Restore chain holder if it is not initialized. |
|
318
|
|
|
if not self.chain_holder: |
|
319
|
|
|
self.pre_run() |
|
320
|
|
|
|
|
321
|
|
|
# Change the status of the liveaction from resuming to running. |
|
322
|
|
|
self.liveaction = action_service.update_status( |
|
323
|
|
|
self.liveaction, |
|
324
|
|
|
action_constants.LIVEACTION_STATUS_RUNNING, |
|
325
|
|
|
publish=False |
|
326
|
|
|
) |
|
327
|
|
|
|
|
328
|
|
|
# Run the action chain. |
|
329
|
|
|
return self._run_chain(action_parameters, resuming=True) |
|
330
|
|
|
|
|
331
|
|
|
def _run_chain(self, action_parameters, resuming=False): |
|
332
|
|
|
# Set chain status to fail unless explicitly set to succeed. |
|
333
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
334
|
|
|
|
|
335
|
|
|
# Result holds the final result that the chain store in the database. |
|
336
|
|
|
result = {'tasks': []} |
|
337
|
|
|
|
|
338
|
|
|
# Save published variables into the result if specified. |
|
339
|
|
|
if self._display_published: |
|
340
|
|
|
result[PUBLISHED_VARS_KEY] = {} |
|
341
|
|
|
|
|
342
|
|
|
context_result = {} # Holds result which is used for the template context purposes |
|
343
|
|
|
top_level_error = None # Stores a reference to a top level error |
|
344
|
|
|
action_node = None |
|
345
|
|
|
last_task = None |
|
346
|
|
|
|
|
347
|
|
|
try: |
|
348
|
|
|
# Initialize vars with the action parameters. |
|
349
|
|
|
# This allows action parameers to be referenced from vars. |
|
350
|
|
|
self.chain_holder.init_vars(action_parameters) |
|
351
|
|
|
except Exception as e: |
|
352
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
353
|
|
|
m = 'Failed initializing ``vars`` in chain.' |
|
354
|
|
|
LOG.exception(m) |
|
355
|
|
|
top_level_error = self._format_error(e, m) |
|
356
|
|
|
result.update(top_level_error) |
|
357
|
|
|
return (chain_status, result, None) |
|
358
|
|
|
|
|
359
|
|
|
# Restore state on resuming an existing chain execution. |
|
360
|
|
|
if resuming: |
|
361
|
|
|
# Restore vars is any from the liveaction. |
|
362
|
|
|
ctx_vars = self.liveaction.context.pop('vars', {}) |
|
363
|
|
|
self.chain_holder.restore_vars(ctx_vars) |
|
364
|
|
|
|
|
365
|
|
|
# Restore result if any from the liveaction. |
|
366
|
|
|
if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result: |
|
367
|
|
|
result = self.liveaction.result |
|
368
|
|
|
|
|
369
|
|
|
# Initialize or rebuild existing context_result from liveaction |
|
370
|
|
|
# which holds the result used for resolving context in Jinja template. |
|
371
|
|
|
for task in result.get('tasks', []): |
|
372
|
|
|
context_result[task['name']] = task['result'] |
|
373
|
|
|
|
|
374
|
|
|
# Restore or initialize the top_level_error |
|
375
|
|
|
# that stores a reference to a top level error. |
|
376
|
|
|
if 'error' in result or 'traceback' in result: |
|
377
|
|
|
top_level_error = { |
|
378
|
|
|
'error': result.get('error'), |
|
379
|
|
|
'traceback': result.get('traceback') |
|
380
|
|
|
} |
|
381
|
|
|
|
|
382
|
|
|
# If there are no executed tasks in the chain, then get the first node. |
|
383
|
|
|
if len(result['tasks']) <= 0: |
|
384
|
|
|
try: |
|
385
|
|
|
action_node = self.chain_holder.get_next_node() |
|
386
|
|
|
except Exception as e: |
|
387
|
|
|
m = 'Failed to get starting node "%s".', action_node.name |
|
388
|
|
|
LOG.exception(m) |
|
389
|
|
|
top_level_error = self._format_error(e, m) |
|
390
|
|
|
|
|
391
|
|
|
# If there are no action node to run next, then mark the chain successful. |
|
392
|
|
|
if not action_node: |
|
393
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
|
394
|
|
|
|
|
395
|
|
|
# Otherwise, figure out the last task executed and |
|
396
|
|
|
# its state to determine where to begin executing. |
|
397
|
|
|
else: |
|
398
|
|
|
last_task = result['tasks'][-1] |
|
399
|
|
|
action_node = self.chain_holder.get_node(last_task['name']) |
|
400
|
|
|
liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
|
401
|
|
|
|
|
402
|
|
|
# If the liveaction of the last task has changed, update the result entry. |
|
403
|
|
|
if liveaction.status != last_task['state']: |
|
404
|
|
|
updated_task_result = self._get_updated_action_exec_result( |
|
405
|
|
|
action_node, liveaction, last_task) |
|
406
|
|
|
del result['tasks'][-1] |
|
407
|
|
|
result['tasks'].append(updated_task_result) |
|
408
|
|
|
|
|
409
|
|
|
# If the last task was canceled, then canceled the chain altogether. |
|
410
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
|
411
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
|
412
|
|
|
return (chain_status, result, None) |
|
413
|
|
|
|
|
414
|
|
|
# If the last task was paused, then stay on this action node. |
|
415
|
|
|
# This is explicitly put here for clarity. |
|
416
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
|
417
|
|
|
pass |
|
418
|
|
|
|
|
419
|
|
|
# If the last task succeeded, then get the next on-success action node. |
|
420
|
|
|
if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
|
421
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
|
422
|
|
|
action_node = self.chain_holder.get_next_node( |
|
423
|
|
|
last_task['name'], condition='on-success') |
|
424
|
|
|
|
|
425
|
|
|
# If the last task failed, then get the next on-failure action node. |
|
426
|
|
|
if liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
|
427
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
428
|
|
|
action_node = self.chain_holder.get_next_node( |
|
429
|
|
|
last_task['name'], condition='on-failure') |
|
430
|
|
|
|
|
431
|
|
|
# Setup parent context. |
|
432
|
|
|
parent_context = { |
|
433
|
|
|
'execution_id': self.execution_id |
|
434
|
|
|
} |
|
435
|
|
|
|
|
436
|
|
|
if getattr(self.liveaction, 'context', None): |
|
437
|
|
|
parent_context.update(self.liveaction.context) |
|
438
|
|
|
|
|
439
|
|
|
# Run the action chain until there are no more tasks. |
|
440
|
|
|
while action_node: |
|
441
|
|
|
error = None |
|
442
|
|
|
liveaction = None |
|
443
|
|
|
last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None |
|
444
|
|
|
created_at = date_utils.get_datetime_utc_now() |
|
445
|
|
|
|
|
446
|
|
|
try: |
|
447
|
|
|
# If last task was paused, then fetch the liveaction and resume it first. |
|
448
|
|
|
if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
|
449
|
|
|
liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
|
450
|
|
|
del result['tasks'][-1] |
|
451
|
|
|
else: |
|
452
|
|
|
liveaction = self._get_next_action( |
|
453
|
|
|
action_node=action_node, parent_context=parent_context, |
|
454
|
|
|
action_params=action_parameters, context_result=context_result) |
|
455
|
|
|
except action_exc.InvalidActionReferencedException as e: |
|
456
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
457
|
|
|
m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
|
458
|
|
|
(action_node.name, action_node.ref)) |
|
459
|
|
|
LOG.exception(m) |
|
460
|
|
|
top_level_error = self._format_error(e, m) |
|
461
|
|
|
break |
|
462
|
|
|
except action_exc.ParameterRenderingFailedException as e: |
|
463
|
|
|
# Rendering parameters failed before we even got to running this action, |
|
464
|
|
|
# abort and fail the whole action chain |
|
465
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
466
|
|
|
m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name |
|
467
|
|
|
LOG.exception(m) |
|
468
|
|
|
top_level_error = self._format_error(e, m) |
|
469
|
|
|
break |
|
470
|
|
|
except db_exc.StackStormDBObjectNotFoundError as e: |
|
471
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
472
|
|
|
m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name |
|
473
|
|
|
LOG.exception(m) |
|
474
|
|
|
top_level_error = self._format_error(e, m) |
|
475
|
|
|
break |
|
476
|
|
|
|
|
477
|
|
|
try: |
|
478
|
|
|
# If last task was paused, then fetch the liveaction and resume it first. |
|
479
|
|
|
if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
|
480
|
|
|
LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id) |
|
481
|
|
|
liveaction = self._resume_action(liveaction) |
|
482
|
|
|
else: |
|
483
|
|
|
LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id) |
|
484
|
|
|
liveaction = self._run_action(liveaction) |
|
485
|
|
|
except Exception as e: |
|
486
|
|
|
# Save the traceback and error message |
|
487
|
|
|
m = 'Failed running task "%s".' % action_node.name |
|
488
|
|
|
LOG.exception(m) |
|
489
|
|
|
error = self._format_error(e, m) |
|
490
|
|
|
context_result[action_node.name] = error |
|
491
|
|
|
else: |
|
492
|
|
|
# Update context result |
|
493
|
|
|
context_result[action_node.name] = liveaction.result |
|
494
|
|
|
|
|
495
|
|
|
# Render and publish variables |
|
496
|
|
|
rendered_publish_vars = ActionChainRunner._render_publish_vars( |
|
497
|
|
|
action_node=action_node, action_parameters=action_parameters, |
|
498
|
|
|
execution_result=liveaction.result, previous_execution_results=context_result, |
|
499
|
|
|
chain_vars=self.chain_holder.vars) |
|
500
|
|
|
|
|
501
|
|
|
if rendered_publish_vars: |
|
502
|
|
|
self.chain_holder.vars.update(rendered_publish_vars) |
|
503
|
|
|
if self._display_published: |
|
504
|
|
|
result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
|
505
|
|
|
finally: |
|
506
|
|
|
# Record result and resolve a next node based on the task success or failure |
|
507
|
|
|
updated_at = date_utils.get_datetime_utc_now() |
|
508
|
|
|
|
|
509
|
|
|
task_result = self._format_action_exec_result( |
|
510
|
|
|
action_node, |
|
511
|
|
|
liveaction, |
|
512
|
|
|
created_at, |
|
513
|
|
|
updated_at, |
|
514
|
|
|
error=error |
|
515
|
|
|
) |
|
516
|
|
|
|
|
517
|
|
|
result['tasks'].append(task_result) |
|
518
|
|
|
|
|
519
|
|
|
try: |
|
520
|
|
|
if not liveaction: |
|
521
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
522
|
|
|
action_node = self.chain_holder.get_next_node( |
|
523
|
|
|
action_node.name, condition='on-failure') |
|
524
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT: |
|
525
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT |
|
526
|
|
|
action_node = self.chain_holder.get_next_node( |
|
527
|
|
|
action_node.name, condition='on-failure') |
|
528
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
|
529
|
|
|
LOG.info('Chain execution (%s) canceled because task "%s" is canceled.', |
|
530
|
|
|
self.liveaction_id, action_node.name) |
|
531
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
|
532
|
|
|
action_node = None |
|
533
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
|
534
|
|
|
LOG.info('Chain execution (%s) paused because task "%s" is paused.', |
|
535
|
|
|
self.liveaction_id, action_node.name) |
|
536
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
|
537
|
|
|
self._save_vars() |
|
538
|
|
|
action_node = None |
|
539
|
|
|
elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
|
540
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
541
|
|
|
action_node = self.chain_holder.get_next_node( |
|
542
|
|
|
action_node.name, condition='on-failure') |
|
543
|
|
|
elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
|
544
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
|
545
|
|
|
action_node = self.chain_holder.get_next_node( |
|
546
|
|
|
action_node.name, condition='on-success') |
|
547
|
|
|
else: |
|
548
|
|
|
action_node = None |
|
549
|
|
|
except Exception as e: |
|
550
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_FAILED |
|
551
|
|
|
m = 'Failed to get next node "%s".' % action_node.name |
|
552
|
|
|
LOG.exception(m) |
|
553
|
|
|
top_level_error = self._format_error(e, m) |
|
554
|
|
|
action_node = None |
|
555
|
|
|
break |
|
556
|
|
|
|
|
557
|
|
|
if action_service.is_action_canceled_or_canceling(self.liveaction.id): |
|
558
|
|
|
LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id) |
|
559
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
|
560
|
|
|
return (chain_status, result, None) |
|
561
|
|
|
|
|
562
|
|
|
if action_service.is_action_paused_or_pausing(self.liveaction.id): |
|
563
|
|
|
LOG.info('Chain execution (%s) paused by user.', self.liveaction.id) |
|
564
|
|
|
chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
|
565
|
|
|
self._save_vars() |
|
566
|
|
|
return (chain_status, result, self.liveaction.context) |
|
567
|
|
|
|
|
568
|
|
|
if top_level_error and isinstance(top_level_error, dict): |
|
569
|
|
|
result.update(top_level_error) |
|
570
|
|
|
|
|
571
|
|
|
return (chain_status, result, self.liveaction.context) |
|
572
|
|
|
|
|
573
|
|
|
def _format_error(self, e, msg): |
|
574
|
|
|
return { |
|
575
|
|
|
'error': '%s. %s' % (msg, str(e)), |
|
576
|
|
|
'traceback': traceback.format_exc(10) |
|
577
|
|
|
} |
|
578
|
|
|
|
|
579
|
|
|
def _save_vars(self): |
|
580
|
|
|
# Save the context vars in the liveaction context. |
|
581
|
|
|
self.liveaction.context['vars'] = self.chain_holder.vars |
|
582
|
|
|
|
|
583
|
|
|
@staticmethod |
|
584
|
|
|
def _render_publish_vars(action_node, action_parameters, execution_result, |
|
585
|
|
|
previous_execution_results, chain_vars): |
|
586
|
|
|
""" |
|
587
|
|
|
If no output is specified on the action_node the output is the entire execution_result. |
|
588
|
|
|
If any output is specified then only those variables are published as output of an |
|
589
|
|
|
execution of this action_node. |
|
590
|
|
|
The output variable can refer to a variable from the execution_result, |
|
591
|
|
|
previous_execution_results or chain_vars. |
|
592
|
|
|
""" |
|
593
|
|
|
if not action_node.publish: |
|
594
|
|
|
return {} |
|
595
|
|
|
|
|
596
|
|
|
context = {} |
|
597
|
|
|
context.update(action_parameters) |
|
598
|
|
|
context.update({action_node.name: execution_result}) |
|
599
|
|
|
context.update(previous_execution_results) |
|
600
|
|
|
context.update(chain_vars) |
|
601
|
|
|
context.update({RESULTS_KEY: previous_execution_results}) |
|
602
|
|
|
|
|
603
|
|
|
context.update({ |
|
604
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
|
605
|
|
|
scope=kv_constants.SYSTEM_SCOPE) |
|
606
|
|
|
}) |
|
607
|
|
|
|
|
608
|
|
|
context.update({ |
|
609
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
|
610
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
|
611
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
|
612
|
|
|
} |
|
613
|
|
|
}) |
|
614
|
|
|
|
|
615
|
|
|
try: |
|
616
|
|
|
rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
|
617
|
|
|
context=context) |
|
618
|
|
|
except Exception as e: |
|
619
|
|
|
key = getattr(e, 'key', None) |
|
620
|
|
|
value = getattr(e, 'value', None) |
|
621
|
|
|
msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
|
622
|
|
|
'(template string=%s): %s' % (key, action_node.name, value, str(e))) |
|
623
|
|
|
raise action_exc.ParameterRenderingFailedException(msg) |
|
624
|
|
|
|
|
625
|
|
|
return rendered_result |
|
626
|
|
|
|
|
627
|
|
|
@staticmethod |
|
628
|
|
|
def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
|
629
|
|
|
# setup context with original parameters and the intermediate results. |
|
630
|
|
|
chain_parent = chain_context.get('parent', {}) |
|
631
|
|
|
pack = chain_parent.get('pack') |
|
632
|
|
|
user = chain_parent.get('user') |
|
633
|
|
|
|
|
634
|
|
|
config = get_config(pack, user) |
|
635
|
|
|
|
|
636
|
|
|
context = {} |
|
637
|
|
|
context.update(original_parameters) |
|
638
|
|
|
context.update(results) |
|
639
|
|
|
context.update(chain_vars) |
|
640
|
|
|
context.update({RESULTS_KEY: results}) |
|
641
|
|
|
|
|
642
|
|
|
context.update({ |
|
643
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
|
644
|
|
|
scope=kv_constants.SYSTEM_SCOPE) |
|
645
|
|
|
}) |
|
646
|
|
|
|
|
647
|
|
|
context.update({ |
|
648
|
|
|
kv_constants.DATASTORE_PARENT_SCOPE: { |
|
649
|
|
|
kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
|
650
|
|
|
scope=kv_constants.FULL_SYSTEM_SCOPE) |
|
651
|
|
|
} |
|
652
|
|
|
}) |
|
653
|
|
|
context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context}) |
|
654
|
|
|
context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config}) |
|
655
|
|
|
try: |
|
656
|
|
|
rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
|
657
|
|
|
context=context) |
|
658
|
|
|
except Exception as e: |
|
659
|
|
|
LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
|
660
|
|
|
|
|
661
|
|
|
key = getattr(e, 'key', None) |
|
662
|
|
|
value = getattr(e, 'value', None) |
|
663
|
|
|
msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
|
664
|
|
|
'(template string=%s): %s') % (key, action_node.name, value, str(e)) |
|
665
|
|
|
raise action_exc.ParameterRenderingFailedException(msg) |
|
666
|
|
|
LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
|
667
|
|
|
return rendered_params |
|
668
|
|
|
|
|
669
|
|
|
def _get_next_action(self, action_node, parent_context, action_params, context_result): |
|
670
|
|
|
# Verify that the referenced action exists |
|
671
|
|
|
# TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
|
672
|
|
|
task_name = action_node.name |
|
673
|
|
|
action_ref = action_node.ref |
|
674
|
|
|
action_db = action_db_util.get_action_by_ref(ref=action_ref) |
|
675
|
|
|
|
|
676
|
|
|
if not action_db: |
|
677
|
|
|
error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
|
678
|
|
|
raise action_exc.InvalidActionReferencedException(error) |
|
679
|
|
|
|
|
680
|
|
|
resolved_params = ActionChainRunner._resolve_params( |
|
681
|
|
|
action_node=action_node, original_parameters=action_params, |
|
682
|
|
|
results=context_result, chain_vars=self.chain_holder.vars, |
|
683
|
|
|
chain_context={'parent': parent_context}) |
|
684
|
|
|
|
|
685
|
|
|
liveaction = self._build_liveaction_object( |
|
686
|
|
|
action_node=action_node, |
|
687
|
|
|
resolved_params=resolved_params, |
|
688
|
|
|
parent_context=parent_context) |
|
689
|
|
|
|
|
690
|
|
|
return liveaction |
|
691
|
|
|
|
|
692
|
|
|
def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
|
693
|
|
|
""" |
|
694
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
|
695
|
|
|
:type sleep_delay: ``float`` |
|
696
|
|
|
""" |
|
697
|
|
|
try: |
|
698
|
|
|
liveaction, _ = action_service.request(liveaction) |
|
699
|
|
|
except Exception as e: |
|
700
|
|
|
liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
|
701
|
|
|
LOG.exception('Failed to schedule liveaction.') |
|
702
|
|
|
raise e |
|
703
|
|
|
|
|
704
|
|
|
while (wait_for_completion and liveaction.status not in ( |
|
705
|
|
|
action_constants.LIVEACTION_COMPLETED_STATES + |
|
706
|
|
|
[action_constants.LIVEACTION_STATUS_PAUSED])): |
|
707
|
|
|
eventlet.sleep(sleep_delay) |
|
708
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
|
709
|
|
|
|
|
710
|
|
|
return liveaction |
|
711
|
|
|
|
|
712
|
|
|
def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
|
713
|
|
|
""" |
|
714
|
|
|
:param sleep_delay: Number of seconds to wait during "is completed" polls. |
|
715
|
|
|
:type sleep_delay: ``float`` |
|
716
|
|
|
""" |
|
717
|
|
|
try: |
|
718
|
|
|
user = self.context.get('user', None) |
|
719
|
|
|
liveaction, _ = action_service.request_resume(liveaction, user) |
|
720
|
|
|
except Exception as e: |
|
721
|
|
|
liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
|
722
|
|
|
LOG.exception('Failed to schedule liveaction.') |
|
723
|
|
|
raise e |
|
724
|
|
|
|
|
725
|
|
|
while (wait_for_completion and liveaction.status not in ( |
|
726
|
|
|
action_constants.LIVEACTION_COMPLETED_STATES + |
|
727
|
|
|
[action_constants.LIVEACTION_STATUS_PAUSED])): |
|
728
|
|
|
eventlet.sleep(sleep_delay) |
|
729
|
|
|
liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
|
730
|
|
|
|
|
731
|
|
|
return liveaction |
|
732
|
|
|
|
|
733
|
|
|
def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
|
734
|
|
|
liveaction = LiveActionDB(action=action_node.ref) |
|
735
|
|
|
|
|
736
|
|
|
# Setup notify for task in chain. |
|
737
|
|
|
notify = self._get_notify(action_node) |
|
738
|
|
|
if notify: |
|
739
|
|
|
liveaction.notify = notify |
|
740
|
|
|
LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
|
741
|
|
|
|
|
742
|
|
|
liveaction.context = { |
|
743
|
|
|
'parent': parent_context, |
|
744
|
|
|
'chain': vars(action_node) |
|
745
|
|
|
} |
|
746
|
|
|
liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
|
747
|
|
|
params=resolved_params) |
|
748
|
|
|
return liveaction |
|
749
|
|
|
|
|
750
|
|
|
def _get_notify(self, action_node): |
|
751
|
|
|
if action_node.name not in self._skip_notify_tasks: |
|
752
|
|
|
if action_node.notify: |
|
753
|
|
|
task_notify = NotificationsHelper.to_model(action_node.notify) |
|
754
|
|
|
return task_notify |
|
755
|
|
|
elif self._chain_notify: |
|
756
|
|
|
return self._chain_notify |
|
757
|
|
|
|
|
758
|
|
|
return None |
|
759
|
|
|
|
|
760
|
|
|
def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result): |
|
761
|
|
|
if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES: |
|
762
|
|
|
created_at = isotime.parse(prev_task_result['created_at']) |
|
763
|
|
|
updated_at = liveaction.end_timestamp |
|
764
|
|
|
else: |
|
765
|
|
|
created_at = isotime.parse(prev_task_result['created_at']) |
|
766
|
|
|
updated_at = isotime.parse(prev_task_result['updated_at']) |
|
767
|
|
|
|
|
768
|
|
|
return self._format_action_exec_result(action_node, liveaction, created_at, updated_at) |
|
769
|
|
|
|
|
770
|
|
|
def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
|
771
|
|
|
error=None): |
|
772
|
|
|
""" |
|
773
|
|
|
Format ActionExecution result so it can be used in the final action result output. |
|
774
|
|
|
|
|
775
|
|
|
:rtype: ``dict`` |
|
776
|
|
|
""" |
|
777
|
|
|
assert isinstance(created_at, datetime.datetime) |
|
778
|
|
|
assert isinstance(updated_at, datetime.datetime) |
|
779
|
|
|
|
|
780
|
|
|
result = {} |
|
781
|
|
|
|
|
782
|
|
|
execution_db = None |
|
783
|
|
|
if liveaction_db: |
|
784
|
|
|
execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
|
785
|
|
|
|
|
786
|
|
|
result['id'] = action_node.name |
|
787
|
|
|
result['name'] = action_node.name |
|
788
|
|
|
result['execution_id'] = str(execution_db.id) if execution_db else None |
|
789
|
|
|
result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None |
|
790
|
|
|
result['workflow'] = None |
|
791
|
|
|
|
|
792
|
|
|
result['created_at'] = isotime.format(dt=created_at) |
|
793
|
|
|
result['updated_at'] = isotime.format(dt=updated_at) |
|
794
|
|
|
|
|
795
|
|
|
if error or not liveaction_db: |
|
796
|
|
|
result['state'] = action_constants.LIVEACTION_STATUS_FAILED |
|
797
|
|
|
else: |
|
798
|
|
|
result['state'] = liveaction_db.status |
|
799
|
|
|
|
|
800
|
|
|
if error: |
|
801
|
|
|
result['result'] = error |
|
802
|
|
|
else: |
|
803
|
|
|
result['result'] = liveaction_db.result |
|
804
|
|
|
|
|
805
|
|
|
return result |
|
806
|
|
|
|
|
807
|
|
|
|
|
808
|
|
|
def get_runner(): |
|
809
|
|
|
return ActionChainRunner(str(uuid.uuid4())) |
|
810
|
|
|
|