Test Failed
Pull Request — master (#4197)
by W
03:53
created

ActionChainRunner   F

Complexity

Total Complexity 97

Size/Duplication

Total Lines 598
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 598
rs 2
c 0
b 0
f 0
wmc 97

18 Methods

Rating   Name   Duplication   Size   Complexity  
A _format_error() 0 4 1
A _save_vars() 0 3 1
A run() 0 3 1
B _render_publish_vars() 0 43 3
A _get_notify() 0 9 4
C _format_action_exec_result() 0 36 9
B _resolve_params() 0 41 2
A cancel() 0 15 4
A _get_updated_action_exec_result() 0 9 2
A _resume_action() 0 20 4
A resume() 0 26 3
F _run_chain() 0 246 43
A _build_liveaction_object() 0 16 2
A _get_next_action() 0 22 2
A _run_action() 0 19 4
B pre_run() 0 42 7
A pause() 0 15 4
A __init__() 0 7 1

How to fix   Complexity   

Complex Class

Complex classes like ActionChainRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import eventlet
19
import traceback
20
import uuid
21
import datetime
22
23
from jsonschema import exceptions as json_schema_exc
24
25
from st2common.runners.base import ActionRunner
26
from st2common.runners.base import get_metadata as get_runner_metadata
27
from st2common import log as logging
28
from st2common.constants import action as action_constants
29
from st2common.constants import pack as pack_constants
30
from st2common.constants import keyvalue as kv_constants
31
from st2common.content.loader import MetaLoader
32
from st2common.exceptions import action as action_exc
33
from st2common.exceptions import actionrunner as runner_exc
34
from st2common.exceptions import db as db_exc
35
from st2common.models.api.notification import NotificationsHelper
36
from st2common.models.db.liveaction import LiveActionDB
37
from st2common.models.system import actionchain
38
from st2common.models.utils import action_param_utils
39
from st2common.persistence.execution import ActionExecution
40
from st2common.persistence.liveaction import LiveAction
41
from st2common.services import action as action_service
42
from st2common.services import keyvalues as kv_service
43
from st2common.util import action_db as action_db_util
44
from st2common.util import isotime
45
from st2common.util import date as date_utils
46
from st2common.util import jinja as jinja_utils
47
from st2common.util import param as param_utils
48
from st2common.util.config_loader import get_config
49
50
__all__ = [
51
    'ActionChainRunner',
52
    'ChainHolder',
53
54
    'get_runner',
55
    'get_metadata'
56
]
57
58
LOG = logging.getLogger(__name__)
59
60
RESULTS_KEY = '__results'
61
JINJA_START_MARKERS = [
62
    '{{',
63
    '{%'
64
]
65
PUBLISHED_VARS_KEY = 'published'
66
67
68
class ChainHolder(object):
69
70
    def __init__(self, chainspec, chainname):
71
        self.actionchain = actionchain.ActionChain(**chainspec)
72
        self.chainname = chainname
73
74
        if not self.actionchain.default:
75
            default = self._get_default(self.actionchain)
76
            self.actionchain.default = default
77
78
        LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname)
79
        if not self.actionchain.default:
80
            raise Exception('Failed to find default node in %s.' % (self.chainname))
81
82
        self.vars = {}
83
84
    def init_vars(self, action_parameters):
85
        if self.actionchain.vars:
86
            self.vars = self._get_rendered_vars(self.actionchain.vars,
87
                                                action_parameters=action_parameters)
88
89
    def restore_vars(self, ctx_vars):
90
        self.vars.update(copy.deepcopy(ctx_vars))
91
92
    def validate(self):
93
        """
94
        Function which performs a simple compile time validation.
95
96
        Keep in mind that some variables are only resolved during run time which means we can
97
        perform only simple validation during compile / create time.
98
        """
99
        all_nodes = self._get_all_nodes(action_chain=self.actionchain)
100
101
        for node in self.actionchain.chain:
102
            on_success_node_name = node.on_success
103
            on_failure_node_name = node.on_failure
104
105
            # Check "on-success" path
106
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
107
                                                  node_name=on_success_node_name)
108
            if not valid_name:
109
                msg = ('Unable to find node with name "%s" referenced in "on-success" in '
110
                       'task "%s".' % (on_success_node_name, node.name))
111
                raise ValueError(msg)
112
113
            # Check "on-failure" path
114
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
115
                                                  node_name=on_failure_node_name)
116
            if not valid_name:
117
                msg = ('Unable to find node with name "%s" referenced in "on-failure" in '
118
                       'task "%s".' % (on_failure_node_name, node.name))
119
                raise ValueError(msg)
120
121
        # check if node specified in default is valid.
122
        if self.actionchain.default:
123
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
124
                                                  node_name=self.actionchain.default)
125
            if not valid_name:
126
                msg = ('Unable to find node with name "%s" referenced in "default".' %
127
                       self.actionchain.default)
128
                raise ValueError(msg)
129
        return True
130
131
    @staticmethod
132
    def _get_default(action_chain):
133
        # default is defined
134
        if action_chain.default:
135
            return action_chain.default
136
        # no nodes in chain
137
        if not action_chain.chain:
138
            return None
139
        # The first node with no references is the default node. Assumptions
140
        # that support this are :
141
        # 1. There are no loops in the chain. Even if there are loops there is
142
        #    at least 1 node which does not end up in this loop.
143
        # 2. There are no fragments in the chain.
144
        all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain)
145
        node_names = set(all_nodes)
146
        on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain)
147
        on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain)
148
        referenced_nodes = on_success_nodes | on_failure_nodes
149
        possible_default_nodes = node_names - referenced_nodes
150
        if possible_default_nodes:
151
            # This is to preserve order. set([..]) does not preserve the order so iterate
152
            # over original array.
153
            for node in all_nodes:
154
                if node in possible_default_nodes:
155
                    return node
156
        # If no node is found assume the first node in the chain list to be default.
157
        return action_chain.chain[0].name
158
159
    @staticmethod
160
    def _get_all_nodes(action_chain):
161
        """
162
        Return names for all the nodes in the chain.
163
        """
164
        all_nodes = [node.name for node in action_chain.chain]
165
        return all_nodes
166
167
    @staticmethod
168
    def _get_all_on_success_nodes(action_chain):
169
        """
170
        Return names for all the tasks referenced in "on-success".
171
        """
172
        on_success_nodes = set([node.on_success for node in action_chain.chain])
173
        return on_success_nodes
174
175
    @staticmethod
176
    def _get_all_on_failure_nodes(action_chain):
177
        """
178
        Return names for all the tasks referenced in "on-failure".
179
        """
180
        on_failure_nodes = set([node.on_failure for node in action_chain.chain])
181
        return on_failure_nodes
182
183
    def _is_valid_node_name(self, all_node_names, node_name):
184
        """
185
        Function which validates that the provided node name is defined in the workflow definition
186
        and it's valid.
187
188
        Keep in mind that we can only perform validation for task names which don't include jinja
189
        expressions since those are rendered at run time.
190
        """
191
        if not node_name:
192
            # This task name needs to be resolved during run time so we cant validate the name now
193
            return True
194
195
        is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name)
196
        if is_jinja_expression:
197
            # This task name needs to be resolved during run time so we cant validate the name
198
            # now
199
            return True
200
201
        return node_name in all_node_names
202
203
    @staticmethod
204
    def _get_rendered_vars(vars, action_parameters):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in vars.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
205
        if not vars:
206
            return {}
207
        context = {}
208
        context.update({
209
            kv_constants.DATASTORE_PARENT_SCOPE: {
210
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
211
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
212
            }
213
        })
214
        context.update(action_parameters)
215
        LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context)
216
        return jinja_utils.render_values(mapping=vars, context=context)
217
218
    def get_node(self, node_name=None, raise_on_failure=False):
219
        if not node_name:
220
            return None
221
        for node in self.actionchain.chain:
222
            if node.name == node_name:
223
                return node
224
        if raise_on_failure:
225
            raise runner_exc.ActionRunnerException(
226
                'Unable to find node with name "%s".' % (node_name))
227
        return None
228
229
    def get_next_node(self, curr_node_name=None, condition='on-success'):
230
        if not curr_node_name:
231
            return self.get_node(self.actionchain.default)
232
        current_node = self.get_node(curr_node_name)
233
        if condition == 'on-success':
234
            return self.get_node(current_node.on_success, raise_on_failure=True)
235
        elif condition == 'on-failure':
236
            return self.get_node(current_node.on_failure, raise_on_failure=True)
237
        raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition)
238
239
240
class ActionChainRunner(ActionRunner):
241
242
    def __init__(self, runner_id):
243
        super(ActionChainRunner, self).__init__(runner_id=runner_id)
244
        self.chain_holder = None
245
        self._meta_loader = MetaLoader()
246
        self._skip_notify_tasks = []
247
        self._display_published = True
248
        self._chain_notify = None
249
250
    def pre_run(self):
251
        super(ActionChainRunner, self).pre_run()
252
253
        chainspec_file = self.entry_point
254
        LOG.debug('Reading action chain from %s for action %s.', chainspec_file,
255
                  self.action)
256
257
        try:
258
            chainspec = self._meta_loader.load(file_path=chainspec_file,
259
                                               expected_type=dict)
260
        except Exception as e:
261
            message = ('Failed to parse action chain definition from "%s": %s' %
262
                       (chainspec_file, str(e)))
263
            LOG.exception('Failed to load action chain definition.')
264
            raise runner_exc.ActionRunnerPreRunError(message)
265
266
        try:
267
            self.chain_holder = ChainHolder(chainspec, self.action_name)
268
        except json_schema_exc.ValidationError as e:
269
            # preserve the whole nasty jsonschema message as that is better to get to the
270
            # root cause
271
            message = str(e)
272
            LOG.exception('Failed to instantiate ActionChain.')
273
            raise runner_exc.ActionRunnerPreRunError(message)
274
        except Exception as e:
275
            message = str(e)
276
            LOG.exception('Failed to instantiate ActionChain.')
277
            raise runner_exc.ActionRunnerPreRunError(message)
278
279
        # Runner attributes are set lazily. So these steps
280
        # should happen outside the constructor.
281
        if getattr(self, 'liveaction', None):
282
            self._chain_notify = getattr(self.liveaction, 'notify', None)
283
        if self.runner_parameters:
284
            self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
285
            self._display_published = self.runner_parameters.get('display_published', True)
286
287
        # Perform some pre-run chain validation
288
        try:
289
            self.chain_holder.validate()
290
        except Exception as e:
291
            raise runner_exc.ActionRunnerPreRunError(str(e))
292
293
    def run(self, action_parameters):
294
        # Run the action chain.
295
        return self._run_chain(action_parameters)
296
297
    def cancel(self):
298
        # Identify the list of action executions that are workflows and cascade pause.
299
        for child_exec_id in self.execution.children:
300
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
301
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
302
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
303
                action_service.request_cancellation(
304
                    LiveAction.get(id=child_exec.liveaction['id']),
305
                    self.context.get('user', None)
306
                )
307
308
        return (
309
            action_constants.LIVEACTION_STATUS_CANCELING,
310
            self.liveaction.result,
311
            self.liveaction.context
312
        )
313
314
    def pause(self):
315
        # Identify the list of action executions that are workflows and cascade pause.
316
        for child_exec_id in self.execution.children:
317
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
318
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
319
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
320
                action_service.request_pause(
321
                    LiveAction.get(id=child_exec.liveaction['id']),
322
                    self.context.get('user', None)
323
                )
324
325
        return (
326
            action_constants.LIVEACTION_STATUS_PAUSING,
327
            self.liveaction.result,
328
            self.liveaction.context
329
        )
330
331
    def resume(self):
332
        # Restore runner and action parameters since they are not provided on resume.
333
        runner_parameters, action_parameters = param_utils.render_final_params(
334
            self.runner_type.runner_parameters,
335
            self.action.parameters,
336
            self.liveaction.parameters,
0 ignored issues
show
Bug introduced by
The member liveaction does not seem to be defined here; its definition is on line 349.
Loading history...
337
            self.liveaction.context
0 ignored issues
show
Bug introduced by
The member liveaction does not seem to be defined here; its definition is on line 349.
Loading history...
338
        )
339
340
        # Assign runner parameters needed for pre-run.
341
        if runner_parameters:
342
            self.runner_parameters = runner_parameters
343
344
        # Restore chain holder if it is not initialized.
345
        if not self.chain_holder:
346
            self.pre_run()
347
348
        # Change the status of the liveaction from resuming to running.
349
        self.liveaction = action_service.update_status(
350
            self.liveaction,
351
            action_constants.LIVEACTION_STATUS_RUNNING,
352
            publish=False
353
        )
354
355
        # Run the action chain.
356
        return self._run_chain(action_parameters, resuming=True)
357
358
    def _run_chain(self, action_parameters, resuming=False):
359
        # Set chain status to fail unless explicitly set to succeed.
360
        chain_status = action_constants.LIVEACTION_STATUS_FAILED
361
362
        # Result holds the final result that the chain store in the database.
363
        result = {'tasks': []}
364
365
        # Save published variables into the result if specified.
366
        if self._display_published:
367
            result[PUBLISHED_VARS_KEY] = {}
368
369
        context_result = {}  # Holds result which is used for the template context purposes
370
        top_level_error = None  # Stores a reference to a top level error
371
        action_node = None
372
        last_task = None
373
374
        try:
375
            # Initialize vars with the action parameters.
376
            # This allows action parameers to be referenced from vars.
377
            self.chain_holder.init_vars(action_parameters)
378
        except Exception as e:
379
            chain_status = action_constants.LIVEACTION_STATUS_FAILED
380
            m = 'Failed initializing ``vars`` in chain.'
381
            LOG.exception(m)
382
            top_level_error = self._format_error(e, m)
383
            result.update(top_level_error)
384
            return (chain_status, result, None)
385
386
        # Restore state on resuming an existing chain execution.
387
        if resuming:
388
            # Restore vars is any from the liveaction.
389
            ctx_vars = self.liveaction.context.pop('vars', {})
390
            self.chain_holder.restore_vars(ctx_vars)
391
392
            # Restore result if any from the liveaction.
393
            if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result:
394
                result = self.liveaction.result
395
396
            # Initialize or rebuild existing context_result from liveaction
397
            # which holds the result used for resolving context in Jinja template.
398
            for task in result.get('tasks', []):
399
                context_result[task['name']] = task['result']
400
401
            # Restore or initialize the top_level_error
402
            # that stores a reference to a top level error.
403
            if 'error' in result or 'traceback' in result:
404
                top_level_error = {
405
                    'error': result.get('error'),
406
                    'traceback': result.get('traceback')
407
                }
408
409
        # If there are no executed tasks in the chain, then get the first node.
410
        if len(result['tasks']) <= 0:
411
            try:
412
                action_node = self.chain_holder.get_next_node()
413
            except Exception as e:
414
                m = 'Failed to get starting node "%s".', action_node.name
415
                LOG.exception(m)
416
                top_level_error = self._format_error(e, m)
417
418
            # If there are no action node to run next, then mark the chain successful.
419
            if not action_node:
420
                chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
421
422
        # Otherwise, figure out the last task executed and
423
        # its state to determine where to begin executing.
424
        else:
425
            last_task = result['tasks'][-1]
426
            action_node = self.chain_holder.get_node(last_task['name'])
427
            liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
428
429
            # If the liveaction of the last task has changed, update the result entry.
430
            if liveaction.status != last_task['state']:
431
                updated_task_result = self._get_updated_action_exec_result(
432
                    action_node, liveaction, last_task)
433
                del result['tasks'][-1]
434
                result['tasks'].append(updated_task_result)
435
436
                # Also need to update context_result so the updated result
437
                # is available to Jinja expressions
438
                updated_task_name = updated_task_result['name']
439
                context_result[updated_task_name]['result'] = updated_task_result['result']
440
441
            # If the last task was canceled, then canceled the chain altogether.
442
            if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
443
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
444
                return (chain_status, result, None)
445
446
            # If the last task was paused, then stay on this action node.
447
            # This is explicitly put here for clarity.
448
            if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
449
                pass
450
451
            # If the last task succeeded, then get the next on-success action node.
452
            if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
453
                chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
454
                action_node = self.chain_holder.get_next_node(
455
                    last_task['name'], condition='on-success')
456
457
            # If the last task failed, then get the next on-failure action node.
458
            if liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
459
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
460
                action_node = self.chain_holder.get_next_node(
461
                    last_task['name'], condition='on-failure')
462
463
        # Setup parent context.
464
        parent_context = {
465
            'execution_id': self.execution_id
466
        }
467
468
        if getattr(self.liveaction, 'context', None):
469
            parent_context.update(self.liveaction.context)
470
471
        # Run the action chain until there are no more tasks.
472
        while action_node:
473
            error = None
474
            liveaction = None
475
            last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None
476
            created_at = date_utils.get_datetime_utc_now()
477
478
            try:
479
                # If last task was paused, then fetch the liveaction and resume it first.
480
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
481
                    liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
482
                    del result['tasks'][-1]
483
                else:
484
                    liveaction = self._get_next_action(
485
                        action_node=action_node, parent_context=parent_context,
486
                        action_params=action_parameters, context_result=context_result)
487
            except action_exc.InvalidActionReferencedException as e:
488
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
489
                m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
490
                     (action_node.name, action_node.ref))
491
                LOG.exception(m)
492
                top_level_error = self._format_error(e, m)
493
                break
494
            except action_exc.ParameterRenderingFailedException as e:
495
                # Rendering parameters failed before we even got to running this action,
496
                # abort and fail the whole action chain
497
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
498
                m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name
499
                LOG.exception(m)
500
                top_level_error = self._format_error(e, m)
501
                break
502
            except db_exc.StackStormDBObjectNotFoundError as e:
503
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
504
                m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name
505
                LOG.exception(m)
506
                top_level_error = self._format_error(e, m)
507
                break
508
509
            try:
510
                # If last task was paused, then fetch the liveaction and resume it first.
511
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
512
                    LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id)
513
                    liveaction = self._resume_action(liveaction)
514
                else:
515
                    LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id)
516
                    liveaction = self._run_action(liveaction)
517
            except Exception as e:
518
                # Save the traceback and error message
519
                m = 'Failed running task "%s".' % action_node.name
520
                LOG.exception(m)
521
                error = self._format_error(e, m)
522
                context_result[action_node.name] = error
523
            else:
524
                # Update context result
525
                context_result[action_node.name] = liveaction.result
526
527
                # Render and publish variables
528
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
529
                    action_node=action_node, action_parameters=action_parameters,
530
                    execution_result=liveaction.result, previous_execution_results=context_result,
531
                    chain_vars=self.chain_holder.vars)
532
533
                if rendered_publish_vars:
534
                    self.chain_holder.vars.update(rendered_publish_vars)
535
                    if self._display_published:
536
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
537
            finally:
538
                # Record result and resolve a next node based on the task success or failure
539
                updated_at = date_utils.get_datetime_utc_now()
540
541
                task_result = self._format_action_exec_result(
542
                    action_node,
543
                    liveaction,
544
                    created_at,
545
                    updated_at,
546
                    error=error
547
                )
548
549
                result['tasks'].append(task_result)
550
551
                try:
552
                    if not liveaction:
553
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
554
                        action_node = self.chain_holder.get_next_node(
555
                            action_node.name, condition='on-failure')
556
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
557
                        chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT
558
                        action_node = self.chain_holder.get_next_node(
559
                            action_node.name, condition='on-failure')
560
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
561
                        LOG.info('Chain execution (%s) canceled because task "%s" is canceled.',
562
                                 self.liveaction_id, action_node.name)
563
                        chain_status = action_constants.LIVEACTION_STATUS_CANCELED
564
                        action_node = None
565
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
566
                        LOG.info('Chain execution (%s) paused because task "%s" is paused.',
567
                                 self.liveaction_id, action_node.name)
568
                        chain_status = action_constants.LIVEACTION_STATUS_PAUSED
569
                        self._save_vars()
570
                        action_node = None
571
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
572
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
573
                        action_node = self.chain_holder.get_next_node(
574
                            action_node.name, condition='on-failure')
575
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
576
                        chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
577
                        action_node = self.chain_holder.get_next_node(
578
                            action_node.name, condition='on-success')
579
                    else:
580
                        action_node = None
581
                except Exception as e:
582
                    chain_status = action_constants.LIVEACTION_STATUS_FAILED
583
                    m = 'Failed to get next node "%s".' % action_node.name
584
                    LOG.exception(m)
585
                    top_level_error = self._format_error(e, m)
586
                    action_node = None
587
                    break
0 ignored issues
show
Bug Best Practice introduced by
break statement in finally block may swallow exception

Placing a return statement inside finally will swallow all exceptions that may have been thrown in the try block.

Loading history...
588
589
            if action_service.is_action_canceled_or_canceling(self.liveaction.id):
590
                LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id)
591
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
592
                return (chain_status, result, None)
593
594
            if action_service.is_action_paused_or_pausing(self.liveaction.id):
595
                LOG.info('Chain execution (%s) paused by user.', self.liveaction.id)
596
                chain_status = action_constants.LIVEACTION_STATUS_PAUSED
597
                self._save_vars()
598
                return (chain_status, result, self.liveaction.context)
599
600
        if top_level_error and isinstance(top_level_error, dict):
601
            result.update(top_level_error)
602
603
        return (chain_status, result, self.liveaction.context)
604
605
    def _format_error(self, e, msg):
606
        return {
607
            'error': '%s. %s' % (msg, str(e)),
608
            'traceback': traceback.format_exc(10)
609
        }
610
611
    def _save_vars(self):
612
        # Save the context vars in the liveaction context.
613
        self.liveaction.context['vars'] = self.chain_holder.vars
614
615
    @staticmethod
616
    def _render_publish_vars(action_node, action_parameters, execution_result,
617
                             previous_execution_results, chain_vars):
618
        """
619
        If no output is specified on the action_node the output is the entire execution_result.
620
        If any output is specified then only those variables are published as output of an
621
        execution of this action_node.
622
        The output variable can refer to a variable from the execution_result,
623
        previous_execution_results or chain_vars.
624
        """
625
        if not action_node.publish:
626
            return {}
627
628
        context = {}
629
        context.update(action_parameters)
630
        context.update({action_node.name: execution_result})
631
        context.update(previous_execution_results)
632
        context.update(chain_vars)
633
        context.update({RESULTS_KEY: previous_execution_results})
634
635
        context.update({
636
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
637
                scope=kv_constants.SYSTEM_SCOPE)
638
        })
639
640
        context.update({
641
            kv_constants.DATASTORE_PARENT_SCOPE: {
642
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
643
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
644
            }
645
        })
646
647
        try:
648
            rendered_result = jinja_utils.render_values(mapping=action_node.publish,
649
                                                        context=context)
650
        except Exception as e:
651
            key = getattr(e, 'key', None)
652
            value = getattr(e, 'value', None)
653
            msg = ('Failed rendering value for publish parameter "%s" in task "%s" '
654
                   '(template string=%s): %s' % (key, action_node.name, value, str(e)))
655
            raise action_exc.ParameterRenderingFailedException(msg)
656
657
        return rendered_result
658
659
    @staticmethod
660
    def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context):
661
        # setup context with original parameters and the intermediate results.
662
        chain_parent = chain_context.get('parent', {})
663
        pack = chain_parent.get('pack')
664
        user = chain_parent.get('user')
665
666
        config = get_config(pack, user)
667
668
        context = {}
669
        context.update(original_parameters)
670
        context.update(results)
671
        context.update(chain_vars)
672
        context.update({RESULTS_KEY: results})
673
674
        context.update({
675
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
676
                scope=kv_constants.SYSTEM_SCOPE)
677
        })
678
679
        context.update({
680
            kv_constants.DATASTORE_PARENT_SCOPE: {
681
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
682
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
683
            }
684
        })
685
        context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context})
686
        context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config})
687
        try:
688
            rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(),
689
                                                        context=context)
690
        except Exception as e:
691
            LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key))
692
693
            key = getattr(e, 'key', None)
694
            value = getattr(e, 'value', None)
695
            msg = ('Failed rendering value for action parameter "%s" in task "%s" '
696
                   '(template string=%s): %s') % (key, action_node.name, value, str(e))
697
            raise action_exc.ParameterRenderingFailedException(msg)
698
        LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params))
699
        return rendered_params
700
701
    def _get_next_action(self, action_node, parent_context, action_params, context_result):
702
        # Verify that the referenced action exists
703
        # TODO: We do another lookup in cast_param, refactor to reduce number of lookups
704
        task_name = action_node.name
705
        action_ref = action_node.ref
706
        action_db = action_db_util.get_action_by_ref(ref=action_ref)
707
708
        if not action_db:
709
            error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref)
710
            raise action_exc.InvalidActionReferencedException(error)
711
712
        resolved_params = ActionChainRunner._resolve_params(
713
            action_node=action_node, original_parameters=action_params,
714
            results=context_result, chain_vars=self.chain_holder.vars,
715
            chain_context={'parent': parent_context})
716
717
        liveaction = self._build_liveaction_object(
718
            action_node=action_node,
719
            resolved_params=resolved_params,
720
            parent_context=parent_context)
721
722
        return liveaction
723
724
    def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
725
        """
726
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
727
        :type sleep_delay: ``float``
728
        """
729
        try:
730
            liveaction, _ = action_service.request(liveaction)
731
        except Exception as e:
732
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
733
            LOG.exception('Failed to schedule liveaction.')
734
            raise e
735
736
        while (wait_for_completion and liveaction.status not in (
737
                action_constants.LIVEACTION_COMPLETED_STATES +
738
                [action_constants.LIVEACTION_STATUS_PAUSED])):
739
            eventlet.sleep(sleep_delay)
740
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
741
742
        return liveaction
743
744
    def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
745
        """
746
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
747
        :type sleep_delay: ``float``
748
        """
749
        try:
750
            user = self.context.get('user', None)
751
            liveaction, _ = action_service.request_resume(liveaction, user)
752
        except Exception as e:
753
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
754
            LOG.exception('Failed to schedule liveaction.')
755
            raise e
756
757
        while (wait_for_completion and liveaction.status not in (
758
                action_constants.LIVEACTION_COMPLETED_STATES +
759
                [action_constants.LIVEACTION_STATUS_PAUSED])):
760
            eventlet.sleep(sleep_delay)
761
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
762
763
        return liveaction
764
765
    def _build_liveaction_object(self, action_node, resolved_params, parent_context):
766
        liveaction = LiveActionDB(action=action_node.ref)
767
768
        # Setup notify for task in chain.
769
        notify = self._get_notify(action_node)
770
        if notify:
771
            liveaction.notify = notify
772
            LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify)
773
774
        liveaction.context = {
775
            'parent': parent_context,
776
            'chain': vars(action_node)
777
        }
778
        liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
779
                                                               params=resolved_params)
780
        return liveaction
781
782
    def _get_notify(self, action_node):
783
        if action_node.name not in self._skip_notify_tasks:
784
            if action_node.notify:
785
                task_notify = NotificationsHelper.to_model(action_node.notify)
786
                return task_notify
787
            elif self._chain_notify:
788
                return self._chain_notify
789
790
        return None
791
792
    def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
793
        if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
794
            created_at = isotime.parse(prev_task_result['created_at'])
795
            updated_at = liveaction.end_timestamp
796
        else:
797
            created_at = isotime.parse(prev_task_result['created_at'])
798
            updated_at = isotime.parse(prev_task_result['updated_at'])
799
800
        return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
801
802
    def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at,
803
                                   error=None):
804
        """
805
        Format ActionExecution result so it can be used in the final action result output.
806
807
        :rtype: ``dict``
808
        """
809
        assert isinstance(created_at, datetime.datetime)
810
        assert isinstance(updated_at, datetime.datetime)
811
812
        result = {}
813
814
        execution_db = None
815
        if liveaction_db:
816
            execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
817
818
        result['id'] = action_node.name
819
        result['name'] = action_node.name
820
        result['execution_id'] = str(execution_db.id) if execution_db else None
821
        result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None
822
        result['workflow'] = None
823
824
        result['created_at'] = isotime.format(dt=created_at)
825
        result['updated_at'] = isotime.format(dt=updated_at)
826
827
        if error or not liveaction_db:
828
            result['state'] = action_constants.LIVEACTION_STATUS_FAILED
829
        else:
830
            result['state'] = liveaction_db.status
831
832
        if error:
833
            result['result'] = error
834
        else:
835
            result['result'] = liveaction_db.result
836
837
        return result
838
839
840
def get_runner():
841
    return ActionChainRunner(str(uuid.uuid4()))
842
843
844
def get_metadata():
845
    return get_runner_metadata('action_chain_runner')
846