Passed
Pull Request — master (#3507)
by W
05:28
created

MistralRunner._save_workbook()   A

Complexity

Conditions 4

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 20
rs 9.2
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants import action as action_constants
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.services import action as action_service
32
from st2common.util.workflow import mistral as utils
33
from st2common.util.url import get_url_without_trailing_slash
34
from st2common.util.api import get_full_public_api_url
35
from st2common.util.api import get_mistral_api_url
36
37
38
LOG = logging.getLogger(__name__)
39
40
41
def get_runner():
42
    return MistralRunner(str(uuid.uuid4()))
43
44
45
class MistralRunner(AsyncActionRunner):
46
47
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
48
49
    def __init__(self, runner_id):
50
        super(MistralRunner, self).__init__(runner_id=runner_id)
51
        self._on_behalf_user = cfg.CONF.system_user.user
52
        self._notify = None
53
        self._skip_notify_tasks = []
54
        self._client = mistral.client(
55
            mistral_url=self.url,
56
            username=cfg.CONF.mistral.keystone_username,
57
            api_key=cfg.CONF.mistral.keystone_password,
58
            project_name=cfg.CONF.mistral.keystone_project_name,
59
            auth_url=cfg.CONF.mistral.keystone_auth_url,
60
            cacert=cfg.CONF.mistral.cacert,
61
            insecure=cfg.CONF.mistral.insecure)
62
63
    @staticmethod
64
    def get_workflow_definition(entry_point):
65
        with open(entry_point, 'r') as def_file:
66
            return def_file.read()
67
68
    def pre_run(self):
69
        super(MistralRunner, self).pre_run()
70
71
        if getattr(self, 'liveaction', None):
72
            self._notify = getattr(self.liveaction, 'notify', None)
73
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
74
75
    @staticmethod
76
    def _check_name(action_ref, is_workbook, def_dict):
77
        # If workbook, change the value of the "name" key.
78
        if is_workbook:
79
            if def_dict.get('name') != action_ref:
80
                raise Exception('Name of the workbook must be the same as the '
81
                                'fully qualified action name "%s".' % action_ref)
82
        # If workflow, change the key name of the workflow.
83
        else:
84
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
85
            if workflow_name != action_ref:
86
                raise Exception('Name of the workflow must be the same as the '
87
                                'fully qualified action name "%s".' % action_ref)
88
89
    def _save_workbook(self, name, def_yaml):
90
        # If the workbook is not found, the mistral client throws a generic API exception.
91
        try:
92
            # Update existing workbook.
93
            wb = self._client.workbooks.get(name)
94
        except:
95
            # Delete if definition was previously a workflow.
96
            # If not found, an API exception is thrown.
97
            try:
98
                self._client.workflows.delete(name)
99
            except:
100
                pass
101
102
            # Create the new workbook.
103
            wb = self._client.workbooks.create(def_yaml)
104
105
        # Update the workbook definition.
106
        # pylint: disable=no-member
107
        if wb.definition != def_yaml:
108
            self._client.workbooks.update(def_yaml)
109
110
    def _save_workflow(self, name, def_yaml):
111
        # If the workflow is not found, the mistral client throws a generic API exception.
112
        try:
113
            # Update existing workbook.
114
            wf = self._client.workflows.get(name)
115
        except:
116
            # Delete if definition was previously a workbook.
117
            # If not found, an API exception is thrown.
118
            try:
119
                self._client.workbooks.delete(name)
120
            except:
121
                pass
122
123
            # Create the new workflow.
124
            wf = self._client.workflows.create(def_yaml)[0]
125
126
        # Update the workflow definition.
127
        # pylint: disable=no-member
128
        if wf.definition != def_yaml:
129
            self._client.workflows.update(def_yaml)
130
131
    def _find_default_workflow(self, def_dict):
132
        num_workflows = len(def_dict['workflows'].keys())
133
134
        if num_workflows > 1:
135
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
136
            if not fully_qualified_wf_name:
137
                raise ValueError('Workbook definition is detected. '
138
                                 'Default workflow cannot be determined.')
139
140
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
141
            if wf_name not in def_dict['workflows']:
142
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
143
                                 % fully_qualified_wf_name)
144
145
            return fully_qualified_wf_name
146
        elif num_workflows == 1:
147
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
148
        else:
149
            raise Exception('There are no workflows in the workbook.')
150
151
    def _construct_workflow_execution_options(self):
152
        # This URL is used by Mistral to talk back to the API
153
        api_url = get_mistral_api_url()
154
        endpoint = api_url + '/actionexecutions'
155
156
        # This URL is available in the context and can be used by the users inside a workflow,
157
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
158
        public_api_url = get_full_public_api_url()
159
160
        # Build context with additional information
161
        parent_context = {
162
            'execution_id': self.execution_id
163
        }
164
        if getattr(self.liveaction, 'context', None):
165
            parent_context.update(self.liveaction.context)
166
167
        st2_execution_context = {
168
            'api_url': api_url,
169
            'endpoint': endpoint,
170
            'parent': parent_context,
171
            'notify': {},
172
            'skip_notify_tasks': self._skip_notify_tasks
173
        }
174
175
        # Include notification information
176
        if self._notify:
177
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
178
            st2_execution_context['notify'] = notify_dict
179
180
        if self.auth_token:
181
            st2_execution_context['auth_token'] = self.auth_token.token
182
183
        options = {
184
            'env': {
185
                'st2_execution_id': self.execution_id,
186
                'st2_liveaction_id': self.liveaction_id,
187
                'st2_action_api_url': public_api_url,
188
                '__actions': {
189
                    'st2.action': {
190
                        'st2_context': st2_execution_context
191
                    }
192
                }
193
            }
194
        }
195
196
        return options
197
198
    def _get_resume_options(self):
199
        return self.context.get('re-run', {})
200
201
    @retrying.retry(
202
        retry_on_exception=utils.retry_on_exceptions,
203
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
204
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
205
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
206
    def run(self, action_parameters):
207
        resume_options = self._get_resume_options()
208
209
        tasks_to_reset = resume_options.get('reset', [])
210
211
        task_specs = {
212
            task_name: {'reset': task_name in tasks_to_reset}
213
            for task_name in resume_options.get('tasks', [])
214
        }
215
216
        resume = self.rerun_ex_ref and task_specs
217
218
        if resume:
219
            result = self.resume_workflow(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
220
        else:
221
            result = self.start_workflow(action_parameters=action_parameters)
222
223
        return result
224
225
    def start_workflow(self, action_parameters):
226
        # Test connection
227
        self._client.workflows.list()
228
229
        # Setup inputs for the workflow execution.
230
        inputs = self.runner_parameters.get('context', dict())
231
        inputs.update(action_parameters)
232
233
        # Get workbook/workflow definition from file.
234
        def_yaml = self.get_workflow_definition(self.entry_point)
235
        def_dict = yaml.safe_load(def_yaml)
236
        is_workbook = ('workflows' in def_dict)
237
238
        if not is_workbook:
239
            # Non-workbook definition containing multiple workflows is not supported.
240
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
241
                raise Exception('Workflow (not workbook) definition is detected. '
242
                                'Multiple workflows is not supported.')
243
244
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
245
        self._check_name(action_ref, is_workbook, def_dict)
246
        def_dict_xformed = utils.transform_definition(def_dict)
247
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
248
249
        # Construct additional options for the workflow execution
250
        options = self._construct_workflow_execution_options()
251
252
        # Save workbook/workflow definition.
253
        if is_workbook:
254
            self._save_workbook(action_ref, def_yaml_xformed)
255
            default_workflow = self._find_default_workflow(def_dict_xformed)
256
            execution = self._client.executions.create(default_workflow,
257
                                                       workflow_input=inputs,
258
                                                       **options)
259
        else:
260
            self._save_workflow(action_ref, def_yaml_xformed)
261
            execution = self._client.executions.create(action_ref,
262
                                                       workflow_input=inputs,
263
                                                       **options)
264
265
        status = action_constants.LIVEACTION_STATUS_RUNNING
266
        partial_results = {'tasks': []}
267
268
        # pylint: disable=no-member
269
        current_context = {
270
            'execution_id': str(execution.id),
271
            'workflow_name': execution.workflow_name
272
        }
273
274
        exec_context = self.context
275
        exec_context = self._build_mistral_context(exec_context, current_context)
276
        LOG.info('Mistral query context is %s' % exec_context)
277
278
        return (status, partial_results, exec_context)
279
280
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
281
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
282
283
        if '.' in task_name:
284
            dot_pos = task_name.index('.')
285
            parent_task_name = task_name[:dot_pos]
286
            task_name = task_name[dot_pos + 1:]
287
288
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
289
290
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
291
                               if (getattr(wf_ex, 'task_execution_id', None) and
292
                                   wf_ex.task_execution_id in parent_task_ids)]
293
294
            tasks = {}
295
296
            for sub_wf_ex_id in workflow_ex_ids:
297
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
298
299
            return tasks
300
301
        # pylint: disable=no-member
302
        tasks = {
303
            full_task_name: task.to_dict()
304
            for task in task_exs
305
            if task.name == task_name and task.state == 'ERROR'
306
        }
307
308
        return tasks
309
310
    def resume_workflow(self, ex_ref, task_specs):
311
        mistral_ctx = ex_ref.context.get('mistral', dict())
312
313
        if not mistral_ctx.get('execution_id'):
314
            raise Exception('Unable to rerun because mistral execution_id is missing.')
315
316
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
317
318
        # pylint: disable=no-member
319
        if execution.state not in ['ERROR']:
320
            raise Exception('Workflow execution is not in a rerunable state.')
321
322
        executions = self._client.executions.list()
323
324
        tasks = {}
325
326
        for task_name, task_spec in six.iteritems(task_specs):
327
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
328
329
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
330
        if missing_tasks:
331
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
332
                            'rerunable tasks: %s. Please make sure that the task name is correct '
333
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
334
335
        # Construct additional options for the workflow execution
336
        options = self._construct_workflow_execution_options()
337
338
        for task_name, task_obj in six.iteritems(tasks):
339
            # pylint: disable=unexpected-keyword-arg
340
            self._client.tasks.rerun(
341
                task_obj['id'],
342
                reset=task_specs[task_name].get('reset', False),
343
                env=options.get('env', None)
344
            )
345
346
        status = action_constants.LIVEACTION_STATUS_RUNNING
347
        partial_results = {'tasks': []}
348
349
        # pylint: disable=no-member
350
        current_context = {
351
            'execution_id': str(execution.id),
352
            'workflow_name': execution.workflow_name
353
        }
354
355
        exec_context = self.context
356
        exec_context = self._build_mistral_context(exec_context, current_context)
357
        LOG.info('Mistral query context is %s' % exec_context)
358
359
        return (status, partial_results, exec_context)
360
361
    @retrying.retry(
362
        retry_on_exception=utils.retry_on_exceptions,
363
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
364
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
365
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
366
    def pause(self):
367
        mistral_ctx = self.context.get('mistral', dict())
368
369
        if not mistral_ctx.get('execution_id'):
370
            raise Exception('Unable to pause because mistral execution_id is missing.')
371
372
        # Pause the main workflow execution. Any non-workflow tasks that are still
373
        # running will be allowed to complete gracefully.
374
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
375
376
        # If workflow is executed under another parent workflow, pause the corresponding
377
        # action execution for the task in the parent workflow.
378
        if 'parent' in getattr(self, 'context', {}):
379
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
380
            self._client.action_executions.update(mistral_action_ex_id, 'PAUSED')
381
382
        # Identify the list of action executions that are workflows and cascade pause.
383
        for child_exec_id in self.execution.children:
384
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
385
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
386
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
387
                action_service.request_pause(
388
                    LiveAction.get(id=child_exec.liveaction['id']),
389
                    self.context.get('user', None)
390
                )
391
392
        return (
393
            action_constants.LIVEACTION_STATUS_PAUSING,
394
            self.liveaction.result,
395
            self.liveaction.context
396
        )
397
398
    @retrying.retry(
399
        retry_on_exception=utils.retry_on_exceptions,
400
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
401
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
402
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
403
    def resume(self):
404
        mistral_ctx = self.context.get('mistral', dict())
405
406
        if not mistral_ctx.get('execution_id'):
407
            raise Exception('Unable to resume because mistral execution_id is missing.')
408
409
        # If workflow is executed under another parent workflow, resume the corresponding
410
        # action execution for the task in the parent workflow.
411
        if 'parent' in getattr(self, 'context', {}):
412
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
413
            self._client.action_executions.update(mistral_action_ex_id, 'RUNNING')
414
415
        # Pause the main workflow execution. Any non-workflow tasks that are still
416
        # running will be allowed to complete gracefully.
417
        self._client.executions.update(mistral_ctx.get('execution_id'), 'RUNNING')
418
419
        # Identify the list of action executions that are workflows and cascade resume.
420
        for child_exec_id in self.execution.children:
421
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
422
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
423
                    child_exec.status == action_constants.LIVEACTION_STATUS_PAUSED):
424
                action_service.request_resume(
425
                    LiveAction.get(id=child_exec.liveaction['id']),
426
                    self.context.get('user', None)
427
                )
428
429
        return (
430
            action_constants.LIVEACTION_STATUS_RUNNING,
431
            self.execution.result,
432
            self.execution.context
433
        )
434
435
    @retrying.retry(
436
        retry_on_exception=utils.retry_on_exceptions,
437
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
438
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
439
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
440
    def cancel(self):
441
        mistral_ctx = self.context.get('mistral', dict())
442
443
        if not mistral_ctx.get('execution_id'):
444
            raise Exception('Unable to cancel because mistral execution_id is missing.')
445
446
        # Cancels the main workflow execution. Any non-workflow tasks that are still
447
        # running will be allowed to complete gracefully.
448
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
449
450
        # Identify the list of action executions that are workflows and still running.
451
        for child_exec_id in self.execution.children:
452
            child_exec = ActionExecution.get(id=child_exec_id)
453
            if (child_exec.runner['name'] == self.runner_type_db.name and
454
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
455
                action_service.request_cancellation(
456
                    LiveAction.get(id=child_exec.liveaction['id']),
457
                    self.context.get('user', None)
458
                )
459
460
        return (
461
            action_constants.LIVEACTION_STATUS_CANCELING,
462
            self.liveaction.result,
463
            self.liveaction.context
464
        )
465
466
    @staticmethod
467
    def _build_mistral_context(parent, current):
468
        """
469
        Mistral workflow might be kicked off in st2 by a parent Mistral
470
        workflow. In that case, we need to make sure that the existing
471
        mistral 'context' is moved as 'parent' and the child workflow
472
        'context' is added.
473
        """
474
        parent = copy.deepcopy(parent)
475
        context = dict()
476
477
        if not parent:
478
            context['mistral'] = current
479
        else:
480
            if 'mistral' in parent.keys():
481
                orig_parent_context = parent.get('mistral', dict())
482
                actual_parent = dict()
483
                if 'workflow_name' in orig_parent_context.keys():
484
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
485
                    del orig_parent_context['workflow_name']
486
                if 'workflow_execution_id' in orig_parent_context.keys():
487
                    actual_parent['workflow_execution_id'] = \
488
                        orig_parent_context['workflow_execution_id']
489
                    del orig_parent_context['workflow_execution_id']
490
                context['mistral'] = orig_parent_context
491
                context['mistral'].update(current)
492
                context['mistral']['parent'] = actual_parent
493
            else:
494
                context['mistral'] = current
495
496
        return context
497