| Total Complexity | 58 |
| Total Lines | 386 |
| Duplicated Lines | 0 % |
Complex classes like ActionChainRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
| 221 | class ActionChainRunner(ActionRunner): |
||
| 222 | |||
| 223 | def __init__(self, runner_id): |
||
| 224 | super(ActionChainRunner, self).__init__(runner_id=runner_id) |
||
| 225 | self.chain_holder = None |
||
| 226 | self._meta_loader = MetaLoader() |
||
| 227 | self._stopped = False |
||
| 228 | self._skip_notify_tasks = [] |
||
| 229 | self._display_published = False |
||
| 230 | self._chain_notify = None |
||
| 231 | |||
| 232 | def pre_run(self): |
||
| 233 | chainspec_file = self.entry_point |
||
| 234 | LOG.debug('Reading action chain from %s for action %s.', chainspec_file, |
||
| 235 | self.action) |
||
| 236 | |||
| 237 | try: |
||
| 238 | chainspec = self._meta_loader.load(file_path=chainspec_file, |
||
| 239 | expected_type=dict) |
||
| 240 | except Exception as e: |
||
| 241 | message = ('Failed to parse action chain definition from "%s": %s' % |
||
| 242 | (chainspec_file, str(e))) |
||
| 243 | LOG.exception('Failed to load action chain definition.') |
||
| 244 | raise runnerexceptions.ActionRunnerPreRunError(message) |
||
| 245 | |||
| 246 | try: |
||
| 247 | self.chain_holder = ChainHolder(chainspec, self.action_name) |
||
| 248 | except json_schema_exceptions.ValidationError as e: |
||
| 249 | # preserve the whole nasty jsonschema message as that is better to get to the |
||
| 250 | # root cause |
||
| 251 | message = str(e) |
||
| 252 | LOG.exception('Failed to instantiate ActionChain.') |
||
| 253 | raise runnerexceptions.ActionRunnerPreRunError(message) |
||
| 254 | except Exception as e: |
||
| 255 | message = e.message or str(e) |
||
| 256 | LOG.exception('Failed to instantiate ActionChain.') |
||
| 257 | raise runnerexceptions.ActionRunnerPreRunError(message) |
||
| 258 | |||
| 259 | # Runner attributes are set lazily. So these steps |
||
| 260 | # should happen outside the constructor. |
||
| 261 | if getattr(self, 'liveaction', None): |
||
| 262 | self._chain_notify = getattr(self.liveaction, 'notify', None) |
||
| 263 | if self.runner_parameters: |
||
| 264 | self._skip_notify_tasks = self.runner_parameters.get('skip_notify', []) |
||
| 265 | self._display_published = self.runner_parameters.get('display_published', False) |
||
| 266 | |||
| 267 | # Perform some pre-run chain validation |
||
| 268 | try: |
||
| 269 | self.chain_holder.validate() |
||
| 270 | except Exception as e: |
||
| 271 | raise runnerexceptions.ActionRunnerPreRunError(e.message) |
||
| 272 | |||
| 273 | def run(self, action_parameters): |
||
| 274 | # holds final result we store. |
||
| 275 | result = {'tasks': []} |
||
| 276 | # published variables are to be stored for display. |
||
| 277 | if self._display_published: |
||
| 278 | result[PUBLISHED_VARS_KEY] = {} |
||
| 279 | context_result = {} # holds result which is used for the template context purposes |
||
| 280 | top_level_error = None # stores a reference to a top level error |
||
| 281 | fail = True |
||
| 282 | action_node = None |
||
| 283 | |||
| 284 | try: |
||
| 285 | # initialize vars once we have the action_parameters. This allows |
||
| 286 | # vars to refer to action_parameters. |
||
| 287 | self.chain_holder.init_vars(action_parameters) |
||
| 288 | action_node = self.chain_holder.get_next_node() |
||
| 289 | except Exception as e: |
||
| 290 | LOG.exception('Failed to get starting node "%s".', action_node.name) |
||
| 291 | |||
| 292 | error = ('Failed to get starting node "%s". Lookup failed: %s' % |
||
| 293 | (action_node.name, str(e))) |
||
| 294 | trace = traceback.format_exc(10) |
||
| 295 | top_level_error = { |
||
| 296 | 'error': error, |
||
| 297 | 'traceback': trace |
||
| 298 | } |
||
| 299 | |||
| 300 | parent_context = { |
||
| 301 | 'execution_id': self.execution_id |
||
| 302 | } |
||
| 303 | if getattr(self.liveaction, 'context', None): |
||
| 304 | parent_context.update(self.liveaction.context) |
||
| 305 | |||
| 306 | while action_node: |
||
| 307 | fail = False |
||
| 308 | timeout = False |
||
| 309 | error = None |
||
| 310 | liveaction = None |
||
| 311 | |||
| 312 | created_at = date_utils.get_datetime_utc_now() |
||
| 313 | |||
| 314 | try: |
||
| 315 | liveaction = self._get_next_action( |
||
| 316 | action_node=action_node, parent_context=parent_context, |
||
| 317 | action_params=action_parameters, context_result=context_result) |
||
| 318 | except InvalidActionReferencedException as e: |
||
| 319 | error = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' % |
||
| 320 | (action_node.name, action_node.ref)) |
||
| 321 | LOG.exception(error) |
||
| 322 | |||
| 323 | fail = True |
||
| 324 | top_level_error = { |
||
| 325 | 'error': error, |
||
| 326 | 'traceback': traceback.format_exc(10) |
||
| 327 | } |
||
| 328 | break |
||
| 329 | except ParameterRenderingFailedException as e: |
||
| 330 | # Rendering parameters failed before we even got to running this action, abort and |
||
| 331 | # fail the whole action chain |
||
| 332 | LOG.exception('Failed to run action "%s".', action_node.name) |
||
| 333 | |||
| 334 | fail = True |
||
| 335 | error = ('Failed to run task "%s". Parameter rendering failed: %s' % |
||
| 336 | (action_node.name, str(e))) |
||
| 337 | trace = traceback.format_exc(10) |
||
| 338 | top_level_error = { |
||
| 339 | 'error': error, |
||
| 340 | 'traceback': trace |
||
| 341 | } |
||
| 342 | break |
||
| 343 | |||
| 344 | try: |
||
| 345 | liveaction = self._run_action(liveaction) |
||
| 346 | except Exception as e: |
||
| 347 | # Save the traceback and error message |
||
| 348 | LOG.exception('Failure in running action "%s".', action_node.name) |
||
| 349 | |||
| 350 | error = { |
||
| 351 | 'error': 'Task "%s" failed: %s' % (action_node.name, str(e)), |
||
| 352 | 'traceback': traceback.format_exc(10) |
||
| 353 | } |
||
| 354 | context_result[action_node.name] = error |
||
| 355 | else: |
||
| 356 | # Update context result |
||
| 357 | context_result[action_node.name] = liveaction.result |
||
| 358 | |||
| 359 | # Render and publish variables |
||
| 360 | rendered_publish_vars = ActionChainRunner._render_publish_vars( |
||
| 361 | action_node=action_node, action_parameters=action_parameters, |
||
| 362 | execution_result=liveaction.result, previous_execution_results=context_result, |
||
| 363 | chain_vars=self.chain_holder.vars) |
||
| 364 | |||
| 365 | if rendered_publish_vars: |
||
| 366 | self.chain_holder.vars.update(rendered_publish_vars) |
||
| 367 | if self._display_published: |
||
| 368 | result[PUBLISHED_VARS_KEY].update(rendered_publish_vars) |
||
| 369 | finally: |
||
| 370 | # Record result and resolve a next node based on the task success or failure |
||
| 371 | updated_at = date_utils.get_datetime_utc_now() |
||
| 372 | |||
| 373 | format_kwargs = {'action_node': action_node, 'liveaction_db': liveaction, |
||
| 374 | 'created_at': created_at, 'updated_at': updated_at} |
||
| 375 | |||
| 376 | if error: |
||
| 377 | format_kwargs['error'] = error |
||
| 378 | |||
| 379 | task_result = self._format_action_exec_result(**format_kwargs) |
||
| 380 | result['tasks'].append(task_result) |
||
| 381 | |||
| 382 | if self.liveaction_id: |
||
| 383 | self._stopped = action_service.is_action_canceled_or_canceling( |
||
| 384 | self.liveaction_id) |
||
| 385 | |||
| 386 | if self._stopped: |
||
| 387 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
||
| 388 | status = LIVEACTION_STATUS_CANCELED |
||
| 389 | return (status, result, None) |
||
| 390 | |||
| 391 | try: |
||
| 392 | if not liveaction: |
||
| 393 | fail = True |
||
| 394 | action_node = self.chain_holder.get_next_node(action_node.name, |
||
| 395 | condition='on-failure') |
||
| 396 | elif liveaction.status in LIVEACTION_FAILED_STATES: |
||
| 397 | if liveaction and liveaction.status == LIVEACTION_STATUS_TIMED_OUT: |
||
| 398 | timeout = True |
||
| 399 | else: |
||
| 400 | fail = True |
||
| 401 | action_node = self.chain_holder.get_next_node(action_node.name, |
||
| 402 | condition='on-failure') |
||
| 403 | elif liveaction.status == LIVEACTION_STATUS_CANCELED: |
||
| 404 | # User canceled an action (task) in the workflow - cancel the execution of |
||
| 405 | # rest of the workflow |
||
| 406 | self._stopped = True |
||
| 407 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
||
| 408 | elif liveaction.status == LIVEACTION_STATUS_SUCCEEDED: |
||
| 409 | action_node = self.chain_holder.get_next_node(action_node.name, |
||
| 410 | condition='on-success') |
||
| 411 | except Exception as e: |
||
| 412 | LOG.exception('Failed to get next node "%s".', action_node.name) |
||
| 413 | |||
| 414 | fail = True |
||
| 415 | error = ('Failed to get next node "%s". Lookup failed: %s' % |
||
| 416 | (action_node.name, str(e))) |
||
| 417 | trace = traceback.format_exc(10) |
||
| 418 | top_level_error = { |
||
| 419 | 'error': error, |
||
| 420 | 'traceback': trace |
||
| 421 | } |
||
| 422 | # reset action_node here so that chain breaks on failure. |
||
| 423 | action_node = None |
||
| 424 | break |
||
| 425 | |||
| 426 | if self._stopped: |
||
| 427 | LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id) |
||
| 428 | status = LIVEACTION_STATUS_CANCELED |
||
| 429 | return (status, result, None) |
||
| 430 | |||
| 431 | if fail: |
||
| 432 | status = LIVEACTION_STATUS_FAILED |
||
| 433 | elif timeout: |
||
| 434 | status = LIVEACTION_STATUS_TIMED_OUT |
||
| 435 | else: |
||
| 436 | status = LIVEACTION_STATUS_SUCCEEDED |
||
| 437 | |||
| 438 | if top_level_error: |
||
| 439 | # Include top level error information |
||
| 440 | result['error'] = top_level_error['error'] |
||
| 441 | result['traceback'] = top_level_error['traceback'] |
||
| 442 | |||
| 443 | return (status, result, None) |
||
| 444 | |||
| 445 | @staticmethod |
||
| 446 | def _render_publish_vars(action_node, action_parameters, execution_result, |
||
| 447 | previous_execution_results, chain_vars): |
||
| 448 | """ |
||
| 449 | If no output is specified on the action_node the output is the entire execution_result. |
||
| 450 | If any output is specified then only those variables are published as output of an |
||
| 451 | execution of this action_node. |
||
| 452 | The output variable can refer to a variable from the execution_result, |
||
| 453 | previous_execution_results or chain_vars. |
||
| 454 | """ |
||
| 455 | if not action_node.publish: |
||
| 456 | return {} |
||
| 457 | |||
| 458 | context = {} |
||
| 459 | context.update(action_parameters) |
||
| 460 | context.update({action_node.name: execution_result}) |
||
| 461 | context.update(previous_execution_results) |
||
| 462 | context.update(chain_vars) |
||
| 463 | context.update({RESULTS_KEY: previous_execution_results}) |
||
| 464 | context.update({SYSTEM_KV_PREFIX: KeyValueLookup()}) |
||
| 465 | |||
| 466 | try: |
||
| 467 | rendered_result = jinja_utils.render_values(mapping=action_node.publish, |
||
| 468 | context=context) |
||
| 469 | except Exception as e: |
||
| 470 | key = getattr(e, 'key', None) |
||
| 471 | value = getattr(e, 'value', None) |
||
| 472 | msg = ('Failed rendering value for publish parameter "%s" in task "%s" ' |
||
| 473 | '(template string=%s): %s' % (key, action_node.name, value, str(e))) |
||
| 474 | raise ParameterRenderingFailedException(msg) |
||
| 475 | |||
| 476 | return rendered_result |
||
| 477 | |||
| 478 | @staticmethod |
||
| 479 | def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context): |
||
| 480 | # setup context with original parameters and the intermediate results. |
||
| 481 | context = {} |
||
| 482 | context.update(original_parameters) |
||
| 483 | context.update(results) |
||
| 484 | context.update(chain_vars) |
||
| 485 | context.update({RESULTS_KEY: results}) |
||
| 486 | context.update({SYSTEM_KV_PREFIX: KeyValueLookup()}) |
||
| 487 | context.update({ACTION_CONTEXT_KV_PREFIX: chain_context}) |
||
| 488 | try: |
||
| 489 | rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(), |
||
| 490 | context=context) |
||
| 491 | except Exception as e: |
||
| 492 | LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key)) |
||
| 493 | |||
| 494 | key = getattr(e, 'key', None) |
||
| 495 | value = getattr(e, 'value', None) |
||
| 496 | msg = ('Failed rendering value for action parameter "%s" in task "%s" ' |
||
| 497 | '(template string=%s): %s') % (key, action_node.name, value, str(e)) |
||
| 498 | raise ParameterRenderingFailedException(msg) |
||
| 499 | LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params)) |
||
| 500 | return rendered_params |
||
| 501 | |||
| 502 | def _get_next_action(self, action_node, parent_context, action_params, context_result): |
||
| 503 | # Verify that the referenced action exists |
||
| 504 | # TODO: We do another lookup in cast_param, refactor to reduce number of lookups |
||
| 505 | task_name = action_node.name |
||
| 506 | action_ref = action_node.ref |
||
| 507 | action_db = action_db_util.get_action_by_ref(ref=action_ref) |
||
| 508 | |||
| 509 | if not action_db: |
||
| 510 | error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref) |
||
| 511 | raise InvalidActionReferencedException(error) |
||
| 512 | |||
| 513 | resolved_params = ActionChainRunner._resolve_params( |
||
| 514 | action_node=action_node, original_parameters=action_params, |
||
| 515 | results=context_result, chain_vars=self.chain_holder.vars, |
||
| 516 | chain_context={'parent': parent_context}) |
||
| 517 | |||
| 518 | liveaction = self._build_liveaction_object( |
||
| 519 | action_node=action_node, |
||
| 520 | resolved_params=resolved_params, |
||
| 521 | parent_context=parent_context) |
||
| 522 | |||
| 523 | return liveaction |
||
| 524 | |||
| 525 | def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0): |
||
| 526 | """ |
||
| 527 | :param sleep_delay: Number of seconds to wait during "is completed" polls. |
||
| 528 | :type sleep_delay: ``float`` |
||
| 529 | """ |
||
| 530 | try: |
||
| 531 | # request return canceled |
||
| 532 | liveaction, _ = action_service.request(liveaction) |
||
| 533 | except Exception as e: |
||
| 534 | liveaction.status = LIVEACTION_STATUS_FAILED |
||
| 535 | LOG.exception('Failed to schedule liveaction.') |
||
| 536 | raise e |
||
| 537 | |||
| 538 | while (wait_for_completion and liveaction.status not in LIVEACTION_COMPLETED_STATES): |
||
| 539 | eventlet.sleep(sleep_delay) |
||
| 540 | liveaction = action_db_util.get_liveaction_by_id(liveaction.id) |
||
| 541 | |||
| 542 | return liveaction |
||
| 543 | |||
| 544 | def _build_liveaction_object(self, action_node, resolved_params, parent_context): |
||
| 545 | liveaction = LiveActionDB(action=action_node.ref) |
||
| 546 | |||
| 547 | # Setup notify for task in chain. |
||
| 548 | notify = self._get_notify(action_node) |
||
| 549 | if notify: |
||
| 550 | liveaction.notify = notify |
||
| 551 | LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify) |
||
| 552 | |||
| 553 | liveaction.context = { |
||
| 554 | 'parent': parent_context, |
||
| 555 | 'chain': vars(action_node) |
||
| 556 | } |
||
| 557 | |||
| 558 | liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref, |
||
| 559 | params=resolved_params) |
||
| 560 | return liveaction |
||
| 561 | |||
| 562 | def _get_notify(self, action_node): |
||
| 563 | if action_node.name not in self._skip_notify_tasks: |
||
| 564 | if action_node.notify: |
||
| 565 | task_notify = NotificationsHelper.to_model(action_node.notify) |
||
| 566 | return task_notify |
||
| 567 | elif self._chain_notify: |
||
| 568 | return self._chain_notify |
||
| 569 | |||
| 570 | return None |
||
| 571 | |||
| 572 | def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at, |
||
| 573 | error=None): |
||
| 574 | """ |
||
| 575 | Format ActionExecution result so it can be used in the final action result output. |
||
| 576 | |||
| 577 | :rtype: ``dict`` |
||
| 578 | """ |
||
| 579 | assert isinstance(created_at, datetime.datetime) |
||
| 580 | assert isinstance(updated_at, datetime.datetime) |
||
| 581 | |||
| 582 | result = {} |
||
| 583 | |||
| 584 | execution_db = None |
||
| 585 | if liveaction_db: |
||
| 586 | execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id)) |
||
| 587 | |||
| 588 | result['id'] = action_node.name |
||
| 589 | result['name'] = action_node.name |
||
| 590 | result['execution_id'] = str(execution_db.id) if execution_db else None |
||
| 591 | result['workflow'] = None |
||
| 592 | |||
| 593 | result['created_at'] = isotime.format(dt=created_at) |
||
| 594 | result['updated_at'] = isotime.format(dt=updated_at) |
||
| 595 | |||
| 596 | if error or not liveaction_db: |
||
| 597 | result['state'] = LIVEACTION_STATUS_FAILED |
||
| 598 | else: |
||
| 599 | result['state'] = liveaction_db.status |
||
| 600 | |||
| 601 | if error: |
||
| 602 | result['result'] = error |
||
| 603 | else: |
||
| 604 | result['result'] = liveaction_db.result |
||
| 605 | |||
| 606 | return result |
||
| 607 | |||
| 611 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.