Passed
Pull Request — master (#3507)
by W
07:37 queued 01:55
created

MistralRunner.start_workflow()   B

Complexity

Conditions 6

Size

Total Lines 54

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
dl 0
loc 54
rs 7.8519
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants import action as action_constants
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.services import action as action_service
32
from st2common.util.workflow import mistral as utils
33
from st2common.util.url import get_url_without_trailing_slash
34
from st2common.util.api import get_full_public_api_url
35
from st2common.util.api import get_mistral_api_url
36
37
38
LOG = logging.getLogger(__name__)
39
40
41
def get_runner():
42
    return MistralRunner(str(uuid.uuid4()))
43
44
45
class MistralRunner(AsyncActionRunner):
46
47
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
48
49
    def __init__(self, runner_id):
50
        super(MistralRunner, self).__init__(runner_id=runner_id)
51
        self._on_behalf_user = cfg.CONF.system_user.user
52
        self._notify = None
53
        self._skip_notify_tasks = []
54
        self._client = mistral.client(
55
            mistral_url=self.url,
56
            username=cfg.CONF.mistral.keystone_username,
57
            api_key=cfg.CONF.mistral.keystone_password,
58
            project_name=cfg.CONF.mistral.keystone_project_name,
59
            auth_url=cfg.CONF.mistral.keystone_auth_url,
60
            cacert=cfg.CONF.mistral.cacert,
61
            insecure=cfg.CONF.mistral.insecure)
62
63
    @staticmethod
64
    def get_workflow_definition(entry_point):
65
        with open(entry_point, 'r') as def_file:
66
            return def_file.read()
67
68
    def pre_run(self):
69
        super(MistralRunner, self).pre_run()
70
71
        if getattr(self, 'liveaction', None):
72
            self._notify = getattr(self.liveaction, 'notify', None)
73
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
74
75
    @staticmethod
76
    def _check_name(action_ref, is_workbook, def_dict):
77
        # If workbook, change the value of the "name" key.
78
        if is_workbook:
79
            if def_dict.get('name') != action_ref:
80
                raise Exception('Name of the workbook must be the same as the '
81
                                'fully qualified action name "%s".' % action_ref)
82
        # If workflow, change the key name of the workflow.
83
        else:
84
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
85
            if workflow_name != action_ref:
86
                raise Exception('Name of the workflow must be the same as the '
87
                                'fully qualified action name "%s".' % action_ref)
88
89
    def _save_workbook(self, name, def_yaml):
90
        # If the workbook is not found, the mistral client throws a generic API exception.
91
        try:
92
            # Update existing workbook.
93
            wb = self._client.workbooks.get(name)
94
        except:
95
            # Delete if definition was previously a workflow.
96
            # If not found, an API exception is thrown.
97
            try:
98
                self._client.workflows.delete(name)
99
            except:
100
                pass
101
102
            # Create the new workbook.
103
            wb = self._client.workbooks.create(def_yaml)
104
105
        # Update the workbook definition.
106
        # pylint: disable=no-member
107
        if wb.definition != def_yaml:
108
            self._client.workbooks.update(def_yaml)
109
110
    def _save_workflow(self, name, def_yaml):
111
        # If the workflow is not found, the mistral client throws a generic API exception.
112
        try:
113
            # Update existing workbook.
114
            wf = self._client.workflows.get(name)
115
        except:
116
            # Delete if definition was previously a workbook.
117
            # If not found, an API exception is thrown.
118
            try:
119
                self._client.workbooks.delete(name)
120
            except:
121
                pass
122
123
            # Create the new workflow.
124
            wf = self._client.workflows.create(def_yaml)[0]
125
126
        # Update the workflow definition.
127
        # pylint: disable=no-member
128
        if wf.definition != def_yaml:
129
            self._client.workflows.update(def_yaml)
130
131
    def _find_default_workflow(self, def_dict):
132
        num_workflows = len(def_dict['workflows'].keys())
133
134
        if num_workflows > 1:
135
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
136
            if not fully_qualified_wf_name:
137
                raise ValueError('Workbook definition is detected. '
138
                                 'Default workflow cannot be determined.')
139
140
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
141
            if wf_name not in def_dict['workflows']:
142
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
143
                                 % fully_qualified_wf_name)
144
145
            return fully_qualified_wf_name
146
        elif num_workflows == 1:
147
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
148
        else:
149
            raise Exception('There are no workflows in the workbook.')
150
151
    def _construct_workflow_execution_options(self):
152
        # This URL is used by Mistral to talk back to the API
153
        api_url = get_mistral_api_url()
154
        endpoint = api_url + '/actionexecutions'
155
156
        # This URL is available in the context and can be used by the users inside a workflow,
157
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
158
        public_api_url = get_full_public_api_url()
159
160
        # Build context with additional information
161
        parent_context = {
162
            'execution_id': self.execution_id
163
        }
164
        if getattr(self.liveaction, 'context', None):
165
            parent_context.update(self.liveaction.context)
166
167
        st2_execution_context = {
168
            'api_url': api_url,
169
            'endpoint': endpoint,
170
            'parent': parent_context,
171
            'notify': {},
172
            'skip_notify_tasks': self._skip_notify_tasks
173
        }
174
175
        # Include notification information
176
        if self._notify:
177
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
178
            st2_execution_context['notify'] = notify_dict
179
180
        if self.auth_token:
181
            st2_execution_context['auth_token'] = self.auth_token.token
182
183
        options = {
184
            'env': {
185
                'st2_execution_id': self.execution_id,
186
                'st2_liveaction_id': self.liveaction_id,
187
                'st2_action_api_url': public_api_url,
188
                '__actions': {
189
                    'st2.action': {
190
                        'st2_context': st2_execution_context
191
                    }
192
                }
193
            }
194
        }
195
196
        return options
197
198
    def _get_resume_options(self):
199
        return self.context.get('re-run', {})
200
201
    @retrying.retry(
202
        retry_on_exception=utils.retry_on_exceptions,
203
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
204
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
205
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
206
    def run(self, action_parameters):
207
        resume_options = self._get_resume_options()
208
209
        tasks_to_reset = resume_options.get('reset', [])
210
211
        task_specs = {
212
            task_name: {'reset': task_name in tasks_to_reset}
213
            for task_name in resume_options.get('tasks', [])
214
        }
215
216
        resume = self.rerun_ex_ref and task_specs
217
218
        if resume:
219
            result = self.resume_workflow(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
220
        else:
221
            result = self.start_workflow(action_parameters=action_parameters)
222
223
        return result
224
225
    def start_workflow(self, action_parameters):
226
        # Test connection
227
        self._client.workflows.list()
228
229
        # Setup inputs for the workflow execution.
230
        inputs = self.runner_parameters.get('context', dict())
231
        inputs.update(action_parameters)
232
233
        # Get workbook/workflow definition from file.
234
        def_yaml = self.get_workflow_definition(self.entry_point)
235
        def_dict = yaml.safe_load(def_yaml)
236
        is_workbook = ('workflows' in def_dict)
237
238
        if not is_workbook:
239
            # Non-workbook definition containing multiple workflows is not supported.
240
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
241
                raise Exception('Workflow (not workbook) definition is detected. '
242
                                'Multiple workflows is not supported.')
243
244
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
245
        self._check_name(action_ref, is_workbook, def_dict)
246
        def_dict_xformed = utils.transform_definition(def_dict)
247
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
248
249
        # Construct additional options for the workflow execution
250
        options = self._construct_workflow_execution_options()
251
252
        # Save workbook/workflow definition.
253
        if is_workbook:
254
            self._save_workbook(action_ref, def_yaml_xformed)
255
            default_workflow = self._find_default_workflow(def_dict_xformed)
256
            execution = self._client.executions.create(default_workflow,
257
                                                       workflow_input=inputs,
258
                                                       **options)
259
        else:
260
            self._save_workflow(action_ref, def_yaml_xformed)
261
            execution = self._client.executions.create(action_ref,
262
                                                       workflow_input=inputs,
263
                                                       **options)
264
265
        status = action_constants.LIVEACTION_STATUS_RUNNING
266
        partial_results = {'tasks': []}
267
268
        # pylint: disable=no-member
269
        current_context = {
270
            'execution_id': str(execution.id),
271
            'workflow_name': execution.workflow_name
272
        }
273
274
        exec_context = self.context
275
        exec_context = self._build_mistral_context(exec_context, current_context)
276
        LOG.info('Mistral query context is %s' % exec_context)
277
278
        return (status, partial_results, exec_context)
279
280
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
281
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
282
283
        if '.' in task_name:
284
            dot_pos = task_name.index('.')
285
            parent_task_name = task_name[:dot_pos]
286
            task_name = task_name[dot_pos + 1:]
287
288
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
289
290
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
291
                               if (getattr(wf_ex, 'task_execution_id', None) and
292
                                   wf_ex.task_execution_id in parent_task_ids)]
293
294
            tasks = {}
295
296
            for sub_wf_ex_id in workflow_ex_ids:
297
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
298
299
            return tasks
300
301
        # pylint: disable=no-member
302
        tasks = {
303
            full_task_name: task.to_dict()
304
            for task in task_exs
305
            if task.name == task_name and task.state == 'ERROR'
306
        }
307
308
        return tasks
309
310
    def resume_workflow(self, ex_ref, task_specs):
311
        mistral_ctx = ex_ref.context.get('mistral', dict())
312
313
        if not mistral_ctx.get('execution_id'):
314
            raise Exception('Unable to rerun because mistral execution_id is missing.')
315
316
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
317
318
        # pylint: disable=no-member
319
        if execution.state not in ['ERROR']:
320
            raise Exception('Workflow execution is not in a rerunable state.')
321
322
        executions = self._client.executions.list()
323
324
        tasks = {}
325
326
        for task_name, task_spec in six.iteritems(task_specs):
327
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
328
329
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
330
        if missing_tasks:
331
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
332
                            'rerunable tasks: %s. Please make sure that the task name is correct '
333
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
334
335
        # Construct additional options for the workflow execution
336
        options = self._construct_workflow_execution_options()
337
338
        for task_name, task_obj in six.iteritems(tasks):
339
            # pylint: disable=unexpected-keyword-arg
340
            self._client.tasks.rerun(
341
                task_obj['id'],
342
                reset=task_specs[task_name].get('reset', False),
343
                env=options.get('env', None)
344
            )
345
346
        status = action_constants.LIVEACTION_STATUS_RUNNING
347
        partial_results = {'tasks': []}
348
349
        # pylint: disable=no-member
350
        current_context = {
351
            'execution_id': str(execution.id),
352
            'workflow_name': execution.workflow_name
353
        }
354
355
        exec_context = self.context
356
        exec_context = self._build_mistral_context(exec_context, current_context)
357
        LOG.info('Mistral query context is %s' % exec_context)
358
359
        return (status, partial_results, exec_context)
360
361
    @retrying.retry(
362
        retry_on_exception=utils.retry_on_exceptions,
363
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
364
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
365
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
366
    def pause(self):
367
        mistral_ctx = self.context.get('mistral', dict())
368
369
        if not mistral_ctx.get('execution_id'):
370
            raise Exception('Unable to pause because mistral execution_id is missing.')
371
372
        # Pause the main workflow execution. Any non-workflow tasks that are still
373
        # running will be allowed to complete gracefully.
374
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
375
376
        # If workflow is executed under another parent workflow, pause the corresponding
377
        # action execution for the task in the parent workflow.
378
        if 'parent' in getattr(self, 'context', {}):
379
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
380
            self._client.action_executions.update(mistral_action_ex_id, 'PAUSED')
381
382
        # Identify the list of action executions that are workflows and cascade pause.
383
        for child_exec_id in self.execution.children:
384
            child_exec = ActionExecution.get(id=child_exec_id)
385
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
386
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
387
                action_service.request_pause(
388
                    LiveAction.get(id=child_exec.liveaction['id']),
389
                    self.context.get('user', None)
390
                )
391
392
    @retrying.retry(
393
        retry_on_exception=utils.retry_on_exceptions,
394
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
395
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
396
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
397
    def resume(self):
398
        mistral_ctx = self.context.get('mistral', dict())
399
400
        if not mistral_ctx.get('execution_id'):
401
            raise Exception('Unable to resume because mistral execution_id is missing.')
402
403
        # If workflow is executed under another parent workflow, resume the corresponding
404
        # action execution for the task in the parent workflow.
405
        if 'parent' in getattr(self, 'context', {}):
406
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
407
            self._client.action_executions.update(mistral_action_ex_id, 'RUNNING')
408
409
        # Pause the main workflow execution. Any non-workflow tasks that are still
410
        # running will be allowed to complete gracefully.
411
        self._client.executions.update(mistral_ctx.get('execution_id'), 'RUNNING')
412
413
        # Identify the list of action executions that are workflows and cascade resume.
414
        for child_exec_id in self.execution.children:
415
            child_exec = ActionExecution.get(id=child_exec_id)
416
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
417
                    child_exec.status == action_constants.LIVEACTION_STATUS_PAUSED):
418
                action_service.request_resume(
419
                    LiveAction.get(id=child_exec.liveaction['id']),
420
                    self.context.get('user', None)
421
                )
422
423
        return (
424
            action_constants.LIVEACTION_STATUS_RUNNING,
425
            self.execution.result,
426
            self.execution.context
427
        )
428
429
    @retrying.retry(
430
        retry_on_exception=utils.retry_on_exceptions,
431
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
432
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
433
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
434
    def cancel(self):
435
        mistral_ctx = self.context.get('mistral', dict())
436
437
        if not mistral_ctx.get('execution_id'):
438
            raise Exception('Unable to cancel because mistral execution_id is missing.')
439
440
        # Cancels the main workflow execution. Any non-workflow tasks that are still
441
        # running will be allowed to complete gracefully.
442
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
443
444
        # Identify the list of action executions that are workflows and still running.
445
        for child_exec_id in self.execution.children:
446
            child_exec = ActionExecution.get(id=child_exec_id)
447
            if (child_exec.runner['name'] == self.runner_type_db.name and
448
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
449
                action_service.request_cancellation(
450
                    LiveAction.get(id=child_exec.liveaction['id']),
451
                    self.context.get('user', None)
452
                )
453
454
    @staticmethod
455
    def _build_mistral_context(parent, current):
456
        """
457
        Mistral workflow might be kicked off in st2 by a parent Mistral
458
        workflow. In that case, we need to make sure that the existing
459
        mistral 'context' is moved as 'parent' and the child workflow
460
        'context' is added.
461
        """
462
        parent = copy.deepcopy(parent)
463
        context = dict()
464
465
        if not parent:
466
            context['mistral'] = current
467
        else:
468
            if 'mistral' in parent.keys():
469
                orig_parent_context = parent.get('mistral', dict())
470
                actual_parent = dict()
471
                if 'workflow_name' in orig_parent_context.keys():
472
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
473
                    del orig_parent_context['workflow_name']
474
                if 'workflow_execution_id' in orig_parent_context.keys():
475
                    actual_parent['workflow_execution_id'] = \
476
                        orig_parent_context['workflow_execution_id']
477
                    del orig_parent_context['workflow_execution_id']
478
                context['mistral'] = orig_parent_context
479
                context['mistral'].update(current)
480
                context['mistral']['parent'] = actual_parent
481
            else:
482
                context['mistral'] = current
483
484
        return context
485