Passed
Pull Request — master (#4000)
by W
05:55
created

MistralRunner._save_workflow()   A

Complexity

Conditions 4

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 20
rs 9.2
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import uuid
19
20
import retrying
21
import six
22
import yaml
23
from mistralclient.api import client as mistral
24
from oslo_config import cfg
25
26
from st2common.runners.base import AsyncActionRunner
27
from st2common.runners.base import get_metadata as get_runner_metadata
28
from st2common.constants import action as action_constants
29
from st2common import log as logging
30
from st2common.models.api.notification import NotificationsHelper
31
from st2common.persistence.execution import ActionExecution
32
from st2common.persistence.liveaction import LiveAction
33
from st2common.services import action as action_service
34
from st2common.util import jinja
35
from st2common.util.workflow import mistral as utils
36
from st2common.util.url import get_url_without_trailing_slash
37
from st2common.util.api import get_full_public_api_url
38
from st2common.util.api import get_mistral_api_url
39
40
__all__ = [
41
    'MistralRunner',
42
43
    'get_runner',
44
    'get_metadata'
45
]
46
47
48
LOG = logging.getLogger(__name__)
49
50
51
class MistralRunner(AsyncActionRunner):
52
53
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
54
55
    def __init__(self, runner_id):
56
        super(MistralRunner, self).__init__(runner_id=runner_id)
57
        self._on_behalf_user = cfg.CONF.system_user.user
58
        self._notify = None
59
        self._skip_notify_tasks = []
60
        self._client = mistral.client(
61
            mistral_url=self.url,
62
            username=cfg.CONF.mistral.keystone_username,
63
            api_key=cfg.CONF.mistral.keystone_password,
64
            project_name=cfg.CONF.mistral.keystone_project_name,
65
            auth_url=cfg.CONF.mistral.keystone_auth_url,
66
            cacert=cfg.CONF.mistral.cacert,
67
            insecure=cfg.CONF.mistral.insecure)
68
69
    @staticmethod
70
    def get_workflow_definition(entry_point):
71
        with open(entry_point, 'r') as def_file:
72
            return def_file.read()
73
74
    def pre_run(self):
75
        super(MistralRunner, self).pre_run()
76
77
        if getattr(self, 'liveaction', None):
78
            self._notify = getattr(self.liveaction, 'notify', None)
79
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
80
81
    @staticmethod
82
    def _check_name(action_ref, is_workbook, def_dict):
83
        # If workbook, change the value of the "name" key.
84
        if is_workbook:
85
            if def_dict.get('name') != action_ref:
86
                raise Exception('Name of the workbook must be the same as the '
87
                                'fully qualified action name "%s".' % action_ref)
88
        # If workflow, change the key name of the workflow.
89
        else:
90
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
91
            if workflow_name != action_ref:
92
                raise Exception('Name of the workflow must be the same as the '
93
                                'fully qualified action name "%s".' % action_ref)
94
95
    def _save_workbook(self, name, def_yaml):
96
        # If the workbook is not found, the mistral client throws a generic API exception.
97
        try:
98
            # Update existing workbook.
99
            wb = self._client.workbooks.get(name)
100
        except:
101
            # Delete if definition was previously a workflow.
102
            # If not found, an API exception is thrown.
103
            try:
104
                self._client.workflows.delete(name)
105
            except:
106
                pass
107
108
            # Create the new workbook.
109
            wb = self._client.workbooks.create(def_yaml)
110
111
        # Update the workbook definition.
112
        # pylint: disable=no-member
113
        if wb.definition != def_yaml:
114
            self._client.workbooks.update(def_yaml)
115
116
    def _save_workflow(self, name, def_yaml):
117
        # If the workflow is not found, the mistral client throws a generic API exception.
118
        try:
119
            # Update existing workbook.
120
            wf = self._client.workflows.get(name)
121
        except:
122
            # Delete if definition was previously a workbook.
123
            # If not found, an API exception is thrown.
124
            try:
125
                self._client.workbooks.delete(name)
126
            except:
127
                pass
128
129
            # Create the new workflow.
130
            wf = self._client.workflows.create(def_yaml)[0]
131
132
        # Update the workflow definition.
133
        # pylint: disable=no-member
134
        if wf.definition != def_yaml:
135
            self._client.workflows.update(def_yaml)
136
137
    def _find_default_workflow(self, def_dict):
138
        num_workflows = len(list(def_dict['workflows'].keys()))
139
140
        if num_workflows > 1:
141
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
142
            if not fully_qualified_wf_name:
143
                raise ValueError('Workbook definition is detected. '
144
                                 'Default workflow cannot be determined.')
145
146
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
147
            if wf_name not in def_dict['workflows']:
148
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
149
                                 % fully_qualified_wf_name)
150
151
            return fully_qualified_wf_name
152
        elif num_workflows == 1:
153
            return '%s.%s' % (def_dict['name'], list(def_dict['workflows'].keys())[0])
154
        else:
155
            raise Exception('There are no workflows in the workbook.')
156
157
    def _construct_workflow_execution_options(self):
158
        # This URL is used by Mistral to talk back to the API
159
        api_url = get_mistral_api_url()
160
        endpoint = api_url + '/actionexecutions'
161
162
        # This URL is available in the context and can be used by the users inside a workflow,
163
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
164
        public_api_url = get_full_public_api_url()
165
166
        # Build context with additional information
167
        parent_context = {
168
            'execution_id': self.execution_id
169
        }
170
171
        if getattr(self.liveaction, 'context', None):
172
            parent_context.update(self.liveaction.context)
173
174
        # Convert jinja expressions in the params of Action Chain under the parent context
175
        # into raw block. If there is any jinja expressions, Mistral will try to evaulate
176
        # the expression. If there is a local context reference, the evaluation will fail
177
        # because the local context reference is out of scope.
178
        chain_ctx = parent_context.get('chain') or {}
179
180
        for attr in ['params', 'parameters']:
181
            chain_params_ctx = chain_ctx.get(attr) or {}
182
183
            for k, v in six.iteritems(chain_params_ctx):
184
                parent_context['chain'][attr][k] = jinja.convert_jinja_to_raw_block(v)
185
186
        st2_execution_context = {
187
            'api_url': api_url,
188
            'endpoint': endpoint,
189
            'parent': parent_context,
190
            'notify': {},
191
            'skip_notify_tasks': self._skip_notify_tasks
192
        }
193
194
        # Include notification information
195
        if self._notify:
196
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
197
            st2_execution_context['notify'] = notify_dict
198
199
        if self.auth_token:
200
            st2_execution_context['auth_token'] = self.auth_token.token
201
202
        options = {
203
            'env': {
204
                'st2_execution_id': self.execution_id,
205
                'st2_liveaction_id': self.liveaction_id,
206
                'st2_action_api_url': public_api_url,
207
                '__actions': {
208
                    'st2.action': {
209
                        'st2_context': st2_execution_context
210
                    }
211
                }
212
            },
213
            'notify': [
214
                {
215
                    'type': 'st2',
216
                }
217
            ]
218
        }
219
220
        return options
221
222
    def _get_resume_options(self):
223
        return self.context.get('re-run', {})
224
225
    @retrying.retry(
226
        retry_on_exception=utils.retry_on_exceptions,
227
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
228
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
229
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
230
    def run(self, action_parameters):
231
        resume_options = self._get_resume_options()
232
233
        tasks_to_reset = resume_options.get('reset', [])
234
235
        task_specs = {
236
            task_name: {'reset': task_name in tasks_to_reset}
237
            for task_name in resume_options.get('tasks', [])
238
        }
239
240
        resume = self.rerun_ex_ref and task_specs
241
242
        if resume:
243
            result = self.resume_workflow(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
244
        else:
245
            result = self.start_workflow(action_parameters=action_parameters)
246
247
        return result
248
249
    def start_workflow(self, action_parameters):
250
        # Test connection
251
        self._client.workflows.list()
252
253
        # Setup inputs for the workflow execution.
254
        inputs = self.runner_parameters.get('context', dict())
255
        inputs.update(action_parameters)
256
257
        # Get workbook/workflow definition from file.
258
        def_yaml = self.get_workflow_definition(self.entry_point)
259
        def_dict = yaml.safe_load(def_yaml)
260
        is_workbook = ('workflows' in def_dict)
261
262
        if not is_workbook:
263
            # Non-workbook definition containing multiple workflows is not supported.
264
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
265
                raise Exception('Workflow (not workbook) definition is detected. '
266
                                'Multiple workflows is not supported.')
267
268
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
269
        self._check_name(action_ref, is_workbook, def_dict)
270
        def_dict_xformed = utils.transform_definition(def_dict)
271
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
272
273
        # Construct additional options for the workflow execution
274
        options = self._construct_workflow_execution_options()
275
276
        # Save workbook/workflow definition.
277
        if is_workbook:
278
            self._save_workbook(action_ref, def_yaml_xformed)
279
            default_workflow = self._find_default_workflow(def_dict_xformed)
280
            execution = self._client.executions.create(default_workflow,
281
                                                       workflow_input=inputs,
282
                                                       **options)
283
        else:
284
            self._save_workflow(action_ref, def_yaml_xformed)
285
            execution = self._client.executions.create(action_ref,
286
                                                       workflow_input=inputs,
287
                                                       **options)
288
289
        status = action_constants.LIVEACTION_STATUS_RUNNING
290
        partial_results = {'tasks': []}
291
292
        # pylint: disable=no-member
293
        current_context = {
294
            'execution_id': str(execution.id),
295
            'workflow_name': execution.workflow_name
296
        }
297
298
        exec_context = self.context
299
        exec_context = self._build_mistral_context(exec_context, current_context)
300
        LOG.info('Mistral query context is %s' % exec_context)
301
302
        return (status, partial_results, exec_context)
303
304
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
305
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
306
307
        if '.' in task_name:
308
            dot_pos = task_name.index('.')
309
            parent_task_name = task_name[:dot_pos]
310
            task_name = task_name[dot_pos + 1:]
311
312
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
313
314
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
315
                               if (getattr(wf_ex, 'task_execution_id', None) and
316
                                   wf_ex.task_execution_id in parent_task_ids)]
317
318
            tasks = {}
319
320
            for sub_wf_ex_id in workflow_ex_ids:
321
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
322
323
            return tasks
324
325
        # pylint: disable=no-member
326
        tasks = {
327
            full_task_name: task.to_dict()
328
            for task in task_exs
329
            if task.name == task_name and task.state == 'ERROR'
330
        }
331
332
        return tasks
333
334
    def _update_workflow_env(self, wf_ex_id, env):
335
        wf_ex = self._client.executions.update(str(wf_ex_id), None, env=env)
336
337
        for task_ex in self._client.tasks.list(workflow_execution_id=wf_ex.id):
338
            for subwf_ex in self._client.executions.list(task_execution_id=task_ex.id):
339
                self._update_workflow_env(subwf_ex.id, env)
340
341
    def resume_workflow(self, ex_ref, task_specs):
342
        mistral_ctx = ex_ref.context.get('mistral', dict())
343
344
        if not mistral_ctx.get('execution_id'):
345
            raise Exception('Unable to rerun because mistral execution_id is missing.')
346
347
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
348
349
        # pylint: disable=no-member
350
        if execution.state not in ['ERROR']:
351
            raise Exception('Workflow execution is not in a rerunable state.')
352
353
        executions = self._client.executions.list()
354
355
        tasks = {}
356
357
        for task_name, task_spec in six.iteritems(task_specs):
358
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
359
360
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
361
        if missing_tasks:
362
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
363
                            'rerunable tasks: %s. Please make sure that the task name is correct '
364
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
365
366
        # Construct additional options for the workflow execution
367
        options = self._construct_workflow_execution_options()
368
369
        # Update workflow env recursively.
370
        self._update_workflow_env(execution.id, options.get('env', None))
371
372
        # Re-run task list.
373
        for task_name, task_obj in six.iteritems(tasks):
374
            # pylint: disable=unexpected-keyword-arg
375
            self._client.tasks.rerun(
376
                task_obj['id'],
377
                reset=task_specs[task_name].get('reset', False),
378
                env=options.get('env', None)
379
            )
380
381
        status = action_constants.LIVEACTION_STATUS_RUNNING
382
        partial_results = {'tasks': []}
383
384
        # pylint: disable=no-member
385
        current_context = {
386
            'execution_id': str(execution.id),
387
            'workflow_name': execution.workflow_name
388
        }
389
390
        exec_context = self.context
391
        exec_context = self._build_mistral_context(exec_context, current_context)
392
        LOG.info('Mistral query context is %s' % exec_context)
393
394
        return (status, partial_results, exec_context)
395
396
    @retrying.retry(
397
        retry_on_exception=utils.retry_on_exceptions,
398
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
399
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
400
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
401
    def pause(self):
402
        mistral_ctx = self.context.get('mistral', dict())
403
404
        if not mistral_ctx.get('execution_id'):
405
            raise Exception('Unable to pause because mistral execution_id is missing.')
406
407
        # Pause the main workflow execution. Any non-workflow tasks that are still
408
        # running will be allowed to complete gracefully.
409
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
410
411
        # If workflow is executed under another parent workflow, pause the corresponding
412
        # action execution for the task in the parent workflow.
413
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
414
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
415
            self._client.action_executions.update(mistral_action_ex_id, 'PAUSED')
416
417
        # Identify the list of action executions that are workflows and cascade pause.
418
        for child_exec_id in self.execution.children:
419
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
420
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
421
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
422
                action_service.request_pause(
423
                    LiveAction.get(id=child_exec.liveaction['id']),
424
                    self.context.get('user', None)
425
                )
426
427
        status = (
428
            action_constants.LIVEACTION_STATUS_PAUSING
429
            if action_service.is_children_active(self.liveaction.id)
430
            else action_constants.LIVEACTION_STATUS_PAUSED
431
        )
432
433
        return (
434
            status,
435
            self.liveaction.result,
436
            self.liveaction.context
437
        )
438
439
    @retrying.retry(
440
        retry_on_exception=utils.retry_on_exceptions,
441
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
442
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
443
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
444
    def resume(self):
445
        mistral_ctx = self.context.get('mistral', dict())
446
447
        if not mistral_ctx.get('execution_id'):
448
            raise Exception('Unable to resume because mistral execution_id is missing.')
449
450
        # If workflow is executed under another parent workflow, resume the corresponding
451
        # action execution for the task in the parent workflow.
452
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
453
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
454
            self._client.action_executions.update(mistral_action_ex_id, 'RUNNING')
455
456
        # Pause the main workflow execution. Any non-workflow tasks that are still
457
        # running will be allowed to complete gracefully.
458
        self._client.executions.update(mistral_ctx.get('execution_id'), 'RUNNING')
459
460
        # Identify the list of action executions that are workflows and cascade resume.
461
        for child_exec_id in self.execution.children:
462
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
463
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
464
                    child_exec.status == action_constants.LIVEACTION_STATUS_PAUSED):
465
                action_service.request_resume(
466
                    LiveAction.get(id=child_exec.liveaction['id']),
467
                    self.context.get('user', None)
468
                )
469
470
        return (
471
            action_constants.LIVEACTION_STATUS_RUNNING,
472
            self.execution.result,
473
            self.execution.context
474
        )
475
476
    @retrying.retry(
477
        retry_on_exception=utils.retry_on_exceptions,
478
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
479
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
480
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
481
    def cancel(self):
482
        mistral_ctx = self.context.get('mistral', dict())
483
484
        if not mistral_ctx.get('execution_id'):
485
            raise Exception('Unable to cancel because mistral execution_id is missing.')
486
487
        # Cancels the main workflow execution. Any non-workflow tasks that are still
488
        # running will be allowed to complete gracefully.
489
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
490
491
        # If workflow is executed under another parent workflow, cancel the corresponding
492
        # action execution for the task in the parent workflow.
493
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
494
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
495
            self._client.action_executions.update(mistral_action_ex_id, 'CANCELLED')
496
497
        # Identify the list of action executions that are workflows and still running.
498
        for child_exec_id in self.execution.children:
499
            child_exec = ActionExecution.get(id=child_exec_id)
500
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
501
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
502
                action_service.request_cancellation(
503
                    LiveAction.get(id=child_exec.liveaction['id']),
504
                    self.context.get('user', None)
505
                )
506
507
        status = (
508
            action_constants.LIVEACTION_STATUS_CANCELING
509
            if action_service.is_children_active(self.liveaction.id)
510
            else action_constants.LIVEACTION_STATUS_CANCELED
511
        )
512
513
        return (
514
            status,
515
            self.liveaction.result,
516
            self.liveaction.context
517
        )
518
519
    @staticmethod
520
    def _build_mistral_context(parent, current):
521
        """
522
        Mistral workflow might be kicked off in st2 by a parent Mistral
523
        workflow. In that case, we need to make sure that the existing
524
        mistral 'context' is moved as 'parent' and the child workflow
525
        'context' is added.
526
        """
527
        parent = copy.deepcopy(parent)
528
        context = dict()
529
530
        if not parent:
531
            context['mistral'] = current
532
        else:
533
            if 'mistral' in list(parent.keys()):
534
                orig_parent_context = parent.get('mistral', dict())
535
                actual_parent = dict()
536
                if 'workflow_name' in list(orig_parent_context.keys()):
537
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
538
                    del orig_parent_context['workflow_name']
539
                if 'workflow_execution_id' in list(orig_parent_context.keys()):
540
                    actual_parent['workflow_execution_id'] = \
541
                        orig_parent_context['workflow_execution_id']
542
                    del orig_parent_context['workflow_execution_id']
543
                context['mistral'] = orig_parent_context
544
                context['mistral'].update(current)
545
                context['mistral']['parent'] = actual_parent
546
            else:
547
                context['mistral'] = current
548
549
        return context
550
551
552
def get_runner():
553
    return MistralRunner(str(uuid.uuid4()))
554
555
556
def get_metadata():
557
    return get_runner_metadata('mistral_v2')
558