Test Failed
Push — master ( 21460f...e380d0 )
by Tomaz
01:48
created

action_chain_runner/action_chain_runner.py (4 issues)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import eventlet
19
import traceback
20
import uuid
21
import datetime
22
23
from jsonschema import exceptions as json_schema_exc
24
25
from st2common.runners.base import ActionRunner
26
from st2common.runners.base import get_metadata as get_runner_metadata
27
from st2common import log as logging
28
from st2common.constants import action as action_constants
29
from st2common.constants import pack as pack_constants
30
from st2common.constants import keyvalue as kv_constants
31
from st2common.content.loader import MetaLoader
32
from st2common.exceptions import action as action_exc
33
from st2common.exceptions import actionrunner as runner_exc
34
from st2common.exceptions import db as db_exc
35
from st2common.models.api.notification import NotificationsHelper
36
from st2common.models.db.liveaction import LiveActionDB
37
from st2common.models.system import actionchain
38
from st2common.models.utils import action_param_utils
39
from st2common.persistence.execution import ActionExecution
40
from st2common.persistence.liveaction import LiveAction
41
from st2common.services import action as action_service
42
from st2common.services import keyvalues as kv_service
43
from st2common.util import action_db as action_db_util
44
from st2common.util import isotime
45
from st2common.util import date as date_utils
46
from st2common.util import jinja as jinja_utils
47
from st2common.util import param as param_utils
48
from st2common.util.config_loader import get_config
49
50
__all__ = [
51
    'ActionChainRunner',
52
    'ChainHolder',
53
54
    'get_runner',
55
    'get_metadata'
56
]
57
58
LOG = logging.getLogger(__name__)
59
60
RESULTS_KEY = '__results'
61
JINJA_START_MARKERS = [
62
    '{{',
63
    '{%'
64
]
65
PUBLISHED_VARS_KEY = 'published'
66
67
68
class ChainHolder(object):
69
70
    def __init__(self, chainspec, chainname):
71
        self.actionchain = actionchain.ActionChain(**chainspec)
72
        self.chainname = chainname
73
74
        if not self.actionchain.default:
75
            default = self._get_default(self.actionchain)
76
            self.actionchain.default = default
77
78
        LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname)
79
        if not self.actionchain.default:
80
            raise Exception('Failed to find default node in %s.' % (self.chainname))
81
82
        self.vars = {}
83
84
    def init_vars(self, action_parameters):
85
        if self.actionchain.vars:
86
            self.vars = self._get_rendered_vars(self.actionchain.vars,
87
                                                action_parameters=action_parameters)
88
89
    def restore_vars(self, ctx_vars):
90
        self.vars.update(copy.deepcopy(ctx_vars))
91
92
    def validate(self):
93
        """
94
        Function which performs a simple compile time validation.
95
96
        Keep in mind that some variables are only resolved during run time which means we can
97
        perform only simple validation during compile / create time.
98
        """
99
        all_nodes = self._get_all_nodes(action_chain=self.actionchain)
100
101
        for node in self.actionchain.chain:
102
            on_success_node_name = node.on_success
103
            on_failure_node_name = node.on_failure
104
105
            # Check "on-success" path
106
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
107
                                                  node_name=on_success_node_name)
108
            if not valid_name:
109
                msg = ('Unable to find node with name "%s" referenced in "on-success" in '
110
                       'task "%s".' % (on_success_node_name, node.name))
111
                raise ValueError(msg)
112
113
            # Check "on-failure" path
114
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
115
                                                  node_name=on_failure_node_name)
116
            if not valid_name:
117
                msg = ('Unable to find node with name "%s" referenced in "on-failure" in '
118
                       'task "%s".' % (on_failure_node_name, node.name))
119
                raise ValueError(msg)
120
121
        # check if node specified in default is valid.
122
        if self.actionchain.default:
123
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
124
                                                  node_name=self.actionchain.default)
125
            if not valid_name:
126
                msg = ('Unable to find node with name "%s" referenced in "default".' %
127
                       self.actionchain.default)
128
                raise ValueError(msg)
129
        return True
130
131
    @staticmethod
132
    def _get_default(action_chain):
133
        # default is defined
134
        if action_chain.default:
135
            return action_chain.default
136
        # no nodes in chain
137
        if not action_chain.chain:
138
            return None
139
        # The first node with no references is the default node. Assumptions
140
        # that support this are :
141
        # 1. There are no loops in the chain. Even if there are loops there is
142
        #    at least 1 node which does not end up in this loop.
143
        # 2. There are no fragments in the chain.
144
        all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain)
145
        node_names = set(all_nodes)
146
        on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain)
147
        on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain)
148
        referenced_nodes = on_success_nodes | on_failure_nodes
149
        possible_default_nodes = node_names - referenced_nodes
150
        if possible_default_nodes:
151
            # This is to preserve order. set([..]) does not preserve the order so iterate
152
            # over original array.
153
            for node in all_nodes:
154
                if node in possible_default_nodes:
155
                    return node
156
        # If no node is found assume the first node in the chain list to be default.
157
        return action_chain.chain[0].name
158
159
    @staticmethod
160
    def _get_all_nodes(action_chain):
161
        """
162
        Return names for all the nodes in the chain.
163
        """
164
        all_nodes = [node.name for node in action_chain.chain]
165
        return all_nodes
166
167
    @staticmethod
168
    def _get_all_on_success_nodes(action_chain):
169
        """
170
        Return names for all the tasks referenced in "on-success".
171
        """
172
        on_success_nodes = set([node.on_success for node in action_chain.chain])
173
        return on_success_nodes
174
175
    @staticmethod
176
    def _get_all_on_failure_nodes(action_chain):
177
        """
178
        Return names for all the tasks referenced in "on-failure".
179
        """
180
        on_failure_nodes = set([node.on_failure for node in action_chain.chain])
181
        return on_failure_nodes
182
183
    def _is_valid_node_name(self, all_node_names, node_name):
184
        """
185
        Function which validates that the provided node name is defined in the workflow definition
186
        and it's valid.
187
188
        Keep in mind that we can only perform validation for task names which don't include jinja
189
        expressions since those are rendered at run time.
190
        """
191
        if not node_name:
192
            # This task name needs to be resolved during run time so we cant validate the name now
193
            return True
194
195
        is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name)
196
        if is_jinja_expression:
197
            # This task name needs to be resolved during run time so we cant validate the name
198
            # now
199
            return True
200
201
        return node_name in all_node_names
202
203
    @staticmethod
204
    def _get_rendered_vars(vars, action_parameters):
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in vars.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
205
        if not vars:
206
            return {}
207
        context = {}
208
        context.update({
209
            kv_constants.DATASTORE_PARENT_SCOPE: {
210
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
211
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
212
            }
213
        })
214
        context.update(action_parameters)
215
        LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context)
216
        return jinja_utils.render_values(mapping=vars, context=context)
217
218
    def get_node(self, node_name=None, raise_on_failure=False):
219
        if not node_name:
220
            return None
221
        for node in self.actionchain.chain:
222
            if node.name == node_name:
223
                return node
224
        if raise_on_failure:
225
            raise runner_exc.ActionRunnerException(
226
                'Unable to find node with name "%s".' % (node_name))
227
        return None
228
229
    def get_next_node(self, curr_node_name=None, condition='on-success'):
230
        if not curr_node_name:
231
            return self.get_node(self.actionchain.default)
232
        current_node = self.get_node(curr_node_name)
233
        if condition == 'on-success':
234
            return self.get_node(current_node.on_success, raise_on_failure=True)
235
        elif condition == 'on-failure':
236
            return self.get_node(current_node.on_failure, raise_on_failure=True)
237
        raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition)
238
239
240
class ActionChainRunner(ActionRunner):
241
242
    def __init__(self, runner_id):
243
        super(ActionChainRunner, self).__init__(runner_id=runner_id)
244
        self.chain_holder = None
245
        self._meta_loader = MetaLoader()
246
        self._skip_notify_tasks = []
247
        self._display_published = True
248
        self._chain_notify = None
249
250
    def pre_run(self):
251
        super(ActionChainRunner, self).pre_run()
252
253
        chainspec_file = self.entry_point
254
        LOG.debug('Reading action chain from %s for action %s.', chainspec_file,
255
                  self.action)
256
257
        try:
258
            chainspec = self._meta_loader.load(file_path=chainspec_file,
259
                                               expected_type=dict)
260
        except Exception as e:
261
            message = ('Failed to parse action chain definition from "%s": %s' %
262
                       (chainspec_file, str(e)))
263
            LOG.exception('Failed to load action chain definition.')
264
            raise runner_exc.ActionRunnerPreRunError(message)
265
266
        try:
267
            self.chain_holder = ChainHolder(chainspec, self.action_name)
268
        except json_schema_exc.ValidationError as e:
269
            # preserve the whole nasty jsonschema message as that is better to get to the
270
            # root cause
271
            message = str(e)
272
            LOG.exception('Failed to instantiate ActionChain.')
273
            raise runner_exc.ActionRunnerPreRunError(message)
274
        except Exception as e:
275
            message = str(e)
276
            LOG.exception('Failed to instantiate ActionChain.')
277
            raise runner_exc.ActionRunnerPreRunError(message)
278
279
        # Runner attributes are set lazily. So these steps
280
        # should happen outside the constructor.
281
        if getattr(self, 'liveaction', None):
282
            self._chain_notify = getattr(self.liveaction, 'notify', None)
283
        if self.runner_parameters:
284
            self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
285
            self._display_published = self.runner_parameters.get('display_published', True)
286
287
        # Perform some pre-run chain validation
288
        try:
289
            self.chain_holder.validate()
290
        except Exception as e:
291
            raise runner_exc.ActionRunnerPreRunError(str(e))
292
293
    def run(self, action_parameters):
294
        # Run the action chain.
295
        return self._run_chain(action_parameters)
296
297
    def cancel(self):
298
        # Identify the list of action executions that are workflows and cascade pause.
299
        for child_exec_id in self.execution.children:
300
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
301
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
302
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
303
                action_service.request_cancellation(
304
                    LiveAction.get(id=child_exec.liveaction['id']),
305
                    self.context.get('user', None)
306
                )
307
308
        return (
309
            action_constants.LIVEACTION_STATUS_CANCELING,
310
            self.liveaction.result,
311
            self.liveaction.context
312
        )
313
314
    def pause(self):
315
        # Identify the list of action executions that are workflows and cascade pause.
316
        for child_exec_id in self.execution.children:
317
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
318
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
319
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
320
                action_service.request_pause(
321
                    LiveAction.get(id=child_exec.liveaction['id']),
322
                    self.context.get('user', None)
323
                )
324
325
        return (
326
            action_constants.LIVEACTION_STATUS_PAUSING,
327
            self.liveaction.result,
328
            self.liveaction.context
329
        )
330
331
    def resume(self):
332
        # Restore runner and action parameters since they are not provided on resume.
333
        runner_parameters, action_parameters = param_utils.render_final_params(
334
            self.runner_type.runner_parameters,
335
            self.action.parameters,
336
            self.liveaction.parameters,
0 ignored issues
show
The member liveaction does not seem to be defined here; its definition is on line 349.
Loading history...
337
            self.liveaction.context
0 ignored issues
show
The member liveaction does not seem to be defined here; its definition is on line 349.
Loading history...
338
        )
339
340
        # Assign runner parameters needed for pre-run.
341
        if runner_parameters:
342
            self.runner_parameters = runner_parameters
343
344
        # Restore chain holder if it is not initialized.
345
        if not self.chain_holder:
346
            self.pre_run()
347
348
        # Change the status of the liveaction from resuming to running.
349
        self.liveaction = action_service.update_status(
350
            self.liveaction,
351
            action_constants.LIVEACTION_STATUS_RUNNING,
352
            publish=False
353
        )
354
355
        # Run the action chain.
356
        return self._run_chain(action_parameters, resuming=True)
357
358
    def _run_chain(self, action_parameters, resuming=False):
359
        # Set chain status to fail unless explicitly set to succeed.
360
        chain_status = action_constants.LIVEACTION_STATUS_FAILED
361
362
        # Result holds the final result that the chain store in the database.
363
        result = {'tasks': []}
364
365
        # Save published variables into the result if specified.
366
        if self._display_published:
367
            result[PUBLISHED_VARS_KEY] = {}
368
369
        context_result = {}  # Holds result which is used for the template context purposes
370
        top_level_error = None  # Stores a reference to a top level error
371
        action_node = None
372
        last_task = None
373
374
        try:
375
            # Initialize vars with the action parameters.
376
            # This allows action parameers to be referenced from vars.
377
            self.chain_holder.init_vars(action_parameters)
378
        except Exception as e:
379
            chain_status = action_constants.LIVEACTION_STATUS_FAILED
380
            m = 'Failed initializing ``vars`` in chain.'
381
            LOG.exception(m)
382
            top_level_error = self._format_error(e, m)
383
            result.update(top_level_error)
384
            return (chain_status, result, None)
385
386
        # Restore state on resuming an existing chain execution.
387
        if resuming:
388
            # Restore vars is any from the liveaction.
389
            ctx_vars = self.liveaction.context.pop('vars', {})
390
            self.chain_holder.restore_vars(ctx_vars)
391
392
            # Restore result if any from the liveaction.
393
            if self.liveaction and hasattr(self.liveaction, 'result') and self.liveaction.result:
394
                result = self.liveaction.result
395
396
            # Initialize or rebuild existing context_result from liveaction
397
            # which holds the result used for resolving context in Jinja template.
398
            for task in result.get('tasks', []):
399
                context_result[task['name']] = task['result']
400
401
            # Restore or initialize the top_level_error
402
            # that stores a reference to a top level error.
403
            if 'error' in result or 'traceback' in result:
404
                top_level_error = {
405
                    'error': result.get('error'),
406
                    'traceback': result.get('traceback')
407
                }
408
409
        # If there are no executed tasks in the chain, then get the first node.
410
        if len(result['tasks']) <= 0:
411
            try:
412
                action_node = self.chain_holder.get_next_node()
413
            except Exception as e:
414
                m = 'Failed to get starting node "%s".', action_node.name
415
                LOG.exception(m)
416
                top_level_error = self._format_error(e, m)
417
418
            # If there are no action node to run next, then mark the chain successful.
419
            if not action_node:
420
                chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
421
422
        # Otherwise, figure out the last task executed and
423
        # its state to determine where to begin executing.
424
        else:
425
            last_task = result['tasks'][-1]
426
            action_node = self.chain_holder.get_node(last_task['name'])
427
            liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
428
429
            # If the liveaction of the last task has changed, update the result entry.
430
            if liveaction.status != last_task['state']:
431
                updated_task_result = self._get_updated_action_exec_result(
432
                    action_node, liveaction, last_task)
433
                del result['tasks'][-1]
434
                result['tasks'].append(updated_task_result)
435
436
                # Also need to update context_result so the updated result
437
                # is available to Jinja expressions
438
                updated_task_name = updated_task_result['name']
439
                context_result[updated_task_name]['result'] = updated_task_result['result']
440
441
            # If the last task was canceled, then canceled the chain altogether.
442
            if liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
443
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
444
                return (chain_status, result, None)
445
446
            # If the last task was paused, then stay on this action node.
447
            # This is explicitly put here for clarity.
448
            if liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
449
                pass
450
451
            # If the last task succeeded, then get the next on-success action node.
452
            if liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
453
                chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
454
                action_node = self.chain_holder.get_next_node(
455
                    last_task['name'], condition='on-success')
456
457
            # If the last task failed, then get the next on-failure action node.
458
            if liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
459
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
460
                action_node = self.chain_holder.get_next_node(
461
                    last_task['name'], condition='on-failure')
462
463
        # Setup parent context.
464
        parent_context = {
465
            'execution_id': self.execution_id
466
        }
467
468
        if getattr(self.liveaction, 'context', None):
469
            parent_context.update(self.liveaction.context)
470
471
        # Run the action chain until there are no more tasks.
472
        while action_node:
473
            error = None
474
            liveaction = None
475
            last_task = result['tasks'][-1] if len(result['tasks']) > 0 else None
476
            created_at = date_utils.get_datetime_utc_now()
477
478
            try:
479
                # If last task was paused, then fetch the liveaction and resume it first.
480
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
481
                    liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
482
                    del result['tasks'][-1]
483
                else:
484
                    liveaction = self._get_next_action(
485
                        action_node=action_node, parent_context=parent_context,
486
                        action_params=action_parameters, context_result=context_result)
487
            except action_exc.InvalidActionReferencedException as e:
488
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
489
                m = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
490
                     (action_node.name, action_node.ref))
491
                LOG.exception(m)
492
                top_level_error = self._format_error(e, m)
493
                break
494
            except action_exc.ParameterRenderingFailedException as e:
495
                # Rendering parameters failed before we even got to running this action,
496
                # abort and fail the whole action chain
497
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
498
                m = 'Failed to run task "%s". Parameter rendering failed.' % action_node.name
499
                LOG.exception(m)
500
                top_level_error = self._format_error(e, m)
501
                break
502
            except db_exc.StackStormDBObjectNotFoundError as e:
503
                chain_status = action_constants.LIVEACTION_STATUS_FAILED
504
                m = 'Failed to resume task "%s". Unable to find liveaction.' % action_node.name
505
                LOG.exception(m)
506
                top_level_error = self._format_error(e, m)
507
                break
508
509
            try:
510
                # If last task was paused, then fetch the liveaction and resume it first.
511
                if last_task and last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
512
                    LOG.info('Resume task %s for chain %s.', action_node.name, self.liveaction.id)
513
                    liveaction = self._resume_action(liveaction)
514
                else:
515
                    LOG.info('Run task %s for chain %s.', action_node.name, self.liveaction.id)
516
                    liveaction = self._run_action(liveaction)
517
            except Exception as e:
518
                # Save the traceback and error message
519
                m = 'Failed running task "%s".' % action_node.name
520
                LOG.exception(m)
521
                error = self._format_error(e, m)
522
                context_result[action_node.name] = error
523
            else:
524
                # Update context result
525
                context_result[action_node.name] = liveaction.result
526
527
                # Render and publish variables
528
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
529
                    action_node=action_node, action_parameters=action_parameters,
530
                    execution_result=liveaction.result, previous_execution_results=context_result,
531
                    chain_vars=self.chain_holder.vars)
532
533
                if rendered_publish_vars:
534
                    self.chain_holder.vars.update(rendered_publish_vars)
535
                    if self._display_published:
536
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
537
            finally:
538
                # Record result and resolve a next node based on the task success or failure
539
                updated_at = date_utils.get_datetime_utc_now()
540
541
                task_result = self._format_action_exec_result(
542
                    action_node,
543
                    liveaction,
544
                    created_at,
545
                    updated_at,
546
                    error=error
547
                )
548
549
                result['tasks'].append(task_result)
550
551
                try:
552
                    if not liveaction:
553
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
554
                        action_node = self.chain_holder.get_next_node(
555
                            action_node.name, condition='on-failure')
556
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
557
                        chain_status = action_constants.LIVEACTION_STATUS_TIMED_OUT
558
                        action_node = self.chain_holder.get_next_node(
559
                            action_node.name, condition='on-failure')
560
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
561
                        LOG.info('Chain execution (%s) canceled because task "%s" is canceled.',
562
                                 self.liveaction_id, action_node.name)
563
                        chain_status = action_constants.LIVEACTION_STATUS_CANCELED
564
                        action_node = None
565
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
566
                        LOG.info('Chain execution (%s) paused because task "%s" is paused.',
567
                                 self.liveaction_id, action_node.name)
568
                        chain_status = action_constants.LIVEACTION_STATUS_PAUSED
569
                        self._save_vars()
570
                        action_node = None
571
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PENDING:
572
                        LOG.info('Chain execution (%s) paused because task "%s" is pending.',
573
                                 self.liveaction_id, action_node.name)
574
                        chain_status = action_constants.LIVEACTION_STATUS_PAUSED
575
                        self._save_vars()
576
                        action_node = None
577
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
578
                        chain_status = action_constants.LIVEACTION_STATUS_FAILED
579
                        action_node = self.chain_holder.get_next_node(
580
                            action_node.name, condition='on-failure')
581
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
582
                        chain_status = action_constants.LIVEACTION_STATUS_SUCCEEDED
583
                        action_node = self.chain_holder.get_next_node(
584
                            action_node.name, condition='on-success')
585
                    else:
586
                        action_node = None
587
                except Exception as e:
588
                    chain_status = action_constants.LIVEACTION_STATUS_FAILED
589
                    m = 'Failed to get next node "%s".' % action_node.name
590
                    LOG.exception(m)
591
                    top_level_error = self._format_error(e, m)
592
                    action_node = None
593
                    break
0 ignored issues
show
Bug Best Practice introduced by
break statement in finally block may swallow exception

Placing a return statement inside finally will swallow all exceptions that may have been thrown in the try block.

Loading history...
594
595
            if action_service.is_action_canceled_or_canceling(self.liveaction.id):
596
                LOG.info('Chain execution (%s) canceled by user.', self.liveaction.id)
597
                chain_status = action_constants.LIVEACTION_STATUS_CANCELED
598
                return (chain_status, result, None)
599
600
            if action_service.is_action_paused_or_pausing(self.liveaction.id):
601
                LOG.info('Chain execution (%s) paused by user.', self.liveaction.id)
602
                chain_status = action_constants.LIVEACTION_STATUS_PAUSED
603
                self._save_vars()
604
                return (chain_status, result, self.liveaction.context)
605
606
        if top_level_error and isinstance(top_level_error, dict):
607
            result.update(top_level_error)
608
609
        return (chain_status, result, self.liveaction.context)
610
611
    def _format_error(self, e, msg):
612
        return {
613
            'error': '%s. %s' % (msg, str(e)),
614
            'traceback': traceback.format_exc(10)
615
        }
616
617
    def _save_vars(self):
618
        # Save the context vars in the liveaction context.
619
        self.liveaction.context['vars'] = self.chain_holder.vars
620
621
    @staticmethod
622
    def _render_publish_vars(action_node, action_parameters, execution_result,
623
                             previous_execution_results, chain_vars):
624
        """
625
        If no output is specified on the action_node the output is the entire execution_result.
626
        If any output is specified then only those variables are published as output of an
627
        execution of this action_node.
628
        The output variable can refer to a variable from the execution_result,
629
        previous_execution_results or chain_vars.
630
        """
631
        if not action_node.publish:
632
            return {}
633
634
        context = {}
635
        context.update(action_parameters)
636
        context.update({action_node.name: execution_result})
637
        context.update(previous_execution_results)
638
        context.update(chain_vars)
639
        context.update({RESULTS_KEY: previous_execution_results})
640
641
        context.update({
642
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
643
                scope=kv_constants.SYSTEM_SCOPE)
644
        })
645
646
        context.update({
647
            kv_constants.DATASTORE_PARENT_SCOPE: {
648
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
649
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
650
            }
651
        })
652
653
        try:
654
            rendered_result = jinja_utils.render_values(mapping=action_node.publish,
655
                                                        context=context)
656
        except Exception as e:
657
            key = getattr(e, 'key', None)
658
            value = getattr(e, 'value', None)
659
            msg = ('Failed rendering value for publish parameter "%s" in task "%s" '
660
                   '(template string=%s): %s' % (key, action_node.name, value, str(e)))
661
            raise action_exc.ParameterRenderingFailedException(msg)
662
663
        return rendered_result
664
665
    @staticmethod
666
    def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context):
667
        # setup context with original parameters and the intermediate results.
668
        chain_parent = chain_context.get('parent', {})
669
        pack = chain_parent.get('pack')
670
        user = chain_parent.get('user')
671
672
        config = get_config(pack, user)
673
674
        context = {}
675
        context.update(original_parameters)
676
        context.update(results)
677
        context.update(chain_vars)
678
        context.update({RESULTS_KEY: results})
679
680
        context.update({
681
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
682
                scope=kv_constants.SYSTEM_SCOPE)
683
        })
684
685
        context.update({
686
            kv_constants.DATASTORE_PARENT_SCOPE: {
687
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
688
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
689
            }
690
        })
691
        context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context})
692
        context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config})
693
        try:
694
            rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(),
695
                                                        context=context)
696
        except Exception as e:
697
            LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key))
698
699
            key = getattr(e, 'key', None)
700
            value = getattr(e, 'value', None)
701
            msg = ('Failed rendering value for action parameter "%s" in task "%s" '
702
                   '(template string=%s): %s') % (key, action_node.name, value, str(e))
703
            raise action_exc.ParameterRenderingFailedException(msg)
704
        LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params))
705
        return rendered_params
706
707
    def _get_next_action(self, action_node, parent_context, action_params, context_result):
708
        # Verify that the referenced action exists
709
        # TODO: We do another lookup in cast_param, refactor to reduce number of lookups
710
        task_name = action_node.name
711
        action_ref = action_node.ref
712
        action_db = action_db_util.get_action_by_ref(ref=action_ref)
713
714
        if not action_db:
715
            error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref)
716
            raise action_exc.InvalidActionReferencedException(error)
717
718
        resolved_params = ActionChainRunner._resolve_params(
719
            action_node=action_node, original_parameters=action_params,
720
            results=context_result, chain_vars=self.chain_holder.vars,
721
            chain_context={'parent': parent_context})
722
723
        liveaction = self._build_liveaction_object(
724
            action_node=action_node,
725
            resolved_params=resolved_params,
726
            parent_context=parent_context)
727
728
        return liveaction
729
730
    def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
731
        """
732
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
733
        :type sleep_delay: ``float``
734
        """
735
        try:
736
            liveaction, _ = action_service.request(liveaction)
737
        except Exception as e:
738
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
739
            LOG.exception('Failed to schedule liveaction.')
740
            raise e
741
742
        while (wait_for_completion and liveaction.status not in (
743
                action_constants.LIVEACTION_COMPLETED_STATES +
744
                [action_constants.LIVEACTION_STATUS_PAUSED,
745
                 action_constants.LIVEACTION_STATUS_PENDING])):
746
            eventlet.sleep(sleep_delay)
747
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
748
749
        return liveaction
750
751
    def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
752
        """
753
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
754
        :type sleep_delay: ``float``
755
        """
756
        try:
757
            user = self.context.get('user', None)
758
            liveaction, _ = action_service.request_resume(liveaction, user)
759
        except Exception as e:
760
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
761
            LOG.exception('Failed to schedule liveaction.')
762
            raise e
763
764
        while (wait_for_completion and liveaction.status not in (
765
                action_constants.LIVEACTION_COMPLETED_STATES +
766
                [action_constants.LIVEACTION_STATUS_PAUSED])):
767
            eventlet.sleep(sleep_delay)
768
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
769
770
        return liveaction
771
772
    def _build_liveaction_object(self, action_node, resolved_params, parent_context):
773
        liveaction = LiveActionDB(action=action_node.ref)
774
775
        # Setup notify for task in chain.
776
        notify = self._get_notify(action_node)
777
        if notify:
778
            liveaction.notify = notify
779
            LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify)
780
781
        liveaction.context = {
782
            'parent': parent_context,
783
            'chain': vars(action_node)
784
        }
785
        liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
786
                                                               params=resolved_params)
787
        return liveaction
788
789
    def _get_notify(self, action_node):
790
        if action_node.name not in self._skip_notify_tasks:
791
            if action_node.notify:
792
                task_notify = NotificationsHelper.to_model(action_node.notify)
793
                return task_notify
794
            elif self._chain_notify:
795
                return self._chain_notify
796
797
        return None
798
799
    def _get_updated_action_exec_result(self, action_node, liveaction, prev_task_result):
800
        if liveaction.status in action_constants.LIVEACTION_COMPLETED_STATES:
801
            created_at = isotime.parse(prev_task_result['created_at'])
802
            updated_at = liveaction.end_timestamp
803
        else:
804
            created_at = isotime.parse(prev_task_result['created_at'])
805
            updated_at = isotime.parse(prev_task_result['updated_at'])
806
807
        return self._format_action_exec_result(action_node, liveaction, created_at, updated_at)
808
809
    def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at,
810
                                   error=None):
811
        """
812
        Format ActionExecution result so it can be used in the final action result output.
813
814
        :rtype: ``dict``
815
        """
816
        assert isinstance(created_at, datetime.datetime)
817
        assert isinstance(updated_at, datetime.datetime)
818
819
        result = {}
820
821
        execution_db = None
822
        if liveaction_db:
823
            execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
824
825
        result['id'] = action_node.name
826
        result['name'] = action_node.name
827
        result['execution_id'] = str(execution_db.id) if execution_db else None
828
        result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None
829
        result['workflow'] = None
830
831
        result['created_at'] = isotime.format(dt=created_at)
832
        result['updated_at'] = isotime.format(dt=updated_at)
833
834
        if error or not liveaction_db:
835
            result['state'] = action_constants.LIVEACTION_STATUS_FAILED
836
        else:
837
            result['state'] = liveaction_db.status
838
839
        if error:
840
            result['result'] = error
841
        else:
842
            result['result'] = liveaction_db.result
843
844
        return result
845
846
847
def get_runner():
848
    return ActionChainRunner(str(uuid.uuid4()))
849
850
851
def get_metadata():
852
    return get_runner_metadata('action_chain_runner')
853