Completed
Push — master ( 6b8670...b84371 )
by W
32:52 queued 32:52
created

st2actions.runners.mistral.MistralRunner.cancel()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 14
rs 9.4285
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2actions.runners import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url)
57
58
    def pre_run(self):
59
        if getattr(self, 'liveaction', None):
60
            self._notify = getattr(self.liveaction, 'notify', None)
61
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
62
63
    @staticmethod
64
    def _check_name(action_ref, is_workbook, def_dict):
65
        # If workbook, change the value of the "name" key.
66
        if is_workbook:
67
            if def_dict.get('name') != action_ref:
68
                raise Exception('Name of the workbook must be the same as the '
69
                                'fully qualified action name "%s".' % action_ref)
70
        # If workflow, change the key name of the workflow.
71
        else:
72
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
73
            if workflow_name != action_ref:
74
                raise Exception('Name of the workflow must be the same as the '
75
                                'fully qualified action name "%s".' % action_ref)
76
77
    def _save_workbook(self, name, def_yaml):
78
        # If the workbook is not found, the mistral client throws a generic API exception.
79
        try:
80
            # Update existing workbook.
81
            wb = self._client.workbooks.get(name)
82
        except:
83
            # Delete if definition was previously a workflow.
84
            # If not found, an API exception is thrown.
85
            try:
86
                self._client.workflows.delete(name)
87
            except:
88
                pass
89
90
            # Create the new workbook.
91
            wb = self._client.workbooks.create(def_yaml)
92
93
        # Update the workbook definition.
94
        # pylint: disable=no-member
95
        if wb.definition != def_yaml:
96
            self._client.workbooks.update(def_yaml)
97
98
    def _save_workflow(self, name, def_yaml):
99
        # If the workflow is not found, the mistral client throws a generic API exception.
100
        try:
101
            # Update existing workbook.
102
            wf = self._client.workflows.get(name)
103
        except:
104
            # Delete if definition was previously a workbook.
105
            # If not found, an API exception is thrown.
106
            try:
107
                self._client.workbooks.delete(name)
108
            except:
109
                pass
110
111
            # Create the new workflow.
112
            wf = self._client.workflows.create(def_yaml)[0]
113
114
        # Update the workflow definition.
115
        # pylint: disable=no-member
116
        if wf.definition != def_yaml:
117
            self._client.workflows.update(def_yaml)
118
119
    def _find_default_workflow(self, def_dict):
120
        num_workflows = len(def_dict['workflows'].keys())
121
122
        if num_workflows > 1:
123
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
124
            if not fully_qualified_wf_name:
125
                raise ValueError('Workbook definition is detected. '
126
                                 'Default workflow cannot be determined.')
127
128
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
129
            if wf_name not in def_dict['workflows']:
130
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
131
                                 % fully_qualified_wf_name)
132
133
            return fully_qualified_wf_name
134
        elif num_workflows == 1:
135
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
136
        else:
137
            raise Exception('There are no workflows in the workbook.')
138
139
    def _construct_workflow_execution_options(self):
140
        # This URL is used by Mistral to talk back to the API
141
        api_url = get_mistral_api_url()
142
        endpoint = api_url + '/actionexecutions'
143
144
        # This URL is available in the context and can be used by the users inside a workflow,
145
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
146
        public_api_url = get_full_public_api_url()
147
148
        # Build context with additional information
149
        parent_context = {
150
            'execution_id': self.execution_id
151
        }
152
        if getattr(self.liveaction, 'context', None):
153
            parent_context.update(self.liveaction.context)
154
155
        st2_execution_context = {
156
            'endpoint': endpoint,
157
            'parent': parent_context,
158
            'notify': {},
159
            'skip_notify_tasks': self._skip_notify_tasks
160
        }
161
162
        # Include notification information
163
        if self._notify:
164
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
165
            st2_execution_context['notify'] = notify_dict
166
167
        if self.auth_token:
168
            st2_execution_context['auth_token'] = self.auth_token.token
169
170
        options = {
171
            'env': {
172
                'st2_execution_id': self.execution_id,
173
                'st2_liveaction_id': self.liveaction_id,
174
                'st2_action_api_url': public_api_url,
175
                '__actions': {
176
                    'st2.action': {
177
                        'st2_context': st2_execution_context
178
                    }
179
                }
180
            }
181
        }
182
183
        return options
184
185
    def _get_resume_options(self):
186
        return self.context.get('re-run', {})
187
188
    @retrying.retry(
189
        retry_on_exception=utils.retry_on_exceptions,
190
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
191
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
192
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
193
    def run(self, action_parameters):
194
        resume_options = self._get_resume_options()
195
        tasks = resume_options.get('tasks', [])
196
        resume = self.rerun_ex_ref and tasks
197
198
        if resume:
199
            result = self.resume(ex_ref=self.rerun_ex_ref, task_names=tasks)
200
        else:
201
            result = self.start(action_parameters=action_parameters)
202
203
        return result
204
205
    def start(self, action_parameters):
206
        # Test connection
207
        self._client.workflows.list()
208
209
        # Setup inputs for the workflow execution.
210
        inputs = self.runner_parameters.get('context', dict())
211
        inputs.update(action_parameters)
212
213
        # Get workbook/workflow definition from file.
214
        with open(self.entry_point, 'r') as def_file:
215
            def_yaml = def_file.read()
216
217
        def_dict = yaml.safe_load(def_yaml)
218
        is_workbook = ('workflows' in def_dict)
219
220
        if not is_workbook:
221
            # Non-workbook definition containing multiple workflows is not supported.
222
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
223
                raise Exception('Workflow (not workbook) definition is detected. '
224
                                'Multiple workflows is not supported.')
225
226
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
227
        self._check_name(action_ref, is_workbook, def_dict)
228
        def_dict_xformed = utils.transform_definition(def_dict)
229
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
230
231
        # Construct additional options for the workflow execution
232
        options = self._construct_workflow_execution_options()
233
234
        # Save workbook/workflow definition.
235
        if is_workbook:
236
            self._save_workbook(action_ref, def_yaml_xformed)
237
            default_workflow = self._find_default_workflow(def_dict_xformed)
238
            execution = self._client.executions.create(default_workflow,
239
                                                       workflow_input=inputs,
240
                                                       **options)
241
        else:
242
            self._save_workflow(action_ref, def_yaml_xformed)
243
            execution = self._client.executions.create(action_ref,
244
                                                       workflow_input=inputs,
245
                                                       **options)
246
247
        status = LIVEACTION_STATUS_RUNNING
248
        partial_results = {'tasks': []}
249
250
        # pylint: disable=no-member
251
        current_context = {
252
            'execution_id': str(execution.id),
253
            'workflow_name': execution.workflow_name
254
        }
255
256
        exec_context = self.context
257
        exec_context = self._build_mistral_context(exec_context, current_context)
258
        LOG.info('Mistral query context is %s' % exec_context)
259
260
        return (status, partial_results, exec_context)
261
262
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
263
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
264
265
        if '.' in task_name:
266
            dot_pos = task_name.index('.')
267
            parent_task_name = task_name[:dot_pos]
268
            task_name = task_name[dot_pos + 1:]
269
270
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
271
272
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
273
                               if (getattr(wf_ex, 'task_execution_id', None) and
274
                                   wf_ex.task_execution_id in parent_task_ids)]
275
276
            tasks = {}
277
278
            for sub_wf_ex_id in workflow_ex_ids:
279
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
280
281
            return tasks
282
283
        # pylint: disable=no-member
284
        tasks = {
285
            full_task_name: task.to_dict()
286
            for task in task_exs
287
            if task.name == task_name and task.state == 'ERROR'
288
        }
289
290
        return tasks
291
292
    def resume(self, ex_ref, task_names):
293
        mistral_ctx = ex_ref.context.get('mistral', dict())
294
295
        if not mistral_ctx.get('execution_id'):
296
            raise Exception('Unable to rerun because mistral execution_id is missing.')
297
298
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
299
300
        # pylint: disable=no-member
301
        if execution.state not in ['ERROR']:
302
            raise Exception('Workflow execution is not in a rerunable state.')
303
304
        executions = self._client.executions.list()
305
306
        tasks = {}
307
308
        for task_name in task_names:
309
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
310
311
        missing_tasks = list(set(task_names) - set(tasks.keys()))
312
        if missing_tasks:
313
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
314
                            'rerunable tasks: %s. Please make sure that the task name is correct '
315
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
316
317
        # Construct additional options for the workflow execution
318
        options = self._construct_workflow_execution_options()
319
320
        for task in tasks.values():
321
            # pylint: disable=unexpected-keyword-arg
322
            self._client.tasks.rerun(task['id'], env=options.get('env', None))
323
324
        status = LIVEACTION_STATUS_RUNNING
325
        partial_results = {'tasks': []}
326
327
        # pylint: disable=no-member
328
        current_context = {
329
            'execution_id': str(execution.id),
330
            'workflow_name': execution.workflow_name
331
        }
332
333
        exec_context = self.context
334
        exec_context = self._build_mistral_context(exec_context, current_context)
335
        LOG.info('Mistral query context is %s' % exec_context)
336
337
        return (status, partial_results, exec_context)
338
339
    @retrying.retry(
340
        retry_on_exception=utils.retry_on_exceptions,
341
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
342
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
343
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
344
    def cancel(self):
345
        mistral_ctx = self.context.get('mistral', dict())
346
347
        if not mistral_ctx.get('execution_id'):
348
            raise Exception('Unable to cancel because mistral execution_id is missing.')
349
350
        # There is no cancellation state in Mistral. Pause the workflow so
351
        # actions that are still executing can gracefully reach completion.
352
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
353
354
    @staticmethod
355
    def _build_mistral_context(parent, current):
356
        """
357
        Mistral workflow might be kicked off in st2 by a parent Mistral
358
        workflow. In that case, we need to make sure that the existing
359
        mistral 'context' is moved as 'parent' and the child workflow
360
        'context' is added.
361
        """
362
        parent = copy.deepcopy(parent)
363
        context = dict()
364
365
        if not parent:
366
            context['mistral'] = current
367
        else:
368
            if 'mistral' in parent.keys():
369
                orig_parent_context = parent.get('mistral', dict())
370
                actual_parent = dict()
371
                if 'workflow_name' in orig_parent_context.keys():
372
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
373
                    del orig_parent_context['workflow_name']
374
                if 'workflow_execution_id' in orig_parent_context.keys():
375
                    actual_parent['workflow_execution_id'] = \
376
                        orig_parent_context['workflow_execution_id']
377
                    del orig_parent_context['workflow_execution_id']
378
                context['mistral'] = orig_parent_context
379
                context['mistral'].update(current)
380
                context['mistral']['parent'] = actual_parent
381
            else:
382
                context['mistral'] = current
383
384
        return context
385