Test Failed
Pull Request — master (#3163)
by W
05:02
created

MistralRunner._get_tasks()   F

Complexity

Conditions 11

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
dl 0
loc 29
rs 3.1764
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like MistralRunner._get_tasks() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
0 ignored issues
show
Unused Code introduced by
Unused client imported from mistralclient.api as mistral
Loading history...
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util import api as api_utils 
0 ignored issues
show
Coding Style introduced by
Trailing whitespace
Loading history...
30
from st2common.util import url as url_utils
31
from st2common.util.workflow import mistral as utils
32
33
34
LOG = logging.getLogger(__name__)
35
36
37
def get_runner():
38
    return MistralRunner(str(uuid.uuid4()))
39
40
41
class MistralRunner(AsyncActionRunner):
42
43
    def __init__(self, runner_id):
44
        super(MistralRunner, self).__init__(runner_id=runner_id)
45
        self._on_behalf_user = cfg.CONF.system_user.user
46
        self._notify = None
47
        self._skip_notify_tasks = []
48
        self._base_url = url_utils.get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
49
        self._client = None
50
51
    @staticmethod
52
    def get_workflow_definition(entry_point):
53
        with open(entry_point, 'r') as def_file:
54
            return def_file.read()
55
56
    def pre_run(self):
57
        super(MistralRunner, self).pre_run()
58
59
        if getattr(self, 'liveaction', None):
60
            self._notify = getattr(self.liveaction, 'notify', None)
61
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
62
63
    def _get_client(self, ctx=None):
64
        auth_token = (
65
            ctx['auth_token']
66
            if ctx and 'auth_token' in ctx and ctx.get('auth_token')
67
            else getattr(self.auth_token, 'token', None)
68
        )
69
70
        return utils.get_client(self._base_url, auth_token=auth_token)
71
72
    @staticmethod
73
    def _check_name(action_ref, is_workbook, def_dict):
74
        # If workbook, change the value of the "name" key.
75
        if is_workbook:
76
            if def_dict.get('name') != action_ref:
77
                raise Exception('Name of the workbook must be the same as the '
78
                                'fully qualified action name "%s".' % action_ref)
79
        # If workflow, change the key name of the workflow.
80
        else:
81
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
82
            if workflow_name != action_ref:
83
                raise Exception('Name of the workflow must be the same as the '
84
                                'fully qualified action name "%s".' % action_ref)
85
86
    def _save_workbook(self, name, def_yaml):
87
        # If the workbook is not found, the mistral client throws a generic API exception.
88
        try:
89
            # Update existing workbook.
90
            wb = self._client.workbooks.get(name)
91
        except:
92
            # Delete if definition was previously a workflow.
93
            # If not found, an API exception is thrown.
94
            try:
95
                self._client.workflows.delete(name)
96
            except:
97
                pass
98
99
            # Create the new workbook.
100
            wb = self._client.workbooks.create(def_yaml)
101
102
        # Update the workbook definition.
103
        # pylint: disable=no-member
104
        if wb.definition != def_yaml:
105
            self._client.workbooks.update(def_yaml)
106
107
    def _save_workflow(self, name, def_yaml):
108
        # If the workflow is not found, the mistral client throws a generic API exception.
109
        try:
110
            # Update existing workbook.
111
            wf = self._client.workflows.get(name)
112
        except:
113
            # Delete if definition was previously a workbook.
114
            # If not found, an API exception is thrown.
115
            try:
116
                self._client.workbooks.delete(name)
117
            except:
118
                pass
119
120
            # Create the new workflow.
121
            wf = self._client.workflows.create(def_yaml)[0]
122
123
        # Update the workflow definition.
124
        # pylint: disable=no-member
125
        if wf.definition != def_yaml:
126
            self._client.workflows.update(def_yaml)
127
128
    def _find_default_workflow(self, def_dict):
129
        num_workflows = len(def_dict['workflows'].keys())
130
131
        if num_workflows > 1:
132
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
133
            if not fully_qualified_wf_name:
134
                raise ValueError('Workbook definition is detected. '
135
                                 'Default workflow cannot be determined.')
136
137
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
138
            if wf_name not in def_dict['workflows']:
139
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
140
                                 % fully_qualified_wf_name)
141
142
            return fully_qualified_wf_name
143
        elif num_workflows == 1:
144
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
145
        else:
146
            raise Exception('There are no workflows in the workbook.')
147
148
    def _construct_workflow_execution_options(self):
149
        # This URL is used by Mistral to talk back to the API
150
        api_url = api_utils.get_mistral_api_url()
151
        endpoint = api_url + '/actionexecutions'
152
153
        # This URL is available in the context and can be used by the users inside a workflow,
154
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
155
        public_api_url = api_utils.get_full_public_api_url()
156
157
        # Build context with additional information
158
        parent_context = {
159
            'execution_id': self.execution_id
160
        }
161
        if getattr(self.liveaction, 'context', None):
162
            parent_context.update(self.liveaction.context)
163
164
        st2_execution_context = {
165
            'api_url': api_url,
166
            'endpoint': endpoint,
167
            'parent': parent_context,
168
            'notify': {},
169
            'skip_notify_tasks': self._skip_notify_tasks
170
        }
171
172
        # Include notification information
173
        if self._notify:
174
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
175
            st2_execution_context['notify'] = notify_dict
176
177
        if self.auth_token:
178
            st2_execution_context['auth_token'] = self.auth_token.token
179
180
        options = {
181
            'env': {
182
                'st2_execution_id': self.execution_id,
183
                'st2_liveaction_id': self.liveaction_id,
184
                'st2_action_api_url': public_api_url,
185
                '__actions': {
186
                    'st2.action': {
187
                        'st2_context': st2_execution_context
188
                    }
189
                }
190
            }
191
        }
192
193
        return options
194
195
    def _get_resume_options(self):
196
        return self.context.get('re-run', {})
197
198
    @retrying.retry(
199
        retry_on_exception=utils.retry_on_exceptions,
200
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
201
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
202
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
203
    def run(self, action_parameters):
204
        resume_options = self._get_resume_options()
205
206
        tasks_to_reset = resume_options.get('reset', [])
207
208
        task_specs = {
209
            task_name: {'reset': task_name in tasks_to_reset}
210
            for task_name in resume_options.get('tasks', [])
211
        }
212
213
        resume = self.rerun_ex_ref and task_specs
214
215
        if resume:
216
            result = self.resume(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
217
        else:
218
            result = self.start(action_parameters=action_parameters)
219
220
        return result
221
222
    def start(self, action_parameters):
223
        if not self._client:
224
            self._client = self._get_client()
225
226
        # Test connection
227
        self._client.workflows.list()
228
229
        # Setup inputs for the workflow execution.
230
        inputs = self.runner_parameters.get('context', dict())
231
        inputs.update(action_parameters)
232
233
        # Get workbook/workflow definition from file.
234
        def_yaml = self.get_workflow_definition(self.entry_point)
235
        def_dict = yaml.safe_load(def_yaml)
236
        is_workbook = ('workflows' in def_dict)
237
238
        if not is_workbook:
239
            # Non-workbook definition containing multiple workflows is not supported.
240
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
241
                raise Exception('Workflow (not workbook) definition is detected. '
242
                                'Multiple workflows is not supported.')
243
244
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
245
        self._check_name(action_ref, is_workbook, def_dict)
246
        def_dict_xformed = utils.transform_definition(def_dict)
247
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
248
249
        # Construct additional options for the workflow execution
250
        options = self._construct_workflow_execution_options()
251
252
        # Save workbook/workflow definition.
253
        if is_workbook:
254
            self._save_workbook(action_ref, def_yaml_xformed)
255
            default_workflow = self._find_default_workflow(def_dict_xformed)
256
            execution = self._client.executions.create(default_workflow,
257
                                                       workflow_input=inputs,
258
                                                       **options)
259
        else:
260
            self._save_workflow(action_ref, def_yaml_xformed)
261
            execution = self._client.executions.create(action_ref,
262
                                                       workflow_input=inputs,
263
                                                       **options)
264
265
        status = LIVEACTION_STATUS_RUNNING
266
        partial_results = {'tasks': []}
267
268
        # pylint: disable=no-member
269
        current_context = {
270
            'execution_id': str(execution.id),
271
            'workflow_name': execution.workflow_name
272
        }
273
274
        exec_context = self.context
275
        exec_context = self._build_mistral_context(exec_context, current_context)
276
        LOG.info('Mistral query context is %s' % exec_context)
277
278
        return (status, partial_results, exec_context)
279
280
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
281
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
282
283
        if '.' in task_name:
284
            dot_pos = task_name.index('.')
285
            parent_task_name = task_name[:dot_pos]
286
            task_name = task_name[dot_pos + 1:]
287
288
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
289
290
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
291
                               if (getattr(wf_ex, 'task_execution_id', None) and
292
                                   wf_ex.task_execution_id in parent_task_ids)]
293
294
            tasks = {}
295
296
            for sub_wf_ex_id in workflow_ex_ids:
297
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
298
299
            return tasks
300
301
        # pylint: disable=no-member
302
        tasks = {
303
            full_task_name: task.to_dict()
304
            for task in task_exs
305
            if task.name == task_name and task.state == 'ERROR'
306
        }
307
308
        return tasks
309
310
    def resume(self, ex_ref, task_specs):
311
        mistral_ctx = ex_ref.context.get('mistral', dict())
312
313
        if not mistral_ctx.get('execution_id'):
314
            raise Exception('Unable to rerun because mistral execution_id is missing.')
315
316
        if not self._client:
317
            self._client = self._get_client(mistral_ctx)
318
319
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
320
321
        # pylint: disable=no-member
322
        if execution.state not in ['ERROR']:
323
            raise Exception('Workflow execution is not in a rerunable state.')
324
325
        executions = self._client.executions.list()
326
327
        tasks = {}
328
329
        for task_name, task_spec in six.iteritems(task_specs):
330
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
331
332
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
333
        if missing_tasks:
334
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
335
                            'rerunable tasks: %s. Please make sure that the task name is correct '
336
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
337
338
        # Construct additional options for the workflow execution
339
        options = self._construct_workflow_execution_options()
340
341
        for task_name, task_obj in six.iteritems(tasks):
342
            # pylint: disable=unexpected-keyword-arg
343
            self._client.tasks.rerun(
344
                task_obj['id'],
345
                reset=task_specs[task_name].get('reset', False),
346
                env=options.get('env', None)
347
            )
348
349
        status = LIVEACTION_STATUS_RUNNING
350
        partial_results = {'tasks': []}
351
352
        # pylint: disable=no-member
353
        current_context = {
354
            'execution_id': str(execution.id),
355
            'workflow_name': execution.workflow_name
356
        }
357
358
        exec_context = self.context
359
        exec_context = self._build_mistral_context(exec_context, current_context)
360
        LOG.info('Mistral query context is %s' % exec_context)
361
362
        return (status, partial_results, exec_context)
363
364
    @retrying.retry(
365
        retry_on_exception=utils.retry_on_exceptions,
366
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
367
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
368
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
369
    def cancel(self):
370
        mistral_ctx = self.context.get('mistral', dict())
371
372
        if not mistral_ctx.get('execution_id'):
373
            raise Exception('Unable to cancel because mistral execution_id is missing.')
374
375
        if not self._client:
376
            self._client = self._get_client(mistral_ctx)
377
378
        # There is no cancellation state in Mistral. Pause the workflow so
379
        # actions that are still executing can gracefully reach completion.
380
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
381
382
    def _build_mistral_context(self, parent, current):
383
        """
384
        Mistral workflow might be kicked off in st2 by a parent Mistral
385
        workflow. In that case, we need to make sure that the existing
386
        mistral 'context' is moved as 'parent' and the child workflow
387
        'context' is added.
388
        """
389
        parent = copy.deepcopy(parent)
390
        context = dict()
391
392
        if not parent:
393
            context['mistral'] = current
394
        else:
395
            if 'mistral' in parent.keys():
396
                orig_parent_context = parent.get('mistral', dict())
397
                actual_parent = dict()
398
                if 'workflow_name' in orig_parent_context.keys():
399
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
400
                    del orig_parent_context['workflow_name']
401
                if 'workflow_execution_id' in orig_parent_context.keys():
402
                    actual_parent['workflow_execution_id'] = \
403
                        orig_parent_context['workflow_execution_id']
404
                    del orig_parent_context['workflow_execution_id']
405
                context['mistral'] = orig_parent_context
406
                context['mistral'].update(current)
407
                context['mistral']['parent'] = actual_parent
408
            else:
409
                context['mistral'] = current
410
411
        context['mistral']['auth_token'] = getattr(self.auth_token, 'token', None)
412
413
        return context
414