Passed
Pull Request — master (#3645)
by W
05:13
created

ActionChainRunner._build_liveaction_object()   A

Complexity

Conditions 2

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 16
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
import traceback
18
import uuid
19
import datetime
20
21
from jsonschema import exceptions as json_schema_exc
22
23
from st2common.runners.base import ActionRunner
24
from st2common import log as logging
25
from st2common.constants import action as action_constants
26
from st2common.constants import pack as pack_constants
27
from st2common.constants import keyvalue as kv_constants
28
from st2common.content.loader import MetaLoader
29
from st2common.exceptions import action as action_exc
30
from st2common.exceptions import actionrunner as runner_exc
31
from st2common.exceptions import db as db_exc
32
from st2common.models.api.notification import NotificationsHelper
33
from st2common.models.db.liveaction import LiveActionDB
34
from st2common.models.system import actionchain
35
from st2common.models.utils import action_param_utils
36
from st2common.persistence.execution import ActionExecution
37
from st2common.persistence.liveaction import LiveAction
38
from st2common.services import action as action_service
39
from st2common.services import keyvalues as kv_service
40
from st2common.util import action_db as action_db_util
41
from st2common.util import isotime
42
from st2common.util import date as date_utils
43
from st2common.util import jinja as jinja_utils
44
from st2common.util import param as param_utils
45
from st2common.util.config_loader import get_config
46
47
48
LOG = logging.getLogger(__name__)
49
RESULTS_KEY = '__results'
50
JINJA_START_MARKERS = [
51
    '{{',
52
    '{%'
53
]
54
PUBLISHED_VARS_KEY = 'published'
55
56
57
class ChainHolder(object):
58
59
    def __init__(self, chainspec, chainname):
60
        self.actionchain = actionchain.ActionChain(**chainspec)
61
        self.chainname = chainname
62
63
        if not self.actionchain.default:
64
            default = self._get_default(self.actionchain)
65
            self.actionchain.default = default
66
67
        LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname)
68
        if not self.actionchain.default:
69
            raise Exception('Failed to find default node in %s.' % (self.chainname))
70
71
        self.vars = {}
72
73
    def init_vars(self, action_parameters):
74
        if self.actionchain.vars:
75
            self.vars = self._get_rendered_vars(self.actionchain.vars,
76
                                                action_parameters=action_parameters)
77
78
    def validate(self):
79
        """
80
        Function which performs a simple compile time validation.
81
82
        Keep in mind that some variables are only resolved during run time which means we can
83
        perform only simple validation during compile / create time.
84
        """
85
        all_nodes = self._get_all_nodes(action_chain=self.actionchain)
86
87
        for node in self.actionchain.chain:
88
            on_success_node_name = node.on_success
89
            on_failure_node_name = node.on_failure
90
91
            # Check "on-success" path
92
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
93
                                                  node_name=on_success_node_name)
94
            if not valid_name:
95
                msg = ('Unable to find node with name "%s" referenced in "on-success" in '
96
                       'task "%s".' % (on_success_node_name, node.name))
97
                raise ValueError(msg)
98
99
            # Check "on-failure" path
100
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
101
                                                  node_name=on_failure_node_name)
102
            if not valid_name:
103
                msg = ('Unable to find node with name "%s" referenced in "on-failure" in '
104
                       'task "%s".' % (on_failure_node_name, node.name))
105
                raise ValueError(msg)
106
107
        # check if node specified in default is valid.
108
        if self.actionchain.default:
109
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
110
                                                  node_name=self.actionchain.default)
111
            if not valid_name:
112
                msg = ('Unable to find node with name "%s" referenced in "default".' %
113
                       self.actionchain.default)
114
                raise ValueError(msg)
115
        return True
116
117
    @staticmethod
118
    def _get_default(action_chain):
119
        # default is defined
120
        if action_chain.default:
121
            return action_chain.default
122
        # no nodes in chain
123
        if not action_chain.chain:
124
            return None
125
        # The first node with no references is the default node. Assumptions
126
        # that support this are :
127
        # 1. There are no loops in the chain. Even if there are loops there is
128
        #    at least 1 node which does not end up in this loop.
129
        # 2. There are no fragments in the chain.
130
        all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain)
131
        node_names = set(all_nodes)
132
        on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain)
133
        on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain)
134
        referenced_nodes = on_success_nodes | on_failure_nodes
135
        possible_default_nodes = node_names - referenced_nodes
136
        if possible_default_nodes:
137
            # This is to preserve order. set([..]) does not preserve the order so iterate
138
            # over original array.
139
            for node in all_nodes:
140
                if node in possible_default_nodes:
141
                    return node
142
        # If no node is found assume the first node in the chain list to be default.
143
        return action_chain.chain[0].name
144
145
    @staticmethod
146
    def _get_all_nodes(action_chain):
147
        """
148
        Return names for all the nodes in the chain.
149
        """
150
        all_nodes = [node.name for node in action_chain.chain]
151
        return all_nodes
152
153
    @staticmethod
154
    def _get_all_on_success_nodes(action_chain):
155
        """
156
        Return names for all the tasks referenced in "on-success".
157
        """
158
        on_success_nodes = set([node.on_success for node in action_chain.chain])
159
        return on_success_nodes
160
161
    @staticmethod
162
    def _get_all_on_failure_nodes(action_chain):
163
        """
164
        Return names for all the tasks referenced in "on-failure".
165
        """
166
        on_failure_nodes = set([node.on_failure for node in action_chain.chain])
167
        return on_failure_nodes
168
169
    def _is_valid_node_name(self, all_node_names, node_name):
170
        """
171
        Function which validates that the provided node name is defined in the workflow definition
172
        and it's valid.
173
174
        Keep in mind that we can only perform validation for task names which don't include jinja
175
        expressions since those are rendered at run time.
176
        """
177
        if not node_name:
178
            # This task name needs to be resolved during run time so we cant validate the name now
179
            return True
180
181
        is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name)
182
        if is_jinja_expression:
183
            # This task name needs to be resolved during run time so we cant validate the name
184
            # now
185
            return True
186
187
        return node_name in all_node_names
188
189
    @staticmethod
190
    def _get_rendered_vars(vars, action_parameters):
191
        if not vars:
192
            return {}
193
        context = {}
194
        context.update({
195
            kv_constants.DATASTORE_PARENT_SCOPE: {
196
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
197
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
198
            }
199
        })
200
        context.update(action_parameters)
201
        LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context)
202
        return jinja_utils.render_values(mapping=vars, context=context)
203
204
    def get_node(self, node_name=None, raise_on_failure=False):
205
        if not node_name:
206
            return None
207
        for node in self.actionchain.chain:
208
            if node.name == node_name:
209
                return node
210
        if raise_on_failure:
211
            raise runner_exc.ActionRunnerException(
212
                'Unable to find node with name "%s".' % (node_name))
213
        return None
214
215
    def get_next_node(self, curr_node_name=None, condition='on-success'):
216
        if not curr_node_name:
217
            return self.get_node(self.actionchain.default)
218
        current_node = self.get_node(curr_node_name)
219
        if condition == 'on-success':
220
            return self.get_node(current_node.on_success, raise_on_failure=True)
221
        elif condition == 'on-failure':
222
            return self.get_node(current_node.on_failure, raise_on_failure=True)
223
        raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition)
224
225
226
class ActionChainRunner(ActionRunner):
227
228
    def __init__(self, runner_id):
229
        super(ActionChainRunner, self).__init__(runner_id=runner_id)
230
        self.chain_holder = None
231
        self._meta_loader = MetaLoader()
232
        self._skip_notify_tasks = []
233
        self._display_published = True
234
        self._chain_notify = None
235
236
    def pre_run(self):
237
        super(ActionChainRunner, self).pre_run()
238
239
        chainspec_file = self.entry_point
240
        LOG.debug('Reading action chain from %s for action %s.', chainspec_file,
241
                  self.action)
242
243
        try:
244
            chainspec = self._meta_loader.load(file_path=chainspec_file,
245
                                               expected_type=dict)
246
        except Exception as e:
247
            message = ('Failed to parse action chain definition from "%s": %s' %
248
                       (chainspec_file, str(e)))
249
            LOG.exception('Failed to load action chain definition.')
250
            raise runner_exc.ActionRunnerPreRunError(message)
251
252
        try:
253
            self.chain_holder = ChainHolder(chainspec, self.action_name)
254
        except json_schema_exc.ValidationError as e:
255
            # preserve the whole nasty jsonschema message as that is better to get to the
256
            # root cause
257
            message = str(e)
258
            LOG.exception('Failed to instantiate ActionChain.')
259
            raise runner_exc.ActionRunnerPreRunError(message)
260
        except Exception as e:
261
            message = e.message or str(e)
262
            LOG.exception('Failed to instantiate ActionChain.')
263
            raise runner_exc.ActionRunnerPreRunError(message)
264
265
        # Runner attributes are set lazily. So these steps
266
        # should happen outside the constructor.
267
        if getattr(self, 'liveaction', None):
268
            self._chain_notify = getattr(self.liveaction, 'notify', None)
269
        if self.runner_parameters:
270
            self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
271
            self._display_published = self.runner_parameters.get('display_published', True)
272
273
        # Perform some pre-run chain validation
274
        try:
275
            self.chain_holder.validate()
276
        except Exception as e:
277
            raise runner_exc.ActionRunnerPreRunError(e.message)
278
279
    def run(self, action_parameters):
280
        # Run the action chain.
281
        return self._run_chain(action_parameters)
282
283
    def pause(self):
284
        # Identify the list of action executions that are workflows and cascade pause.
285
        for child_exec_id in self.execution.children:
286
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
287
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
288
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
289
                action_service.request_pause(
290
                    LiveAction.get(id=child_exec.liveaction['id']),
291
                    self.context.get('user', None)
292
                )
293
294
        return (
295
            action_constants.LIVEACTION_STATUS_PAUSING,
296
            self.liveaction.result,
297
            self.liveaction.context
298
        )
299
300
    def resume(self):
301
        # Restore runner and action parameters since they are not provided on resume.
302
        runner_parameters, action_parameters = param_utils.render_final_params(
303
            self.runner_type_db.runner_parameters,
304
            self.action.parameters,
305
            self.liveaction.parameters,
306
            self.liveaction.context
307
        )
308
309
        # Assign runner parameters needed for pre-run.
310
        if runner_parameters:
311
            self.runner_parameters = runner_parameters
312
313
        # Restore chain holder if it is not initialized.
314
        if not self.chain_holder:
315
            self.pre_run()
316
317
        # Change the status of the liveaction from resuming to running.
318
        self.liveaction = action_service.update_status(
319
            self.liveaction,
320
            action_constants.LIVEACTION_STATUS_RUNNING,
321
            publish=False
322
        )
323
324
        # Run the action chain.
325
        return self._run_chain(action_parameters, resuming=True)
326
327
    def _run_chain(self, action_parameters, resuming=False):
328
        # Set chain status initially to success until chain cancels, fails, or times out below.
329
        chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
330
331
        # Result holds the final result that the chain store in the database.
332
        result = {'tasks': []}
333
334
        # Save published variables into the result if specified.
335
        if self._display_published:
336
            result[PUBLISHED_VARS_KEY] = {}
337
338
        context_result = {}  # Holds result which is used for the template context purposes
339
        top_level_error = None  # Stores a reference to a top level error
340
        action_node = None
341
        last_task = None
342
343
        # Restore state on resuming an existing chain execution.
344
        if resuming:
345
            # Restore result if any from the liveaction.
346
            if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result:
347
                result = self.liveaction.result
348
349
            # Initialize or rebuild existing context_result from liveaction
350
            # which holds the result used for resolving context in Jinja template.
351
            for task in result.get('tasks', []):
352
                context_result[task['name']] = task['result']
353
354
            # Restore or initialize the top_level_error
355
            # thata stores a reference to a top level error.
356
            if 'error' in result or 'traceback' in result:
357
                top_level_error = {
358
                    'error': result.get('error'),
359
                    'traceback': result.get('traceback')
360
                }
361
362
        try:
363
            # Initialize vars with the action parameters.
364
            # This allows action parameers to be referenced from vars.
365
            self.chain_holder.init_vars(action_parameters)
366
        except Exception as e:
367
            chain_status = action_constants.LIVEACTION_STATUS_FAILED
368
            m = 'Failed initializing ``vars`` in chain.'
369
            LOG.exception(m)
370
            top_level_error = self._format_error(e, m)
371
            result.update(top_level_error)
372
            return (chain_status, result, None)
373
374
        # If there are no executed tasks in the chain, then get the first node.
375
        if len(result['tasks']) <= 0:
376
            try:
377
                action_node = self.chain_holder.get_next_node()
378
            except Exception as e:
379
                m = 'Failed to get starting node "%s".', action_node.name
380
                LOG.exception(m)
381
                top_level_error = self._format_error(e, m)
382
383
        # Otherwise, figure out the last task executed and
384
        # its state to determine where to begin executing.
385
        else:
386
            last_task = result['tasks'][-1]
387
            action_node = self.chain_holder.get_node(last_task['name'])
388
            liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
389
390
            # If the liveaction of the last task has changed, update the result entry.
391
            if liveaction.status != last_task['state']:
392
                updated_task_result = self._get_updated_action_exec_result(
393
                    action_node, liveaction, last_task)
394
                del result['tasks'][-1]
395
                result['tasks'].append(updated_task_result)
396
397
            # If the last task was canceled, then canceled the chain altogether.
398
            if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
399
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
400
                return (chain_status, result, None)
401
402
            # If the last task was paused, then stay on this action node.
403
            # This is explicitly put here for clarity.
404
            if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
405
                pass
406
407
            # If the last task was completed, then get the next action node.
408
            if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
409
                action_node = self.chain_holder.get_next_node(
410
                    last_task['name'],
411
                    condition=(
412
                        'on-failure'
413
                        if liveaction.status in action_constants.LIVEACTION_FAILED_STATES
414
                        else 'on-success'
415
                    )
416
                )
417
418
        # Setup parent context.
419
        parent_context = {
420
            'execution_id': self.execution_id
421
        }
422
423
        if getattr(self.liveaction, 'context', None):
424
            parent_context.update(self.liveaction.context)
425
426
        # Run the action chain until there are no more tasks.
427
        while action_node:
428
            error = None
429
            liveaction = None
430
            last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None
431
            created_at = date_utils.get_datetime_utc_now()
432
433
            try:
434
                # If last task was paused, then fetch the liveaction and resume it first.
435
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
436
                    liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
437
                    del result['tasks'][-1]
438
                else:
439
                    liveaction = self._get_next_action(
440
                        action_node=action_node, parent_context=parent_context,
441
                        action_params=action_parameters, context_result=context_result)
442
            except action_exc.InvalidActionReferencedException as e:
443
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
444
                m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
445
                     (action_node.name, action_node.ref))
446
                LOG.exception(m)
447
                top_level_error = self._format_error(e, m)
448
                break
449
            except action_exc.ParameterRenderingFailedException as e:
450
                # Rendering parameters failed before we even got to running this action,
451
                # abort and fail the whole action chain
452
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
453
                m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name
454
                LOG.exception(m)
455
                top_level_error = self._format_error(e, m)
456
                break
457
            except db_exc.StackStormDBObjectNotFoundError as e:
458
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
459
                m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name
460
                LOG.exception(m)
461
                top_level_error = self._format_error(e, m)
462
                break
463
464
            try:
465
                # If last task was paused, then fetch the liveaction and resume it first.
466
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
467
                    LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id)
468
                    liveaction = self._resume_action(liveaction)
469
                else:
470
                    LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id)
471
                    liveaction = self._run_action(liveaction)
472
            except Exception as e:
473
                # Save the traceback and error message
474
                m = 'Failed running task "%s".' % action_node.name
475
                LOG.exception(m)
476
                error = self._format_error(e, m)
477
                context_result[action_node.name] = error
478
            else:
479
                # Update context result
480
                context_result[action_node.name] = liveaction.result
481
482
                # Render and publish variables
483
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
484
                    action_node=action_node, action_parameters=action_parameters,
485
                    execution_result=liveaction.result, previous_execution_results=context_result,
486
                    chain_vars=self.chain_holder.vars)
487
488
                if rendered_publish_vars:
489
                    self.chain_holder.vars.update(rendered_publish_vars)
490
                    if self._display_published:
491
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
492
            finally:
493
                # Record result and resolve a next node based on the task success or failure
494
                updated_at = date_utils.get_datetime_utc_now()
495
496
                task_result = self._format_action_exec_result(
497
                    action_node,
498
                    liveaction,
499
                    created_at,
500
                    updated_at,
501
                    error=error
502
                )
503
504
                result['tasks'].append(task_result)
505
506
                try:
507
                    if not liveaction:
508
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
509
                        action_node = self.chain_holder.get_next_node(
510
                            action_node.name, condition='on-failure')
511
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
512
                        chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT
513
                        action_node = self.chain_holder.get_next_node(
514
                            action_node.name, condition='on-failure')
515
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
516
                        LOG.info('Chain execution (%s) canceled because task "%s" is canceled.',
517
                                 self.liveaction_id, action_node.name)
518
                        chain_status = action_constants.LIVEACTION_STATUS_CANCELED
519
                        action_node = None
520
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
521
                        LOG.info('Chain execution (%s) paused because task "%s" is paused.',
522
                                 self.liveaction_id, action_node.name)
523
                        chain_status = action_constants.LIVEACTION_STATUS_PAUSED
524
                        action_node = None
525
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
526
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
527
                        action_node = self.chain_holder.get_next_node(
528
                            action_node.name, condition='on-failure')
529
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
530
                        chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
531
                        action_node = self.chain_holder.get_next_node(
532
                            action_node.name, condition='on-success')
533
                    else:
534
                        action_node = None
535
                except Exception as e:
536
                    chain_status = action_constants.LIVEACTION_STATUS_FAILED
537
                    m = 'Failed to get next node "%s".' % action_node.name
538
                    LOG.exception(m)
539
                    top_level_error = self._format_error(e, m)
540
                    action_node = None
541
                    break
542
543
            if action_service.is_action_canceled_or_canceling(self.liveaction.id):
544
                LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id)
545
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
546
                return (chain_status, result, None)
547
548
            if action_service.is_action_paused_or_pausing(self.liveaction.id):
549
                LOG.info('Chain execution (%s) paused by user.', self.liveaction.id)
550
                chain_status = action_constants.LIVEACTION_STATUS_PAUSED
551
                return (chain_status, result, None)
552
553
        if top_level_error and isinstance(top_level_error, dict):
554
            result.update(top_level_error)
555
556
        return (chain_status, result, None)
557
558
    def _format_error(self, e, msg):
559
        return {
560
            'error': '%s. %s' % (msg, str(e)),
561
            'traceback': traceback.format_exc(10)
562
        }
563
564
    @staticmethod
565
    def _render_publish_vars(action_node, action_parameters, execution_result,
566
                             previous_execution_results, chain_vars):
567
        """
568
        If no output is specified on the action_node the output is the entire execution_result.
569
        If any output is specified then only those variables are published as output of an
570
        execution of this action_node.
571
        The output variable can refer to a variable from the execution_result,
572
        previous_execution_results or chain_vars.
573
        """
574
        if not action_node.publish:
575
            return {}
576
577
        context = {}
578
        context.update(action_parameters)
579
        context.update({action_node.name: execution_result})
580
        context.update(previous_execution_results)
581
        context.update(chain_vars)
582
        context.update({RESULTS_KEY: previous_execution_results})
583
584
        context.update({
585
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
586
                scope=kv_constants.SYSTEM_SCOPE)
587
        })
588
589
        context.update({
590
            kv_constants.DATASTORE_PARENT_SCOPE: {
591
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
592
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
593
            }
594
        })
595
596
        try:
597
            rendered_result = jinja_utils.render_values(mapping=action_node.publish,
598
                                                        context=context)
599
        except Exception as e:
600
            key = getattr(e, 'key', None)
601
            value = getattr(e, 'value', None)
602
            msg = ('Failed rendering value for publish parameter "%s" in task "%s" '
603
                   '(template string=%s): %s' % (key, action_node.name, value, str(e)))
604
            raise action_exc.ParameterRenderingFailedException(msg)
605
606
        return rendered_result
607
608
    @staticmethod
609
    def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context):
610
        # setup context with original parameters and the intermediate results.
611
        chain_parent = chain_context.get('parent', {})
612
        pack = chain_parent.get('pack')
613
        user = chain_parent.get('user')
614
615
        config = get_config(pack, user)
616
617
        context = {}
618
        context.update(original_parameters)
619
        context.update(results)
620
        context.update(chain_vars)
621
        context.update({RESULTS_KEY: results})
622
623
        context.update({
624
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
625
                scope=kv_constants.SYSTEM_SCOPE)
626
        })
627
628
        context.update({
629
            kv_constants.DATASTORE_PARENT_SCOPE: {
630
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
631
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
632
            }
633
        })
634
        context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context})
635
        context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config})
636
        try:
637
            rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(),
638
                                                        context=context)
639
        except Exception as e:
640
            LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key))
641
642
            key = getattr(e, 'key', None)
643
            value = getattr(e, 'value', None)
644
            msg = ('Failed rendering value for action parameter "%s" in task "%s" '
645
                   '(template string=%s): %s') % (key, action_node.name, value, str(e))
646
            raise action_exc.ParameterRenderingFailedException(msg)
647
        LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params))
648
        return rendered_params
649
650
    def _get_next_action(self, action_node, parent_context, action_params, context_result):
651
        # Verify that the referenced action exists
652
        # TODO: We do another lookup in cast_param, refactor to reduce number of lookups
653
        task_name = action_node.name
654
        action_ref = action_node.ref
655
        action_db = action_db_util.get_action_by_ref(ref=action_ref)
656
657
        if not action_db:
658
            error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref)
659
            raise action_exc.InvalidActionReferencedException(error)
660
661
        resolved_params = ActionChainRunner._resolve_params(
662
            action_node=action_node, original_parameters=action_params,
663
            results=context_result, chain_vars=self.chain_holder.vars,
664
            chain_context={'parent': parent_context})
665
666
        liveaction = self._build_liveaction_object(
667
            action_node=action_node,
668
            resolved_params=resolved_params,
669
            parent_context=parent_context)
670
671
        return liveaction
672
673
    def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
674
        """
675
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
676
        :type sleep_delay: ``float``
677
        """
678
        try:
679
            liveaction, _ = action_service.request(liveaction)
680
        except Exception as e:
681
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
682
            LOG.exception('Failed to schedule liveaction.')
683
            raise e
684
685
        while (wait_for_completion and liveaction.status not in (
686
                action_constants.LIVEACTION_COMPLETED_STATES +
687
                [action_constants.LIVEACTION_STATUS_PAUSED])):
688
            eventlet.sleep(sleep_delay)
689
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
690
691
        return liveaction
692
693
    def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
694
        """
695
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
696
        :type sleep_delay: ``float``
697
        """
698
        try:
699
            user = self.context.get('user', None)
700
            liveaction, _ = action_service.request_resume(liveaction, user)
701
        except Exception as e:
702
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
703
            LOG.exception('Failed to schedule liveaction.')
704
            raise e
705
706
        while (wait_for_completion and liveaction.status not in (
707
                action_constants.LIVEACTION_COMPLETED_STATES +
708
                [action_constants.LIVEACTION_STATUS_PAUSED])):
709
            eventlet.sleep(sleep_delay)
710
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
711
712
        return liveaction
713
714
    def _build_liveaction_object(self, action_node, resolved_params, parent_context):
715
        liveaction = LiveActionDB(action=action_node.ref)
716
717
        # Setup notify for task in chain.
718
        notify = self._get_notify(action_node)
719
        if notify:
720
            liveaction.notify = notify
721
            LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify)
722
723
        liveaction.context = {
724
            'parent': parent_context,
725
            'chain': vars(action_node)
726
        }
727
        liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
728
                                                               params=resolved_params)
729
        return liveaction
730
731
    def _get_notify(self, action_node):
732
        if action_node.name not in self._skip_notify_tasks:
733
            if action_node.notify:
734
                task_notify = NotificationsHelper.to_model(action_node.notify)
735
                return task_notify
736
            elif self._chain_notify:
737
                return self._chain_notify
738
739
        return None
740
741
    def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
742
        if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
743
            created_at = isotime.parse(prev_task_result['created_at'])
744
            updated_at = liveaction.end_timestamp
745
        else:
746
            created_at = isotime.parse(prev_task_result['created_at'])
747
            updated_at = isotime.parse(prev_task_result['updated_at'])
748
749
        return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
750
751
    def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at,
752
                                   error=None):
753
        """
754
        Format ActionExecution result so it can be used in the final action result output.
755
756
        :rtype: ``dict``
757
        """
758
        assert isinstance(created_at, datetime.datetime)
759
        assert isinstance(updated_at, datetime.datetime)
760
761
        result = {}
762
763
        execution_db = None
764
        if liveaction_db:
765
            execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
766
767
        result['id'] = action_node.name
768
        result['name'] = action_node.name
769
        result['execution_id'] = str(execution_db.id) if execution_db else None
770
        result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None
771
        result['workflow'] = None
772
773
        result['created_at'] = isotime.format(dt=created_at)
774
        result['updated_at'] = isotime.format(dt=updated_at)
775
776
        if error or not liveaction_db:
777
            result['state'] = action_constants.LIVEACTION_STATUS_FAILED
778
        else:
779
            result['state'] = liveaction_db.status
780
781
        if error:
782
            result['result'] = error
783
        else:
784
            result['result'] = liveaction_db.result
785
786
        return result
787
788
789
def get_runner():
790
    return ActionChainRunner(str(uuid.uuid4()))
791