Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

MistralRunner.resume()   C

Complexity

Conditions 7

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 35
rs 5.5
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants import action as action_constants
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.services import action as action_service
32
from st2common.util import jinja
33
from st2common.util.workflow import mistral as utils
34
from st2common.util.url import get_url_without_trailing_slash
35
from st2common.util.api import get_full_public_api_url
36
from st2common.util.api import get_mistral_api_url
37
38
39
LOG = logging.getLogger(__name__)
40
41
42
def get_runner():
43
    return MistralRunner(str(uuid.uuid4()))
44
45
46
class MistralRunner(AsyncActionRunner):
47
48
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
49
50
    def __init__(self, runner_id):
51
        super(MistralRunner, self).__init__(runner_id=runner_id)
52
        self._on_behalf_user = cfg.CONF.system_user.user
53
        self._notify = None
54
        self._skip_notify_tasks = []
55
        self._client = mistral.client(
56
            mistral_url=self.url,
57
            username=cfg.CONF.mistral.keystone_username,
58
            api_key=cfg.CONF.mistral.keystone_password,
59
            project_name=cfg.CONF.mistral.keystone_project_name,
60
            auth_url=cfg.CONF.mistral.keystone_auth_url,
61
            cacert=cfg.CONF.mistral.cacert,
62
            insecure=cfg.CONF.mistral.insecure)
63
64
    @staticmethod
65
    def get_workflow_definition(entry_point):
66
        with open(entry_point, 'r') as def_file:
67
            return def_file.read()
68
69
    def pre_run(self):
70
        super(MistralRunner, self).pre_run()
71
72
        if getattr(self, 'liveaction', None):
73
            self._notify = getattr(self.liveaction, 'notify', None)
74
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
75
76
    @staticmethod
77
    def _check_name(action_ref, is_workbook, def_dict):
78
        # If workbook, change the value of the "name" key.
79
        if is_workbook:
80
            if def_dict.get('name') != action_ref:
81
                raise Exception('Name of the workbook must be the same as the '
82
                                'fully qualified action name "%s".' % action_ref)
83
        # If workflow, change the key name of the workflow.
84
        else:
85
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
86
            if workflow_name != action_ref:
87
                raise Exception('Name of the workflow must be the same as the '
88
                                'fully qualified action name "%s".' % action_ref)
89
90
    def _save_workbook(self, name, def_yaml):
91
        # If the workbook is not found, the mistral client throws a generic API exception.
92
        try:
93
            # Update existing workbook.
94
            wb = self._client.workbooks.get(name)
95
        except:
96
            # Delete if definition was previously a workflow.
97
            # If not found, an API exception is thrown.
98
            try:
99
                self._client.workflows.delete(name)
100
            except:
101
                pass
102
103
            # Create the new workbook.
104
            wb = self._client.workbooks.create(def_yaml)
105
106
        # Update the workbook definition.
107
        # pylint: disable=no-member
108
        if wb.definition != def_yaml:
109
            self._client.workbooks.update(def_yaml)
110
111
    def _save_workflow(self, name, def_yaml):
112
        # If the workflow is not found, the mistral client throws a generic API exception.
113
        try:
114
            # Update existing workbook.
115
            wf = self._client.workflows.get(name)
116
        except:
117
            # Delete if definition was previously a workbook.
118
            # If not found, an API exception is thrown.
119
            try:
120
                self._client.workbooks.delete(name)
121
            except:
122
                pass
123
124
            # Create the new workflow.
125
            wf = self._client.workflows.create(def_yaml)[0]
126
127
        # Update the workflow definition.
128
        # pylint: disable=no-member
129
        if wf.definition != def_yaml:
130
            self._client.workflows.update(def_yaml)
131
132
    def _find_default_workflow(self, def_dict):
133
        num_workflows = len(def_dict['workflows'].keys())
134
135
        if num_workflows > 1:
136
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
137
            if not fully_qualified_wf_name:
138
                raise ValueError('Workbook definition is detected. '
139
                                 'Default workflow cannot be determined.')
140
141
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
142
            if wf_name not in def_dict['workflows']:
143
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
144
                                 % fully_qualified_wf_name)
145
146
            return fully_qualified_wf_name
147
        elif num_workflows == 1:
148
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
149
        else:
150
            raise Exception('There are no workflows in the workbook.')
151
152
    def _construct_workflow_execution_options(self):
153
        # This URL is used by Mistral to talk back to the API
154
        api_url = get_mistral_api_url()
155
        endpoint = api_url + '/actionexecutions'
156
157
        # This URL is available in the context and can be used by the users inside a workflow,
158
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
159
        public_api_url = get_full_public_api_url()
160
161
        # Build context with additional information
162
        parent_context = {
163
            'execution_id': self.execution_id
164
        }
165
166
        if getattr(self.liveaction, 'context', None):
167
            parent_context.update(self.liveaction.context)
168
169
        # Convert jinja expressions in the params of Action Chain under the parent context
170
        # into raw block. If there is any jinja expressions, Mistral will try to evaulate
171
        # the expression. If there is a local context reference, the evaluation will fail
172
        # because the local context reference is out of scope.
173
        chain_ctx = parent_context.get('chain') or {}
174
        chain_params_ctx = chain_ctx.get('params') or {}
175
176
        for k, v in six.iteritems(chain_params_ctx):
177
            parent_context['chain']['params'][k] = jinja.convert_jinja_to_raw_block(v)
178
179
        st2_execution_context = {
180
            'api_url': api_url,
181
            'endpoint': endpoint,
182
            'parent': parent_context,
183
            'notify': {},
184
            'skip_notify_tasks': self._skip_notify_tasks
185
        }
186
187
        # Include notification information
188
        if self._notify:
189
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
190
            st2_execution_context['notify'] = notify_dict
191
192
        if self.auth_token:
193
            st2_execution_context['auth_token'] = self.auth_token.token
194
195
        options = {
196
            'env': {
197
                'st2_execution_id': self.execution_id,
198
                'st2_liveaction_id': self.liveaction_id,
199
                'st2_action_api_url': public_api_url,
200
                '__actions': {
201
                    'st2.action': {
202
                        'st2_context': st2_execution_context
203
                    }
204
                }
205
            }
206
        }
207
208
        return options
209
210
    def _get_resume_options(self):
211
        return self.context.get('re-run', {})
212
213
    @retrying.retry(
214
        retry_on_exception=utils.retry_on_exceptions,
215
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
216
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
217
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
218
    def run(self, action_parameters):
219
        resume_options = self._get_resume_options()
220
221
        tasks_to_reset = resume_options.get('reset', [])
222
223
        task_specs = {
224
            task_name: {'reset': task_name in tasks_to_reset}
225
            for task_name in resume_options.get('tasks', [])
226
        }
227
228
        resume = self.rerun_ex_ref and task_specs
229
230
        if resume:
231
            result = self.resume_workflow(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
232
        else:
233
            result = self.start_workflow(action_parameters=action_parameters)
234
235
        return result
236
237
    def start_workflow(self, action_parameters):
238
        # Test connection
239
        self._client.workflows.list()
240
241
        # Setup inputs for the workflow execution.
242
        inputs = self.runner_parameters.get('context', dict())
243
        inputs.update(action_parameters)
244
245
        # Get workbook/workflow definition from file.
246
        def_yaml = self.get_workflow_definition(self.entry_point)
247
        def_dict = yaml.safe_load(def_yaml)
248
        is_workbook = ('workflows' in def_dict)
249
250
        if not is_workbook:
251
            # Non-workbook definition containing multiple workflows is not supported.
252
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
253
                raise Exception('Workflow (not workbook) definition is detected. '
254
                                'Multiple workflows is not supported.')
255
256
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
257
        self._check_name(action_ref, is_workbook, def_dict)
258
        def_dict_xformed = utils.transform_definition(def_dict)
259
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
260
261
        # Construct additional options for the workflow execution
262
        options = self._construct_workflow_execution_options()
263
264
        # Save workbook/workflow definition.
265
        if is_workbook:
266
            self._save_workbook(action_ref, def_yaml_xformed)
267
            default_workflow = self._find_default_workflow(def_dict_xformed)
268
            execution = self._client.executions.create(default_workflow,
269
                                                       workflow_input=inputs,
270
                                                       **options)
271
        else:
272
            self._save_workflow(action_ref, def_yaml_xformed)
273
            execution = self._client.executions.create(action_ref,
274
                                                       workflow_input=inputs,
275
                                                       **options)
276
277
        status = action_constants.LIVEACTION_STATUS_RUNNING
278
        partial_results = {'tasks': []}
279
280
        # pylint: disable=no-member
281
        current_context = {
282
            'execution_id': str(execution.id),
283
            'workflow_name': execution.workflow_name
284
        }
285
286
        exec_context = self.context
287
        exec_context = self._build_mistral_context(exec_context, current_context)
288
        LOG.info('Mistral query context is %s' % exec_context)
289
290
        return (status, partial_results, exec_context)
291
292
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
293
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
294
295
        if '.' in task_name:
296
            dot_pos = task_name.index('.')
297
            parent_task_name = task_name[:dot_pos]
298
            task_name = task_name[dot_pos + 1:]
299
300
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
301
302
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
303
                               if (getattr(wf_ex, 'task_execution_id', None) and
304
                                   wf_ex.task_execution_id in parent_task_ids)]
305
306
            tasks = {}
307
308
            for sub_wf_ex_id in workflow_ex_ids:
309
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
310
311
            return tasks
312
313
        # pylint: disable=no-member
314
        tasks = {
315
            full_task_name: task.to_dict()
316
            for task in task_exs
317
            if task.name == task_name and task.state == 'ERROR'
318
        }
319
320
        return tasks
321
322
    def resume_workflow(self, ex_ref, task_specs):
323
        mistral_ctx = ex_ref.context.get('mistral', dict())
324
325
        if not mistral_ctx.get('execution_id'):
326
            raise Exception('Unable to rerun because mistral execution_id is missing.')
327
328
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
329
330
        # pylint: disable=no-member
331
        if execution.state not in ['ERROR']:
332
            raise Exception('Workflow execution is not in a rerunable state.')
333
334
        executions = self._client.executions.list()
335
336
        tasks = {}
337
338
        for task_name, task_spec in six.iteritems(task_specs):
339
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
340
341
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
342
        if missing_tasks:
343
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
344
                            'rerunable tasks: %s. Please make sure that the task name is correct '
345
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
346
347
        # Construct additional options for the workflow execution
348
        options = self._construct_workflow_execution_options()
349
350
        for task_name, task_obj in six.iteritems(tasks):
351
            # pylint: disable=unexpected-keyword-arg
352
            self._client.tasks.rerun(
353
                task_obj['id'],
354
                reset=task_specs[task_name].get('reset', False),
355
                env=options.get('env', None)
356
            )
357
358
        status = action_constants.LIVEACTION_STATUS_RUNNING
359
        partial_results = {'tasks': []}
360
361
        # pylint: disable=no-member
362
        current_context = {
363
            'execution_id': str(execution.id),
364
            'workflow_name': execution.workflow_name
365
        }
366
367
        exec_context = self.context
368
        exec_context = self._build_mistral_context(exec_context, current_context)
369
        LOG.info('Mistral query context is %s' % exec_context)
370
371
        return (status, partial_results, exec_context)
372
373
    @retrying.retry(
374
        retry_on_exception=utils.retry_on_exceptions,
375
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
376
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
377
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
378
    def pause(self):
379
        mistral_ctx = self.context.get('mistral', dict())
380
381
        if not mistral_ctx.get('execution_id'):
382
            raise Exception('Unable to pause because mistral execution_id is missing.')
383
384
        # Pause the main workflow execution. Any non-workflow tasks that are still
385
        # running will be allowed to complete gracefully.
386
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
387
388
        # If workflow is executed under another parent workflow, pause the corresponding
389
        # action execution for the task in the parent workflow.
390
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
391
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
392
            self._client.action_executions.update(mistral_action_ex_id, 'PAUSED')
393
394
        # Identify the list of action executions that are workflows and cascade pause.
395
        for child_exec_id in self.execution.children:
396
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
397
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
398
                    child_exec.status == action_constants.LIVEACTION_STATUS_RUNNING):
399
                action_service.request_pause(
400
                    LiveAction.get(id=child_exec.liveaction['id']),
401
                    self.context.get('user', None)
402
                )
403
404
        return (
405
            action_constants.LIVEACTION_STATUS_PAUSING,
406
            self.liveaction.result,
407
            self.liveaction.context
408
        )
409
410
    @retrying.retry(
411
        retry_on_exception=utils.retry_on_exceptions,
412
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
413
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
414
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
415
    def resume(self):
416
        mistral_ctx = self.context.get('mistral', dict())
417
418
        if not mistral_ctx.get('execution_id'):
419
            raise Exception('Unable to resume because mistral execution_id is missing.')
420
421
        # If workflow is executed under another parent workflow, resume the corresponding
422
        # action execution for the task in the parent workflow.
423
        if 'parent' in getattr(self, 'context', {}) and mistral_ctx.get('action_execution_id'):
424
            mistral_action_ex_id = mistral_ctx.get('action_execution_id')
425
            self._client.action_executions.update(mistral_action_ex_id, 'RUNNING')
426
427
        # Pause the main workflow execution. Any non-workflow tasks that are still
428
        # running will be allowed to complete gracefully.
429
        self._client.executions.update(mistral_ctx.get('execution_id'), 'RUNNING')
430
431
        # Identify the list of action executions that are workflows and cascade resume.
432
        for child_exec_id in self.execution.children:
433
            child_exec = ActionExecution.get(id=child_exec_id, raise_exception=True)
434
            if (child_exec.runner['name'] in action_constants.WORKFLOW_RUNNER_TYPES and
435
                    child_exec.status == action_constants.LIVEACTION_STATUS_PAUSED):
436
                action_service.request_resume(
437
                    LiveAction.get(id=child_exec.liveaction['id']),
438
                    self.context.get('user', None)
439
                )
440
441
        return (
442
            action_constants.LIVEACTION_STATUS_RUNNING,
443
            self.execution.result,
444
            self.execution.context
445
        )
446
447
    @retrying.retry(
448
        retry_on_exception=utils.retry_on_exceptions,
449
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
450
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
451
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
452
    def cancel(self):
453
        mistral_ctx = self.context.get('mistral', dict())
454
455
        if not mistral_ctx.get('execution_id'):
456
            raise Exception('Unable to cancel because mistral execution_id is missing.')
457
458
        # Cancels the main workflow execution. Any non-workflow tasks that are still
459
        # running will be allowed to complete gracefully.
460
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
461
462
        # Identify the list of action executions that are workflows and still running.
463
        for child_exec_id in self.execution.children:
464
            child_exec = ActionExecution.get(id=child_exec_id)
465
            if (child_exec.runner['name'] == self.runner_type_db.name and
466
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
467
                action_service.request_cancellation(
468
                    LiveAction.get(id=child_exec.liveaction['id']),
469
                    self.context.get('user', None)
470
                )
471
472
        return (
473
            action_constants.LIVEACTION_STATUS_CANCELING,
474
            self.liveaction.result,
475
            self.liveaction.context
476
        )
477
478
    @staticmethod
479
    def _build_mistral_context(parent, current):
480
        """
481
        Mistral workflow might be kicked off in st2 by a parent Mistral
482
        workflow. In that case, we need to make sure that the existing
483
        mistral 'context' is moved as 'parent' and the child workflow
484
        'context' is added.
485
        """
486
        parent = copy.deepcopy(parent)
487
        context = dict()
488
489
        if not parent:
490
            context['mistral'] = current
491
        else:
492
            if 'mistral' in parent.keys():
493
                orig_parent_context = parent.get('mistral', dict())
494
                actual_parent = dict()
495
                if 'workflow_name' in orig_parent_context.keys():
496
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
497
                    del orig_parent_context['workflow_name']
498
                if 'workflow_execution_id' in orig_parent_context.keys():
499
                    actual_parent['workflow_execution_id'] = \
500
                        orig_parent_context['workflow_execution_id']
501
                    del orig_parent_context['workflow_execution_id']
502
                context['mistral'] = orig_parent_context
503
                context['mistral'].update(current)
504
                context['mistral']['parent'] = actual_parent
505
            else:
506
                context['mistral'] = current
507
508
        return context
509