Passed
Pull Request — master (#3645)
by W
09:06 queued 03:31
created

ActionChainRunner   F

Complexity

Total Complexity 92

Size/Duplication

Total Lines 571
Duplicated Lines 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
c 2
b 0
f 0
dl 0
loc 571
rs 1.5789
wmc 92

17 Methods

Rating   Name   Duplication   Size   Complexity  
C pre_run() 0 42 7
B _render_publish_vars() 0 43 3
A _get_notify() 0 9 4
F _format_action_exec_result() 0 36 9
A _format_error() 0 4 1
B _resolve_params() 0 41 2
A _save_vars() 0 3 1
A _get_updated_action_exec_result() 0 9 2
A _resume_action() 0 20 4
B resume() 0 26 3
A run() 0 3 1
A _build_liveaction_object() 0 16 2
A _get_next_action() 0 22 2
A _run_action() 0 19 4
A pause() 0 15 4
A __init__() 0 7 1
F _run_chain() 0 236 42

How to fix   Complexity   

Complex Class

Complex classes like ActionChainRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import eventlet
18
import traceback
19
import uuid
20
import datetime
21
22
from jsonschema import exceptions as json_schema_exc
23
24
from st2common.runners.base import ActionRunner
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.constants import pack as pack_constants
28
from st2common.constants import keyvalue as kv_constants
29
from st2common.content.loader import MetaLoader
30
from st2common.exceptions import action as action_exc
31
from st2common.exceptions import actionrunner as runner_exc
32
from st2common.exceptions import db as db_exc
33
from st2common.models.api.notification import NotificationsHelper
34
from st2common.models.db.liveaction import LiveActionDB
35
from st2common.models.system import actionchain
36
from st2common.models.utils import action_param_utils
37
from st2common.persistence.execution import ActionExecution
38
from st2common.persistence.liveaction import LiveAction
39
from st2common.services import action as action_service
40
from st2common.services import keyvalues as kv_service
41
from st2common.util import action_db as action_db_util
42
from st2common.util import isotime
43
from st2common.util import date as date_utils
44
from st2common.util import jinja as jinja_utils
45
from st2common.util import param as param_utils
46
from st2common.util.config_loader import get_config
47
48
49
LOG = logging.getLogger(__name__)
50
RESULTS_KEY = '__results'
51
JINJA_START_MARKERS = [
52
    '{{',
53
    '{%'
54
]
55
PUBLISHED_VARS_KEY = 'published'
56
57
58
class ChainHolder(object):
59
60
    def __init__(self, chainspec, chainname):
61
        self.actionchain = actionchain.ActionChain(**chainspec)
62
        self.chainname = chainname
63
64
        if not self.actionchain.default:
65
            default = self._get_default(self.actionchain)
66
            self.actionchain.default = default
67
68
        LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname)
69
        if not self.actionchain.default:
70
            raise Exception('Failed to find default node in %s.' % (self.chainname))
71
72
        self.vars = {}
73
74
    def init_vars(self, action_parameters):
75
        if self.actionchain.vars:
76
            self.vars = self._get_rendered_vars(self.actionchain.vars,
77
                                                action_parameters=action_parameters)
78
79
    def restore_vars(self, ctx_vars):
80
        self.vars.update(copy.deepcopy(ctx_vars))
81
82
    def validate(self):
83
        """
84
        Function which performs a simple compile time validation.
85
86
        Keep in mind that some variables are only resolved during run time which means we can
87
        perform only simple validation during compile / create time.
88
        """
89
        all_nodes = self._get_all_nodes(action_chain=self.actionchain)
90
91
        for node in self.actionchain.chain:
92
            on_success_node_name = node.on_success
93
            on_failure_node_name = node.on_failure
94
95
            # Check "on-success" path
96
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
97
                                                  node_name=on_success_node_name)
98
            if not valid_name:
99
                msg = ('Unable to find node with name "%s" referenced in "on-success" in '
100
                       'task "%s".' % (on_success_node_name, node.name))
101
                raise ValueError(msg)
102
103
            # Check "on-failure" path
104
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
105
                                                  node_name=on_failure_node_name)
106
            if not valid_name:
107
                msg = ('Unable to find node with name "%s" referenced in "on-failure" in '
108
                       'task "%s".' % (on_failure_node_name, node.name))
109
                raise ValueError(msg)
110
111
        # check if node specified in default is valid.
112
        if self.actionchain.default:
113
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
114
                                                  node_name=self.actionchain.default)
115
            if not valid_name:
116
                msg = ('Unable to find node with name "%s" referenced in "default".' %
117
                       self.actionchain.default)
118
                raise ValueError(msg)
119
        return True
120
121
    @staticmethod
122
    def _get_default(action_chain):
123
        # default is defined
124
        if action_chain.default:
125
            return action_chain.default
126
        # no nodes in chain
127
        if not action_chain.chain:
128
            return None
129
        # The first node with no references is the default node. Assumptions
130
        # that support this are :
131
        # 1. There are no loops in the chain. Even if there are loops there is
132
        #    at least 1 node which does not end up in this loop.
133
        # 2. There are no fragments in the chain.
134
        all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain)
135
        node_names = set(all_nodes)
136
        on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain)
137
        on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain)
138
        referenced_nodes = on_success_nodes | on_failure_nodes
139
        possible_default_nodes = node_names - referenced_nodes
140
        if possible_default_nodes:
141
            # This is to preserve order. set([..]) does not preserve the order so iterate
142
            # over original array.
143
            for node in all_nodes:
144
                if node in possible_default_nodes:
145
                    return node
146
        # If no node is found assume the first node in the chain list to be default.
147
        return action_chain.chain[0].name
148
149
    @staticmethod
150
    def _get_all_nodes(action_chain):
151
        """
152
        Return names for all the nodes in the chain.
153
        """
154
        all_nodes = [node.name for node in action_chain.chain]
155
        return all_nodes
156
157
    @staticmethod
158
    def _get_all_on_success_nodes(action_chain):
159
        """
160
        Return names for all the tasks referenced in "on-success".
161
        """
162
        on_success_nodes = set([node.on_success for node in action_chain.chain])
163
        return on_success_nodes
164
165
    @staticmethod
166
    def _get_all_on_failure_nodes(action_chain):
167
        """
168
        Return names for all the tasks referenced in "on-failure".
169
        """
170
        on_failure_nodes = set([node.on_failure for node in action_chain.chain])
171
        return on_failure_nodes
172
173
    def _is_valid_node_name(self, all_node_names, node_name):
174
        """
175
        Function which validates that the provided node name is defined in the workflow definition
176
        and it's valid.
177
178
        Keep in mind that we can only perform validation for task names which don't include jinja
179
        expressions since those are rendered at run time.
180
        """
181
        if not node_name:
182
            # This task name needs to be resolved during run time so we cant validate the name now
183
            return True
184
185
        is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name)
186
        if is_jinja_expression:
187
            # This task name needs to be resolved during run time so we cant validate the name
188
            # now
189
            return True
190
191
        return node_name in all_node_names
192
193
    @staticmethod
194
    def _get_rendered_vars(vars, action_parameters):
195
        if not vars:
196
            return {}
197
        context = {}
198
        context.update({
199
            kv_constants.DATASTORE_PARENT_SCOPE: {
200
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
201
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
202
            }
203
        })
204
        context.update(action_parameters)
205
        LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context)
206
        return jinja_utils.render_values(mapping=vars, context=context)
207
208
    def get_node(self, node_name=None, raise_on_failure=False):
209
        if not node_name:
210
            return None
211
        for node in self.actionchain.chain:
212
            if node.name == node_name:
213
                return node
214
        if raise_on_failure:
215
            raise runner_exc.ActionRunnerException(
216
                'Unable to find node with name "%s".' % (node_name))
217
        return None
218
219
    def get_next_node(self, curr_node_name=None, condition='on-success'):
220
        if not curr_node_name:
221
            return self.get_node(self.actionchain.default)
222
        current_node = self.get_node(curr_node_name)
223
        if condition == 'on-success':
224
            return self.get_node(current_node.on_success, raise_on_failure=True)
225
        elif condition == 'on-failure':
226
            return self.get_node(current_node.on_failure, raise_on_failure=True)
227
        raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition)
228
229
230
class ActionChainRunner(ActionRunner):
231
232
    def __init__(self, runner_id):
233
        super(ActionChainRunner, self).__init__(runner_id=runner_id)
234
        self.chain_holder = None
235
        self._meta_loader = MetaLoader()
236
        self._skip_notify_tasks = []
237
        self._display_published = True
238
        self._chain_notify = None
239
240
    def pre_run(self):
241
        super(ActionChainRunner, self).pre_run()
242
243
        chainspec_file = self.entry_point
244
        LOG.debug('Reading action chain from %s for action %s.', chainspec_file,
245
                  self.action)
246
247
        try:
248
            chainspec = self._meta_loader.load(file_path=chainspec_file,
249
                                               expected_type=dict)
250
        except Exception as e:
251
            message = ('Failed to parse action chain definition from "%s": %s' %
252
                       (chainspec_file, str(e)))
253
            LOG.exception('Failed to load action chain definition.')
254
            raise runner_exc.ActionRunnerPreRunError(message)
255
256
        try:
257
            self.chain_holder = ChainHolder(chainspec, self.action_name)
258
        except json_schema_exc.ValidationError as e:
259
            # preserve the whole nasty jsonschema message as that is better to get to the
260
            # root cause
261
            message = str(e)
262
            LOG.exception('Failed to instantiate ActionChain.')
263
            raise runner_exc.ActionRunnerPreRunError(message)
264
        except Exception as e:
265
            message = e.message or str(e)
266
            LOG.exception('Failed to instantiate ActionChain.')
267
            raise runner_exc.ActionRunnerPreRunError(message)
268
269
        # Runner attributes are set lazily. So these steps
270
        # should happen outside the constructor.
271
        if getattr(self, 'liveaction', None):
272
            self._chain_notify = getattr(self.liveaction, 'notify', None)
273
        if self.runner_parameters:
274
            self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
275
            self._display_published = self.runner_parameters.get('display_published', True)
276
277
        # Perform some pre-run chain validation
278
        try:
279
            self.chain_holder.validate()
280
        except Exception as e:
281
            raise runner_exc.ActionRunnerPreRunError(e.message)
282
283
    def run(self, action_parameters):
284
        # Run the action chain.
285
        return self._run_chain(action_parameters)
286
287
    def pause(self):
288
        # Identify the list of action executions that are workflows and cascade pause.
289
        for child_exec_id in self.execution.children:
290
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
291
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
292
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
293
                action_service.request_pause(
294
                    LiveAction.get(id=child_exec.liveaction['id']),
295
                    self.context.get('user', None)
296
                )
297
298
        return (
299
            action_constants.LIVEACTION_STATUS_PAUSING,
300
            self.liveaction.result,
301
            self.liveaction.context
302
        )
303
304
    def resume(self):
305
        # Restore runner and action parameters since they are not provided on resume.
306
        runner_parameters, action_parameters = param_utils.render_final_params(
307
            self.runner_type_db.runner_parameters,
308
            self.action.parameters,
309
            self.liveaction.parameters,
310
            self.liveaction.context
311
        )
312
313
        # Assign runner parameters needed for pre-run.
314
        if runner_parameters:
315
            self.runner_parameters = runner_parameters
316
317
        # Restore chain holder if it is not initialized.
318
        if not self.chain_holder:
319
            self.pre_run()
320
321
        # Change the status of the liveaction from resuming to running.
322
        self.liveaction = action_service.update_status(
323
            self.liveaction,
324
            action_constants.LIVEACTION_STATUS_RUNNING,
325
            publish=False
326
        )
327
328
        # Run the action chain.
329
        return self._run_chain(action_parameters, resuming=True)
330
331
    def _run_chain(self, action_parameters, resuming=False):
332
        # Set chain status to fail unless explicitly set to succeed.
333
        chain_status = action_constants.LIVEACTION_STATUS_FAILED
334
335
        # Result holds the final result that the chain store in the database.
336
        result = {'tasks': []}
337
338
        # Save published variables into the result if specified.
339
        if self._display_published:
340
            result[PUBLISHED_VARS_KEY] = {}
341
342
        context_result = {}  # Holds result which is used for the template context purposes
343
        top_level_error = None  # Stores a reference to a top level error
344
        action_node = None
345
        last_task = None
346
347
        try:
348
            # Initialize vars with the action parameters.
349
            # This allows action parameers to be referenced from vars.
350
            self.chain_holder.init_vars(action_parameters)
351
        except Exception as e:
352
            chain_status = action_constants.LIVEACTION_STATUS_FAILED
353
            m = 'Failed initializing ``vars`` in chain.'
354
            LOG.exception(m)
355
            top_level_error = self._format_error(e, m)
356
            result.update(top_level_error)
357
            return (chain_status, result, None)
358
359
        # Restore state on resuming an existing chain execution.
360
        if resuming:
361
            # Restore vars is any from the liveaction.
362
            ctx_vars = self.liveaction.context.pop('vars', {})
363
            self.chain_holder.restore_vars(ctx_vars)
364
365
            # Restore result if any from the liveaction.
366
            if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result:
367
                result = self.liveaction.result
368
369
            # Initialize or rebuild existing context_result from liveaction
370
            # which holds the result used for resolving context in Jinja template.
371
            for task in result.get('tasks', []):
372
                context_result[task['name']] = task['result']
373
374
            # Restore or initialize the top_level_error
375
            # that stores a reference to a top level error.
376
            if 'error' in result or 'traceback' in result:
377
                top_level_error = {
378
                    'error': result.get('error'),
379
                    'traceback': result.get('traceback')
380
                }
381
382
        # If there are no executed tasks in the chain, then get the first node.
383
        if len(result['tasks']) <= 0:
384
            try:
385
                action_node = self.chain_holder.get_next_node()
386
            except Exception as e:
387
                m = 'Failed to get starting node "%s".', action_node.name
388
                LOG.exception(m)
389
                top_level_error = self._format_error(e, m)
390
391
        # Otherwise, figure out the last task executed and
392
        # its state to determine where to begin executing.
393
        else:
394
            last_task = result['tasks'][-1]
395
            action_node = self.chain_holder.get_node(last_task['name'])
396
            liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
397
398
            # If the liveaction of the last task has changed, update the result entry.
399
            if liveaction.status != last_task['state']:
400
                updated_task_result = self._get_updated_action_exec_result(
401
                    action_node, liveaction, last_task)
402
                del result['tasks'][-1]
403
                result['tasks'].append(updated_task_result)
404
405
            # If the last task was canceled, then canceled the chain altogether.
406
            if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
407
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
408
                return (chain_status, result, None)
409
410
            # If the last task was paused, then stay on this action node.
411
            # This is explicitly put here for clarity.
412
            if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
413
                pass
414
415
            # If the last task was completed, then get the next action node.
416
            if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
417
                action_node = self.chain_holder.get_next_node(
418
                    last_task['name'],
419
                    condition=(
420
                        'on-failure'
421
                        if liveaction.status in action_constants.LIVEACTION_FAILED_STATES
422
                        else 'on-success'
423
                    )
424
                )
425
426
        # Setup parent context.
427
        parent_context = {
428
            'execution_id': self.execution_id
429
        }
430
431
        if getattr(self.liveaction, 'context', None):
432
            parent_context.update(self.liveaction.context)
433
434
        # Run the action chain until there are no more tasks.
435
        while action_node:
436
            error = None
437
            liveaction = None
438
            last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None
439
            created_at = date_utils.get_datetime_utc_now()
440
441
            try:
442
                # If last task was paused, then fetch the liveaction and resume it first.
443
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
444
                    liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
445
                    del result['tasks'][-1]
446
                else:
447
                    liveaction = self._get_next_action(
448
                        action_node=action_node, parent_context=parent_context,
449
                        action_params=action_parameters, context_result=context_result)
450
            except action_exc.InvalidActionReferencedException as e:
451
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
452
                m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
453
                     (action_node.name, action_node.ref))
454
                LOG.exception(m)
455
                top_level_error = self._format_error(e, m)
456
                break
457
            except action_exc.ParameterRenderingFailedException as e:
458
                # Rendering parameters failed before we even got to running this action,
459
                # abort and fail the whole action chain
460
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
461
                m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name
462
                LOG.exception(m)
463
                top_level_error = self._format_error(e, m)
464
                break
465
            except db_exc.StackStormDBObjectNotFoundError as e:
466
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
467
                m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name
468
                LOG.exception(m)
469
                top_level_error = self._format_error(e, m)
470
                break
471
472
            try:
473
                # If last task was paused, then fetch the liveaction and resume it first.
474
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
475
                    LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id)
476
                    liveaction = self._resume_action(liveaction)
477
                else:
478
                    LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id)
479
                    liveaction = self._run_action(liveaction)
480
            except Exception as e:
481
                # Save the traceback and error message
482
                m = 'Failed running task "%s".' % action_node.name
483
                LOG.exception(m)
484
                error = self._format_error(e, m)
485
                context_result[action_node.name] = error
486
            else:
487
                # Update context result
488
                context_result[action_node.name] = liveaction.result
489
490
                # Render and publish variables
491
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
492
                    action_node=action_node, action_parameters=action_parameters,
493
                    execution_result=liveaction.result, previous_execution_results=context_result,
494
                    chain_vars=self.chain_holder.vars)
495
496
                if rendered_publish_vars:
497
                    self.chain_holder.vars.update(rendered_publish_vars)
498
                    if self._display_published:
499
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
500
            finally:
501
                # Record result and resolve a next node based on the task success or failure
502
                updated_at = date_utils.get_datetime_utc_now()
503
504
                task_result = self._format_action_exec_result(
505
                    action_node,
506
                    liveaction,
507
                    created_at,
508
                    updated_at,
509
                    error=error
510
                )
511
512
                result['tasks'].append(task_result)
513
514
                try:
515
                    if not liveaction:
516
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
517
                        action_node = self.chain_holder.get_next_node(
518
                            action_node.name, condition='on-failure')
519
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
520
                        chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT
521
                        action_node = self.chain_holder.get_next_node(
522
                            action_node.name, condition='on-failure')
523
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
524
                        LOG.info('Chain execution (%s) canceled because task "%s" is canceled.',
525
                                 self.liveaction_id, action_node.name)
526
                        chain_status = action_constants.LIVEACTION_STATUS_CANCELED
527
                        action_node = None
528
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
529
                        LOG.info('Chain execution (%s) paused because task "%s" is paused.',
530
                                 self.liveaction_id, action_node.name)
531
                        chain_status = action_constants.LIVEACTION_STATUS_PAUSED
532
                        self._save_vars()
533
                        action_node = None
534
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
535
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
536
                        action_node = self.chain_holder.get_next_node(
537
                            action_node.name, condition='on-failure')
538
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
539
                        chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
540
                        action_node = self.chain_holder.get_next_node(
541
                            action_node.name, condition='on-success')
542
                    else:
543
                        action_node = None
544
                except Exception as e:
545
                    chain_status = action_constants.LIVEACTION_STATUS_FAILED
546
                    m = 'Failed to get next node "%s".' % action_node.name
547
                    LOG.exception(m)
548
                    top_level_error = self._format_error(e, m)
549
                    action_node = None
550
                    break
551
552
            if action_service.is_action_canceled_or_canceling(self.liveaction.id):
553
                LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id)
554
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
555
                return (chain_status, result, None)
556
557
            if action_service.is_action_paused_or_pausing(self.liveaction.id):
558
                LOG.info('Chain execution (%s) paused by user.', self.liveaction.id)
559
                chain_status = action_constants.LIVEACTION_STATUS_PAUSED
560
                self._save_vars()
561
                return (chain_status, result, self.liveaction.context)
562
563
        if top_level_error and isinstance(top_level_error, dict):
564
            result.update(top_level_error)
565
566
        return (chain_status, result, self.liveaction.context)
567
568
    def _format_error(self, e, msg):
569
        return {
570
            'error': '%s. %s' % (msg, str(e)),
571
            'traceback': traceback.format_exc(10)
572
        }
573
574
    def _save_vars(self):
575
        # Save the context vars in the liveaction context.
576
        self.liveaction.context['vars'] = self.chain_holder.vars
577
578
    @staticmethod
579
    def _render_publish_vars(action_node, action_parameters, execution_result,
580
                             previous_execution_results, chain_vars):
581
        """
582
        If no output is specified on the action_node the output is the entire execution_result.
583
        If any output is specified then only those variables are published as output of an
584
        execution of this action_node.
585
        The output variable can refer to a variable from the execution_result,
586
        previous_execution_results or chain_vars.
587
        """
588
        if not action_node.publish:
589
            return {}
590
591
        context = {}
592
        context.update(action_parameters)
593
        context.update({action_node.name: execution_result})
594
        context.update(previous_execution_results)
595
        context.update(chain_vars)
596
        context.update({RESULTS_KEY: previous_execution_results})
597
598
        context.update({
599
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
600
                scope=kv_constants.SYSTEM_SCOPE)
601
        })
602
603
        context.update({
604
            kv_constants.DATASTORE_PARENT_SCOPE: {
605
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
606
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
607
            }
608
        })
609
610
        try:
611
            rendered_result = jinja_utils.render_values(mapping=action_node.publish,
612
                                                        context=context)
613
        except Exception as e:
614
            key = getattr(e, 'key', None)
615
            value = getattr(e, 'value', None)
616
            msg = ('Failed rendering value for publish parameter "%s" in task "%s" '
617
                   '(template string=%s): %s' % (key, action_node.name, value, str(e)))
618
            raise action_exc.ParameterRenderingFailedException(msg)
619
620
        return rendered_result
621
622
    @staticmethod
623
    def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context):
624
        # setup context with original parameters and the intermediate results.
625
        chain_parent = chain_context.get('parent', {})
626
        pack = chain_parent.get('pack')
627
        user = chain_parent.get('user')
628
629
        config = get_config(pack, user)
630
631
        context = {}
632
        context.update(original_parameters)
633
        context.update(results)
634
        context.update(chain_vars)
635
        context.update({RESULTS_KEY: results})
636
637
        context.update({
638
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
639
                scope=kv_constants.SYSTEM_SCOPE)
640
        })
641
642
        context.update({
643
            kv_constants.DATASTORE_PARENT_SCOPE: {
644
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
645
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
646
            }
647
        })
648
        context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context})
649
        context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config})
650
        try:
651
            rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(),
652
                                                        context=context)
653
        except Exception as e:
654
            LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key))
655
656
            key = getattr(e, 'key', None)
657
            value = getattr(e, 'value', None)
658
            msg = ('Failed rendering value for action parameter "%s" in task "%s" '
659
                   '(template string=%s): %s') % (key, action_node.name, value, str(e))
660
            raise action_exc.ParameterRenderingFailedException(msg)
661
        LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params))
662
        return rendered_params
663
664
    def _get_next_action(self, action_node, parent_context, action_params, context_result):
665
        # Verify that the referenced action exists
666
        # TODO: We do another lookup in cast_param, refactor to reduce number of lookups
667
        task_name = action_node.name
668
        action_ref = action_node.ref
669
        action_db = action_db_util.get_action_by_ref(ref=action_ref)
670
671
        if not action_db:
672
            error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref)
673
            raise action_exc.InvalidActionReferencedException(error)
674
675
        resolved_params = ActionChainRunner._resolve_params(
676
            action_node=action_node, original_parameters=action_params,
677
            results=context_result, chain_vars=self.chain_holder.vars,
678
            chain_context={'parent': parent_context})
679
680
        liveaction = self._build_liveaction_object(
681
            action_node=action_node,
682
            resolved_params=resolved_params,
683
            parent_context=parent_context)
684
685
        return liveaction
686
687
    def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
688
        """
689
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
690
        :type sleep_delay: ``float``
691
        """
692
        try:
693
            liveaction, _ = action_service.request(liveaction)
694
        except Exception as e:
695
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
696
            LOG.exception('Failed to schedule liveaction.')
697
            raise e
698
699
        while (wait_for_completion and liveaction.status not in (
700
                action_constants.LIVEACTION_COMPLETED_STATES +
701
                [action_constants.LIVEACTION_STATUS_PAUSED])):
702
            eventlet.sleep(sleep_delay)
703
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
704
705
        return liveaction
706
707
    def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
708
        """
709
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
710
        :type sleep_delay: ``float``
711
        """
712
        try:
713
            user = self.context.get('user', None)
714
            liveaction, _ = action_service.request_resume(liveaction, user)
715
        except Exception as e:
716
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
717
            LOG.exception('Failed to schedule liveaction.')
718
            raise e
719
720
        while (wait_for_completion and liveaction.status not in (
721
                action_constants.LIVEACTION_COMPLETED_STATES +
722
                [action_constants.LIVEACTION_STATUS_PAUSED])):
723
            eventlet.sleep(sleep_delay)
724
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
725
726
        return liveaction
727
728
    def _build_liveaction_object(self, action_node, resolved_params, parent_context):
729
        liveaction = LiveActionDB(action=action_node.ref)
730
731
        # Setup notify for task in chain.
732
        notify = self._get_notify(action_node)
733
        if notify:
734
            liveaction.notify = notify
735
            LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify)
736
737
        liveaction.context = {
738
            'parent': parent_context,
739
            'chain': vars(action_node)
740
        }
741
        liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
742
                                                               params=resolved_params)
743
        return liveaction
744
745
    def _get_notify(self, action_node):
746
        if action_node.name not in self._skip_notify_tasks:
747
            if action_node.notify:
748
                task_notify = NotificationsHelper.to_model(action_node.notify)
749
                return task_notify
750
            elif self._chain_notify:
751
                return self._chain_notify
752
753
        return None
754
755
    def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
756
        if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
757
            created_at = isotime.parse(prev_task_result['created_at'])
758
            updated_at = liveaction.end_timestamp
759
        else:
760
            created_at = isotime.parse(prev_task_result['created_at'])
761
            updated_at = isotime.parse(prev_task_result['updated_at'])
762
763
        return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
764
765
    def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at,
766
                                   error=None):
767
        """
768
        Format ActionExecution result so it can be used in the final action result output.
769
770
        :rtype: ``dict``
771
        """
772
        assert isinstance(created_at, datetime.datetime)
773
        assert isinstance(updated_at, datetime.datetime)
774
775
        result = {}
776
777
        execution_db = None
778
        if liveaction_db:
779
            execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
780
781
        result['id'] = action_node.name
782
        result['name'] = action_node.name
783
        result['execution_id'] = str(execution_db.id) if execution_db else None
784
        result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None
785
        result['workflow'] = None
786
787
        result['created_at'] = isotime.format(dt=created_at)
788
        result['updated_at'] = isotime.format(dt=updated_at)
789
790
        if error or not liveaction_db:
791
            result['state'] = action_constants.LIVEACTION_STATUS_FAILED
792
        else:
793
            result['state'] = liveaction_db.status
794
795
        if error:
796
            result['result'] = error
797
        else:
798
            result['result'] = liveaction_db.result
799
800
        return result
801
802
803
def get_runner():
804
    return ActionChainRunner(str(uuid.uuid4()))
805