Passed
Pull Request — master (#3753)
by W
09:44
created

MistralRunner.run()   A

Complexity

Conditions 3

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 23
rs 9.0856
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants import action as action_constants
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.services import action as action_service
32
from st2common.util import jinja
33
from st2common.util.workflow import mistral as utils
34
from st2common.util.url import get_url_without_trailing_slash
35
from st2common.util.api import get_full_public_api_url
36
from st2common.util.api import get_mistral_api_url
37
38
39
LOG = logging.getLogger(__name__)
40
41
42
def get_runner():
43
    return MistralRunner(str(uuid.uuid4()))
44
45
46
class MistralRunner(AsyncActionRunner):
47
48
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
49
50
    def __init__(self, runner_id):
51
        super(MistralRunner, self).__init__(runner_id=runner_id)
52
        self._on_behalf_user = cfg.CONF.system_user.user
53
        self._notify = None
54
        self._skip_notify_tasks = []
55
        self._client = mistral.client(
56
            mistral_url=self.url,
57
            username=cfg.CONF.mistral.keystone_username,
58
            api_key=cfg.CONF.mistral.keystone_password,
59
            project_name=cfg.CONF.mistral.keystone_project_name,
60
            auth_url=cfg.CONF.mistral.keystone_auth_url,
61
            cacert=cfg.CONF.mistral.cacert,
62
            insecure=cfg.CONF.mistral.insecure)
63
64
    @staticmethod
65
    def get_workflow_definition(entry_point):
66
        with open(entry_point, 'r') as def_file:
67
            return def_file.read()
68
69
    def pre_run(self):
70
        super(MistralRunner, self).pre_run()
71
72
        if getattr(self, 'liveaction', None):
73
            self._notify = getattr(self.liveaction, 'notify', None)
74
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
75
76
    @staticmethod
77
    def _check_name(action_ref, is_workbook, def_dict):
78
        # If workbook, change the value of the "name" key.
79
        if is_workbook:
80
            if def_dict.get('name') != action_ref:
81
                raise Exception('Name of the workbook must be the same as the '
82
                                'fully qualified action name "%s".' % action_ref)
83
        # If workflow, change the key name of the workflow.
84
        else:
85
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
86
            if workflow_name != action_ref:
87
                raise Exception('Name of the workflow must be the same as the '
88
                                'fully qualified action name "%s".' % action_ref)
89
90
    def _save_workbook(self, name, def_yaml):
91
        # If the workbook is not found, the mistral client throws a generic API exception.
92
        try:
93
            # Update existing workbook.
94
            wb = self._client.workbooks.get(name)
95
        except:
96
            # Delete if definition was previously a workflow.
97
            # If not found, an API exception is thrown.
98
            try:
99
                self._client.workflows.delete(name)
100
            except:
101
                pass
102
103
            # Create the new workbook.
104
            wb = self._client.workbooks.create(def_yaml)
105
106
        # Update the workbook definition.
107
        # pylint: disable=no-member
108
        if wb.definition != def_yaml:
109
            self._client.workbooks.update(def_yaml)
110
111
    def _save_workflow(self, name, def_yaml):
112
        # If the workflow is not found, the mistral client throws a generic API exception.
113
        try:
114
            # Update existing workbook.
115
            wf = self._client.workflows.get(name)
116
        except:
117
            # Delete if definition was previously a workbook.
118
            # If not found, an API exception is thrown.
119
            try:
120
                self._client.workbooks.delete(name)
121
            except:
122
                pass
123
124
            # Create the new workflow.
125
            wf = self._client.workflows.create(def_yaml)[0]
126
127
        # Update the workflow definition.
128
        # pylint: disable=no-member
129
        if wf.definition != def_yaml:
130
            self._client.workflows.update(def_yaml)
131
132
    def _find_default_workflow(self, def_dict):
133
        num_workflows = len(def_dict['workflows'].keys())
134
135
        if num_workflows > 1:
136
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
137
            if not fully_qualified_wf_name:
138
                raise ValueError('Workbook definition is detected. '
139
                                 'Default workflow cannot be determined.')
140
141
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
142
            if wf_name not in def_dict['workflows']:
143
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
144
                                 % fully_qualified_wf_name)
145
146
            return fully_qualified_wf_name
147
        elif num_workflows == 1:
148
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
149
        else:
150
            raise Exception('There are no workflows in the workbook.')
151
152
    def _construct_workflow_execution_options(self):
153
        # This URL is used by Mistral to talk back to the API
154
        api_url = get_mistral_api_url()
155
        endpoint = api_url + '/actionexecutions'
156
157
        # This URL is available in the context and can be used by the users inside a workflow,
158
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
159
        public_api_url = get_full_public_api_url()
160
161
        # Build context with additional information
162
        parent_context = {
163
            'execution_id': self.execution_id
164
        }
165
166
        if getattr(self.liveaction, 'context', None):
167
            parent_context.update(self.liveaction.context)
168
169
        # Convert jinja expressions in the params of Action Chain under the parent context
170
        # into raw block. If there is any jinja expressions, Mistral will try to evaulate
171
        # the expression. If there is a local context reference, the evaluation will fail
172
        # because the local context reference is out of scope.
173
        chain_ctx = parent_context.get('chain') or {}
174
175
        for attr in ['params', 'parameters']:
176
            chain_params_ctx = chain_ctx.get(attr) or {}
177
178
            for k, v in six.iteritems(chain_params_ctx):
179
                parent_context['chain'][attr][k] = jinja.convert_jinja_to_raw_block(v)
180
181
        st2_execution_context = {
182
            'api_url': api_url,
183
            'endpoint': endpoint,
184
            'parent': parent_context,
185
            'notify': {},
186
            'skip_notify_tasks': self._skip_notify_tasks
187
        }
188
189
        # Include notification information
190
        if self._notify:
191
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
192
            st2_execution_context['notify'] = notify_dict
193
194
        if self.auth_token:
195
            st2_execution_context['auth_token'] = self.auth_token.token
196
197
        options = {
198
            'env': {
199
                'st2_execution_id': self.execution_id,
200
                'st2_liveaction_id': self.liveaction_id,
201
                'st2_action_api_url': public_api_url,
202
                '__actions': {
203
                    'st2.action': {
204
                        'st2_context': st2_execution_context
205
                    }
206
                }
207
            }
208
        }
209
210
        return options
211
212
    def _get_resume_options(self):
213
        return self.context.get('re-run', {})
214
215
    @retrying.retry(
216
        retry_on_exception=utils.retry_on_exceptions,
217
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
218
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
219
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
220
    def run(self, action_parameters):
221
        resume_options = self._get_resume_options()
222
223
        tasks_to_reset = resume_options.get('reset', [])
224
225
        task_specs = {
226
            task_name: {'reset': task_name in tasks_to_reset}
227
            for task_name in resume_options.get('tasks', [])
228
        }
229
230
        resume = self.rerun_ex_ref and task_specs
231
232
        if resume:
233
            result = self.resume_workflow(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
234
        else:
235
            result = self.start_workflow(action_parameters=action_parameters)
236
237
        return result
238
239
    def start_workflow(self, action_parameters):
240
        # Test connection
241
        self._client.workflows.list()
242
243
        # Setup inputs for the workflow execution.
244
        inputs = self.runner_parameters.get('context', dict())
245
        inputs.update(action_parameters)
246
247
        # Get workbook/workflow definition from file.
248
        def_yaml = self.get_workflow_definition(self.entry_point)
249
        def_dict = yaml.safe_load(def_yaml)
250
        is_workbook = ('workflows' in def_dict)
251
252
        if not is_workbook:
253
            # Non-workbook definition containing multiple workflows is not supported.
254
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
255
                raise Exception('Workflow (not workbook) definition is detected. '
256
                                'Multiple workflows is not supported.')
257
258
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
259
        self._check_name(action_ref, is_workbook, def_dict)
260
        def_dict_xformed = utils.transform_definition(def_dict)
261
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
262
263
        # Construct additional options for the workflow execution
264
        options = self._construct_workflow_execution_options()
265
266
        # Save workbook/workflow definition.
267
        if is_workbook:
268
            self._save_workbook(action_ref, def_yaml_xformed)
269
            default_workflow = self._find_default_workflow(def_dict_xformed)
270
            execution = self._client.executions.create(default_workflow,
271
                                                       workflow_input=inputs,
272
                                                       **options)
273
        else:
274
            self._save_workflow(action_ref, def_yaml_xformed)
275
            execution = self._client.executions.create(action_ref,
276
                                                       workflow_input=inputs,
277
                                                       **options)
278
279
        status = action_constants.LIVEACTION_STATUS_RUNNING
280
        partial_results = {'tasks': []}
281
282
        # pylint: disable=no-member
283
        current_context = {
284
            'execution_id': str(execution.id),
285
            'workflow_name': execution.workflow_name
286
        }
287
288
        exec_context = self.context
289
        exec_context = self._build_mistral_context(exec_context, current_context)
290
        LOG.info('Mistral query context is %s' % exec_context)
291
292
        return (status, partial_results, exec_context)
293
294
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
295
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
296
297
        if '.' in task_name:
298
            dot_pos = task_name.index('.')
299
            parent_task_name = task_name[:dot_pos]
300
            task_name = task_name[dot_pos + 1:]
301
302
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
303
304
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
305
                               if (getattr(wf_ex, 'task_execution_id', None) and
306
                                   wf_ex.task_execution_id in parent_task_ids)]
307
308
            tasks = {}
309
310
            for sub_wf_ex_id in workflow_ex_ids:
311
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
312
313
            return tasks
314
315
        # pylint: disable=no-member
316
        tasks = {
317
            full_task_name: task.to_dict()
318
            for task in task_exs
319
            if task.name == task_name and task.state == 'ERROR'
320
        }
321
322
        return tasks
323
324
    def resume_workflow(self, ex_ref, task_specs):
325
        mistral_ctx = ex_ref.context.get('mistral', dict())
326
327
        if not mistral_ctx.get('execution_id'):
328
            raise Exception('Unable to rerun because mistral execution_id is missing.')
329
330
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
331
332
        # pylint: disable=no-member
333
        if execution.state not in ['ERROR']:
334
            raise Exception('Workflow execution is not in a rerunable state.')
335
336
        executions = self._client.executions.list()
337
338
        tasks = {}
339
340
        for task_name, task_spec in six.iteritems(task_specs):
341
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
342
343
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
344
        if missing_tasks:
345
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
346
                            'rerunable tasks: %s. Please make sure that the task name is correct '
347
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
348
349
        # Construct additional options for the workflow execution
350
        options = self._construct_workflow_execution_options()
351
352
        for task_name, task_obj in six.iteritems(tasks):
353
            # pylint: disable=unexpected-keyword-arg
354
            self._client.tasks.rerun(
355
                task_obj['id'],
356
                reset=task_specs[task_name].get('reset', False),
357
                env=options.get('env', None)
358
            )
359
360
        status = action_constants.LIVEACTION_STATUS_RUNNING
361
        partial_results = {'tasks': []}
362
363
        # pylint: disable=no-member
364
        current_context = {
365
            'execution_id': str(execution.id),
366
            'workflow_name': execution.workflow_name
367
        }
368
369
        exec_context = self.context
370
        exec_context = self._build_mistral_context(exec_context, current_context)
371
        LOG.info('Mistral query context is %s' % exec_context)
372
373
        return (status, partial_results, exec_context)
374
375
    @retrying.retry(
376
        retry_on_exception=utils.retry_on_exceptions,
377
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
378
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
379
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
380
    def pause(self):
381
        mistral_ctx = self.context.get('mistral', dict())
382
383
        if not mistral_ctx.get('execution_id'):
384
            raise Exception('Unable to pause because mistral execution_id is missing.')
385
386
        # Pause the main workflow execution. Any non-workflow tasks that are still
387
        # running will be allowed to complete gracefully.
388
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
389
390
        # If workflow is executed under another parent workflow, pause the corresponding
391
        # action execution for the task in the parent workflow.
392
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
393
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
394
            self._client.action_executions.update(mistral_action_ex_id, 'PAUSED')
395
396
        # Identify the list of action executions that are workflows and cascade pause.
397
        for child_exec_id in self.execution.children:
398
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
399
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
400
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
401
                action_service.request_pause(
402
                    LiveAction.get(id=child_exec.liveaction['id']),
403
                    self.context.get('user', None)
404
                )
405
406
        return (
407
            action_constants.LIVEACTION_STATUS_PAUSING,
408
            self.liveaction.result,
409
            self.liveaction.context
410
        )
411
412
    @retrying.retry(
413
        retry_on_exception=utils.retry_on_exceptions,
414
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
415
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
416
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
417
    def resume(self):
418
        mistral_ctx = self.context.get('mistral', dict())
419
420
        if not mistral_ctx.get('execution_id'):
421
            raise Exception('Unable to resume because mistral execution_id is missing.')
422
423
        # If workflow is executed under another parent workflow, resume the corresponding
424
        # action execution for the task in the parent workflow.
425
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
426
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
427
            self._client.action_executions.update(mistral_action_ex_id, 'RUNNING')
428
429
        # Pause the main workflow execution. Any non-workflow tasks that are still
430
        # running will be allowed to complete gracefully.
431
        self._client.executions.update(mistral_ctx.get('execution_id'), 'RUNNING')
432
433
        # Identify the list of action executions that are workflows and cascade resume.
434
        for child_exec_id in self.execution.children:
435
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
436
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
437
                    child_exec.status == action_constants.LIVEACTION_STATUS_PAUSED):
438
                action_service.request_resume(
439
                    LiveAction.get(id=child_exec.liveaction['id']),
440
                    self.context.get('user', None)
441
                )
442
443
        return (
444
            action_constants.LIVEACTION_STATUS_RUNNING,
445
            self.execution.result,
446
            self.execution.context
447
        )
448
449
    @retrying.retry(
450
        retry_on_exception=utils.retry_on_exceptions,
451
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
452
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
453
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
454
    def cancel(self):
455
        mistral_ctx = self.context.get('mistral', dict())
456
457
        if not mistral_ctx.get('execution_id'):
458
            raise Exception('Unable to cancel because mistral execution_id is missing.')
459
460
        # Cancels the main workflow execution. Any non-workflow tasks that are still
461
        # running will be allowed to complete gracefully.
462
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
463
464
        # If workflow is executed under another parent workflow, cancel the corresponding
465
        # action execution for the task in the parent workflow.
466
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
467
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
468
            self._client.action_executions.update(mistral_action_ex_id, 'CANCELLED')
469
470
        # Identify the list of action executions that are workflows and still running.
471
        for child_exec_id in self.execution.children:
472
            child_exec = ActionExecution.get(id=child_exec_id)
473
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
474
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
475
                action_service.request_cancellation(
476
                    LiveAction.get(id=child_exec.liveaction['id']),
477
                    self.context.get('user', None)
478
                )
479
480
        return (
481
            action_constants.LIVEACTION_STATUS_CANCELING,
482
            self.liveaction.result,
483
            self.liveaction.context
484
        )
485
486
    @staticmethod
487
    def _build_mistral_context(parent, current):
488
        """
489
        Mistral workflow might be kicked off in st2 by a parent Mistral
490
        workflow. In that case, we need to make sure that the existing
491
        mistral 'context' is moved as 'parent' and the child workflow
492
        'context' is added.
493
        """
494
        parent = copy.deepcopy(parent)
495
        context = dict()
496
497
        if not parent:
498
            context['mistral'] = current
499
        else:
500
            if 'mistral' in parent.keys():
501
                orig_parent_context = parent.get('mistral', dict())
502
                actual_parent = dict()
503
                if 'workflow_name' in orig_parent_context.keys():
504
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
505
                    del orig_parent_context['workflow_name']
506
                if 'workflow_execution_id' in orig_parent_context.keys():
507
                    actual_parent['workflow_execution_id'] = \
508
                        orig_parent_context['workflow_execution_id']
509
                    del orig_parent_context['workflow_execution_id']
510
                context['mistral'] = orig_parent_context
511
                context['mistral'].update(current)
512
                context['mistral']['parent'] = actual_parent
513
            else:
514
                context['mistral'] = current
515
516
        return context
517