Total Complexity | 60 |
Total Lines | 386 |
Duplicated Lines | 0 % |
Complex classes like st2actions.runners.ActionChainRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
221 | class ActionChainRunner(ActionRunner): |
||
222 | |||
223 | def __init__(self, runner_id): |
||
224 | super(ActionChainRunner, self).__init__(runner_id=runner_id) |
||
225 | self.chain_holder = None |
||
226 | self._meta_loader = MetaLoader() |
||
227 | self._stopped = False |
||
228 | self._skip_notify_tasks = [] |
||
229 | self._display_published = False |
||
230 | self._chain_notify = None |
||
231 | |||
232 | def pre_run(self): |
||
233 | chainspec_file = self.entry_point |
||
234 | LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
||
235 | self.action) |
||
236 | |||
237 | try: |
||
238 | chainspec = self._meta_loader.load(file_path=chainspec_file, |
||
239 | expected_type=dict) |
||
240 | except Exception as e: |
||
241 | message = ('Failed to parse action chain definition from "%s": %s' % |
||
242 | (chainspec_file, str(e))) |
||
243 | LOG.exception('Failed to load action chain definition.') |
||
244 | raise runnerexceptions.ActionRunnerPreRunError(message) |
||
245 | |||
246 | try: |
||
247 | self.chain_holder = ChainHolder(chainspec, self.action_name) |
||
248 | except json_schema_exceptions.ValidationError as e: |
||
249 | # preserve the whole nasty jsonschema message as that is better to get to the |
||
250 | # root cause |
||
251 | message = str(e) |
||
252 | LOG.exception('Failed to instantiate ActionChain.') |
||
253 | raise runnerexceptions.ActionRunnerPreRunError(message) |
||
254 | except Exception as e: |
||
255 | message = e.message or str(e) |
||
256 | LOG.exception('Failed to instantiate ActionChain.') |
||
257 | raise runnerexceptions.ActionRunnerPreRunError(message) |
||
258 | |||
259 | # Runner attributes are set lazily. So these steps |
||
260 | # should happen outside the constructor. |
||
261 | if getattr(self, 'liveaction', None): |
||
262 | self._chain_notify = getattr(self.liveaction, 'notify', None) |
||
263 | if self.runner_parameters: |
||
264 | self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
||
265 | self._display_published = self.runner_parameters.get('display_published', False) |
||
266 | |||
267 | # Perform some pre-run chain validation |
||
268 | try: |
||
269 | self.chain_holder.validate() |
||
270 | except Exception as e: |
||
271 | raise runnerexceptions.ActionRunnerPreRunError(e.message) |
||
272 | |||
273 | def run(self, action_parameters): |
||
274 | # holds final result we store. |
||
275 | result = {'tasks': []} |
||
276 | # published variables are to be stored for display. |
||
277 | if self._display_published: |
||
278 | result[PUBLISHED_VARS_KEY] = {} |
||
279 | context_result = {} # holds result which is used for the template context purposes |
||
280 | top_level_error = None # stores a reference to a top level error |
||
281 | fail = True |
||
282 | action_node = None |
||
283 | |||
284 | try: |
||
285 | # initialize vars once we have the action_parameters. This allows |
||
286 | # vars to refer to action_parameters. |
||
287 | self.chain_holder.init_vars(action_parameters) |
||
288 | action_node = self.chain_holder.get_next_node() |
||
289 | except Exception as e: |
||
290 | LOG.exception('Failed to get starting node "%s".', action_node.name) |
||
291 | |||
292 | error = ('Failed to get starting node "%s". Lookup failed: %s' % |
||
293 | (action_node.name, str(e))) |
||
294 | trace = traceback.format_exc(10) |
||
295 | top_level_error = { |
||
296 | 'error': error, |
||
297 | 'traceback': trace |
||
298 | } |
||
299 | |||
300 | parent_context = { |
||
301 | 'execution_id': self.execution_id |
||
302 | } |
||
303 | if getattr(self.liveaction, 'context', None): |
||
304 | parent_context.update(self.liveaction.context) |
||
305 | |||
306 | while action_node: |
||
307 | fail = False |
||
308 | timeout = False |
||
309 | error = None |
||
310 | liveaction = None |
||
311 | |||
312 | created_at = date_utils.get_datetime_utc_now() |
||
313 | |||
314 | try: |
||
315 | liveaction = self._get_next_action( |
||
316 | action_node=action_node, parent_context=parent_context, |
||
317 | action_params=action_parameters, context_result=context_result) |
||
318 | except InvalidActionReferencedException as e: |
||
319 | error = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
||
320 | (action_node.name, action_node.ref)) |
||
321 | LOG.exception(error) |
||
322 | |||
323 | fail = True |
||
324 | top_level_error = { |
||
325 | 'error': error, |
||
326 | 'traceback': traceback.format_exc(10) |
||
327 | } |
||
328 | break |
||
329 | except ParameterRenderingFailedException as e: |
||
330 | # Rendering parameters failed before we even got to running this action, abort and |
||
331 | # fail the whole action chain |
||
332 | LOG.exception('Failed to run action "%s".', action_node.name) |
||
333 | |||
334 | fail = True |
||
335 | error = ('Failed to run task "%s". Parameter rendering failed: %s' % |
||
336 | (action_node.name, str(e))) |
||
337 | trace = traceback.format_exc(10) |
||
338 | top_level_error = { |
||
339 | 'error': error, |
||
340 | 'traceback': trace |
||
341 | } |
||
342 | break |
||
343 | |||
344 | try: |
||
345 | liveaction = self._run_action(liveaction) |
||
346 | except Exception as e: |
||
347 | # Save the traceback and error message |
||
348 | LOG.exception('Failure in running action "%s".', action_node.name) |
||
349 | |||
350 | error = { |
||
351 | 'error': 'Task "%s" failed: %s' % (action_node.name, str(e)), |
||
352 | 'traceback': traceback.format_exc(10) |
||
353 | } |
||
354 | context_result[action_node.name] = error |
||
355 | else: |
||
356 | # Update context result |
||
357 | context_result[action_node.name] = liveaction.result |
||
358 | |||
359 | # Render and publish variables |
||
360 | rendered_publish_vars = ActionChainRunner._render_publish_vars( |
||
361 | action_node=action_node, action_parameters=action_parameters, |
||
362 | execution_result=liveaction.result, previous_execution_results=context_result, |
||
363 | chain_vars=self.chain_holder.vars) |
||
364 | |||
365 | if rendered_publish_vars: |
||
366 | self.chain_holder.vars.update(rendered_publish_vars) |
||
367 | if self._display_published: |
||
368 | result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
||
369 | finally: |
||
370 | # Record result and resolve a next node based on the task success or failure |
||
371 | updated_at = date_utils.get_datetime_utc_now() |
||
372 | |||
373 | format_kwargs = {'action_node': action_node, 'liveaction_db': liveaction, |
||
374 | 'created_at': created_at, 'updated_at': updated_at} |
||
375 | |||
376 | if error: |
||
377 | format_kwargs['error'] = error |
||
378 | |||
379 | task_result = self._format_action_exec_result(**format_kwargs) |
||
380 | result['tasks'].append(task_result) |
||
381 | |||
382 | if self.liveaction_id: |
||
383 | self._stopped = action_service.is_action_canceled_or_canceling( |
||
384 | self.liveaction_id) |
||
385 | |||
386 | if self._stopped: |
||
387 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
||
388 | status = LIVEACTION_STATUS_CANCELED |
||
389 | return (status, result, None) |
||
390 | |||
391 | try: |
||
392 | if not liveaction: |
||
393 | fail = True |
||
394 | action_node = self.chain_holder.get_next_node(action_node.name, |
||
395 | condition='on-failure') |
||
396 | elif liveaction.status in LIVEACTION_FAILED_STATES: |
||
397 | if liveaction and liveaction.status == LIVEACTION_STATUS_TIMED_OUT: |
||
398 | timeout = True |
||
399 | else: |
||
400 | fail = True |
||
401 | action_node = self.chain_holder.get_next_node(action_node.name, |
||
402 | condition='on-failure') |
||
403 | elif liveaction.status == LIVEACTION_STATUS_CANCELED: |
||
404 | # User canceled an action (task) in the workflow - cancel the execution of |
||
405 | # rest of the workflow |
||
406 | self._stopped = True |
||
407 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
||
408 | elif liveaction.status == LIVEACTION_STATUS_SUCCEEDED: |
||
409 | action_node = self.chain_holder.get_next_node(action_node.name, |
||
410 | condition='on-success') |
||
411 | except Exception as e: |
||
412 | LOG.exception('Failed to get next node "%s".', action_node.name) |
||
413 | |||
414 | fail = True |
||
415 | error = ('Failed to get next node "%s". Lookup failed: %s' % |
||
416 | (action_node.name, str(e))) |
||
417 | trace = traceback.format_exc(10) |
||
418 | top_level_error = { |
||
419 | 'error': error, |
||
420 | 'traceback': trace |
||
421 | } |
||
422 | # reset action_node here so that chain breaks on failure. |
||
423 | action_node = None |
||
424 | break |
||
425 | |||
426 | if self._stopped: |
||
427 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
||
428 | status = LIVEACTION_STATUS_CANCELED |
||
429 | return (status, result, None) |
||
430 | |||
431 | if fail: |
||
432 | status = LIVEACTION_STATUS_FAILED |
||
433 | elif timeout: |
||
434 | status = LIVEACTION_STATUS_TIMED_OUT |
||
435 | else: |
||
436 | status = LIVEACTION_STATUS_SUCCEEDED |
||
437 | |||
438 | if top_level_error: |
||
439 | # Include top level error information |
||
440 | result['error'] = top_level_error['error'] |
||
441 | result['traceback'] = top_level_error['traceback'] |
||
442 | |||
443 | return (status, result, None) |
||
444 | |||
445 | @staticmethod |
||
446 | def _render_publish_vars(action_node, action_parameters, execution_result, |
||
447 | previous_execution_results, chain_vars): |
||
448 | """ |
||
449 | If no output is specified on the action_node the output is the entire execution_result. |
||
450 | If any output is specified then only those variables are published as output of an |
||
451 | execution of this action_node. |
||
452 | The output variable can refer to a variable from the execution_result, |
||
453 | previous_execution_results or chain_vars. |
||
454 | """ |
||
455 | if not action_node.publish: |
||
456 | return {} |
||
457 | |||
458 | context = {} |
||
459 | context.update(action_parameters) |
||
460 | context.update({action_node.name: execution_result}) |
||
461 | context.update(previous_execution_results) |
||
462 | context.update(chain_vars) |
||
463 | context.update({RESULTS_KEY: previous_execution_results}) |
||
464 | context.update({SYSTEM_KV_PREFIX: KeyValueLookup()}) |
||
465 | |||
466 | try: |
||
467 | rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
||
468 | context=context) |
||
469 | except Exception as e: |
||
470 | key = getattr(e, 'key', None) |
||
471 | value = getattr(e, 'value', None) |
||
472 | msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
||
473 | '(template string=%s): %s' % (key, action_node.name, value, str(e))) |
||
474 | raise ParameterRenderingFailedException(msg) |
||
475 | |||
476 | return rendered_result |
||
477 | |||
478 | @staticmethod |
||
479 | def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
||
480 | # setup context with original parameters and the intermediate results. |
||
481 | context = {} |
||
482 | context.update(original_parameters) |
||
483 | context.update(results) |
||
484 | context.update(chain_vars) |
||
485 | context.update({RESULTS_KEY: results}) |
||
486 | context.update({SYSTEM_KV_PREFIX: KeyValueLookup()}) |
||
487 | context.update({ACTION_CONTEXT_KV_PREFIX: chain_context}) |
||
488 | try: |
||
489 | rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
||
490 | context=context) |
||
491 | except Exception as e: |
||
492 | LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
||
493 | |||
494 | key = getattr(e, 'key', None) |
||
495 | value = getattr(e, 'value', None) |
||
496 | msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
||
497 | '(template string=%s): %s') % (key, action_node.name, value, str(e)) |
||
498 | raise ParameterRenderingFailedException(msg) |
||
499 | LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
||
500 | return rendered_params |
||
501 | |||
502 | def _get_next_action(self, action_node, parent_context, action_params, context_result): |
||
503 | # Verify that the referenced action exists |
||
504 | # TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
||
505 | task_name = action_node.name |
||
506 | action_ref = action_node.ref |
||
507 | action_db = action_db_util.get_action_by_ref(ref=action_ref) |
||
508 | |||
509 | if not action_db: |
||
510 | error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
||
511 | raise InvalidActionReferencedException(error) |
||
512 | |||
513 | resolved_params = ActionChainRunner._resolve_params( |
||
514 | action_node=action_node, original_parameters=action_params, |
||
515 | results=context_result, chain_vars=self.chain_holder.vars, |
||
516 | chain_context={'parent': parent_context}) |
||
517 | |||
518 | liveaction = self._build_liveaction_object( |
||
519 | action_node=action_node, |
||
520 | resolved_params=resolved_params, |
||
521 | parent_context=parent_context) |
||
522 | |||
523 | return liveaction |
||
524 | |||
525 | def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
||
526 | """ |
||
527 | :param sleep_delay: Number of seconds to wait during "is completed" polls. |
||
528 | :type sleep_delay: ``float`` |
||
529 | """ |
||
530 | try: |
||
531 | # request return canceled |
||
532 | liveaction, _ = action_service.request(liveaction) |
||
533 | except Exception as e: |
||
534 | liveaction.status = LIVEACTION_STATUS_FAILED |
||
535 | LOG.exception('Failed to schedule liveaction.') |
||
536 | raise e |
||
537 | |||
538 | while (wait_for_completion and liveaction.status not in LIVEACTION_COMPLETED_STATES): |
||
539 | eventlet.sleep(sleep_delay) |
||
540 | liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
||
541 | |||
542 | return liveaction |
||
543 | |||
544 | def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
||
545 | liveaction = LiveActionDB(action=action_node.ref) |
||
546 | |||
547 | # Setup notify for task in chain. |
||
548 | notify = self._get_notify(action_node) |
||
549 | if notify: |
||
550 | liveaction.notify = notify |
||
551 | LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
||
552 | |||
553 | liveaction.context = { |
||
554 | 'parent': parent_context, |
||
555 | 'chain': vars(action_node) |
||
556 | } |
||
557 | |||
558 | liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
||
559 | params=resolved_params) |
||
560 | return liveaction |
||
561 | |||
562 | def _get_notify(self, action_node): |
||
563 | if action_node.name not in self._skip_notify_tasks: |
||
564 | if action_node.notify: |
||
565 | task_notify = NotificationsHelper.to_model(action_node.notify) |
||
566 | return task_notify |
||
567 | elif self._chain_notify: |
||
568 | return self._chain_notify |
||
569 | |||
570 | return None |
||
571 | |||
572 | def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
||
573 | error=None): |
||
574 | """ |
||
575 | Format ActionExecution result so it can be used in the final action result output. |
||
576 | |||
577 | :rtype: ``dict`` |
||
578 | """ |
||
579 | assert isinstance(created_at, datetime.datetime) |
||
580 | assert isinstance(updated_at, datetime.datetime) |
||
581 | |||
582 | result = {} |
||
583 | |||
584 | execution_db = None |
||
585 | if liveaction_db: |
||
586 | execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
||
587 | |||
588 | result['id'] = action_node.name |
||
589 | result['name'] = action_node.name |
||
590 | result['execution_id'] = str(execution_db.id) if execution_db else None |
||
591 | result['workflow'] = None |
||
592 | |||
593 | result['created_at'] = isotime.format(dt=created_at) |
||
594 | result['updated_at'] = isotime.format(dt=updated_at) |
||
595 | |||
596 | if error or not liveaction_db: |
||
597 | result['state'] = LIVEACTION_STATUS_FAILED |
||
598 | else: |
||
599 | result['state'] = liveaction_db.status |
||
600 | |||
601 | if error: |
||
602 | result['result'] = error |
||
603 | else: |
||
604 | result['result'] = liveaction_db.result |
||
605 | |||
606 | return result |
||
607 | |||
611 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.