Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

ActionChainRunner.run()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import eventlet
18
import traceback
19
import uuid
20
import datetime
21
22
from jsonschema import exceptions as json_schema_exc
23
24
from st2common.runners.base import ActionRunner
25
from st2common import log as logging
26
from st2common.constants import action as action_constants
27
from st2common.constants import pack as pack_constants
28
from st2common.constants import keyvalue as kv_constants
29
from st2common.content.loader import MetaLoader
30
from st2common.exceptions import action as action_exc
31
from st2common.exceptions import actionrunner as runner_exc
32
from st2common.exceptions import db as db_exc
33
from st2common.models.api.notification import NotificationsHelper
34
from st2common.models.db.liveaction import LiveActionDB
35
from st2common.models.system import actionchain
36
from st2common.models.utils import action_param_utils
37
from st2common.persistence.execution import ActionExecution
38
from st2common.persistence.liveaction import LiveAction
39
from st2common.services import action as action_service
40
from st2common.services import keyvalues as kv_service
41
from st2common.util import action_db as action_db_util
42
from st2common.util import isotime
43
from st2common.util import date as date_utils
44
from st2common.util import jinja as jinja_utils
45
from st2common.util import param as param_utils
46
from st2common.util.config_loader import get_config
47
48
49
LOG = logging.getLogger(__name__)
50
RESULTS_KEY = '__results'
51
JINJA_START_MARKERS = [
52
    '{{',
53
    '{%'
54
]
55
PUBLISHED_VARS_KEY = 'published'
56
57
58
class ChainHolder(object):
59
60
    def __init__(self, chainspec, chainname):
61
        self.actionchain = actionchain.ActionChain(**chainspec)
62
        self.chainname = chainname
63
64
        if not self.actionchain.default:
65
            default = self._get_default(self.actionchain)
66
            self.actionchain.default = default
67
68
        LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname)
69
        if not self.actionchain.default:
70
            raise Exception('Failed to find default node in %s.' % (self.chainname))
71
72
        self.vars = {}
73
74
    def init_vars(self, action_parameters):
75
        if self.actionchain.vars:
76
            self.vars = self._get_rendered_vars(self.actionchain.vars,
77
                                                action_parameters=action_parameters)
78
79
    def restore_vars(self, ctx_vars):
80
        self.vars.update(copy.deepcopy(ctx_vars))
81
82
    def validate(self):
83
        """
84
        Function which performs a simple compile time validation.
85
86
        Keep in mind that some variables are only resolved during run time which means we can
87
        perform only simple validation during compile / create time.
88
        """
89
        all_nodes = self._get_all_nodes(action_chain=self.actionchain)
90
91
        for node in self.actionchain.chain:
92
            on_success_node_name = node.on_success
93
            on_failure_node_name = node.on_failure
94
95
            # Check "on-success" path
96
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
97
                                                  node_name=on_success_node_name)
98
            if not valid_name:
99
                msg = ('Unable to find node with name "%s" referenced in "on-success" in '
100
                       'task "%s".' % (on_success_node_name, node.name))
101
                raise ValueError(msg)
102
103
            # Check "on-failure" path
104
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
105
                                                  node_name=on_failure_node_name)
106
            if not valid_name:
107
                msg = ('Unable to find node with name "%s" referenced in "on-failure" in '
108
                       'task "%s".' % (on_failure_node_name, node.name))
109
                raise ValueError(msg)
110
111
        # check if node specified in default is valid.
112
        if self.actionchain.default:
113
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
114
                                                  node_name=self.actionchain.default)
115
            if not valid_name:
116
                msg = ('Unable to find node with name "%s" referenced in "default".' %
117
                       self.actionchain.default)
118
                raise ValueError(msg)
119
        return True
120
121
    @staticmethod
122
    def _get_default(action_chain):
123
        # default is defined
124
        if action_chain.default:
125
            return action_chain.default
126
        # no nodes in chain
127
        if not action_chain.chain:
128
            return None
129
        # The first node with no references is the default node. Assumptions
130
        # that support this are :
131
        # 1. There are no loops in the chain. Even if there are loops there is
132
        #    at least 1 node which does not end up in this loop.
133
        # 2. There are no fragments in the chain.
134
        all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain)
135
        node_names = set(all_nodes)
136
        on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain)
137
        on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain)
138
        referenced_nodes = on_success_nodes | on_failure_nodes
139
        possible_default_nodes = node_names - referenced_nodes
140
        if possible_default_nodes:
141
            # This is to preserve order. set([..]) does not preserve the order so iterate
142
            # over original array.
143
            for node in all_nodes:
144
                if node in possible_default_nodes:
145
                    return node
146
        # If no node is found assume the first node in the chain list to be default.
147
        return action_chain.chain[0].name
148
149
    @staticmethod
150
    def _get_all_nodes(action_chain):
151
        """
152
        Return names for all the nodes in the chain.
153
        """
154
        all_nodes = [node.name for node in action_chain.chain]
155
        return all_nodes
156
157
    @staticmethod
158
    def _get_all_on_success_nodes(action_chain):
159
        """
160
        Return names for all the tasks referenced in "on-success".
161
        """
162
        on_success_nodes = set([node.on_success for node in action_chain.chain])
163
        return on_success_nodes
164
165
    @staticmethod
166
    def _get_all_on_failure_nodes(action_chain):
167
        """
168
        Return names for all the tasks referenced in "on-failure".
169
        """
170
        on_failure_nodes = set([node.on_failure for node in action_chain.chain])
171
        return on_failure_nodes
172
173
    def _is_valid_node_name(self, all_node_names, node_name):
174
        """
175
        Function which validates that the provided node name is defined in the workflow definition
176
        and it's valid.
177
178
        Keep in mind that we can only perform validation for task names which don't include jinja
179
        expressions since those are rendered at run time.
180
        """
181
        if not node_name:
182
            # This task name needs to be resolved during run time so we cant validate the name now
183
            return True
184
185
        is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name)
186
        if is_jinja_expression:
187
            # This task name needs to be resolved during run time so we cant validate the name
188
            # now
189
            return True
190
191
        return node_name in all_node_names
192
193
    @staticmethod
194
    def _get_rendered_vars(vars, action_parameters):
195
        if not vars:
196
            return {}
197
        context = {}
198
        context.update({
199
            kv_constants.DATASTORE_PARENT_SCOPE: {
200
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
201
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
202
            }
203
        })
204
        context.update(action_parameters)
205
        LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context)
206
        return jinja_utils.render_values(mapping=vars, context=context)
207
208
    def get_node(self, node_name=None, raise_on_failure=False):
209
        if not node_name:
210
            return None
211
        for node in self.actionchain.chain:
212
            if node.name == node_name:
213
                return node
214
        if raise_on_failure:
215
            raise runner_exc.ActionRunnerException(
216
                'Unable to find node with name "%s".' % (node_name))
217
        return None
218
219
    def get_next_node(self, curr_node_name=None, condition='on-success'):
220
        if not curr_node_name:
221
            return self.get_node(self.actionchain.default)
222
        current_node = self.get_node(curr_node_name)
223
        if condition == 'on-success':
224
            return self.get_node(current_node.on_success, raise_on_failure=True)
225
        elif condition == 'on-failure':
226
            return self.get_node(current_node.on_failure, raise_on_failure=True)
227
        raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition)
228
229
230
class ActionChainRunner(ActionRunner):
231
232
    def __init__(self, runner_id):
233
        super(ActionChainRunner, self).__init__(runner_id=runner_id)
234
        self.chain_holder = None
235
        self._meta_loader = MetaLoader()
236
        self._skip_notify_tasks = []
237
        self._display_published = True
238
        self._chain_notify = None
239
240
    def pre_run(self):
241
        super(ActionChainRunner, self).pre_run()
242
243
        chainspec_file = self.entry_point
244
        LOG.debug('Reading action chain from %s for action %s.', chainspec_file,
245
                  self.action)
246
247
        try:
248
            chainspec = self._meta_loader.load(file_path=chainspec_file,
249
                                               expected_type=dict)
250
        except Exception as e:
251
            message = ('Failed to parse action chain definition from "%s": %s' %
252
                       (chainspec_file, str(e)))
253
            LOG.exception('Failed to load action chain definition.')
254
            raise runner_exc.ActionRunnerPreRunError(message)
255
256
        try:
257
            self.chain_holder = ChainHolder(chainspec, self.action_name)
258
        except json_schema_exc.ValidationError as e:
259
            # preserve the whole nasty jsonschema message as that is better to get to the
260
            # root cause
261
            message = str(e)
262
            LOG.exception('Failed to instantiate ActionChain.')
263
            raise runner_exc.ActionRunnerPreRunError(message)
264
        except Exception as e:
265
            message = e.message or str(e)
266
            LOG.exception('Failed to instantiate ActionChain.')
267
            raise runner_exc.ActionRunnerPreRunError(message)
268
269
        # Runner attributes are set lazily. So these steps
270
        # should happen outside the constructor.
271
        if getattr(self, 'liveaction', None):
272
            self._chain_notify = getattr(self.liveaction, 'notify', None)
273
        if self.runner_parameters:
274
            self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
275
            self._display_published = self.runner_parameters.get('display_published', True)
276
277
        # Perform some pre-run chain validation
278
        try:
279
            self.chain_holder.validate()
280
        except Exception as e:
281
            raise runner_exc.ActionRunnerPreRunError(e.message)
282
283
    def run(self, action_parameters):
284
        # Run the action chain.
285
        return self._run_chain(action_parameters)
286
287
    def pause(self):
288
        # Identify the list of action executions that are workflows and cascade pause.
289
        for child_exec_id in self.execution.children:
290
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
291
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
292
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
293
                action_service.request_pause(
294
                    LiveAction.get(id=child_exec.liveaction['id']),
295
                    self.context.get('user', None)
296
                )
297
298
        return (
299
            action_constants.LIVEACTION_STATUS_PAUSING,
300
            self.liveaction.result,
301
            self.liveaction.context
302
        )
303
304
    def resume(self):
305
        # Restore runner and action parameters since they are not provided on resume.
306
        runner_parameters, action_parameters = param_utils.render_final_params(
307
            self.runner_type_db.runner_parameters,
308
            self.action.parameters,
309
            self.liveaction.parameters,
310
            self.liveaction.context
311
        )
312
313
        # Assign runner parameters needed for pre-run.
314
        if runner_parameters:
315
            self.runner_parameters = runner_parameters
316
317
        # Restore chain holder if it is not initialized.
318
        if not self.chain_holder:
319
            self.pre_run()
320
321
        # Change the status of the liveaction from resuming to running.
322
        self.liveaction = action_service.update_status(
323
            self.liveaction,
324
            action_constants.LIVEACTION_STATUS_RUNNING,
325
            publish=False
326
        )
327
328
        # Run the action chain.
329
        return self._run_chain(action_parameters, resuming=True)
330
331
    def _run_chain(self, action_parameters, resuming=False):
332
        # Set chain status to fail unless explicitly set to succeed.
333
        chain_status = action_constants.LIVEACTION_STATUS_FAILED
334
335
        # Result holds the final result that the chain store in the database.
336
        result = {'tasks': []}
337
338
        # Save published variables into the result if specified.
339
        if self._display_published:
340
            result[PUBLISHED_VARS_KEY] = {}
341
342
        context_result = {}  # Holds result which is used for the template context purposes
343
        top_level_error = None  # Stores a reference to a top level error
344
        action_node = None
345
        last_task = None
346
347
        try:
348
            # Initialize vars with the action parameters.
349
            # This allows action parameers to be referenced from vars.
350
            self.chain_holder.init_vars(action_parameters)
351
        except Exception as e:
352
            chain_status = action_constants.LIVEACTION_STATUS_FAILED
353
            m = 'Failed initializing ``vars`` in chain.'
354
            LOG.exception(m)
355
            top_level_error = self._format_error(e, m)
356
            result.update(top_level_error)
357
            return (chain_status, result, None)
358
359
        # Restore state on resuming an existing chain execution.
360
        if resuming:
361
            # Restore vars is any from the liveaction.
362
            ctx_vars = self.liveaction.context.pop('vars', {})
363
            self.chain_holder.restore_vars(ctx_vars)
364
365
            # Restore result if any from the liveaction.
366
            if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result:
367
                result = self.liveaction.result
368
369
            # Initialize or rebuild existing context_result from liveaction
370
            # which holds the result used for resolving context in Jinja template.
371
            for task in result.get('tasks', []):
372
                context_result[task['name']] = task['result']
373
374
            # Restore or initialize the top_level_error
375
            # that stores a reference to a top level error.
376
            if 'error' in result or 'traceback' in result:
377
                top_level_error = {
378
                    'error': result.get('error'),
379
                    'traceback': result.get('traceback')
380
                }
381
382
        # If there are no executed tasks in the chain, then get the first node.
383
        if len(result['tasks']) <= 0:
384
            try:
385
                action_node = self.chain_holder.get_next_node()
386
            except Exception as e:
387
                m = 'Failed to get starting node "%s".', action_node.name
388
                LOG.exception(m)
389
                top_level_error = self._format_error(e, m)
390
391
            # If there are no action node to run next, then mark the chain successful.
392
            if not action_node:
393
                chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
394
395
        # Otherwise, figure out the last task executed and
396
        # its state to determine where to begin executing.
397
        else:
398
            last_task = result['tasks'][-1]
399
            action_node = self.chain_holder.get_node(last_task['name'])
400
            liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
401
402
            # If the liveaction of the last task has changed, update the result entry.
403
            if liveaction.status != last_task['state']:
404
                updated_task_result = self._get_updated_action_exec_result(
405
                    action_node, liveaction, last_task)
406
                del result['tasks'][-1]
407
                result['tasks'].append(updated_task_result)
408
409
            # If the last task was canceled, then canceled the chain altogether.
410
            if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
411
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
412
                return (chain_status, result, None)
413
414
            # If the last task was paused, then stay on this action node.
415
            # This is explicitly put here for clarity.
416
            if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
417
                pass
418
419
            # If the last task succeeded, then get the next on-success action node.
420
            if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
421
                chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
422
                action_node = self.chain_holder.get_next_node(
423
                    last_task['name'], condition='on-success')
424
425
            # If the last task failed, then get the next on-failure action node.
426
            if liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
427
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
428
                action_node = self.chain_holder.get_next_node(
429
                    last_task['name'], condition='on-failure')
430
431
        # Setup parent context.
432
        parent_context = {
433
            'execution_id': self.execution_id
434
        }
435
436
        if getattr(self.liveaction, 'context', None):
437
            parent_context.update(self.liveaction.context)
438
439
        # Run the action chain until there are no more tasks.
440
        while action_node:
441
            error = None
442
            liveaction = None
443
            last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None
444
            created_at = date_utils.get_datetime_utc_now()
445
446
            try:
447
                # If last task was paused, then fetch the liveaction and resume it first.
448
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
449
                    liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
450
                    del result['tasks'][-1]
451
                else:
452
                    liveaction = self._get_next_action(
453
                        action_node=action_node, parent_context=parent_context,
454
                        action_params=action_parameters, context_result=context_result)
455
            except action_exc.InvalidActionReferencedException as e:
456
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
457
                m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
458
                     (action_node.name, action_node.ref))
459
                LOG.exception(m)
460
                top_level_error = self._format_error(e, m)
461
                break
462
            except action_exc.ParameterRenderingFailedException as e:
463
                # Rendering parameters failed before we even got to running this action,
464
                # abort and fail the whole action chain
465
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
466
                m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name
467
                LOG.exception(m)
468
                top_level_error = self._format_error(e, m)
469
                break
470
            except db_exc.StackStormDBObjectNotFoundError as e:
471
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
472
                m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name
473
                LOG.exception(m)
474
                top_level_error = self._format_error(e, m)
475
                break
476
477
            try:
478
                # If last task was paused, then fetch the liveaction and resume it first.
479
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
480
                    LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id)
481
                    liveaction = self._resume_action(liveaction)
482
                else:
483
                    LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id)
484
                    liveaction = self._run_action(liveaction)
485
            except Exception as e:
486
                # Save the traceback and error message
487
                m = 'Failed running task "%s".' % action_node.name
488
                LOG.exception(m)
489
                error = self._format_error(e, m)
490
                context_result[action_node.name] = error
491
            else:
492
                # Update context result
493
                context_result[action_node.name] = liveaction.result
494
495
                # Render and publish variables
496
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
497
                    action_node=action_node, action_parameters=action_parameters,
498
                    execution_result=liveaction.result, previous_execution_results=context_result,
499
                    chain_vars=self.chain_holder.vars)
500
501
                if rendered_publish_vars:
502
                    self.chain_holder.vars.update(rendered_publish_vars)
503
                    if self._display_published:
504
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
505
            finally:
506
                # Record result and resolve a next node based on the task success or failure
507
                updated_at = date_utils.get_datetime_utc_now()
508
509
                task_result = self._format_action_exec_result(
510
                    action_node,
511
                    liveaction,
512
                    created_at,
513
                    updated_at,
514
                    error=error
515
                )
516
517
                result['tasks'].append(task_result)
518
519
                try:
520
                    if not liveaction:
521
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
522
                        action_node = self.chain_holder.get_next_node(
523
                            action_node.name, condition='on-failure')
524
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
525
                        chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT
526
                        action_node = self.chain_holder.get_next_node(
527
                            action_node.name, condition='on-failure')
528
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
529
                        LOG.info('Chain execution (%s) canceled because task "%s" is canceled.',
530
                                 self.liveaction_id, action_node.name)
531
                        chain_status = action_constants.LIVEACTION_STATUS_CANCELED
532
                        action_node = None
533
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
534
                        LOG.info('Chain execution (%s) paused because task "%s" is paused.',
535
                                 self.liveaction_id, action_node.name)
536
                        chain_status = action_constants.LIVEACTION_STATUS_PAUSED
537
                        self._save_vars()
538
                        action_node = None
539
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
540
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
541
                        action_node = self.chain_holder.get_next_node(
542
                            action_node.name, condition='on-failure')
543
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
544
                        chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
545
                        action_node = self.chain_holder.get_next_node(
546
                            action_node.name, condition='on-success')
547
                    else:
548
                        action_node = None
549
                except Exception as e:
550
                    chain_status = action_constants.LIVEACTION_STATUS_FAILED
551
                    m = 'Failed to get next node "%s".' % action_node.name
552
                    LOG.exception(m)
553
                    top_level_error = self._format_error(e, m)
554
                    action_node = None
555
                    break
556
557
            if action_service.is_action_canceled_or_canceling(self.liveaction.id):
558
                LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id)
559
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
560
                return (chain_status, result, None)
561
562
            if action_service.is_action_paused_or_pausing(self.liveaction.id):
563
                LOG.info('Chain execution (%s) paused by user.', self.liveaction.id)
564
                chain_status = action_constants.LIVEACTION_STATUS_PAUSED
565
                self._save_vars()
566
                return (chain_status, result, self.liveaction.context)
567
568
        if top_level_error and isinstance(top_level_error, dict):
569
            result.update(top_level_error)
570
571
        return (chain_status, result, self.liveaction.context)
572
573
    def _format_error(self, e, msg):
574
        return {
575
            'error': '%s. %s' % (msg, str(e)),
576
            'traceback': traceback.format_exc(10)
577
        }
578
579
    def _save_vars(self):
580
        # Save the context vars in the liveaction context.
581
        self.liveaction.context['vars'] = self.chain_holder.vars
582
583
    @staticmethod
584
    def _render_publish_vars(action_node, action_parameters, execution_result,
585
                             previous_execution_results, chain_vars):
586
        """
587
        If no output is specified on the action_node the output is the entire execution_result.
588
        If any output is specified then only those variables are published as output of an
589
        execution of this action_node.
590
        The output variable can refer to a variable from the execution_result,
591
        previous_execution_results or chain_vars.
592
        """
593
        if not action_node.publish:
594
            return {}
595
596
        context = {}
597
        context.update(action_parameters)
598
        context.update({action_node.name: execution_result})
599
        context.update(previous_execution_results)
600
        context.update(chain_vars)
601
        context.update({RESULTS_KEY: previous_execution_results})
602
603
        context.update({
604
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
605
                scope=kv_constants.SYSTEM_SCOPE)
606
        })
607
608
        context.update({
609
            kv_constants.DATASTORE_PARENT_SCOPE: {
610
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
611
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
612
            }
613
        })
614
615
        try:
616
            rendered_result = jinja_utils.render_values(mapping=action_node.publish,
617
                                                        context=context)
618
        except Exception as e:
619
            key = getattr(e, 'key', None)
620
            value = getattr(e, 'value', None)
621
            msg = ('Failed rendering value for publish parameter "%s" in task "%s" '
622
                   '(template string=%s): %s' % (key, action_node.name, value, str(e)))
623
            raise action_exc.ParameterRenderingFailedException(msg)
624
625
        return rendered_result
626
627
    @staticmethod
628
    def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context):
629
        # setup context with original parameters and the intermediate results.
630
        chain_parent = chain_context.get('parent', {})
631
        pack = chain_parent.get('pack')
632
        user = chain_parent.get('user')
633
634
        config = get_config(pack, user)
635
636
        context = {}
637
        context.update(original_parameters)
638
        context.update(results)
639
        context.update(chain_vars)
640
        context.update({RESULTS_KEY: results})
641
642
        context.update({
643
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
644
                scope=kv_constants.SYSTEM_SCOPE)
645
        })
646
647
        context.update({
648
            kv_constants.DATASTORE_PARENT_SCOPE: {
649
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
650
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
651
            }
652
        })
653
        context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context})
654
        context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config})
655
        try:
656
            rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(),
657
                                                        context=context)
658
        except Exception as e:
659
            LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key))
660
661
            key = getattr(e, 'key', None)
662
            value = getattr(e, 'value', None)
663
            msg = ('Failed rendering value for action parameter "%s" in task "%s" '
664
                   '(template string=%s): %s') % (key, action_node.name, value, str(e))
665
            raise action_exc.ParameterRenderingFailedException(msg)
666
        LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params))
667
        return rendered_params
668
669
    def _get_next_action(self, action_node, parent_context, action_params, context_result):
670
        # Verify that the referenced action exists
671
        # TODO: We do another lookup in cast_param, refactor to reduce number of lookups
672
        task_name = action_node.name
673
        action_ref = action_node.ref
674
        action_db = action_db_util.get_action_by_ref(ref=action_ref)
675
676
        if not action_db:
677
            error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref)
678
            raise action_exc.InvalidActionReferencedException(error)
679
680
        resolved_params = ActionChainRunner._resolve_params(
681
            action_node=action_node, original_parameters=action_params,
682
            results=context_result, chain_vars=self.chain_holder.vars,
683
            chain_context={'parent': parent_context})
684
685
        liveaction = self._build_liveaction_object(
686
            action_node=action_node,
687
            resolved_params=resolved_params,
688
            parent_context=parent_context)
689
690
        return liveaction
691
692
    def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
693
        """
694
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
695
        :type sleep_delay: ``float``
696
        """
697
        try:
698
            liveaction, _ = action_service.request(liveaction)
699
        except Exception as e:
700
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
701
            LOG.exception('Failed to schedule liveaction.')
702
            raise e
703
704
        while (wait_for_completion and liveaction.status not in (
705
                action_constants.LIVEACTION_COMPLETED_STATES +
706
                [action_constants.LIVEACTION_STATUS_PAUSED])):
707
            eventlet.sleep(sleep_delay)
708
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
709
710
        return liveaction
711
712
    def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
713
        """
714
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
715
        :type sleep_delay: ``float``
716
        """
717
        try:
718
            user = self.context.get('user', None)
719
            liveaction, _ = action_service.request_resume(liveaction, user)
720
        except Exception as e:
721
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
722
            LOG.exception('Failed to schedule liveaction.')
723
            raise e
724
725
        while (wait_for_completion and liveaction.status not in (
726
                action_constants.LIVEACTION_COMPLETED_STATES +
727
                [action_constants.LIVEACTION_STATUS_PAUSED])):
728
            eventlet.sleep(sleep_delay)
729
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
730
731
        return liveaction
732
733
    def _build_liveaction_object(self, action_node, resolved_params, parent_context):
734
        liveaction = LiveActionDB(action=action_node.ref)
735
736
        # Setup notify for task in chain.
737
        notify = self._get_notify(action_node)
738
        if notify:
739
            liveaction.notify = notify
740
            LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify)
741
742
        liveaction.context = {
743
            'parent': parent_context,
744
            'chain': vars(action_node)
745
        }
746
        liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
747
                                                               params=resolved_params)
748
        return liveaction
749
750
    def _get_notify(self, action_node):
751
        if action_node.name not in self._skip_notify_tasks:
752
            if action_node.notify:
753
                task_notify = NotificationsHelper.to_model(action_node.notify)
754
                return task_notify
755
            elif self._chain_notify:
756
                return self._chain_notify
757
758
        return None
759
760
    def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
761
        if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
762
            created_at = isotime.parse(prev_task_result['created_at'])
763
            updated_at = liveaction.end_timestamp
764
        else:
765
            created_at = isotime.parse(prev_task_result['created_at'])
766
            updated_at = isotime.parse(prev_task_result['updated_at'])
767
768
        return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
769
770
    def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at,
771
                                   error=None):
772
        """
773
        Format ActionExecution result so it can be used in the final action result output.
774
775
        :rtype: ``dict``
776
        """
777
        assert isinstance(created_at, datetime.datetime)
778
        assert isinstance(updated_at, datetime.datetime)
779
780
        result = {}
781
782
        execution_db = None
783
        if liveaction_db:
784
            execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
785
786
        result['id'] = action_node.name
787
        result['name'] = action_node.name
788
        result['execution_id'] = str(execution_db.id) if execution_db else None
789
        result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None
790
        result['workflow'] = None
791
792
        result['created_at'] = isotime.format(dt=created_at)
793
        result['updated_at'] = isotime.format(dt=updated_at)
794
795
        if error or not liveaction_db:
796
            result['state'] = action_constants.LIVEACTION_STATUS_FAILED
797
        else:
798
            result['state'] = liveaction_db.status
799
800
        if error:
801
            result['result'] = error
802
        else:
803
            result['result'] = liveaction_db.result
804
805
        return result
806
807
808
def get_runner():
809
    return ActionChainRunner(str(uuid.uuid4()))
810