1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import copy |
||
18 | import eventlet |
||
19 | import traceback |
||
20 | import uuid |
||
21 | import datetime |
||
22 | |||
23 | from jsonschema import exceptions as json_schema_exc |
||
24 | |||
25 | from st2common.runners.base import ActionRunner |
||
26 | from st2common.runners.base import get_metadata as get_runner_metadata |
||
27 | from st2common import log as logging |
||
28 | from st2common.constants import action as action_constants |
||
29 | from st2common.constants import pack as pack_constants |
||
30 | from st2common.constants import keyvalue as kv_constants |
||
31 | from st2common.content.loader import MetaLoader |
||
32 | from st2common.exceptions import action as action_exc |
||
33 | from st2common.exceptions import actionrunner as runner_exc |
||
34 | from st2common.exceptions import db as db_exc |
||
35 | from st2common.models.api.notification import NotificationsHelper |
||
36 | from st2common.models.db.liveaction import LiveActionDB |
||
37 | from st2common.models.system import actionchain |
||
38 | from st2common.models.utils import action_param_utils |
||
39 | from st2common.persistence.execution import ActionExecution |
||
40 | from st2common.persistence.liveaction import LiveAction |
||
41 | from st2common.services import action as action_service |
||
42 | from st2common.services import keyvalues as kv_service |
||
43 | from st2common.util import action_db as action_db_util |
||
44 | from st2common.util import isotime |
||
45 | from st2common.util import date as date_utils |
||
46 | from st2common.util import jinja as jinja_utils |
||
47 | from st2common.util import param as param_utils |
||
48 | from st2common.util.config_loader import get_config |
||
49 | |||
50 | __all__ = [ |
||
51 | 'ActionChainRunner', |
||
52 | 'ChainHolder', |
||
53 | |||
54 | 'get_runner', |
||
55 | 'get_metadata' |
||
56 | ] |
||
57 | |||
58 | LOG = logging.getLogger(__name__) |
||
59 | |||
60 | RESULTS_KEY = '__results' |
||
61 | JINJA_START_MARKERS = [ |
||
62 | '{{', |
||
63 | '{%' |
||
64 | ] |
||
65 | PUBLISHED_VARS_KEY = 'published' |
||
66 | |||
67 | |||
68 | class ChainHolder(object): |
||
69 | |||
70 | def __init__(self, chainspec, chainname): |
||
71 | self.actionchain = actionchain.ActionChain(**chainspec) |
||
72 | self.chainname = chainname |
||
73 | |||
74 | if not self.actionchain.default: |
||
75 | default = self._get_default(self.actionchain) |
||
76 | self.actionchain.default = default |
||
77 | |||
78 | LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname) |
||
79 | if not self.actionchain.default: |
||
80 | raise Exception('Failed to find default node in %s.' % (self.chainname)) |
||
81 | |||
82 | self.vars = {} |
||
83 | |||
84 | def init_vars(self, action_parameters): |
||
85 | if self.actionchain.vars: |
||
86 | self.vars = self._get_rendered_vars(self.actionchain.vars, |
||
87 | action_parameters=action_parameters) |
||
88 | |||
89 | def restore_vars(self, ctx_vars): |
||
90 | self.vars.update(copy.deepcopy(ctx_vars)) |
||
91 | |||
92 | def validate(self): |
||
93 | """ |
||
94 | Function which performs a simple compile time validation. |
||
95 | |||
96 | Keep in mind that some variables are only resolved during run time which means we can |
||
97 | perform only simple validation during compile / create time. |
||
98 | """ |
||
99 | all_nodes = self._get_all_nodes(action_chain=self.actionchain) |
||
100 | |||
101 | for node in self.actionchain.chain: |
||
102 | on_success_node_name = node.on_success |
||
103 | on_failure_node_name = node.on_failure |
||
104 | |||
105 | # Check "on-success" path |
||
106 | valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
||
107 | node_name=on_success_node_name) |
||
108 | if not valid_name: |
||
109 | msg = ('Unable to find node with name "%s" referenced in "on-success" in ' |
||
110 | 'task "%s".' % (on_success_node_name, node.name)) |
||
111 | raise ValueError(msg) |
||
112 | |||
113 | # Check "on-failure" path |
||
114 | valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
||
115 | node_name=on_failure_node_name) |
||
116 | if not valid_name: |
||
117 | msg = ('Unable to find node with name "%s" referenced in "on-failure" in ' |
||
118 | 'task "%s".' % (on_failure_node_name, node.name)) |
||
119 | raise ValueError(msg) |
||
120 | |||
121 | # check if node specified in default is valid. |
||
122 | if self.actionchain.default: |
||
123 | valid_name = self._is_valid_node_name(all_node_names=all_nodes, |
||
124 | node_name=self.actionchain.default) |
||
125 | if not valid_name: |
||
126 | msg = ('Unable to find node with name "%s" referenced in "default".' % |
||
127 | self.actionchain.default) |
||
128 | raise ValueError(msg) |
||
129 | return True |
||
130 | |||
131 | @staticmethod |
||
132 | def _get_default(action_chain): |
||
133 | # default is defined |
||
134 | if action_chain.default: |
||
135 | return action_chain.default |
||
136 | # no nodes in chain |
||
137 | if not action_chain.chain: |
||
138 | return None |
||
139 | # The first node with no references is the default node. Assumptions |
||
140 | # that support this are : |
||
141 | # 1. There are no loops in the chain. Even if there are loops there is |
||
142 | # at least 1 node which does not end up in this loop. |
||
143 | # 2. There are no fragments in the chain. |
||
144 | all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain) |
||
145 | node_names = set(all_nodes) |
||
146 | on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain) |
||
147 | on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain) |
||
148 | referenced_nodes = on_success_nodes | on_failure_nodes |
||
149 | possible_default_nodes = node_names - referenced_nodes |
||
150 | if possible_default_nodes: |
||
151 | # This is to preserve order. set([..]) does not preserve the order so iterate |
||
152 | # over original array. |
||
153 | for node in all_nodes: |
||
154 | if node in possible_default_nodes: |
||
155 | return node |
||
156 | # If no node is found assume the first node in the chain list to be default. |
||
157 | return action_chain.chain[0].name |
||
158 | |||
159 | @staticmethod |
||
160 | def _get_all_nodes(action_chain): |
||
161 | """ |
||
162 | Return names for all the nodes in the chain. |
||
163 | """ |
||
164 | all_nodes = [node.name for node in action_chain.chain] |
||
165 | return all_nodes |
||
166 | |||
167 | @staticmethod |
||
168 | def _get_all_on_success_nodes(action_chain): |
||
169 | """ |
||
170 | Return names for all the tasks referenced in "on-success". |
||
171 | """ |
||
172 | on_success_nodes = set([node.on_success for node in action_chain.chain]) |
||
173 | return on_success_nodes |
||
174 | |||
175 | @staticmethod |
||
176 | def _get_all_on_failure_nodes(action_chain): |
||
177 | """ |
||
178 | Return names for all the tasks referenced in "on-failure". |
||
179 | """ |
||
180 | on_failure_nodes = set([node.on_failure for node in action_chain.chain]) |
||
181 | return on_failure_nodes |
||
182 | |||
183 | def _is_valid_node_name(self, all_node_names, node_name): |
||
184 | """ |
||
185 | Function which validates that the provided node name is defined in the workflow definition |
||
186 | and it's valid. |
||
187 | |||
188 | Keep in mind that we can only perform validation for task names which don't include jinja |
||
189 | expressions since those are rendered at run time. |
||
190 | """ |
||
191 | if not node_name: |
||
192 | # This task name needs to be resolved during run time so we cant validate the name now |
||
193 | return True |
||
194 | |||
195 | is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name) |
||
196 | if is_jinja_expression: |
||
197 | # This task name needs to be resolved during run time so we cant validate the name |
||
198 | # now |
||
199 | return True |
||
200 | |||
201 | return node_name in all_node_names |
||
202 | |||
203 | @staticmethod |
||
204 | def _get_rendered_vars(vars, action_parameters): |
||
205 | if not vars: |
||
206 | return {} |
||
207 | context = {} |
||
208 | context.update({ |
||
209 | kv_constants.DATASTORE_PARENT_SCOPE: { |
||
210 | kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
||
211 | scope=kv_constants.FULL_SYSTEM_SCOPE) |
||
212 | } |
||
213 | }) |
||
214 | context.update(action_parameters) |
||
215 | LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context) |
||
216 | return jinja_utils.render_values(mapping=vars, context=context) |
||
217 | |||
218 | def get_node(self, node_name=None, raise_on_failure=False): |
||
219 | if not node_name: |
||
220 | return None |
||
221 | for node in self.actionchain.chain: |
||
222 | if node.name == node_name: |
||
223 | return node |
||
224 | if raise_on_failure: |
||
225 | raise runner_exc.ActionRunnerException( |
||
226 | 'Unable to find node with name "%s".' % (node_name)) |
||
227 | return None |
||
228 | |||
229 | def get_next_node(self, curr_node_name=None, condition='on-success'): |
||
230 | if not curr_node_name: |
||
231 | return self.get_node(self.actionchain.default) |
||
232 | current_node = self.get_node(curr_node_name) |
||
233 | if condition == 'on-success': |
||
234 | return self.get_node(current_node.on_success, raise_on_failure=True) |
||
235 | elif condition == 'on-failure': |
||
236 | return self.get_node(current_node.on_failure, raise_on_failure=True) |
||
237 | raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition) |
||
238 | |||
239 | |||
240 | class ActionChainRunner(ActionRunner): |
||
241 | |||
242 | def __init__(self, runner_id): |
||
243 | super(ActionChainRunner, self).__init__(runner_id=runner_id) |
||
244 | self.chain_holder = None |
||
245 | self._meta_loader = MetaLoader() |
||
246 | self._skip_notify_tasks = [] |
||
247 | self._display_published = True |
||
248 | self._chain_notify = None |
||
249 | |||
250 | def pre_run(self): |
||
251 | super(ActionChainRunner, self).pre_run() |
||
252 | |||
253 | chainspec_file = self.entry_point |
||
254 | LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
||
255 | self.action) |
||
256 | |||
257 | try: |
||
258 | chainspec = self._meta_loader.load(file_path=chainspec_file, |
||
259 | expected_type=dict) |
||
260 | except Exception as e: |
||
261 | message = ('Failed to parse action chain definition from "%s": %s' % |
||
262 | (chainspec_file, str(e))) |
||
263 | LOG.exception('Failed to load action chain definition.') |
||
264 | raise runner_exc.ActionRunnerPreRunError(message) |
||
265 | |||
266 | try: |
||
267 | self.chain_holder = ChainHolder(chainspec, self.action_name) |
||
268 | except json_schema_exc.ValidationError as e: |
||
269 | # preserve the whole nasty jsonschema message as that is better to get to the |
||
270 | # root cause |
||
271 | message = str(e) |
||
272 | LOG.exception('Failed to instantiate ActionChain.') |
||
273 | raise runner_exc.ActionRunnerPreRunError(message) |
||
274 | except Exception as e: |
||
275 | message = str(e) |
||
276 | LOG.exception('Failed to instantiate ActionChain.') |
||
277 | raise runner_exc.ActionRunnerPreRunError(message) |
||
278 | |||
279 | # Runner attributes are set lazily. So these steps |
||
280 | # should happen outside the constructor. |
||
281 | if getattr(self, 'liveaction', None): |
||
282 | self._chain_notify = getattr(self.liveaction, 'notify', None) |
||
283 | if self.runner_parameters: |
||
284 | self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
||
285 | self._display_published = self.runner_parameters.get('display_published', True) |
||
286 | |||
287 | # Perform some pre-run chain validation |
||
288 | try: |
||
289 | self.chain_holder.validate() |
||
290 | except Exception as e: |
||
291 | raise runner_exc.ActionRunnerPreRunError(str(e)) |
||
292 | |||
293 | def run(self, action_parameters): |
||
294 | # Run the action chain. |
||
295 | return self._run_chain(action_parameters) |
||
296 | |||
297 | def cancel(self): |
||
298 | # Identify the list of action executions that are workflows and cascade pause. |
||
299 | for child_exec_id in self.execution.children: |
||
300 | child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True) |
||
301 | if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and |
||
302 | child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES): |
||
303 | action_service.request_cancellation( |
||
304 | LiveAction.get(id=child_exec.liveaction['id']), |
||
305 | self.context.get('user', None) |
||
306 | ) |
||
307 | |||
308 | return ( |
||
309 | action_constants.LIVEACTION_STATUS_CANCELING, |
||
310 | self.liveaction.result, |
||
311 | self.liveaction.context |
||
312 | ) |
||
313 | |||
314 | def pause(self): |
||
315 | # Identify the list of action executions that are workflows and cascade pause. |
||
316 | for child_exec_id in self.execution.children: |
||
317 | child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True) |
||
318 | if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and |
||
319 | child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING): |
||
320 | action_service.request_pause( |
||
321 | LiveAction.get(id=child_exec.liveaction['id']), |
||
322 | self.context.get('user', None) |
||
323 | ) |
||
324 | |||
325 | return ( |
||
326 | action_constants.LIVEACTION_STATUS_PAUSING, |
||
327 | self.liveaction.result, |
||
328 | self.liveaction.context |
||
329 | ) |
||
330 | |||
331 | def resume(self): |
||
332 | # Restore runner and action parameters since they are not provided on resume. |
||
333 | runner_parameters, action_parameters = param_utils.render_final_params( |
||
334 | self.runner_type.runner_parameters, |
||
335 | self.action.parameters, |
||
336 | self.liveaction.parameters, |
||
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
337 | self.liveaction.context |
||
0 ignored issues
–
show
|
|||
338 | ) |
||
339 | |||
340 | # Assign runner parameters needed for pre-run. |
||
341 | if runner_parameters: |
||
342 | self.runner_parameters = runner_parameters |
||
343 | |||
344 | # Restore chain holder if it is not initialized. |
||
345 | if not self.chain_holder: |
||
346 | self.pre_run() |
||
347 | |||
348 | # Change the status of the liveaction from resuming to running. |
||
349 | self.liveaction = action_service.update_status( |
||
350 | self.liveaction, |
||
351 | action_constants.LIVEACTION_STATUS_RUNNING, |
||
352 | publish=False |
||
353 | ) |
||
354 | |||
355 | # Run the action chain. |
||
356 | return self._run_chain(action_parameters, resuming=True) |
||
357 | |||
358 | def _run_chain(self, action_parameters, resuming=False): |
||
359 | # Set chain status to fail unless explicitly set to succeed. |
||
360 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
361 | |||
362 | # Result holds the final result that the chain store in the database. |
||
363 | result = {'tasks': []} |
||
364 | |||
365 | # Save published variables into the result if specified. |
||
366 | if self._display_published: |
||
367 | result[PUBLISHED_VARS_KEY] = {} |
||
368 | |||
369 | context_result = {} # Holds result which is used for the template context purposes |
||
370 | top_level_error = None # Stores a reference to a top level error |
||
371 | action_node = None |
||
372 | last_task = None |
||
373 | |||
374 | try: |
||
375 | # Initialize vars with the action parameters. |
||
376 | # This allows action parameers to be referenced from vars. |
||
377 | self.chain_holder.init_vars(action_parameters) |
||
378 | except Exception as e: |
||
379 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
380 | m = 'Failed initializing ``vars`` in chain.' |
||
381 | LOG.exception(m) |
||
382 | top_level_error = self._format_error(e, m) |
||
383 | result.update(top_level_error) |
||
384 | return (chain_status, result, None) |
||
385 | |||
386 | # Restore state on resuming an existing chain execution. |
||
387 | if resuming: |
||
388 | # Restore vars is any from the liveaction. |
||
389 | ctx_vars = self.liveaction.context.pop('vars', {}) |
||
390 | self.chain_holder.restore_vars(ctx_vars) |
||
391 | |||
392 | # Restore result if any from the liveaction. |
||
393 | if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result: |
||
394 | result = self.liveaction.result |
||
395 | |||
396 | # Initialize or rebuild existing context_result from liveaction |
||
397 | # which holds the result used for resolving context in Jinja template. |
||
398 | for task in result.get('tasks', []): |
||
399 | context_result[task['name']] = task['result'] |
||
400 | |||
401 | # Restore or initialize the top_level_error |
||
402 | # that stores a reference to a top level error. |
||
403 | if 'error' in result or 'traceback' in result: |
||
404 | top_level_error = { |
||
405 | 'error': result.get('error'), |
||
406 | 'traceback': result.get('traceback') |
||
407 | } |
||
408 | |||
409 | # If there are no executed tasks in the chain, then get the first node. |
||
410 | if len(result['tasks']) <= 0: |
||
411 | try: |
||
412 | action_node = self.chain_holder.get_next_node() |
||
413 | except Exception as e: |
||
414 | m = 'Failed to get starting node "%s".', action_node.name |
||
415 | LOG.exception(m) |
||
416 | top_level_error = self._format_error(e, m) |
||
417 | |||
418 | # If there are no action node to run next, then mark the chain successful. |
||
419 | if not action_node: |
||
420 | chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
||
421 | |||
422 | # Otherwise, figure out the last task executed and |
||
423 | # its state to determine where to begin executing. |
||
424 | else: |
||
425 | last_task = result['tasks'][-1] |
||
426 | action_node = self.chain_holder.get_node(last_task['name']) |
||
427 | liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
||
428 | |||
429 | # If the liveaction of the last task has changed, update the result entry. |
||
430 | if liveaction.status != last_task['state']: |
||
431 | updated_task_result = self._get_updated_action_exec_result( |
||
432 | action_node, liveaction, last_task) |
||
433 | del result['tasks'][-1] |
||
434 | result['tasks'].append(updated_task_result) |
||
435 | |||
436 | # Also need to update context_result so the updated result |
||
437 | # is available to Jinja expressions |
||
438 | updated_task_name = updated_task_result['name'] |
||
439 | context_result[updated_task_name]['result'] = updated_task_result['result'] |
||
440 | |||
441 | # If the last task was canceled, then canceled the chain altogether. |
||
442 | if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
||
443 | chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
||
444 | return (chain_status, result, None) |
||
445 | |||
446 | # If the last task was paused, then stay on this action node. |
||
447 | # This is explicitly put here for clarity. |
||
448 | if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
||
449 | pass |
||
450 | |||
451 | # If the last task succeeded, then get the next on-success action node. |
||
452 | if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
||
453 | chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
||
454 | action_node = self.chain_holder.get_next_node( |
||
455 | last_task['name'], condition='on-success') |
||
456 | |||
457 | # If the last task failed, then get the next on-failure action node. |
||
458 | if liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
||
459 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
460 | action_node = self.chain_holder.get_next_node( |
||
461 | last_task['name'], condition='on-failure') |
||
462 | |||
463 | # Setup parent context. |
||
464 | parent_context = { |
||
465 | 'execution_id': self.execution_id |
||
466 | } |
||
467 | |||
468 | if getattr(self.liveaction, 'context', None): |
||
469 | parent_context.update(self.liveaction.context) |
||
470 | |||
471 | # Run the action chain until there are no more tasks. |
||
472 | while action_node: |
||
473 | error = None |
||
474 | liveaction = None |
||
475 | last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None |
||
476 | created_at = date_utils.get_datetime_utc_now() |
||
477 | |||
478 | try: |
||
479 | # If last task was paused, then fetch the liveaction and resume it first. |
||
480 | if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
||
481 | liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id']) |
||
482 | del result['tasks'][-1] |
||
483 | else: |
||
484 | liveaction = self._get_next_action( |
||
485 | action_node=action_node, parent_context=parent_context, |
||
486 | action_params=action_parameters, context_result=context_result) |
||
487 | except action_exc.InvalidActionReferencedException as e: |
||
488 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
489 | m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
||
490 | (action_node.name, action_node.ref)) |
||
491 | LOG.exception(m) |
||
492 | top_level_error = self._format_error(e, m) |
||
493 | break |
||
494 | except action_exc.ParameterRenderingFailedException as e: |
||
495 | # Rendering parameters failed before we even got to running this action, |
||
496 | # abort and fail the whole action chain |
||
497 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
498 | m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name |
||
499 | LOG.exception(m) |
||
500 | top_level_error = self._format_error(e, m) |
||
501 | break |
||
502 | except db_exc.StackStormDBObjectNotFoundError as e: |
||
503 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
504 | m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name |
||
505 | LOG.exception(m) |
||
506 | top_level_error = self._format_error(e, m) |
||
507 | break |
||
508 | |||
509 | try: |
||
510 | # If last task was paused, then fetch the liveaction and resume it first. |
||
511 | if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED: |
||
512 | LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id) |
||
513 | liveaction = self._resume_action(liveaction) |
||
514 | else: |
||
515 | LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id) |
||
516 | liveaction = self._run_action(liveaction) |
||
517 | except Exception as e: |
||
518 | # Save the traceback and error message |
||
519 | m = 'Failed running task "%s".' % action_node.name |
||
520 | LOG.exception(m) |
||
521 | error = self._format_error(e, m) |
||
522 | context_result[action_node.name] = error |
||
523 | else: |
||
524 | # Update context result |
||
525 | context_result[action_node.name] = liveaction.result |
||
526 | |||
527 | # Render and publish variables |
||
528 | rendered_publish_vars = ActionChainRunner._render_publish_vars( |
||
529 | action_node=action_node, action_parameters=action_parameters, |
||
530 | execution_result=liveaction.result, previous_execution_results=context_result, |
||
531 | chain_vars=self.chain_holder.vars) |
||
532 | |||
533 | if rendered_publish_vars: |
||
534 | self.chain_holder.vars.update(rendered_publish_vars) |
||
535 | if self._display_published: |
||
536 | result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
||
537 | finally: |
||
538 | # Record result and resolve a next node based on the task success or failure |
||
539 | updated_at = date_utils.get_datetime_utc_now() |
||
540 | |||
541 | task_result = self._format_action_exec_result( |
||
542 | action_node, |
||
543 | liveaction, |
||
544 | created_at, |
||
545 | updated_at, |
||
546 | error=error |
||
547 | ) |
||
548 | |||
549 | result['tasks'].append(task_result) |
||
550 | |||
551 | try: |
||
552 | if not liveaction: |
||
553 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
554 | action_node = self.chain_holder.get_next_node( |
||
555 | action_node.name, condition='on-failure') |
||
556 | elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT: |
||
557 | chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT |
||
558 | action_node = self.chain_holder.get_next_node( |
||
559 | action_node.name, condition='on-failure') |
||
560 | elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED: |
||
561 | LOG.info('Chain execution (%s) canceled because task "%s" is canceled.', |
||
562 | self.liveaction_id, action_node.name) |
||
563 | chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
||
564 | action_node = None |
||
565 | elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED: |
||
566 | LOG.info('Chain execution (%s) paused because task "%s" is paused.', |
||
567 | self.liveaction_id, action_node.name) |
||
568 | chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
||
569 | self._save_vars() |
||
570 | action_node = None |
||
571 | elif liveaction.status == action_constants.LIVEACTION_STATUS_PENDING: |
||
572 | LOG.info('Chain execution (%s) paused because task "%s" is pending.', |
||
573 | self.liveaction_id, action_node.name) |
||
574 | chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
||
575 | self._save_vars() |
||
576 | action_node = None |
||
577 | elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES: |
||
578 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
579 | action_node = self.chain_holder.get_next_node( |
||
580 | action_node.name, condition='on-failure') |
||
581 | elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED: |
||
582 | chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED |
||
583 | action_node = self.chain_holder.get_next_node( |
||
584 | action_node.name, condition='on-success') |
||
585 | else: |
||
586 | action_node = None |
||
587 | except Exception as e: |
||
588 | chain_status = action_constants.LIVEACTION_STATUS_FAILED |
||
589 | m = 'Failed to get next node "%s".' % action_node.name |
||
590 | LOG.exception(m) |
||
591 | top_level_error = self._format_error(e, m) |
||
592 | action_node = None |
||
593 | break |
||
594 | |||
595 | if action_service.is_action_canceled_or_canceling(self.liveaction.id): |
||
596 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id) |
||
597 | chain_status = action_constants.LIVEACTION_STATUS_CANCELED |
||
598 | return (chain_status, result, None) |
||
599 | |||
600 | if action_service.is_action_paused_or_pausing(self.liveaction.id): |
||
601 | LOG.info('Chain execution (%s) paused by user.', self.liveaction.id) |
||
602 | chain_status = action_constants.LIVEACTION_STATUS_PAUSED |
||
603 | self._save_vars() |
||
604 | return (chain_status, result, self.liveaction.context) |
||
605 | |||
606 | if top_level_error and isinstance(top_level_error, dict): |
||
607 | result.update(top_level_error) |
||
608 | |||
609 | return (chain_status, result, self.liveaction.context) |
||
610 | |||
611 | def _format_error(self, e, msg): |
||
612 | return { |
||
613 | 'error': '%s. %s' % (msg, str(e)), |
||
614 | 'traceback': traceback.format_exc(10) |
||
615 | } |
||
616 | |||
617 | def _save_vars(self): |
||
618 | # Save the context vars in the liveaction context. |
||
619 | self.liveaction.context['vars'] = self.chain_holder.vars |
||
620 | |||
621 | @staticmethod |
||
622 | def _render_publish_vars(action_node, action_parameters, execution_result, |
||
623 | previous_execution_results, chain_vars): |
||
624 | """ |
||
625 | If no output is specified on the action_node the output is the entire execution_result. |
||
626 | If any output is specified then only those variables are published as output of an |
||
627 | execution of this action_node. |
||
628 | The output variable can refer to a variable from the execution_result, |
||
629 | previous_execution_results or chain_vars. |
||
630 | """ |
||
631 | if not action_node.publish: |
||
632 | return {} |
||
633 | |||
634 | context = {} |
||
635 | context.update(action_parameters) |
||
636 | context.update({action_node.name: execution_result}) |
||
637 | context.update(previous_execution_results) |
||
638 | context.update(chain_vars) |
||
639 | context.update({RESULTS_KEY: previous_execution_results}) |
||
640 | |||
641 | context.update({ |
||
642 | kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
||
643 | scope=kv_constants.SYSTEM_SCOPE) |
||
644 | }) |
||
645 | |||
646 | context.update({ |
||
647 | kv_constants.DATASTORE_PARENT_SCOPE: { |
||
648 | kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
||
649 | scope=kv_constants.FULL_SYSTEM_SCOPE) |
||
650 | } |
||
651 | }) |
||
652 | |||
653 | try: |
||
654 | rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
||
655 | context=context) |
||
656 | except Exception as e: |
||
657 | key = getattr(e, 'key', None) |
||
658 | value = getattr(e, 'value', None) |
||
659 | msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
||
660 | '(template string=%s): %s' % (key, action_node.name, value, str(e))) |
||
661 | raise action_exc.ParameterRenderingFailedException(msg) |
||
662 | |||
663 | return rendered_result |
||
664 | |||
665 | @staticmethod |
||
666 | def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
||
667 | # setup context with original parameters and the intermediate results. |
||
668 | chain_parent = chain_context.get('parent', {}) |
||
669 | pack = chain_parent.get('pack') |
||
670 | user = chain_parent.get('user') |
||
671 | |||
672 | config = get_config(pack, user) |
||
673 | |||
674 | context = {} |
||
675 | context.update(original_parameters) |
||
676 | context.update(results) |
||
677 | context.update(chain_vars) |
||
678 | context.update({RESULTS_KEY: results}) |
||
679 | |||
680 | context.update({ |
||
681 | kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
||
682 | scope=kv_constants.SYSTEM_SCOPE) |
||
683 | }) |
||
684 | |||
685 | context.update({ |
||
686 | kv_constants.DATASTORE_PARENT_SCOPE: { |
||
687 | kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup( |
||
688 | scope=kv_constants.FULL_SYSTEM_SCOPE) |
||
689 | } |
||
690 | }) |
||
691 | context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context}) |
||
692 | context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config}) |
||
693 | try: |
||
694 | rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
||
695 | context=context) |
||
696 | except Exception as e: |
||
697 | LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
||
698 | |||
699 | key = getattr(e, 'key', None) |
||
700 | value = getattr(e, 'value', None) |
||
701 | msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
||
702 | '(template string=%s): %s') % (key, action_node.name, value, str(e)) |
||
703 | raise action_exc.ParameterRenderingFailedException(msg) |
||
704 | LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
||
705 | return rendered_params |
||
706 | |||
707 | def _get_next_action(self, action_node, parent_context, action_params, context_result): |
||
708 | # Verify that the referenced action exists |
||
709 | # TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
||
710 | task_name = action_node.name |
||
711 | action_ref = action_node.ref |
||
712 | action_db = action_db_util.get_action_by_ref(ref=action_ref) |
||
713 | |||
714 | if not action_db: |
||
715 | error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
||
716 | raise action_exc.InvalidActionReferencedException(error) |
||
717 | |||
718 | resolved_params = ActionChainRunner._resolve_params( |
||
719 | action_node=action_node, original_parameters=action_params, |
||
720 | results=context_result, chain_vars=self.chain_holder.vars, |
||
721 | chain_context={'parent': parent_context}) |
||
722 | |||
723 | liveaction = self._build_liveaction_object( |
||
724 | action_node=action_node, |
||
725 | resolved_params=resolved_params, |
||
726 | parent_context=parent_context) |
||
727 | |||
728 | return liveaction |
||
729 | |||
730 | def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
||
731 | """ |
||
732 | :param sleep_delay: Number of seconds to wait during "is completed" polls. |
||
733 | :type sleep_delay: ``float`` |
||
734 | """ |
||
735 | try: |
||
736 | liveaction, _ = action_service.request(liveaction) |
||
737 | except Exception as e: |
||
738 | liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
||
739 | LOG.exception('Failed to schedule liveaction.') |
||
740 | raise e |
||
741 | |||
742 | while (wait_for_completion and liveaction.status not in ( |
||
743 | action_constants.LIVEACTION_COMPLETED_STATES + |
||
744 | [action_constants.LIVEACTION_STATUS_PAUSED, |
||
745 | action_constants.LIVEACTION_STATUS_PENDING])): |
||
746 | eventlet.sleep(sleep_delay) |
||
747 | liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
||
748 | |||
749 | return liveaction |
||
750 | |||
751 | def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
||
752 | """ |
||
753 | :param sleep_delay: Number of seconds to wait during "is completed" polls. |
||
754 | :type sleep_delay: ``float`` |
||
755 | """ |
||
756 | try: |
||
757 | user = self.context.get('user', None) |
||
758 | liveaction, _ = action_service.request_resume(liveaction, user) |
||
759 | except Exception as e: |
||
760 | liveaction.status = action_constants.LIVEACTION_STATUS_FAILED |
||
761 | LOG.exception('Failed to schedule liveaction.') |
||
762 | raise e |
||
763 | |||
764 | while (wait_for_completion and liveaction.status not in ( |
||
765 | action_constants.LIVEACTION_COMPLETED_STATES + |
||
766 | [action_constants.LIVEACTION_STATUS_PAUSED])): |
||
767 | eventlet.sleep(sleep_delay) |
||
768 | liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
||
769 | |||
770 | return liveaction |
||
771 | |||
772 | def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
||
773 | liveaction = LiveActionDB(action=action_node.ref) |
||
774 | |||
775 | # Setup notify for task in chain. |
||
776 | notify = self._get_notify(action_node) |
||
777 | if notify: |
||
778 | liveaction.notify = notify |
||
779 | LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
||
780 | |||
781 | liveaction.context = { |
||
782 | 'parent': parent_context, |
||
783 | 'chain': vars(action_node) |
||
784 | } |
||
785 | liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
||
786 | params=resolved_params) |
||
787 | return liveaction |
||
788 | |||
789 | def _get_notify(self, action_node): |
||
790 | if action_node.name not in self._skip_notify_tasks: |
||
791 | if action_node.notify: |
||
792 | task_notify = NotificationsHelper.to_model(action_node.notify) |
||
793 | return task_notify |
||
794 | elif self._chain_notify: |
||
795 | return self._chain_notify |
||
796 | |||
797 | return None |
||
798 | |||
799 | def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result): |
||
800 | if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES: |
||
801 | created_at = isotime.parse(prev_task_result['created_at']) |
||
802 | updated_at = liveaction.end_timestamp |
||
803 | else: |
||
804 | created_at = isotime.parse(prev_task_result['created_at']) |
||
805 | updated_at = isotime.parse(prev_task_result['updated_at']) |
||
806 | |||
807 | return self._format_action_exec_result(action_node, liveaction, created_at, updated_at) |
||
808 | |||
809 | def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
||
810 | error=None): |
||
811 | """ |
||
812 | Format ActionExecution result so it can be used in the final action result output. |
||
813 | |||
814 | :rtype: ``dict`` |
||
815 | """ |
||
816 | assert isinstance(created_at, datetime.datetime) |
||
817 | assert isinstance(updated_at, datetime.datetime) |
||
818 | |||
819 | result = {} |
||
820 | |||
821 | execution_db = None |
||
822 | if liveaction_db: |
||
823 | execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
||
824 | |||
825 | result['id'] = action_node.name |
||
826 | result['name'] = action_node.name |
||
827 | result['execution_id'] = str(execution_db.id) if execution_db else None |
||
828 | result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None |
||
829 | result['workflow'] = None |
||
830 | |||
831 | result['created_at'] = isotime.format(dt=created_at) |
||
832 | result['updated_at'] = isotime.format(dt=updated_at) |
||
833 | |||
834 | if error or not liveaction_db: |
||
835 | result['state'] = action_constants.LIVEACTION_STATUS_FAILED |
||
836 | else: |
||
837 | result['state'] = liveaction_db.status |
||
838 | |||
839 | if error: |
||
840 | result['result'] = error |
||
841 | else: |
||
842 | result['result'] = liveaction_db.result |
||
843 | |||
844 | return result |
||
845 | |||
846 | |||
847 | def get_runner(): |
||
848 | return ActionChainRunner(str(uuid.uuid4())) |
||
849 | |||
850 | |||
851 | def get_metadata(): |
||
852 | return get_runner_metadata('action_chain_runner') |
||
853 |