GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — develop-v1.6.0 ( 9d5181...7efb31 )
by
unknown
04:49
created

MistralRunner   D

Complexity

Total Complexity 61

Size/Duplication

Total Lines 357
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 357
rs 4.054
c 0
b 0
f 0
wmc 61

14 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 13 1
A _save_workflow() 0 20 4
A pre_run() 0 4 2
B _check_name() 0 13 6
A _save_workbook() 0 20 4
B _find_default_workflow() 0 19 5
B resume() 0 50 6
A _get_resume_options() 0 2 1
A cancel() 0 14 2
F _get_tasks() 0 29 11
A run() 0 23 3
B _construct_workflow_execution_options() 0 46 4
B _build_mistral_context() 0 31 5
C start() 0 56 7

How to fix   Complexity   

Complex Class

Complex classes like MistralRunner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2actions.runners import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url,
57
            cacert=cfg.CONF.mistral.cacert,
58
            insecure=cfg.CONF.mistral.insecure)
59
60
    def pre_run(self):
61
        if getattr(self, 'liveaction', None):
62
            self._notify = getattr(self.liveaction, 'notify', None)
63
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
64
65
    @staticmethod
66
    def _check_name(action_ref, is_workbook, def_dict):
67
        # If workbook, change the value of the "name" key.
68
        if is_workbook:
69
            if def_dict.get('name') != action_ref:
70
                raise Exception('Name of the workbook must be the same as the '
71
                                'fully qualified action name "%s".' % action_ref)
72
        # If workflow, change the key name of the workflow.
73
        else:
74
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
75
            if workflow_name != action_ref:
76
                raise Exception('Name of the workflow must be the same as the '
77
                                'fully qualified action name "%s".' % action_ref)
78
79
    def _save_workbook(self, name, def_yaml):
80
        # If the workbook is not found, the mistral client throws a generic API exception.
81
        try:
82
            # Update existing workbook.
83
            wb = self._client.workbooks.get(name)
84
        except:
85
            # Delete if definition was previously a workflow.
86
            # If not found, an API exception is thrown.
87
            try:
88
                self._client.workflows.delete(name)
89
            except:
90
                pass
91
92
            # Create the new workbook.
93
            wb = self._client.workbooks.create(def_yaml)
94
95
        # Update the workbook definition.
96
        # pylint: disable=no-member
97
        if wb.definition != def_yaml:
98
            self._client.workbooks.update(def_yaml)
99
100
    def _save_workflow(self, name, def_yaml):
101
        # If the workflow is not found, the mistral client throws a generic API exception.
102
        try:
103
            # Update existing workbook.
104
            wf = self._client.workflows.get(name)
105
        except:
106
            # Delete if definition was previously a workbook.
107
            # If not found, an API exception is thrown.
108
            try:
109
                self._client.workbooks.delete(name)
110
            except:
111
                pass
112
113
            # Create the new workflow.
114
            wf = self._client.workflows.create(def_yaml)[0]
115
116
        # Update the workflow definition.
117
        # pylint: disable=no-member
118
        if wf.definition != def_yaml:
119
            self._client.workflows.update(def_yaml)
120
121
    def _find_default_workflow(self, def_dict):
122
        num_workflows = len(def_dict['workflows'].keys())
123
124
        if num_workflows > 1:
125
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
126
            if not fully_qualified_wf_name:
127
                raise ValueError('Workbook definition is detected. '
128
                                 'Default workflow cannot be determined.')
129
130
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
131
            if wf_name not in def_dict['workflows']:
132
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
133
                                 % fully_qualified_wf_name)
134
135
            return fully_qualified_wf_name
136
        elif num_workflows == 1:
137
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
138
        else:
139
            raise Exception('There are no workflows in the workbook.')
140
141
    def _construct_workflow_execution_options(self):
142
        # This URL is used by Mistral to talk back to the API
143
        api_url = get_mistral_api_url()
144
        endpoint = api_url + '/actionexecutions'
145
146
        # This URL is available in the context and can be used by the users inside a workflow,
147
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
148
        public_api_url = get_full_public_api_url()
149
150
        # Build context with additional information
151
        parent_context = {
152
            'execution_id': self.execution_id
153
        }
154
        if getattr(self.liveaction, 'context', None):
155
            parent_context.update(self.liveaction.context)
156
157
        st2_execution_context = {
158
            'api_url': api_url,
159
            'endpoint': endpoint,
160
            'parent': parent_context,
161
            'notify': {},
162
            'skip_notify_tasks': self._skip_notify_tasks
163
        }
164
165
        # Include notification information
166
        if self._notify:
167
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
168
            st2_execution_context['notify'] = notify_dict
169
170
        if self.auth_token:
171
            st2_execution_context['auth_token'] = self.auth_token.token
172
173
        options = {
174
            'env': {
175
                'st2_execution_id': self.execution_id,
176
                'st2_liveaction_id': self.liveaction_id,
177
                'st2_action_api_url': public_api_url,
178
                '__actions': {
179
                    'st2.action': {
180
                        'st2_context': st2_execution_context
181
                    }
182
                }
183
            }
184
        }
185
186
        return options
187
188
    def _get_resume_options(self):
189
        return self.context.get('re-run', {})
190
191
    @retrying.retry(
192
        retry_on_exception=utils.retry_on_exceptions,
193
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
194
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
195
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
196
    def run(self, action_parameters):
197
        resume_options = self._get_resume_options()
198
199
        tasks_to_reset = resume_options.get('reset', [])
200
201
        task_specs = {
202
            task_name: {'reset': task_name in tasks_to_reset}
203
            for task_name in resume_options.get('tasks', [])
204
        }
205
206
        resume = self.rerun_ex_ref and task_specs
207
208
        if resume:
209
            result = self.resume(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
210
        else:
211
            result = self.start(action_parameters=action_parameters)
212
213
        return result
214
215
    def start(self, action_parameters):
216
        # Test connection
217
        self._client.workflows.list()
218
219
        # Setup inputs for the workflow execution.
220
        inputs = self.runner_parameters.get('context', dict())
221
        inputs.update(action_parameters)
222
223
        # Get workbook/workflow definition from file.
224
        with open(self.entry_point, 'r') as def_file:
225
            def_yaml = def_file.read()
226
227
        def_dict = yaml.safe_load(def_yaml)
228
        is_workbook = ('workflows' in def_dict)
229
230
        if not is_workbook:
231
            # Non-workbook definition containing multiple workflows is not supported.
232
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
233
                raise Exception('Workflow (not workbook) definition is detected. '
234
                                'Multiple workflows is not supported.')
235
236
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
237
        self._check_name(action_ref, is_workbook, def_dict)
238
        def_dict_xformed = utils.transform_definition(def_dict)
239
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
240
241
        # Construct additional options for the workflow execution
242
        options = self._construct_workflow_execution_options()
243
244
        # Save workbook/workflow definition.
245
        if is_workbook:
246
            self._save_workbook(action_ref, def_yaml_xformed)
247
            default_workflow = self._find_default_workflow(def_dict_xformed)
248
            execution = self._client.executions.create(default_workflow,
249
                                                       workflow_input=inputs,
250
                                                       **options)
251
        else:
252
            self._save_workflow(action_ref, def_yaml_xformed)
253
            execution = self._client.executions.create(action_ref,
254
                                                       workflow_input=inputs,
255
                                                       **options)
256
257
        status = LIVEACTION_STATUS_RUNNING
258
        partial_results = {'tasks': []}
259
260
        # pylint: disable=no-member
261
        current_context = {
262
            'execution_id': str(execution.id),
263
            'workflow_name': execution.workflow_name
264
        }
265
266
        exec_context = self.context
267
        exec_context = self._build_mistral_context(exec_context, current_context)
268
        LOG.info('Mistral query context is %s' % exec_context)
269
270
        return (status, partial_results, exec_context)
271
272
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
273
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
274
275
        if '.' in task_name:
276
            dot_pos = task_name.index('.')
277
            parent_task_name = task_name[:dot_pos]
278
            task_name = task_name[dot_pos + 1:]
279
280
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
281
282
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
283
                               if (getattr(wf_ex, 'task_execution_id', None) and
284
                                   wf_ex.task_execution_id in parent_task_ids)]
285
286
            tasks = {}
287
288
            for sub_wf_ex_id in workflow_ex_ids:
289
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
290
291
            return tasks
292
293
        # pylint: disable=no-member
294
        tasks = {
295
            full_task_name: task.to_dict()
296
            for task in task_exs
297
            if task.name == task_name and task.state == 'ERROR'
298
        }
299
300
        return tasks
301
302
    def resume(self, ex_ref, task_specs):
303
        mistral_ctx = ex_ref.context.get('mistral', dict())
304
305
        if not mistral_ctx.get('execution_id'):
306
            raise Exception('Unable to rerun because mistral execution_id is missing.')
307
308
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
309
310
        # pylint: disable=no-member
311
        if execution.state not in ['ERROR']:
312
            raise Exception('Workflow execution is not in a rerunable state.')
313
314
        executions = self._client.executions.list()
315
316
        tasks = {}
317
318
        for task_name, task_spec in six.iteritems(task_specs):
319
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
320
321
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
322
        if missing_tasks:
323
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
324
                            'rerunable tasks: %s. Please make sure that the task name is correct '
325
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
326
327
        # Construct additional options for the workflow execution
328
        options = self._construct_workflow_execution_options()
329
330
        for task_name, task_obj in six.iteritems(tasks):
331
            # pylint: disable=unexpected-keyword-arg
332
            self._client.tasks.rerun(
333
                task_obj['id'],
334
                reset=task_specs[task_name].get('reset', False),
335
                env=options.get('env', None)
336
            )
337
338
        status = LIVEACTION_STATUS_RUNNING
339
        partial_results = {'tasks': []}
340
341
        # pylint: disable=no-member
342
        current_context = {
343
            'execution_id': str(execution.id),
344
            'workflow_name': execution.workflow_name
345
        }
346
347
        exec_context = self.context
348
        exec_context = self._build_mistral_context(exec_context, current_context)
349
        LOG.info('Mistral query context is %s' % exec_context)
350
351
        return (status, partial_results, exec_context)
352
353
    @retrying.retry(
354
        retry_on_exception=utils.retry_on_exceptions,
355
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
356
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
357
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
358
    def cancel(self):
359
        mistral_ctx = self.context.get('mistral', dict())
360
361
        if not mistral_ctx.get('execution_id'):
362
            raise Exception('Unable to cancel because mistral execution_id is missing.')
363
364
        # There is no cancellation state in Mistral. Pause the workflow so
365
        # actions that are still executing can gracefully reach completion.
366
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
367
368
    @staticmethod
369
    def _build_mistral_context(parent, current):
370
        """
371
        Mistral workflow might be kicked off in st2 by a parent Mistral
372
        workflow. In that case, we need to make sure that the existing
373
        mistral 'context' is moved as 'parent' and the child workflow
374
        'context' is added.
375
        """
376
        parent = copy.deepcopy(parent)
377
        context = dict()
378
379
        if not parent:
380
            context['mistral'] = current
381
        else:
382
            if 'mistral' in parent.keys():
383
                orig_parent_context = parent.get('mistral', dict())
384
                actual_parent = dict()
385
                if 'workflow_name' in orig_parent_context.keys():
386
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
387
                    del orig_parent_context['workflow_name']
388
                if 'workflow_execution_id' in orig_parent_context.keys():
389
                    actual_parent['workflow_execution_id'] = \
390
                        orig_parent_context['workflow_execution_id']
391
                    del orig_parent_context['workflow_execution_id']
392
                context['mistral'] = orig_parent_context
393
                context['mistral'].update(current)
394
                context['mistral']['parent'] = actual_parent
395
            else:
396
                context['mistral'] = current
397
398
        return context
399