Passed
Pull Request — master (#3645)
by W
09:52
created

ActionChainRunner._get_notify()   A

Complexity

Conditions 4

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 9
rs 9.2
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
import traceback
18
import uuid
19
import datetime
20
21
from jsonschema import exceptions as json_schema_exc
22
23
from st2common.runners.base import ActionRunner
24
from st2common import log as logging
25
from st2common.constants import action as action_constants
26
from st2common.constants import pack as pack_constants
27
from st2common.constants import keyvalue as kv_constants
28
from st2common.content.loader import MetaLoader
29
from st2common.exceptions import action as action_exc
30
from st2common.exceptions import actionrunner as runner_exc
31
from st2common.models.api.notification import NotificationsHelper
32
from st2common.models.db.liveaction import LiveActionDB
33
from st2common.models.system import actionchain
34
from st2common.models.utils import action_param_utils
35
from st2common.persistence.execution import ActionExecution
36
from st2common.persistence.liveaction import LiveAction
37
from st2common.services import action as action_service
38
from st2common.services import keyvalues as kv_service
39
from st2common.util import action_db as action_db_util
40
from st2common.util import isotime
41
from st2common.util import date as date_utils
42
from st2common.util import jinja as jinja_utils
43
from st2common.util import param as param_utils
44
from st2common.util.config_loader import get_config
45
46
47
LOG = logging.getLogger(__name__)
48
RESULTS_KEY = '__results'
49
JINJA_START_MARKERS = [
50
    '{{',
51
    '{%'
52
]
53
PUBLISHED_VARS_KEY = 'published'
54
55
56
class ChainHolder(object):
57
58
    def __init__(self, chainspec, chainname):
59
        self.actionchain = actionchain.ActionChain(**chainspec)
60
        self.chainname = chainname
61
62
        if not self.actionchain.default:
63
            default = self._get_default(self.actionchain)
64
            self.actionchain.default = default
65
66
        LOG.debug('Using %s as default for %s.', self.actionchain.default, self.chainname)
67
        if not self.actionchain.default:
68
            raise Exception('Failed to find default node in %s.' % (self.chainname))
69
70
        self.vars = {}
71
72
    def init_vars(self, action_parameters):
73
        if self.actionchain.vars:
74
            self.vars = self._get_rendered_vars(self.actionchain.vars,
75
                                                action_parameters=action_parameters)
76
77
    def validate(self):
78
        """
79
        Function which performs a simple compile time validation.
80
81
        Keep in mind that some variables are only resolved during run time which means we can
82
        perform only simple validation during compile / create time.
83
        """
84
        all_nodes = self._get_all_nodes(action_chain=self.actionchain)
85
86
        for node in self.actionchain.chain:
87
            on_success_node_name = node.on_success
88
            on_failure_node_name = node.on_failure
89
90
            # Check "on-success" path
91
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
92
                                                  node_name=on_success_node_name)
93
            if not valid_name:
94
                msg = ('Unable to find node with name "%s" referenced in "on-success" in '
95
                       'task "%s".' % (on_success_node_name, node.name))
96
                raise ValueError(msg)
97
98
            # Check "on-failure" path
99
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
100
                                                  node_name=on_failure_node_name)
101
            if not valid_name:
102
                msg = ('Unable to find node with name "%s" referenced in "on-failure" in '
103
                       'task "%s".' % (on_failure_node_name, node.name))
104
                raise ValueError(msg)
105
106
        # check if node specified in default is valid.
107
        if self.actionchain.default:
108
            valid_name = self._is_valid_node_name(all_node_names=all_nodes,
109
                                                  node_name=self.actionchain.default)
110
            if not valid_name:
111
                msg = ('Unable to find node with name "%s" referenced in "default".' %
112
                       self.actionchain.default)
113
                raise ValueError(msg)
114
        return True
115
116
    @staticmethod
117
    def _get_default(action_chain):
118
        # default is defined
119
        if action_chain.default:
120
            return action_chain.default
121
        # no nodes in chain
122
        if not action_chain.chain:
123
            return None
124
        # The first node with no references is the default node. Assumptions
125
        # that support this are :
126
        # 1. There are no loops in the chain. Even if there are loops there is
127
        #    at least 1 node which does not end up in this loop.
128
        # 2. There are no fragments in the chain.
129
        all_nodes = ChainHolder._get_all_nodes(action_chain=action_chain)
130
        node_names = set(all_nodes)
131
        on_success_nodes = ChainHolder._get_all_on_success_nodes(action_chain=action_chain)
132
        on_failure_nodes = ChainHolder._get_all_on_failure_nodes(action_chain=action_chain)
133
        referenced_nodes = on_success_nodes | on_failure_nodes
134
        possible_default_nodes = node_names - referenced_nodes
135
        if possible_default_nodes:
136
            # This is to preserve order. set([..]) does not preserve the order so iterate
137
            # over original array.
138
            for node in all_nodes:
139
                if node in possible_default_nodes:
140
                    return node
141
        # If no node is found assume the first node in the chain list to be default.
142
        return action_chain.chain[0].name
143
144
    @staticmethod
145
    def _get_all_nodes(action_chain):
146
        """
147
        Return names for all the nodes in the chain.
148
        """
149
        all_nodes = [node.name for node in action_chain.chain]
150
        return all_nodes
151
152
    @staticmethod
153
    def _get_all_on_success_nodes(action_chain):
154
        """
155
        Return names for all the tasks referenced in "on-success".
156
        """
157
        on_success_nodes = set([node.on_success for node in action_chain.chain])
158
        return on_success_nodes
159
160
    @staticmethod
161
    def _get_all_on_failure_nodes(action_chain):
162
        """
163
        Return names for all the tasks referenced in "on-failure".
164
        """
165
        on_failure_nodes = set([node.on_failure for node in action_chain.chain])
166
        return on_failure_nodes
167
168
    def _is_valid_node_name(self, all_node_names, node_name):
169
        """
170
        Function which validates that the provided node name is defined in the workflow definition
171
        and it's valid.
172
173
        Keep in mind that we can only perform validation for task names which don't include jinja
174
        expressions since those are rendered at run time.
175
        """
176
        if not node_name:
177
            # This task name needs to be resolved during run time so we cant validate the name now
178
            return True
179
180
        is_jinja_expression = jinja_utils.is_jinja_expression(value=node_name)
181
        if is_jinja_expression:
182
            # This task name needs to be resolved during run time so we cant validate the name
183
            # now
184
            return True
185
186
        return node_name in all_node_names
187
188
    @staticmethod
189
    def _get_rendered_vars(vars, action_parameters):
190
        if not vars:
191
            return {}
192
        context = {}
193
        context.update({
194
            kv_constants.DATASTORE_PARENT_SCOPE: {
195
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
196
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
197
            }
198
        })
199
        context.update(action_parameters)
200
        LOG.info('Rendering action chain vars. Mapping = %s; Context = %s', vars, context)
201
        return jinja_utils.render_values(mapping=vars, context=context)
202
203
    def get_node(self, node_name=None, raise_on_failure=False):
204
        if not node_name:
205
            return None
206
        for node in self.actionchain.chain:
207
            if node.name == node_name:
208
                return node
209
        if raise_on_failure:
210
            raise runner_exc.ActionRunnerException(
211
                'Unable to find node with name "%s".' % (node_name))
212
        return None
213
214
    def get_next_node(self, curr_node_name=None, condition='on-success'):
215
        if not curr_node_name:
216
            return self.get_node(self.actionchain.default)
217
        current_node = self.get_node(curr_node_name)
218
        if condition == 'on-success':
219
            return self.get_node(current_node.on_success, raise_on_failure=True)
220
        elif condition == 'on-failure':
221
            return self.get_node(current_node.on_failure, raise_on_failure=True)
222
        raise runner_exc.ActionRunnerException('Unknown condition %s.' % condition)
223
224
225
class ActionChainRunner(ActionRunner):
226
227
    def __init__(self, runner_id):
228
        super(ActionChainRunner, self).__init__(runner_id=runner_id)
229
        self.chain_holder = None
230
        self._meta_loader = MetaLoader()
231
        self._stopped = False
232
        self._skip_notify_tasks = []
233
        self._display_published = True
234
        self._chain_notify = None
235
236
    def pre_run(self):
237
        super(ActionChainRunner, self).pre_run()
238
239
        chainspec_file = self.entry_point
240
        LOG.debug('Reading action chain from %s for action %s.', chainspec_file,
241
                  self.action)
242
243
        try:
244
            chainspec = self._meta_loader.load(file_path=chainspec_file,
245
                                               expected_type=dict)
246
        except Exception as e:
247
            message = ('Failed to parse action chain definition from "%s": %s' %
248
                       (chainspec_file, str(e)))
249
            LOG.exception('Failed to load action chain definition.')
250
            raise runner_exc.ActionRunnerPreRunError(message)
251
252
        try:
253
            self.chain_holder = ChainHolder(chainspec, self.action_name)
254
        except json_schema_exc.ValidationError as e:
255
            # preserve the whole nasty jsonschema message as that is better to get to the
256
            # root cause
257
            message = str(e)
258
            LOG.exception('Failed to instantiate ActionChain.')
259
            raise runner_exc.ActionRunnerPreRunError(message)
260
        except Exception as e:
261
            message = e.message or str(e)
262
            LOG.exception('Failed to instantiate ActionChain.')
263
            raise runner_exc.ActionRunnerPreRunError(message)
264
265
        # Runner attributes are set lazily. So these steps
266
        # should happen outside the constructor.
267
        if getattr(self, 'liveaction', None):
268
            self._chain_notify = getattr(self.liveaction, 'notify', None)
269
        if self.runner_parameters:
270
            self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
271
            self._display_published = self.runner_parameters.get('display_published', True)
272
273
        # Perform some pre-run chain validation
274
        try:
275
            self.chain_holder.validate()
276
        except Exception as e:
277
            raise runner_exc.ActionRunnerPreRunError(e.message)
278
279
    def run(self, action_parameters):
280
        # holds final result we store.
281
        result = {'tasks': []}
282
        # published variables are to be stored for display.
283
        if self._display_published:
284
            result[PUBLISHED_VARS_KEY] = {}
285
        context_result = {}  # holds result which is used for the template context purposes
286
        top_level_error = None  # stores a reference to a top level error
287
        fail = True
288
        action_node = None
289
290
        try:
291
            # initialize vars once we have the action_parameters. This allows
292
            # vars to refer to action_parameters.
293
            self.chain_holder.init_vars(action_parameters)
294
        except Exception as e:
295
            error = 'Failed initializing ``vars`` in chain.'
296
297
            LOG.exception(error)
298
299
            trace = traceback.format_exc(10)
300
            top_level_error = {
301
                'error': error,
302
                'traceback': trace
303
            }
304
            result['error'] = top_level_error['error']
305
            result['traceback'] = top_level_error['traceback']
306
            return (action_constants.LIVEACTION_STATUS_FAILED, result, None)
307
308
        try:
309
            action_node = self.chain_holder.get_next_node()
310
        except Exception as e:
311
            LOG.exception('Failed to get starting node "%s".', action_node.name)
312
313
            error = ('Failed to get starting node "%s". Lookup failed: %s' %
314
                     (action_node.name, str(e)))
315
            trace = traceback.format_exc(10)
316
            top_level_error = {
317
                'error': error,
318
                'traceback': trace
319
            }
320
321
        parent_context = {
322
            'execution_id': self.execution_id
323
        }
324
        if getattr(self.liveaction, 'context', None):
325
            parent_context.update(self.liveaction.context)
326
327
        while action_node:
328
            fail = False
329
            timeout = False
330
            error = None
331
            liveaction = None
332
333
            created_at = date_utils.get_datetime_utc_now()
334
335
            try:
336
                liveaction = self._get_next_action(
337
                    action_node=action_node, parent_context=parent_context,
338
                    action_params=action_parameters, context_result=context_result)
339
            except action_exc.InvalidActionReferencedException as e:
340
                error = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
341
                         (action_node.name, action_node.ref))
342
                LOG.exception(error)
343
344
                fail = True
345
                top_level_error = {
346
                    'error': error,
347
                    'traceback': traceback.format_exc(10)
348
                }
349
                break
350
            except action_exc.ParameterRenderingFailedException as e:
351
                # Rendering parameters failed before we even got to running this action, abort and
352
                # fail the whole action chain
353
                LOG.exception('Failed to run action "%s".', action_node.name)
354
355
                fail = True
356
                error = ('Failed to run task "%s". Parameter rendering failed: %s' %
357
                         (action_node.name, str(e)))
358
                trace = traceback.format_exc(10)
359
                top_level_error = {
360
                    'error': error,
361
                    'traceback': trace
362
                }
363
                break
364
365
            try:
366
                liveaction = self._run_action(liveaction)
367
            except Exception as e:
368
                # Save the traceback and error message
369
                LOG.exception('Failure in running action "%s".', action_node.name)
370
371
                error = {
372
                    'error': 'Task "%s" failed: %s' % (action_node.name, str(e)),
373
                    'traceback': traceback.format_exc(10)
374
                }
375
                context_result[action_node.name] = error
376
            else:
377
                # Update context result
378
                context_result[action_node.name] = liveaction.result
379
380
                # Render and publish variables
381
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
382
                    action_node=action_node, action_parameters=action_parameters,
383
                    execution_result=liveaction.result, previous_execution_results=context_result,
384
                    chain_vars=self.chain_holder.vars)
385
386
                if rendered_publish_vars:
387
                    self.chain_holder.vars.update(rendered_publish_vars)
388
                    if self._display_published:
389
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
390
            finally:
391
                # Record result and resolve a next node based on the task success or failure
392
                updated_at = date_utils.get_datetime_utc_now()
393
394
                format_kwargs = {'action_node': action_node, 'liveaction_db': liveaction,
395
                                 'created_at': created_at, 'updated_at': updated_at}
396
397
                if error:
398
                    format_kwargs['error'] = error
399
400
                task_result = self._format_action_exec_result(**format_kwargs)
401
                result['tasks'].append(task_result)
402
403
                if self.liveaction_id:
404
                    self._stopped = action_service.is_action_canceled_or_canceling(
405
                        self.liveaction_id)
406
407
                if self._stopped:
408
                    LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
409
                    status = action_constants.LIVEACTION_STATUS_CANCELED
410
                    return (status, result, None)
411
412
                if self.liveaction_id:
413
                    self._stopped = action_service.is_action_paused_or_pausing(
414
                        self.liveaction_id)
415
416
                if self._stopped:
417
                    LOG.info('Chain execution (%s) paused by user.', self.liveaction_id)
418
                    status = action_constants.LIVEACTION_STATUS_PAUSED
419
                    return (status, result, None)
420
421
                try:
422
                    if not liveaction:
423
                        fail = True
424
                        action_node = self.chain_holder.get_next_node(action_node.name,
425
                                                                      condition='on-failure')
426
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
427
                        if liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
428
                            timeout = True
429
                        else:
430
                            fail = True
431
                        action_node = self.chain_holder.get_next_node(action_node.name,
432
                                                                      condition='on-failure')
433
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
434
                        # User canceled an action (task) in the workflow - cancel the execution of
435
                        # rest of the workflow
436
                        self._stopped = True
437
                        LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
438
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_PAUSED:
439
                        # User paused an action (task) in the workflow - pause the execution of
440
                        # rest of the workflow
441
                        self._stopped = True
442
                        LOG.info('Chain execution (%s) paused by user.', self.liveaction_id)
443
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
444
                        action_node = self.chain_holder.get_next_node(action_node.name,
445
                                                                      condition='on-success')
446
                except Exception as e:
447
                    LOG.exception('Failed to get next node "%s".', action_node.name)
448
449
                    fail = True
450
                    error = ('Failed to get next node "%s". Lookup failed: %s' %
451
                             (action_node.name, str(e)))
452
                    trace = traceback.format_exc(10)
453
                    top_level_error = {
454
                        'error': error,
455
                        'traceback': trace
456
                    }
457
                    # reset action_node here so that chain breaks on failure.
458
                    action_node = None
459
                    break
460
461
                if self._stopped:
462
                    LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
463
                    status = action_constants.LIVEACTION_STATUS_CANCELED
464
                    return (status, result, None)
465
466
        if fail:
467
            status = action_constants.LIVEACTION_STATUS_FAILED
468
        elif timeout:
469
            status = action_constants.LIVEACTION_STATUS_TIMED_OUT
470
        else:
471
            status = action_constants.LIVEACTION_STATUS_SUCCEEDED
472
473
        if top_level_error:
474
            # Include top level error information
475
            result['error'] = top_level_error['error']
476
            result['traceback'] = top_level_error['traceback']
477
478
        return (status, result, None)
479
480
    def pause(self):
481
        # Identify the list of action executions that are workflows and cascade pause.
482
        for child_exec_id in self.execution.children:
483
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
484
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
485
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
486
                action_service.request_pause(
487
                    LiveAction.get(id=child_exec.liveaction['id']),
488
                    self.context.get('user', None)
489
                )
490
491
        return (
492
            action_constants.LIVEACTION_STATUS_PAUSING,
493
            self.liveaction.result,
494
            self.liveaction.context
495
        )
496
497
    def resume(self):
498
        # Restore chain holder
499
        if not self.chain_holder:
500
            self.pre_run()
501
502
        # Restore action parameters
503
        _, action_parameters = param_utils.render_final_params(
504
            self.runner_type_db.runner_parameters,
505
            self.action.parameters,
506
            self.liveaction.parameters,
507
            self.liveaction.context
508
        )
509
510
        # Restore result and published vars from the liveaction.
511
        result = self.liveaction.result or {'tasks': []}
512
513
        if self._display_published and PUBLISHED_VARS_KEY not in result:
514
            result[PUBLISHED_VARS_KEY] = {}
515
516
        # Rebuild context_result
517
        context_result = {}
518
        for task in result['tasks']:
519
            context_result[task['name']] = task['result']
520
521
        # Rebuild top_level_error
522
        top_level_error = None
523
        if 'error' in result or 'traceback' in result:
524
            top_level_error = {
525
                'error': result.get('error'),
526
                'traceback': result.get('traceback')
527
            }
528
529
        # Identify the last task executed.
530
        action_node = None
531
        liveaction = None
532
        resume_action_node = False
533
534
        if len(result['tasks']) > 0:
535
            last_task = result['tasks'][-1]
536
537
            if last_task['state'] == action_constants.LIVEACTION_STATUS_CANCELED:
538
                return (action_constants.LIVEACTION_STATUS_CANCELED, result, None)
539
540
            if last_task['state'] == action_constants.LIVEACTION_STATUS_PAUSED:
541
                action_node = self.chain_holder.get_node(last_task['name'])
542
                liveaction = action_db_util.get_liveaction_by_id(last_task['liveaction_id'])
543
                resume_action_node = True
544
545
            if last_task['state'] in action_constants.LIVEACTION_COMPLETED_STATES:
546
                last_task_condition = (
547
                    'on-failure'
548
                    if last_task['state'] in action_constants.LIVEACTION_FAILED_STATES
549
                    else 'on-success'
550
                )
551
552
                action_node = self.chain_holder.get_next_node(
553
                    last_task['name'],
554
                    condition=last_task_condition
555
                )
556
557
        fail = True
558
559
        # TODO: Restore chain holder vars
560
561
        # Setup parent context.
562
        parent_context = {
563
            'execution_id': self.execution_id
564
        }
565
566
        if getattr(self.liveaction, 'context', None):
567
            parent_context.update(self.liveaction.context)
568
569
        while action_node:
570
            fail = False
571
            timeout = False
572
            error = None
573
574
            created_at = date_utils.get_datetime_utc_now()
575
576
            if not resume_action_node or not liveaction:
577
                try:
578
                    liveaction = self._get_next_action(
579
                        action_node=action_node, parent_context=parent_context,
580
                        action_params=action_parameters, context_result=context_result)
581
                except action_exc.InvalidActionReferencedException as e:
582
                    error = ('Failed to run task "%s". Action with reference "%s" doesn\'t exist.' %
583
                             (action_node.name, action_node.ref))
584
                    LOG.exception(error)
585
586
                    fail = True
587
                    top_level_error = {
588
                        'error': error,
589
                        'traceback': traceback.format_exc(10)
590
                    }
591
                    break
592
                except action_exc.ParameterRenderingFailedException as e:
593
                    # Rendering parameters failed before we even got to running this action,
594
                    # abort and fail the whole action chain
595
                    LOG.exception('Failed to run action "%s".', action_node.name)
596
597
                    fail = True
598
                    error = ('Failed to run task "%s". Parameter rendering failed: %s' %
599
                             (action_node.name, str(e)))
600
                    trace = traceback.format_exc(10)
601
                    top_level_error = {
602
                        'error': error,
603
                        'traceback': trace
604
                    }
605
                    break
606
607
            try:
608
                liveaction = (
609
                    self._resume_action(liveaction)
610
                    if resume_action_node
611
                    else self._run_action(liveaction)
612
                )
613
            except Exception as e:
614
                # Save the traceback and error message
615
                LOG.exception('Failure in running action "%s".', action_node.name)
616
617
                error = {
618
                    'error': 'Task "%s" failed: %s' % (action_node.name, str(e)),
619
                    'traceback': traceback.format_exc(10)
620
                }
621
                context_result[action_node.name] = error
622
            else:
623
                # Update context result
624
                context_result[action_node.name] = liveaction.result
625
626
                # Render and publish variables
627
                rendered_publish_vars = ActionChainRunner._render_publish_vars(
628
                    action_node=action_node, action_parameters=action_parameters,
629
                    execution_result=liveaction.result, previous_execution_results=context_result,
630
                    chain_vars=self.chain_holder.vars)
631
632
                if rendered_publish_vars:
633
                    self.chain_holder.vars.update(rendered_publish_vars)
634
                    if self._display_published:
635
                        result[PUBLISHED_VARS_KEY].update(rendered_publish_vars)
636
            finally:
637
                # Record result and resolve a next node based on the task success or failure
638
                updated_at = date_utils.get_datetime_utc_now()
639
640
                format_kwargs = {'action_node': action_node, 'liveaction_db': liveaction,
641
                                 'created_at': created_at, 'updated_at': updated_at}
642
643
                if error:
644
                    format_kwargs['error'] = error
645
646
                task_result = self._format_action_exec_result(**format_kwargs)
647
                result['tasks'].append(task_result)
648
649
                if self.liveaction_id:
650
                    self._stopped = action_service.is_action_canceled_or_canceling(
651
                        self.liveaction_id)
652
653
                if self._stopped:
654
                    LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
655
                    status = action_constants.LIVEACTION_STATUS_CANCELED
656
                    return (status, result, None)
657
658
                try:
659
                    if not liveaction:
660
                        fail = True
661
                        action_node = self.chain_holder.get_next_node(action_node.name,
662
                                                                      condition='on-failure')
663
                    elif liveaction.status in action_constants.LIVEACTION_FAILED_STATES:
664
                        if liveaction.status == action_constants.LIVEACTION_STATUS_TIMED_OUT:
665
                            timeout = True
666
                        else:
667
                            fail = True
668
                        action_node = self.chain_holder.get_next_node(action_node.name,
669
                                                                      condition='on-failure')
670
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_CANCELED:
671
                        # User canceled an action (task) in the workflow - cancel the execution of
672
                        # rest of the workflow
673
                        self._stopped = True
674
                        LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
675
                    elif liveaction.status == action_constants.LIVEACTION_STATUS_SUCCEEDED:
676
                        action_node = self.chain_holder.get_next_node(action_node.name,
677
                                                                      condition='on-success')
678
                except Exception as e:
679
                    LOG.exception('Failed to get next node "%s".', action_node.name)
680
681
                    fail = True
682
                    error = ('Failed to get next node "%s". Lookup failed: %s' %
683
                             (action_node.name, str(e)))
684
                    trace = traceback.format_exc(10)
685
                    top_level_error = {
686
                        'error': error,
687
                        'traceback': trace
688
                    }
689
                    # reset action_node here so that chain breaks on failure.
690
                    action_node = None
691
                    break
692
693
                if self._stopped:
694
                    LOG.info('Chain execution (%s) canceled by user.', self.liveaction_id)
695
                    status = action_constants.LIVEACTION_STATUS_CANCELED
696
                    return (status, result, None)
697
698
                resume_action_node = False
699
                liveaction = None
700
701
        if fail:
702
            status = action_constants.LIVEACTION_STATUS_FAILED
703
        elif timeout:
704
            status = action_constants.LIVEACTION_STATUS_TIMED_OUT
705
        else:
706
            status = action_constants.LIVEACTION_STATUS_SUCCEEDED
707
708
        if top_level_error:
709
            # Include top level error information
710
            result['error'] = top_level_error['error']
711
            result['traceback'] = top_level_error['traceback']
712
713
        return (status, result, None)
714
715
    @staticmethod
716
    def _render_publish_vars(action_node, action_parameters, execution_result,
717
                             previous_execution_results, chain_vars):
718
        """
719
        If no output is specified on the action_node the output is the entire execution_result.
720
        If any output is specified then only those variables are published as output of an
721
        execution of this action_node.
722
        The output variable can refer to a variable from the execution_result,
723
        previous_execution_results or chain_vars.
724
        """
725
        if not action_node.publish:
726
            return {}
727
728
        context = {}
729
        context.update(action_parameters)
730
        context.update({action_node.name: execution_result})
731
        context.update(previous_execution_results)
732
        context.update(chain_vars)
733
        context.update({RESULTS_KEY: previous_execution_results})
734
735
        context.update({
736
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
737
                scope=kv_constants.SYSTEM_SCOPE)
738
        })
739
740
        context.update({
741
            kv_constants.DATASTORE_PARENT_SCOPE: {
742
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
743
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
744
            }
745
        })
746
747
        try:
748
            rendered_result = jinja_utils.render_values(mapping=action_node.publish,
749
                                                        context=context)
750
        except Exception as e:
751
            key = getattr(e, 'key', None)
752
            value = getattr(e, 'value', None)
753
            msg = ('Failed rendering value for publish parameter "%s" in task "%s" '
754
                   '(template string=%s): %s' % (key, action_node.name, value, str(e)))
755
            raise action_exc.ParameterRenderingFailedException(msg)
756
757
        return rendered_result
758
759
    @staticmethod
760
    def _resolve_params(action_node, original_parameters, results, chain_vars, chain_context):
761
        # setup context with original parameters and the intermediate results.
762
        chain_parent = chain_context.get('parent', {})
763
        pack = chain_parent.get('pack')
764
        user = chain_parent.get('user')
765
766
        config = get_config(pack, user)
767
768
        context = {}
769
        context.update(original_parameters)
770
        context.update(results)
771
        context.update(chain_vars)
772
        context.update({RESULTS_KEY: results})
773
774
        context.update({
775
            kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
776
                scope=kv_constants.SYSTEM_SCOPE)
777
        })
778
779
        context.update({
780
            kv_constants.DATASTORE_PARENT_SCOPE: {
781
                kv_constants.SYSTEM_SCOPE: kv_service.KeyValueLookup(
782
                    scope=kv_constants.FULL_SYSTEM_SCOPE)
783
            }
784
        })
785
        context.update({action_constants.ACTION_CONTEXT_KV_PREFIX: chain_context})
786
        context.update({pack_constants.PACK_CONFIG_CONTEXT_KV_PREFIX: config})
787
        try:
788
            rendered_params = jinja_utils.render_values(mapping=action_node.get_parameters(),
789
                                                        context=context)
790
        except Exception as e:
791
            LOG.exception('Jinja rendering for parameter "%s" failed.' % (e.key))
792
793
            key = getattr(e, 'key', None)
794
            value = getattr(e, 'value', None)
795
            msg = ('Failed rendering value for action parameter "%s" in task "%s" '
796
                   '(template string=%s): %s') % (key, action_node.name, value, str(e))
797
            raise action_exc.ParameterRenderingFailedException(msg)
798
        LOG.debug('Rendered params: %s: Type: %s', rendered_params, type(rendered_params))
799
        return rendered_params
800
801
    def _get_next_action(self, action_node, parent_context, action_params, context_result):
802
        # Verify that the referenced action exists
803
        # TODO: We do another lookup in cast_param, refactor to reduce number of lookups
804
        task_name = action_node.name
805
        action_ref = action_node.ref
806
        action_db = action_db_util.get_action_by_ref(ref=action_ref)
807
808
        if not action_db:
809
            error = 'Task :: %s - Action with ref %s not registered.' % (task_name, action_ref)
810
            raise action_exc.InvalidActionReferencedException(error)
811
812
        resolved_params = ActionChainRunner._resolve_params(
813
            action_node=action_node, original_parameters=action_params,
814
            results=context_result, chain_vars=self.chain_holder.vars,
815
            chain_context={'parent': parent_context})
816
817
        liveaction = self._build_liveaction_object(
818
            action_node=action_node,
819
            resolved_params=resolved_params,
820
            parent_context=parent_context)
821
822
        return liveaction
823
824
    def _run_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
825
        """
826
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
827
        :type sleep_delay: ``float``
828
        """
829
        try:
830
            # request return canceled
831
            liveaction, _ = action_service.request(liveaction)
832
        except Exception as e:
833
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
834
            LOG.exception('Failed to schedule liveaction.')
835
            raise e
836
837
        while (wait_for_completion and liveaction.status not in (
838
                action_constants.LIVEACTION_COMPLETED_STATES +
839
                [action_constants.LIVEACTION_STATUS_PAUSED])):
840
            eventlet.sleep(sleep_delay)
841
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
842
843
        return liveaction
844
845
    def _resume_action(self, liveaction, wait_for_completion=True, sleep_delay=1.0):
846
        """
847
        :param sleep_delay: Number of seconds to wait during "is completed" polls.
848
        :type sleep_delay: ``float``
849
        """
850
        try:
851
            liveaction, _ = action_service.request_resume(
852
                liveaction,
853
                self.context.get('user', None)
854
            )
855
856
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
857
        except Exception as e:
858
            liveaction.status = action_constants.LIVEACTION_STATUS_FAILED
859
            LOG.exception('Failed to schedule liveaction.')
860
            raise e
861
862
        while (wait_for_completion and liveaction.status not in (
863
                action_constants.LIVEACTION_COMPLETED_STATES +
864
                [action_constants.LIVEACTION_STATUS_PAUSED])):
865
            eventlet.sleep(sleep_delay)
866
            liveaction = action_db_util.get_liveaction_by_id(liveaction.id)
867
868
        return liveaction
869
870
    def _build_liveaction_object(self, action_node, resolved_params, parent_context):
871
        liveaction = LiveActionDB(action=action_node.ref)
872
873
        # Setup notify for task in chain.
874
        notify = self._get_notify(action_node)
875
        if notify:
876
            liveaction.notify = notify
877
            LOG.debug('%s: Task notify set to: %s', action_node.name, liveaction.notify)
878
879
        liveaction.context = {
880
            'parent': parent_context,
881
            'chain': vars(action_node)
882
        }
883
        liveaction.parameters = action_param_utils.cast_params(action_ref=action_node.ref,
884
                                                               params=resolved_params)
885
        return liveaction
886
887
    def _get_notify(self, action_node):
888
        if action_node.name not in self._skip_notify_tasks:
889
            if action_node.notify:
890
                task_notify = NotificationsHelper.to_model(action_node.notify)
891
                return task_notify
892
            elif self._chain_notify:
893
                return self._chain_notify
894
895
        return None
896
897
    def _format_action_exec_result(self, action_node, liveaction_db, created_at, updated_at,
898
                                   error=None):
899
        """
900
        Format ActionExecution result so it can be used in the final action result output.
901
902
        :rtype: ``dict``
903
        """
904
        assert isinstance(created_at, datetime.datetime)
905
        assert isinstance(updated_at, datetime.datetime)
906
907
        result = {}
908
909
        execution_db = None
910
        if liveaction_db:
911
            execution_db = ActionExecution.get(liveaction__id=str(liveaction_db.id))
912
913
        result['id'] = action_node.name
914
        result['name'] = action_node.name
915
        result['execution_id'] = str(execution_db.id) if execution_db else None
916
        result['liveaction_id'] = str(liveaction_db.id) if liveaction_db else None
917
        result['workflow'] = None
918
919
        result['created_at'] = isotime.format(dt=created_at)
920
        result['updated_at'] = isotime.format(dt=updated_at)
921
922
        if error or not liveaction_db:
923
            result['state'] = action_constants.LIVEACTION_STATUS_FAILED
924
        else:
925
            result['state'] = liveaction_db.status
926
927
        if error:
928
            result['result'] = error
929
        else:
930
            result['result'] = liveaction_db.result
931
932
        return result
933
934
935
def get_runner():
936
    return ActionChainRunner(str(uuid.uuid4()))
937