Passed
Pull Request — master (#4000)
by W
05:39
created

_construct_workflow_execution_options()   C

Complexity

Conditions 7

Size

Total Lines 62

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 62
rs 6.2295
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import uuid
19
20
import retrying
21
import six
22
import yaml
23
from mistralclient.api import client as mistral
24
from oslo_config import cfg
25
26
from st2common.runners.base import PollingAsyncActionRunner
27
from st2common.runners.base import get_metadata as get_runner_metadata
28
from st2common.constants import action as action_constants
29
from st2common import log as logging
30
from st2common.models.api.notification import NotificationsHelper
31
from st2common.persistence.execution import ActionExecution
32
from st2common.persistence.liveaction import LiveAction
33
from st2common.services import action as action_service
34
from st2common.util import jinja
35
from st2common.util.workflow import mistral as utils
36
from st2common.util.url import get_url_without_trailing_slash
37
from st2common.util.api import get_full_public_api_url
38
from st2common.util.api import get_mistral_api_url
39
40
__all__ = [
41
    'MistralRunner',
42
43
    'get_runner',
44
    'get_metadata'
45
]
46
47
48
LOG = logging.getLogger(__name__)
49
50
51
class MistralRunner(PollingAsyncActionRunner):
52
53
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
54
55
    def __init__(self, runner_id):
56
        super(MistralRunner, self).__init__(runner_id=runner_id)
57
        self._on_behalf_user = cfg.CONF.system_user.user
58
        self._notify = None
59
        self._skip_notify_tasks = []
60
        self._client = mistral.client(
61
            mistral_url=self.url,
62
            username=cfg.CONF.mistral.keystone_username,
63
            api_key=cfg.CONF.mistral.keystone_password,
64
            project_name=cfg.CONF.mistral.keystone_project_name,
65
            auth_url=cfg.CONF.mistral.keystone_auth_url,
66
            cacert=cfg.CONF.mistral.cacert,
67
            insecure=cfg.CONF.mistral.insecure)
68
69
    @classmethod
70
    def is_polling_enabled(cls):
71
        return cfg.CONF.mistral.enable_polling
72
73
    @staticmethod
74
    def get_workflow_definition(entry_point):
75
        with open(entry_point, 'r') as def_file:
76
            return def_file.read()
77
78
    def pre_run(self):
79
        super(MistralRunner, self).pre_run()
80
81
        if getattr(self, 'liveaction', None):
82
            self._notify = getattr(self.liveaction, 'notify', None)
83
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
84
85
    @staticmethod
86
    def _check_name(action_ref, is_workbook, def_dict):
87
        # If workbook, change the value of the "name" key.
88
        if is_workbook:
89
            if def_dict.get('name') != action_ref:
90
                raise Exception('Name of the workbook must be the same as the '
91
                                'fully qualified action name "%s".' % action_ref)
92
        # If workflow, change the key name of the workflow.
93
        else:
94
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
95
            if workflow_name != action_ref:
96
                raise Exception('Name of the workflow must be the same as the '
97
                                'fully qualified action name "%s".' % action_ref)
98
99
    def _save_workbook(self, name, def_yaml):
100
        # If the workbook is not found, the mistral client throws a generic API exception.
101
        try:
102
            # Update existing workbook.
103
            wb = self._client.workbooks.get(name)
104
        except:
105
            # Delete if definition was previously a workflow.
106
            # If not found, an API exception is thrown.
107
            try:
108
                self._client.workflows.delete(name)
109
            except:
110
                pass
111
112
            # Create the new workbook.
113
            wb = self._client.workbooks.create(def_yaml)
114
115
        # Update the workbook definition.
116
        # pylint: disable=no-member
117
        if wb.definition != def_yaml:
118
            self._client.workbooks.update(def_yaml)
119
120
    def _save_workflow(self, name, def_yaml):
121
        # If the workflow is not found, the mistral client throws a generic API exception.
122
        try:
123
            # Update existing workbook.
124
            wf = self._client.workflows.get(name)
125
        except:
126
            # Delete if definition was previously a workbook.
127
            # If not found, an API exception is thrown.
128
            try:
129
                self._client.workbooks.delete(name)
130
            except:
131
                pass
132
133
            # Create the new workflow.
134
            wf = self._client.workflows.create(def_yaml)[0]
135
136
        # Update the workflow definition.
137
        # pylint: disable=no-member
138
        if wf.definition != def_yaml:
139
            self._client.workflows.update(def_yaml)
140
141
    def _find_default_workflow(self, def_dict):
142
        num_workflows = len(list(def_dict['workflows'].keys()))
143
144
        if num_workflows > 1:
145
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
146
            if not fully_qualified_wf_name:
147
                raise ValueError('Workbook definition is detected. '
148
                                 'Default workflow cannot be determined.')
149
150
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
151
            if wf_name not in def_dict['workflows']:
152
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
153
                                 % fully_qualified_wf_name)
154
155
            return fully_qualified_wf_name
156
        elif num_workflows == 1:
157
            return '%s.%s' % (def_dict['name'], list(def_dict['workflows'].keys())[0])
158
        else:
159
            raise Exception('There are no workflows in the workbook.')
160
161
    def _construct_workflow_execution_options(self):
162
        # This URL is used by Mistral to talk back to the API
163
        api_url = get_mistral_api_url()
164
        endpoint = api_url + '/actionexecutions'
165
166
        # This URL is available in the context and can be used by the users inside a workflow,
167
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
168
        public_api_url = get_full_public_api_url()
169
170
        # Build context with additional information
171
        parent_context = {
172
            'execution_id': self.execution_id
173
        }
174
175
        if getattr(self.liveaction, 'context', None):
176
            parent_context.update(self.liveaction.context)
177
178
        # Convert jinja expressions in the params of Action Chain under the parent context
179
        # into raw block. If there is any jinja expressions, Mistral will try to evaulate
180
        # the expression. If there is a local context reference, the evaluation will fail
181
        # because the local context reference is out of scope.
182
        chain_ctx = parent_context.get('chain') or {}
183
184
        for attr in ['params', 'parameters']:
185
            chain_params_ctx = chain_ctx.get(attr) or {}
186
187
            for k, v in six.iteritems(chain_params_ctx):
188
                parent_context['chain'][attr][k] = jinja.convert_jinja_to_raw_block(v)
189
190
        st2_execution_context = {
191
            'api_url': api_url,
192
            'endpoint': endpoint,
193
            'parent': parent_context,
194
            'notify': {},
195
            'skip_notify_tasks': self._skip_notify_tasks
196
        }
197
198
        # Include notification information
199
        if self._notify:
200
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
201
            st2_execution_context['notify'] = notify_dict
202
203
        if self.auth_token:
204
            st2_execution_context['auth_token'] = self.auth_token.token
205
206
        options = {
207
            'env': {
208
                'st2_execution_id': self.execution_id,
209
                'st2_liveaction_id': self.liveaction_id,
210
                'st2_action_api_url': public_api_url,
211
                '__actions': {
212
                    'st2.action': {
213
                        'st2_context': st2_execution_context
214
                    }
215
                }
216
            }
217
        }
218
219
        if not self.is_polling_enabled():
220
            options['notify'] = [{'type': 'st2'}]
221
222
        return options
223
224
    def _get_resume_options(self):
225
        return self.context.get('re-run', {})
226
227
    @retrying.retry(
228
        retry_on_exception=utils.retry_on_exceptions,
229
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
230
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
231
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
232
    def run(self, action_parameters):
233
        resume_options = self._get_resume_options()
234
235
        tasks_to_reset = resume_options.get('reset', [])
236
237
        task_specs = {
238
            task_name: {'reset': task_name in tasks_to_reset}
239
            for task_name in resume_options.get('tasks', [])
240
        }
241
242
        resume = self.rerun_ex_ref and task_specs
243
244
        if resume:
245
            result = self.resume_workflow(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
246
        else:
247
            result = self.start_workflow(action_parameters=action_parameters)
248
249
        return result
250
251
    def start_workflow(self, action_parameters):
252
        # Test connection
253
        self._client.workflows.list()
254
255
        # Setup inputs for the workflow execution.
256
        inputs = self.runner_parameters.get('context', dict())
257
        inputs.update(action_parameters)
258
259
        # Get workbook/workflow definition from file.
260
        def_yaml = self.get_workflow_definition(self.entry_point)
261
        def_dict = yaml.safe_load(def_yaml)
262
        is_workbook = ('workflows' in def_dict)
263
264
        if not is_workbook:
265
            # Non-workbook definition containing multiple workflows is not supported.
266
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
267
                raise Exception('Workflow (not workbook) definition is detected. '
268
                                'Multiple workflows is not supported.')
269
270
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
271
        self._check_name(action_ref, is_workbook, def_dict)
272
        def_dict_xformed = utils.transform_definition(def_dict)
273
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
274
275
        # Construct additional options for the workflow execution
276
        options = self._construct_workflow_execution_options()
277
278
        # Save workbook/workflow definition.
279
        if is_workbook:
280
            self._save_workbook(action_ref, def_yaml_xformed)
281
            default_workflow = self._find_default_workflow(def_dict_xformed)
282
            execution = self._client.executions.create(default_workflow,
283
                                                       workflow_input=inputs,
284
                                                       **options)
285
        else:
286
            self._save_workflow(action_ref, def_yaml_xformed)
287
            execution = self._client.executions.create(action_ref,
288
                                                       workflow_input=inputs,
289
                                                       **options)
290
291
        status = action_constants.LIVEACTION_STATUS_RUNNING
292
        partial_results = {'tasks': []}
293
294
        # pylint: disable=no-member
295
        current_context = {
296
            'execution_id': str(execution.id),
297
            'workflow_name': execution.workflow_name
298
        }
299
300
        exec_context = self.context
301
        exec_context = self._build_mistral_context(exec_context, current_context)
302
        LOG.info('Mistral query context is %s' % exec_context)
303
304
        return (status, partial_results, exec_context)
305
306
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
307
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
308
309
        if '.' in task_name:
310
            dot_pos = task_name.index('.')
311
            parent_task_name = task_name[:dot_pos]
312
            task_name = task_name[dot_pos + 1:]
313
314
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
315
316
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
317
                               if (getattr(wf_ex, 'task_execution_id', None) and
318
                                   wf_ex.task_execution_id in parent_task_ids)]
319
320
            tasks = {}
321
322
            for sub_wf_ex_id in workflow_ex_ids:
323
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
324
325
            return tasks
326
327
        # pylint: disable=no-member
328
        tasks = {
329
            full_task_name: task.to_dict()
330
            for task in task_exs
331
            if task.name == task_name and task.state == 'ERROR'
332
        }
333
334
        return tasks
335
336
    def _update_workflow_env(self, wf_ex_id, env):
337
        wf_ex = self._client.executions.update(str(wf_ex_id), None, env=env)
338
339
        for task_ex in self._client.tasks.list(workflow_execution_id=wf_ex.id):
340
            for subwf_ex in self._client.executions.list(task_execution_id=task_ex.id):
341
                self._update_workflow_env(subwf_ex.id, env)
342
343
    def resume_workflow(self, ex_ref, task_specs):
344
        mistral_ctx = ex_ref.context.get('mistral', dict())
345
346
        if not mistral_ctx.get('execution_id'):
347
            raise Exception('Unable to rerun because mistral execution_id is missing.')
348
349
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
350
351
        # pylint: disable=no-member
352
        if execution.state not in ['ERROR']:
353
            raise Exception('Workflow execution is not in a rerunable state.')
354
355
        executions = self._client.executions.list()
356
357
        tasks = {}
358
359
        for task_name, task_spec in six.iteritems(task_specs):
360
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
361
362
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
363
        if missing_tasks:
364
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
365
                            'rerunable tasks: %s. Please make sure that the task name is correct '
366
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
367
368
        # Construct additional options for the workflow execution
369
        options = self._construct_workflow_execution_options()
370
371
        # Update workflow env recursively.
372
        self._update_workflow_env(execution.id, options.get('env', None))
373
374
        # Re-run task list.
375
        for task_name, task_obj in six.iteritems(tasks):
376
            # pylint: disable=unexpected-keyword-arg
377
            self._client.tasks.rerun(
378
                task_obj['id'],
379
                reset=task_specs[task_name].get('reset', False),
380
                env=options.get('env', None)
381
            )
382
383
        status = action_constants.LIVEACTION_STATUS_RUNNING
384
        partial_results = {'tasks': []}
385
386
        # pylint: disable=no-member
387
        current_context = {
388
            'execution_id': str(execution.id),
389
            'workflow_name': execution.workflow_name
390
        }
391
392
        exec_context = self.context
393
        exec_context = self._build_mistral_context(exec_context, current_context)
394
        LOG.info('Mistral query context is %s' % exec_context)
395
396
        return (status, partial_results, exec_context)
397
398
    @retrying.retry(
399
        retry_on_exception=utils.retry_on_exceptions,
400
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
401
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
402
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
403
    def pause(self):
404
        mistral_ctx = self.context.get('mistral', dict())
405
406
        if not mistral_ctx.get('execution_id'):
407
            raise Exception('Unable to pause because mistral execution_id is missing.')
408
409
        # Pause the main workflow execution. Any non-workflow tasks that are still
410
        # running will be allowed to complete gracefully.
411
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
412
413
        # If workflow is executed under another parent workflow, pause the corresponding
414
        # action execution for the task in the parent workflow.
415
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
416
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
417
            self._client.action_executions.update(mistral_action_ex_id, 'PAUSED')
418
419
        # Identify the list of action executions that are workflows and cascade pause.
420
        for child_exec_id in self.execution.children:
421
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
422
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
423
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
424
                action_service.request_pause(
425
                    LiveAction.get(id=child_exec.liveaction['id']),
426
                    self.context.get('user', None)
427
                )
428
429
        status = (
430
            action_constants.LIVEACTION_STATUS_PAUSING
431
            if action_service.is_children_active(self.liveaction.id)
432
            else action_constants.LIVEACTION_STATUS_PAUSED
433
        )
434
435
        return (
436
            status,
437
            self.liveaction.result,
438
            self.liveaction.context
439
        )
440
441
    @retrying.retry(
442
        retry_on_exception=utils.retry_on_exceptions,
443
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
444
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
445
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
446
    def resume(self):
447
        mistral_ctx = self.context.get('mistral', dict())
448
449
        if not mistral_ctx.get('execution_id'):
450
            raise Exception('Unable to resume because mistral execution_id is missing.')
451
452
        # If workflow is executed under another parent workflow, resume the corresponding
453
        # action execution for the task in the parent workflow.
454
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
455
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
456
            self._client.action_executions.update(mistral_action_ex_id, 'RUNNING')
457
458
        # Pause the main workflow execution. Any non-workflow tasks that are still
459
        # running will be allowed to complete gracefully.
460
        self._client.executions.update(mistral_ctx.get('execution_id'), 'RUNNING')
461
462
        # Identify the list of action executions that are workflows and cascade resume.
463
        for child_exec_id in self.execution.children:
464
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
465
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
466
                    child_exec.status == action_constants.LIVEACTION_STATUS_PAUSED):
467
                action_service.request_resume(
468
                    LiveAction.get(id=child_exec.liveaction['id']),
469
                    self.context.get('user', None)
470
                )
471
472
        return (
473
            action_constants.LIVEACTION_STATUS_RUNNING,
474
            self.execution.result,
475
            self.execution.context
476
        )
477
478
    @retrying.retry(
479
        retry_on_exception=utils.retry_on_exceptions,
480
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
481
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
482
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
483
    def cancel(self):
484
        mistral_ctx = self.context.get('mistral', dict())
485
486
        if not mistral_ctx.get('execution_id'):
487
            raise Exception('Unable to cancel because mistral execution_id is missing.')
488
489
        # Cancels the main workflow execution. Any non-workflow tasks that are still
490
        # running will be allowed to complete gracefully.
491
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
492
493
        # If workflow is executed under another parent workflow, cancel the corresponding
494
        # action execution for the task in the parent workflow.
495
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
496
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
497
            self._client.action_executions.update(mistral_action_ex_id, 'CANCELLED')
498
499
        # Identify the list of action executions that are workflows and still running.
500
        for child_exec_id in self.execution.children:
501
            child_exec = ActionExecution.get(id=child_exec_id)
502
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
503
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
504
                action_service.request_cancellation(
505
                    LiveAction.get(id=child_exec.liveaction['id']),
506
                    self.context.get('user', None)
507
                )
508
509
        status = (
510
            action_constants.LIVEACTION_STATUS_CANCELING
511
            if action_service.is_children_active(self.liveaction.id)
512
            else action_constants.LIVEACTION_STATUS_CANCELED
513
        )
514
515
        return (
516
            status,
517
            self.liveaction.result,
518
            self.liveaction.context
519
        )
520
521
    @staticmethod
522
    def _build_mistral_context(parent, current):
523
        """
524
        Mistral workflow might be kicked off in st2 by a parent Mistral
525
        workflow. In that case, we need to make sure that the existing
526
        mistral 'context' is moved as 'parent' and the child workflow
527
        'context' is added.
528
        """
529
        parent = copy.deepcopy(parent)
530
        context = dict()
531
532
        if not parent:
533
            context['mistral'] = current
534
        else:
535
            if 'mistral' in list(parent.keys()):
536
                orig_parent_context = parent.get('mistral', dict())
537
                actual_parent = dict()
538
                if 'workflow_name' in list(orig_parent_context.keys()):
539
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
540
                    del orig_parent_context['workflow_name']
541
                if 'workflow_execution_id' in list(orig_parent_context.keys()):
542
                    actual_parent['workflow_execution_id'] = \
543
                        orig_parent_context['workflow_execution_id']
544
                    del orig_parent_context['workflow_execution_id']
545
                context['mistral'] = orig_parent_context
546
                context['mistral'].update(current)
547
                context['mistral']['parent'] = actual_parent
548
            else:
549
                context['mistral'] = current
550
551
        return context
552
553
554
def get_runner():
555
    return MistralRunner(str(uuid.uuid4()))
556
557
558
def get_metadata():
559
    return get_runner_metadata('mistral_v2')
560