Passed
Pull Request — master (#3163)
by W
05:12
created

MistralRunner   F

Complexity

Total Complexity 67

Size/Duplication

Total Lines 373
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 373
rs 3.0612
c 0
b 0
f 0
wmc 67

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 7 1
C resume() 0 53 7
A _save_workflow() 0 20 4
A _get_resume_options() 0 2 1
A cancel() 0 17 3
F _get_tasks() 0 29 11
A pre_run() 0 6 2
A run() 0 23 3
B _check_name() 0 13 6
B _construct_workflow_execution_options() 0 46 4
A get_workflow_definition() 0 4 2
B _build_mistral_context() 0 32 5
C start() 0 57 7
A _save_workbook() 0 20 4
A _get_client() 0 8 2
B _find_default_workflow() 0 19 5

How to fix   Complexity   

Complex Class

Complex classes like MistralRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from oslo_config import cfg
23
24
from st2common.runners.base import AsyncActionRunner
25
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
26
from st2common import log as logging
27
from st2common.models.api.notification import NotificationsHelper
28
from st2common.util import api as api_utils
29
from st2common.util import url as url_utils
30
from st2common.util.workflow import mistral as utils
31
32
33
LOG = logging.getLogger(__name__)
34
35
36
def get_runner():
37
    return MistralRunner(str(uuid.uuid4()))
38
39
40
class MistralRunner(AsyncActionRunner):
41
42
    def __init__(self, runner_id):
43
        super(MistralRunner, self).__init__(runner_id=runner_id)
44
        self._on_behalf_user = cfg.CONF.system_user.user
45
        self._notify = None
46
        self._skip_notify_tasks = []
47
        self._base_url = url_utils.get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
48
        self._client = None
49
50
    @staticmethod
51
    def get_workflow_definition(entry_point):
52
        with open(entry_point, 'r') as def_file:
53
            return def_file.read()
54
55
    def pre_run(self):
56
        super(MistralRunner, self).pre_run()
57
58
        if getattr(self, 'liveaction', None):
59
            self._notify = getattr(self.liveaction, 'notify', None)
60
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
61
62
    def _get_client(self, ctx=None):
63
        auth_token = (
64
            ctx['auth_token']
65
            if ctx and 'auth_token' in ctx and ctx.get('auth_token')
66
            else getattr(self.auth_token, 'token', None)
67
        )
68
69
        return utils.get_client(self._base_url, auth_token=auth_token)
70
71
    @staticmethod
72
    def _check_name(action_ref, is_workbook, def_dict):
73
        # If workbook, change the value of the "name" key.
74
        if is_workbook:
75
            if def_dict.get('name') != action_ref:
76
                raise Exception('Name of the workbook must be the same as the '
77
                                'fully qualified action name "%s".' % action_ref)
78
        # If workflow, change the key name of the workflow.
79
        else:
80
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
81
            if workflow_name != action_ref:
82
                raise Exception('Name of the workflow must be the same as the '
83
                                'fully qualified action name "%s".' % action_ref)
84
85
    def _save_workbook(self, name, def_yaml):
86
        # If the workbook is not found, the mistral client throws a generic API exception.
87
        try:
88
            # Update existing workbook.
89
            wb = self._client.workbooks.get(name)
90
        except:
91
            # Delete if definition was previously a workflow.
92
            # If not found, an API exception is thrown.
93
            try:
94
                self._client.workflows.delete(name)
95
            except:
96
                pass
97
98
            # Create the new workbook.
99
            wb = self._client.workbooks.create(def_yaml)
100
101
        # Update the workbook definition.
102
        # pylint: disable=no-member
103
        if wb.definition != def_yaml:
104
            self._client.workbooks.update(def_yaml)
105
106
    def _save_workflow(self, name, def_yaml):
107
        # If the workflow is not found, the mistral client throws a generic API exception.
108
        try:
109
            # Update existing workbook.
110
            wf = self._client.workflows.get(name)
111
        except:
112
            # Delete if definition was previously a workbook.
113
            # If not found, an API exception is thrown.
114
            try:
115
                self._client.workbooks.delete(name)
116
            except:
117
                pass
118
119
            # Create the new workflow.
120
            wf = self._client.workflows.create(def_yaml)[0]
121
122
        # Update the workflow definition.
123
        # pylint: disable=no-member
124
        if wf.definition != def_yaml:
125
            self._client.workflows.update(def_yaml)
126
127
    def _find_default_workflow(self, def_dict):
128
        num_workflows = len(def_dict['workflows'].keys())
129
130
        if num_workflows > 1:
131
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
132
            if not fully_qualified_wf_name:
133
                raise ValueError('Workbook definition is detected. '
134
                                 'Default workflow cannot be determined.')
135
136
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
137
            if wf_name not in def_dict['workflows']:
138
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
139
                                 % fully_qualified_wf_name)
140
141
            return fully_qualified_wf_name
142
        elif num_workflows == 1:
143
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
144
        else:
145
            raise Exception('There are no workflows in the workbook.')
146
147
    def _construct_workflow_execution_options(self):
148
        # This URL is used by Mistral to talk back to the API
149
        api_url = api_utils.get_mistral_api_url()
150
        endpoint = api_url + '/actionexecutions'
151
152
        # This URL is available in the context and can be used by the users inside a workflow,
153
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
154
        public_api_url = api_utils.get_full_public_api_url()
155
156
        # Build context with additional information
157
        parent_context = {
158
            'execution_id': self.execution_id
159
        }
160
        if getattr(self.liveaction, 'context', None):
161
            parent_context.update(self.liveaction.context)
162
163
        st2_execution_context = {
164
            'api_url': api_url,
165
            'endpoint': endpoint,
166
            'parent': parent_context,
167
            'notify': {},
168
            'skip_notify_tasks': self._skip_notify_tasks
169
        }
170
171
        # Include notification information
172
        if self._notify:
173
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
174
            st2_execution_context['notify'] = notify_dict
175
176
        if self.auth_token:
177
            st2_execution_context['auth_token'] = self.auth_token.token
178
179
        options = {
180
            'env': {
181
                'st2_execution_id': self.execution_id,
182
                'st2_liveaction_id': self.liveaction_id,
183
                'st2_action_api_url': public_api_url,
184
                '__actions': {
185
                    'st2.action': {
186
                        'st2_context': st2_execution_context
187
                    }
188
                }
189
            }
190
        }
191
192
        return options
193
194
    def _get_resume_options(self):
195
        return self.context.get('re-run', {})
196
197
    @retrying.retry(
198
        retry_on_exception=utils.retry_on_exceptions,
199
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
200
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
201
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
202
    def run(self, action_parameters):
203
        resume_options = self._get_resume_options()
204
205
        tasks_to_reset = resume_options.get('reset', [])
206
207
        task_specs = {
208
            task_name: {'reset': task_name in tasks_to_reset}
209
            for task_name in resume_options.get('tasks', [])
210
        }
211
212
        resume = self.rerun_ex_ref and task_specs
213
214
        if resume:
215
            result = self.resume(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
216
        else:
217
            result = self.start(action_parameters=action_parameters)
218
219
        return result
220
221
    def start(self, action_parameters):
222
        if not self._client:
223
            self._client = self._get_client()
224
225
        # Test connection
226
        self._client.workflows.list()
227
228
        # Setup inputs for the workflow execution.
229
        inputs = self.runner_parameters.get('context', dict())
230
        inputs.update(action_parameters)
231
232
        # Get workbook/workflow definition from file.
233
        def_yaml = self.get_workflow_definition(self.entry_point)
234
        def_dict = yaml.safe_load(def_yaml)
235
        is_workbook = ('workflows' in def_dict)
236
237
        if not is_workbook:
238
            # Non-workbook definition containing multiple workflows is not supported.
239
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
240
                raise Exception('Workflow (not workbook) definition is detected. '
241
                                'Multiple workflows is not supported.')
242
243
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
244
        self._check_name(action_ref, is_workbook, def_dict)
245
        def_dict_xformed = utils.transform_definition(def_dict)
246
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
247
248
        # Construct additional options for the workflow execution
249
        options = self._construct_workflow_execution_options()
250
251
        # Save workbook/workflow definition.
252
        if is_workbook:
253
            self._save_workbook(action_ref, def_yaml_xformed)
254
            default_workflow = self._find_default_workflow(def_dict_xformed)
255
            execution = self._client.executions.create(default_workflow,
256
                                                       workflow_input=inputs,
257
                                                       **options)
258
        else:
259
            self._save_workflow(action_ref, def_yaml_xformed)
260
            execution = self._client.executions.create(action_ref,
261
                                                       workflow_input=inputs,
262
                                                       **options)
263
264
        status = LIVEACTION_STATUS_RUNNING
265
        partial_results = {'tasks': []}
266
267
        # pylint: disable=no-member
268
        current_context = {
269
            'execution_id': str(execution.id),
270
            'workflow_name': execution.workflow_name
271
        }
272
273
        exec_context = self.context
274
        exec_context = self._build_mistral_context(exec_context, current_context)
275
        LOG.info('Mistral query context is %s' % exec_context)
276
277
        return (status, partial_results, exec_context)
278
279
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
280
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
281
282
        if '.' in task_name:
283
            dot_pos = task_name.index('.')
284
            parent_task_name = task_name[:dot_pos]
285
            task_name = task_name[dot_pos + 1:]
286
287
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
288
289
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
290
                               if (getattr(wf_ex, 'task_execution_id', None) and
291
                                   wf_ex.task_execution_id in parent_task_ids)]
292
293
            tasks = {}
294
295
            for sub_wf_ex_id in workflow_ex_ids:
296
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
297
298
            return tasks
299
300
        # pylint: disable=no-member
301
        tasks = {
302
            full_task_name: task.to_dict()
303
            for task in task_exs
304
            if task.name == task_name and task.state == 'ERROR'
305
        }
306
307
        return tasks
308
309
    def resume(self, ex_ref, task_specs):
310
        mistral_ctx = ex_ref.context.get('mistral', dict())
311
312
        if not mistral_ctx.get('execution_id'):
313
            raise Exception('Unable to rerun because mistral execution_id is missing.')
314
315
        if not self._client:
316
            self._client = self._get_client(mistral_ctx)
317
318
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
319
320
        # pylint: disable=no-member
321
        if execution.state not in ['ERROR']:
322
            raise Exception('Workflow execution is not in a rerunable state.')
323
324
        executions = self._client.executions.list()
325
326
        tasks = {}
327
328
        for task_name, task_spec in six.iteritems(task_specs):
329
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
330
331
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
332
        if missing_tasks:
333
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
334
                            'rerunable tasks: %s. Please make sure that the task name is correct '
335
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
336
337
        # Construct additional options for the workflow execution
338
        options = self._construct_workflow_execution_options()
339
340
        for task_name, task_obj in six.iteritems(tasks):
341
            # pylint: disable=unexpected-keyword-arg
342
            self._client.tasks.rerun(
343
                task_obj['id'],
344
                reset=task_specs[task_name].get('reset', False),
345
                env=options.get('env', None)
346
            )
347
348
        status = LIVEACTION_STATUS_RUNNING
349
        partial_results = {'tasks': []}
350
351
        # pylint: disable=no-member
352
        current_context = {
353
            'execution_id': str(execution.id),
354
            'workflow_name': execution.workflow_name
355
        }
356
357
        exec_context = self.context
358
        exec_context = self._build_mistral_context(exec_context, current_context)
359
        LOG.info('Mistral query context is %s' % exec_context)
360
361
        return (status, partial_results, exec_context)
362
363
    @retrying.retry(
364
        retry_on_exception=utils.retry_on_exceptions,
365
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
366
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
367
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
368
    def cancel(self):
369
        mistral_ctx = self.context.get('mistral', dict())
370
371
        if not mistral_ctx.get('execution_id'):
372
            raise Exception('Unable to cancel because mistral execution_id is missing.')
373
374
        if not self._client:
375
            self._client = self._get_client(mistral_ctx)
376
377
        # There is no cancellation state in Mistral. Pause the workflow so
378
        # actions that are still executing can gracefully reach completion.
379
        self._client.executions.update(mistral_ctx.get('execution_id'), 'CANCELLED')
380
381
    def _build_mistral_context(self, parent, current):
382
        """
383
        Mistral workflow might be kicked off in st2 by a parent Mistral
384
        workflow. In that case, we need to make sure that the existing
385
        mistral 'context' is moved as 'parent' and the child workflow
386
        'context' is added.
387
        """
388
        parent = copy.deepcopy(parent)
389
        context = dict()
390
391
        if not parent:
392
            context['mistral'] = current
393
        else:
394
            if 'mistral' in parent.keys():
395
                orig_parent_context = parent.get('mistral', dict())
396
                actual_parent = dict()
397
                if 'workflow_name' in orig_parent_context.keys():
398
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
399
                    del orig_parent_context['workflow_name']
400
                if 'workflow_execution_id' in orig_parent_context.keys():
401
                    actual_parent['workflow_execution_id'] = \
402
                        orig_parent_context['workflow_execution_id']
403
                    del orig_parent_context['workflow_execution_id']
404
                context['mistral'] = orig_parent_context
405
                context['mistral'].update(current)
406
                context['mistral']['parent'] = actual_parent
407
            else:
408
                context['mistral'] = current
409
410
        context['mistral']['auth_token'] = getattr(self.auth_token, 'token', None)
411
412
        return context
413