Completed
Pull Request — master (#3048)
by W
04:48
created

MistralRunner   F

Complexity

Total Complexity 62

Size/Duplication

Total Lines 362
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 362
rs 3.8461
wmc 62

15 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 13 1
B resume() 0 50 6
A _save_workflow() 0 20 4
A _get_resume_options() 0 2 1
A cancel() 0 14 2
F _get_tasks() 0 29 11
A pre_run() 0 6 2
A run() 0 23 3
B _check_name() 0 13 6
B _construct_workflow_execution_options() 0 46 4
A get_workflow_definition() 0 4 2
B _build_mistral_context() 0 31 5
B start() 0 54 6
A _save_workbook() 0 20 4
B _find_default_workflow() 0 19 5

How to fix   Complexity   

Complex Class

Complex classes like MistralRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2common.runners.base import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url,
57
            cacert=cfg.CONF.mistral.cacert,
58
            insecure=cfg.CONF.mistral.insecure)
59
60
    @staticmethod
61
    def get_workflow_definition(entry_point):
62
        with open(entry_point, 'r') as def_file:
63
            return def_file.read()
64
65
    def pre_run(self):
66
        super(MistralRunner, self).pre_run()
67
68
        if getattr(self, 'liveaction', None):
69
            self._notify = getattr(self.liveaction, 'notify', None)
70
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
71
72
    @staticmethod
73
    def _check_name(action_ref, is_workbook, def_dict):
74
        # If workbook, change the value of the "name" key.
75
        if is_workbook:
76
            if def_dict.get('name') != action_ref:
77
                raise Exception('Name of the workbook must be the same as the '
78
                                'fully qualified action name "%s".' % action_ref)
79
        # If workflow, change the key name of the workflow.
80
        else:
81
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
82
            if workflow_name != action_ref:
83
                raise Exception('Name of the workflow must be the same as the '
84
                                'fully qualified action name "%s".' % action_ref)
85
86
    def _save_workbook(self, name, def_yaml):
87
        # If the workbook is not found, the mistral client throws a generic API exception.
88
        try:
89
            # Update existing workbook.
90
            wb = self._client.workbooks.get(name)
91
        except:
92
            # Delete if definition was previously a workflow.
93
            # If not found, an API exception is thrown.
94
            try:
95
                self._client.workflows.delete(name)
96
            except:
97
                pass
98
99
            # Create the new workbook.
100
            wb = self._client.workbooks.create(def_yaml)
101
102
        # Update the workbook definition.
103
        # pylint: disable=no-member
104
        if wb.definition != def_yaml:
105
            self._client.workbooks.update(def_yaml)
106
107
    def _save_workflow(self, name, def_yaml):
108
        # If the workflow is not found, the mistral client throws a generic API exception.
109
        try:
110
            # Update existing workbook.
111
            wf = self._client.workflows.get(name)
112
        except:
113
            # Delete if definition was previously a workbook.
114
            # If not found, an API exception is thrown.
115
            try:
116
                self._client.workbooks.delete(name)
117
            except:
118
                pass
119
120
            # Create the new workflow.
121
            wf = self._client.workflows.create(def_yaml)[0]
122
123
        # Update the workflow definition.
124
        # pylint: disable=no-member
125
        if wf.definition != def_yaml:
126
            self._client.workflows.update(def_yaml)
127
128
    def _find_default_workflow(self, def_dict):
129
        num_workflows = len(def_dict['workflows'].keys())
130
131
        if num_workflows > 1:
132
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
133
            if not fully_qualified_wf_name:
134
                raise ValueError('Workbook definition is detected. '
135
                                 'Default workflow cannot be determined.')
136
137
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
138
            if wf_name not in def_dict['workflows']:
139
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
140
                                 % fully_qualified_wf_name)
141
142
            return fully_qualified_wf_name
143
        elif num_workflows == 1:
144
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
145
        else:
146
            raise Exception('There are no workflows in the workbook.')
147
148
    def _construct_workflow_execution_options(self):
149
        # This URL is used by Mistral to talk back to the API
150
        api_url = get_mistral_api_url()
151
        endpoint = api_url + '/actionexecutions'
152
153
        # This URL is available in the context and can be used by the users inside a workflow,
154
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
155
        public_api_url = get_full_public_api_url()
156
157
        # Build context with additional information
158
        parent_context = {
159
            'execution_id': self.execution_id
160
        }
161
        if getattr(self.liveaction, 'context', None):
162
            parent_context.update(self.liveaction.context)
163
164
        st2_execution_context = {
165
            'api_url': api_url,
166
            'endpoint': endpoint,
167
            'parent': parent_context,
168
            'notify': {},
169
            'skip_notify_tasks': self._skip_notify_tasks
170
        }
171
172
        # Include notification information
173
        if self._notify:
174
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
175
            st2_execution_context['notify'] = notify_dict
176
177
        if self.auth_token:
178
            st2_execution_context['auth_token'] = self.auth_token.token
179
180
        options = {
181
            'env': {
182
                'st2_execution_id': self.execution_id,
183
                'st2_liveaction_id': self.liveaction_id,
184
                'st2_action_api_url': public_api_url,
185
                '__actions': {
186
                    'st2.action': {
187
                        'st2_context': st2_execution_context
188
                    }
189
                }
190
            }
191
        }
192
193
        return options
194
195
    def _get_resume_options(self):
196
        return self.context.get('re-run', {})
197
198
    @retrying.retry(
199
        retry_on_exception=utils.retry_on_exceptions,
200
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
201
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
202
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
203
    def run(self, action_parameters):
204
        resume_options = self._get_resume_options()
205
206
        tasks_to_reset = resume_options.get('reset', [])
207
208
        task_specs = {
209
            task_name: {'reset': task_name in tasks_to_reset}
210
            for task_name in resume_options.get('tasks', [])
211
        }
212
213
        resume = self.rerun_ex_ref and task_specs
214
215
        if resume:
216
            result = self.resume(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
217
        else:
218
            result = self.start(action_parameters=action_parameters)
219
220
        return result
221
222
    def start(self, action_parameters):
223
        # Test connection
224
        self._client.workflows.list()
225
226
        # Setup inputs for the workflow execution.
227
        inputs = self.runner_parameters.get('context', dict())
228
        inputs.update(action_parameters)
229
230
        # Get workbook/workflow definition from file.
231
        def_yaml = self.get_workflow_definition(self.entry_point)
232
        def_dict = yaml.safe_load(def_yaml)
233
        is_workbook = ('workflows' in def_dict)
234
235
        if not is_workbook:
236
            # Non-workbook definition containing multiple workflows is not supported.
237
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
238
                raise Exception('Workflow (not workbook) definition is detected. '
239
                                'Multiple workflows is not supported.')
240
241
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
242
        self._check_name(action_ref, is_workbook, def_dict)
243
        def_dict_xformed = utils.transform_definition(def_dict)
244
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
245
246
        # Construct additional options for the workflow execution
247
        options = self._construct_workflow_execution_options()
248
249
        # Save workbook/workflow definition.
250
        if is_workbook:
251
            self._save_workbook(action_ref, def_yaml_xformed)
252
            default_workflow = self._find_default_workflow(def_dict_xformed)
253
            execution = self._client.executions.create(default_workflow,
254
                                                       workflow_input=inputs,
255
                                                       **options)
256
        else:
257
            self._save_workflow(action_ref, def_yaml_xformed)
258
            execution = self._client.executions.create(action_ref,
259
                                                       workflow_input=inputs,
260
                                                       **options)
261
262
        status = LIVEACTION_STATUS_RUNNING
263
        partial_results = {'tasks': []}
264
265
        # pylint: disable=no-member
266
        current_context = {
267
            'execution_id': str(execution.id),
268
            'workflow_name': execution.workflow_name
269
        }
270
271
        exec_context = self.context
272
        exec_context = self._build_mistral_context(exec_context, current_context)
273
        LOG.info('Mistral query context is %s' % exec_context)
274
275
        return (status, partial_results, exec_context)
276
277
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
278
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
279
280
        if '.' in task_name:
281
            dot_pos = task_name.index('.')
282
            parent_task_name = task_name[:dot_pos]
283
            task_name = task_name[dot_pos + 1:]
284
285
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
286
287
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
288
                               if (getattr(wf_ex, 'task_execution_id', None) and
289
                                   wf_ex.task_execution_id in parent_task_ids)]
290
291
            tasks = {}
292
293
            for sub_wf_ex_id in workflow_ex_ids:
294
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
295
296
            return tasks
297
298
        # pylint: disable=no-member
299
        tasks = {
300
            full_task_name: task.to_dict()
301
            for task in task_exs
302
            if task.name == task_name and task.state == 'ERROR'
303
        }
304
305
        return tasks
306
307
    def resume(self, ex_ref, task_specs):
308
        mistral_ctx = ex_ref.context.get('mistral', dict())
309
310
        if not mistral_ctx.get('execution_id'):
311
            raise Exception('Unable to rerun because mistral execution_id is missing.')
312
313
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
314
315
        # pylint: disable=no-member
316
        if execution.state not in ['ERROR']:
317
            raise Exception('Workflow execution is not in a rerunable state.')
318
319
        executions = self._client.executions.list()
320
321
        tasks = {}
322
323
        for task_name, task_spec in six.iteritems(task_specs):
324
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
325
326
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
327
        if missing_tasks:
328
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
329
                            'rerunable tasks: %s. Please make sure that the task name is correct '
330
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
331
332
        # Construct additional options for the workflow execution
333
        options = self._construct_workflow_execution_options()
334
335
        for task_name, task_obj in six.iteritems(tasks):
336
            # pylint: disable=unexpected-keyword-arg
337
            self._client.tasks.rerun(
338
                task_obj['id'],
339
                reset=task_specs[task_name].get('reset', False),
340
                env=options.get('env', None)
341
            )
342
343
        status = LIVEACTION_STATUS_RUNNING
344
        partial_results = {'tasks': []}
345
346
        # pylint: disable=no-member
347
        current_context = {
348
            'execution_id': str(execution.id),
349
            'workflow_name': execution.workflow_name
350
        }
351
352
        exec_context = self.context
353
        exec_context = self._build_mistral_context(exec_context, current_context)
354
        LOG.info('Mistral query context is %s' % exec_context)
355
356
        return (status, partial_results, exec_context)
357
358
    @retrying.retry(
359
        retry_on_exception=utils.retry_on_exceptions,
360
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
361
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
362
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
363
    def cancel(self):
364
        mistral_ctx = self.context.get('mistral', dict())
365
366
        if not mistral_ctx.get('execution_id'):
367
            raise Exception('Unable to cancel because mistral execution_id is missing.')
368
369
        # There is no cancellation state in Mistral. Pause the workflow so
370
        # actions that are still executing can gracefully reach completion.
371
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
372
373
    @staticmethod
374
    def _build_mistral_context(parent, current):
375
        """
376
        Mistral workflow might be kicked off in st2 by a parent Mistral
377
        workflow. In that case, we need to make sure that the existing
378
        mistral 'context' is moved as 'parent' and the child workflow
379
        'context' is added.
380
        """
381
        parent = copy.deepcopy(parent)
382
        context = dict()
383
384
        if not parent:
385
            context['mistral'] = current
386
        else:
387
            if 'mistral' in parent.keys():
388
                orig_parent_context = parent.get('mistral', dict())
389
                actual_parent = dict()
390
                if 'workflow_name' in orig_parent_context.keys():
391
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
392
                    del orig_parent_context['workflow_name']
393
                if 'workflow_execution_id' in orig_parent_context.keys():
394
                    actual_parent['workflow_execution_id'] = \
395
                        orig_parent_context['workflow_execution_id']
396
                    del orig_parent_context['workflow_execution_id']
397
                context['mistral'] = orig_parent_context
398
                context['mistral'].update(current)
399
                context['mistral']['parent'] = actual_parent
400
            else:
401
                context['mistral'] = current
402
403
        return context
404