Passed
Push — master ( c48c6c...de488a )
by
unknown
03:01
created

MistralRunner._save_workflow()   A

Complexity

Conditions 4

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
dl 0
loc 20
rs 9.2
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants import action as action_constants
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.persistence.execution import ActionExecution
30
from st2common.persistence.liveaction import LiveAction
31
from st2common.services import action as action_service
32
from st2common.util.workflow import mistral as utils
33
from st2common.util.url import get_url_without_trailing_slash
34
from st2common.util.api import get_full_public_api_url
35
from st2common.util.api import get_mistral_api_url
36
37
38
LOG = logging.getLogger(__name__)
39
40
41
def get_runner():
42
    return MistralRunner(str(uuid.uuid4()))
43
44
45
class MistralRunner(AsyncActionRunner):
46
47
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
48
49
    def __init__(self, runner_id):
50
        super(MistralRunner, self).__init__(runner_id=runner_id)
51
        self._on_behalf_user = cfg.CONF.system_user.user
52
        self._notify = None
53
        self._skip_notify_tasks = []
54
        self._client = mistral.client(
55
            mistral_url=self.url,
56
            username=cfg.CONF.mistral.keystone_username,
57
            api_key=cfg.CONF.mistral.keystone_password,
58
            project_name=cfg.CONF.mistral.keystone_project_name,
59
            auth_url=cfg.CONF.mistral.keystone_auth_url,
60
            cacert=cfg.CONF.mistral.cacert,
61
            insecure=cfg.CONF.mistral.insecure)
62
63
    @staticmethod
64
    def get_workflow_definition(entry_point):
65
        with open(entry_point, 'r') as def_file:
66
            return def_file.read()
67
68
    def pre_run(self):
69
        super(MistralRunner, self).pre_run()
70
71
        if getattr(self, 'liveaction', None):
72
            self._notify = getattr(self.liveaction, 'notify', None)
73
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
74
75
    @staticmethod
76
    def _check_name(action_ref, is_workbook, def_dict):
77
        # If workbook, change the value of the "name" key.
78
        if is_workbook:
79
            if def_dict.get('name') != action_ref:
80
                raise Exception('Name of the workbook must be the same as the '
81
                                'fully qualified action name "%s".' % action_ref)
82
        # If workflow, change the key name of the workflow.
83
        else:
84
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
85
            if workflow_name != action_ref:
86
                raise Exception('Name of the workflow must be the same as the '
87
                                'fully qualified action name "%s".' % action_ref)
88
89
    def _save_workbook(self, name, def_yaml):
90
        # If the workbook is not found, the mistral client throws a generic API exception.
91
        try:
92
            # Update existing workbook.
93
            wb = self._client.workbooks.get(name)
94
        except:
95
            # Delete if definition was previously a workflow.
96
            # If not found, an API exception is thrown.
97
            try:
98
                self._client.workflows.delete(name)
99
            except:
100
                pass
101
102
            # Create the new workbook.
103
            wb = self._client.workbooks.create(def_yaml)
104
105
        # Update the workbook definition.
106
        # pylint: disable=no-member
107
        if wb.definition != def_yaml:
108
            self._client.workbooks.update(def_yaml)
109
110
    def _save_workflow(self, name, def_yaml):
111
        # If the workflow is not found, the mistral client throws a generic API exception.
112
        try:
113
            # Update existing workbook.
114
            wf = self._client.workflows.get(name)
115
        except:
116
            # Delete if definition was previously a workbook.
117
            # If not found, an API exception is thrown.
118
            try:
119
                self._client.workbooks.delete(name)
120
            except:
121
                pass
122
123
            # Create the new workflow.
124
            wf = self._client.workflows.create(def_yaml)[0]
125
126
        # Update the workflow definition.
127
        # pylint: disable=no-member
128
        if wf.definition != def_yaml:
129
            self._client.workflows.update(def_yaml)
130
131
    def _find_default_workflow(self, def_dict):
132
        num_workflows = len(def_dict['workflows'].keys())
133
134
        if num_workflows > 1:
135
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
136
            if not fully_qualified_wf_name:
137
                raise ValueError('Workbook definition is detected. '
138
                                 'Default workflow cannot be determined.')
139
140
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
141
            if wf_name not in def_dict['workflows']:
142
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
143
                                 % fully_qualified_wf_name)
144
145
            return fully_qualified_wf_name
146
        elif num_workflows == 1:
147
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
148
        else:
149
            raise Exception('There are no workflows in the workbook.')
150
151
    def _construct_workflow_execution_options(self):
152
        # This URL is used by Mistral to talk back to the API
153
        api_url = get_mistral_api_url()
154
        endpoint = api_url + '/actionexecutions'
155
156
        # This URL is available in the context and can be used by the users inside a workflow,
157
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
158
        public_api_url = get_full_public_api_url()
159
160
        # Build context with additional information
161
        parent_context = {
162
            'execution_id': self.execution_id
163
        }
164
        if getattr(self.liveaction, 'context', None):
165
            parent_context.update(self.liveaction.context)
166
167
        st2_execution_context = {
168
            'api_url': api_url,
169
            'endpoint': endpoint,
170
            'parent': parent_context,
171
            'notify': {},
172
            'skip_notify_tasks': self._skip_notify_tasks
173
        }
174
175
        # Include notification information
176
        if self._notify:
177
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
178
            st2_execution_context['notify'] = notify_dict
179
180
        if self.auth_token:
181
            st2_execution_context['auth_token'] = self.auth_token.token
182
183
        options = {
184
            'env': {
185
                'st2_execution_id': self.execution_id,
186
                'st2_liveaction_id': self.liveaction_id,
187
                'st2_action_api_url': public_api_url,
188
                '__actions': {
189
                    'st2.action': {
190
                        'st2_context': st2_execution_context
191
                    }
192
                }
193
            }
194
        }
195
196
        return options
197
198
    def _get_resume_options(self):
199
        return self.context.get('re-run', {})
200
201
    @retrying.retry(
202
        retry_on_exception=utils.retry_on_exceptions,
203
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
204
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
205
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
206
    def run(self, action_parameters):
207
        resume_options = self._get_resume_options()
208
209
        tasks_to_reset = resume_options.get('reset', [])
210
211
        task_specs = {
212
            task_name: {'reset': task_name in tasks_to_reset}
213
            for task_name in resume_options.get('tasks', [])
214
        }
215
216
        resume = self.rerun_ex_ref and task_specs
217
218
        if resume:
219
            result = self.resume(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
220
        else:
221
            result = self.start(action_parameters=action_parameters)
222
223
        return result
224
225
    def start(self, action_parameters):
226
        # Test connection
227
        self._client.workflows.list()
228
229
        # Setup inputs for the workflow execution.
230
        inputs = self.runner_parameters.get('context', dict())
231
        inputs.update(action_parameters)
232
233
        # Get workbook/workflow definition from file.
234
        def_yaml = self.get_workflow_definition(self.entry_point)
235
        def_dict = yaml.safe_load(def_yaml)
236
        is_workbook = ('workflows' in def_dict)
237
238
        if not is_workbook:
239
            # Non-workbook definition containing multiple workflows is not supported.
240
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
241
                raise Exception('Workflow (not workbook) definition is detected. '
242
                                'Multiple workflows is not supported.')
243
244
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
245
        self._check_name(action_ref, is_workbook, def_dict)
246
        def_dict_xformed = utils.transform_definition(def_dict)
247
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
248
249
        # Construct additional options for the workflow execution
250
        options = self._construct_workflow_execution_options()
251
252
        # Save workbook/workflow definition.
253
        if is_workbook:
254
            self._save_workbook(action_ref, def_yaml_xformed)
255
            default_workflow = self._find_default_workflow(def_dict_xformed)
256
            execution = self._client.executions.create(default_workflow,
257
                                                       workflow_input=inputs,
258
                                                       **options)
259
        else:
260
            self._save_workflow(action_ref, def_yaml_xformed)
261
            execution = self._client.executions.create(action_ref,
262
                                                       workflow_input=inputs,
263
                                                       **options)
264
265
        status = action_constants.LIVEACTION_STATUS_RUNNING
266
        partial_results = {'tasks': []}
267
268
        # pylint: disable=no-member
269
        current_context = {
270
            'execution_id': str(execution.id),
271
            'workflow_name': execution.workflow_name
272
        }
273
274
        exec_context = self.context
275
        exec_context = self._build_mistral_context(exec_context, current_context)
276
        LOG.info('Mistral query context is %s' % exec_context)
277
278
        return (status, partial_results, exec_context)
279
280
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
281
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
282
283
        if '.' in task_name:
284
            dot_pos = task_name.index('.')
285
            parent_task_name = task_name[:dot_pos]
286
            task_name = task_name[dot_pos + 1:]
287
288
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
289
290
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
291
                               if (getattr(wf_ex, 'task_execution_id', None) and
292
                                   wf_ex.task_execution_id in parent_task_ids)]
293
294
            tasks = {}
295
296
            for sub_wf_ex_id in workflow_ex_ids:
297
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
298
299
            return tasks
300
301
        # pylint: disable=no-member
302
        tasks = {
303
            full_task_name: task.to_dict()
304
            for task in task_exs
305
            if task.name == task_name and task.state == 'ERROR'
306
        }
307
308
        return tasks
309
310
    def resume(self, ex_ref, task_specs):
311
        mistral_ctx = ex_ref.context.get('mistral', dict())
312
313
        if not mistral_ctx.get('execution_id'):
314
            raise Exception('Unable to rerun because mistral execution_id is missing.')
315
316
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
317
318
        # pylint: disable=no-member
319
        if execution.state not in ['ERROR']:
320
            raise Exception('Workflow execution is not in a rerunable state.')
321
322
        executions = self._client.executions.list()
323
324
        tasks = {}
325
326
        for task_name, task_spec in six.iteritems(task_specs):
327
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
328
329
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
330
        if missing_tasks:
331
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
332
                            'rerunable tasks: %s. Please make sure that the task name is correct '
333
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
334
335
        # Construct additional options for the workflow execution
336
        options = self._construct_workflow_execution_options()
337
338
        for task_name, task_obj in six.iteritems(tasks):
339
            # pylint: disable=unexpected-keyword-arg
340
            self._client.tasks.rerun(
341
                task_obj['id'],
342
                reset=task_specs[task_name].get('reset', False),
343
                env=options.get('env', None)
344
            )
345
346
        status = action_constants.LIVEACTION_STATUS_RUNNING
347
        partial_results = {'tasks': []}
348
349
        # pylint: disable=no-member
350
        current_context = {
351
            'execution_id': str(execution.id),
352
            'workflow_name': execution.workflow_name
353
        }
354
355
        exec_context = self.context
356
        exec_context = self._build_mistral_context(exec_context, current_context)
357
        LOG.info('Mistral query context is %s' % exec_context)
358
359
        return (status, partial_results, exec_context)
360
361
    @retrying.retry(
362
        retry_on_exception=utils.retry_on_exceptions,
363
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
364
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
365
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
366
    def cancel(self):
367
        mistral_ctx = self.context.get('mistral', dict())
368
369
        if not mistral_ctx.get('execution_id'):
370
            raise Exception('Unable to cancel because mistral execution_id is missing.')
371
372
        # Cancels the main workflow execution. Any non-workflow tasks that are still
373
        # running will be allowed to complete gracefully.
374
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
375
376
        # Identify the list of action executions that are workflows and still running.
377
        for child_exec_id in self.execution.children:
378
            child_exec = ActionExecution.get(id=child_exec_id)
379
            if (child_exec.runner['name'] == self.runner_type_db.name and
380
                    child_exec.status in action_constants.LIVEACTION_CANCELABLE_STATES):
381
                action_service.request_cancellation(
382
                    LiveAction.get(id=child_exec.liveaction['id']),
383
                    self.context.get('user', None)
384
                )
385
386
    @staticmethod
387
    def _build_mistral_context(parent, current):
388
        """
389
        Mistral workflow might be kicked off in st2 by a parent Mistral
390
        workflow. In that case, we need to make sure that the existing
391
        mistral 'context' is moved as 'parent' and the child workflow
392
        'context' is added.
393
        """
394
        parent = copy.deepcopy(parent)
395
        context = dict()
396
397
        if not parent:
398
            context['mistral'] = current
399
        else:
400
            if 'mistral' in parent.keys():
401
                orig_parent_context = parent.get('mistral', dict())
402
                actual_parent = dict()
403
                if 'workflow_name' in orig_parent_context.keys():
404
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
405
                    del orig_parent_context['workflow_name']
406
                if 'workflow_execution_id' in orig_parent_context.keys():
407
                    actual_parent['workflow_execution_id'] = \
408
                        orig_parent_context['workflow_execution_id']
409
                    del orig_parent_context['workflow_execution_id']
410
                context['mistral'] = orig_parent_context
411
                context['mistral'].update(current)
412
                context['mistral']['parent'] = actual_parent
413
            else:
414
                context['mistral'] = current
415
416
        return context
417