GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — kale/action-datastore ( 9f47ff...fd1ef0 )
by
unknown
12:31 queued 06:24
created

_get_tasks()   F

Complexity

Conditions 11

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 29
rs 3.1764
cc 11

How to fix   Complexity   

Complexity

Complex classes like st2actions.runners.mistral.MistralRunner._get_tasks() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2actions.runners import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url)
57
58
    def pre_run(self):
59
        if getattr(self, 'liveaction', None):
60
            self._notify = getattr(self.liveaction, 'notify', None)
61
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
62
63
    @staticmethod
64
    def _check_name(action_ref, is_workbook, def_dict):
65
        # If workbook, change the value of the "name" key.
66
        if is_workbook:
67
            if def_dict.get('name') != action_ref:
68
                raise Exception('Name of the workbook must be the same as the '
69
                                'fully qualified action name "%s".' % action_ref)
70
        # If workflow, change the key name of the workflow.
71
        else:
72
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
73
            if workflow_name != action_ref:
74
                raise Exception('Name of the workflow must be the same as the '
75
                                'fully qualified action name "%s".' % action_ref)
76
77
    def _save_workbook(self, name, def_yaml):
78
        # If the workbook is not found, the mistral client throws a generic API exception.
79
        try:
80
            # Update existing workbook.
81
            wb = self._client.workbooks.get(name)
82
        except:
83
            # Delete if definition was previously a workflow.
84
            # If not found, an API exception is thrown.
85
            try:
86
                self._client.workflows.delete(name)
87
            except:
88
                pass
89
90
            # Create the new workbook.
91
            wb = self._client.workbooks.create(def_yaml)
92
93
        # Update the workbook definition.
94
        # pylint: disable=no-member
95
        if wb.definition != def_yaml:
96
            self._client.workbooks.update(def_yaml)
97
98
    def _save_workflow(self, name, def_yaml):
99
        # If the workflow is not found, the mistral client throws a generic API exception.
100
        try:
101
            # Update existing workbook.
102
            wf = self._client.workflows.get(name)
103
        except:
104
            # Delete if definition was previously a workbook.
105
            # If not found, an API exception is thrown.
106
            try:
107
                self._client.workbooks.delete(name)
108
            except:
109
                pass
110
111
            # Create the new workflow.
112
            wf = self._client.workflows.create(def_yaml)[0]
113
114
        # Update the workflow definition.
115
        # pylint: disable=no-member
116
        if wf.definition != def_yaml:
117
            self._client.workflows.update(def_yaml)
118
119
    def _find_default_workflow(self, def_dict):
120
        num_workflows = len(def_dict['workflows'].keys())
121
122
        if num_workflows > 1:
123
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
124
            if not fully_qualified_wf_name:
125
                raise ValueError('Workbook definition is detected. '
126
                                 'Default workflow cannot be determined.')
127
128
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
129
            if wf_name not in def_dict['workflows']:
130
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
131
                                 % fully_qualified_wf_name)
132
133
            return fully_qualified_wf_name
134
        elif num_workflows == 1:
135
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
136
        else:
137
            raise Exception('There are no workflows in the workbook.')
138
139
    def _construct_workflow_execution_options(self):
140
        # This URL is used by Mistral to talk back to the API
141
        api_url = get_mistral_api_url()
142
        endpoint = api_url + '/actionexecutions'
143
144
        # This URL is available in the context and can be used by the users inside a workflow,
145
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
146
        public_api_url = get_full_public_api_url()
147
148
        # Build context with additional information
149
        parent_context = {
150
            'execution_id': self.execution_id
151
        }
152
        if getattr(self.liveaction, 'context', None):
153
            parent_context.update(self.liveaction.context)
154
155
        st2_execution_context = {
156
            'endpoint': endpoint,
157
            'parent': parent_context,
158
            'notify': {},
159
            'skip_notify_tasks': self._skip_notify_tasks
160
        }
161
162
        # Include notification information
163
        if self._notify:
164
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
165
            st2_execution_context['notify'] = notify_dict
166
167
        if self.auth_token:
168
            st2_execution_context['auth_token'] = self.auth_token.token
169
170
        options = {
171
            'env': {
172
                'st2_execution_id': self.execution_id,
173
                'st2_liveaction_id': self.liveaction_id,
174
                'st2_action_api_url': public_api_url,
175
                '__actions': {
176
                    'st2.action': {
177
                        'st2_context': st2_execution_context
178
                    }
179
                }
180
            }
181
        }
182
183
        return options
184
185
    def _get_resume_options(self):
186
        return self.context.get('re-run', {})
187
188
    @retrying.retry(
189
        retry_on_exception=utils.retry_on_exceptions,
190
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
191
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
192
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
193
    def run(self, action_parameters):
194
        resume_options = self._get_resume_options()
195
196
        tasks_to_reset = resume_options.get('reset', [])
197
198
        task_specs = {
199
            task_name: {'reset': task_name in tasks_to_reset}
200
            for task_name in resume_options.get('tasks', [])
201
        }
202
203
        resume = self.rerun_ex_ref and task_specs
204
205
        if resume:
206
            result = self.resume(ex_ref=self.rerun_ex_ref, task_specs=task_specs)
207
        else:
208
            result = self.start(action_parameters=action_parameters)
209
210
        return result
211
212
    def start(self, action_parameters):
213
        # Test connection
214
        self._client.workflows.list()
215
216
        # Setup inputs for the workflow execution.
217
        inputs = self.runner_parameters.get('context', dict())
218
        inputs.update(action_parameters)
219
220
        # Get workbook/workflow definition from file.
221
        with open(self.entry_point, 'r') as def_file:
222
            def_yaml = def_file.read()
223
224
        def_dict = yaml.safe_load(def_yaml)
225
        is_workbook = ('workflows' in def_dict)
226
227
        if not is_workbook:
228
            # Non-workbook definition containing multiple workflows is not supported.
229
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
230
                raise Exception('Workflow (not workbook) definition is detected. '
231
                                'Multiple workflows is not supported.')
232
233
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
234
        self._check_name(action_ref, is_workbook, def_dict)
235
        def_dict_xformed = utils.transform_definition(def_dict)
236
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
237
238
        # Construct additional options for the workflow execution
239
        options = self._construct_workflow_execution_options()
240
241
        # Save workbook/workflow definition.
242
        if is_workbook:
243
            self._save_workbook(action_ref, def_yaml_xformed)
244
            default_workflow = self._find_default_workflow(def_dict_xformed)
245
            execution = self._client.executions.create(default_workflow,
246
                                                       workflow_input=inputs,
247
                                                       **options)
248
        else:
249
            self._save_workflow(action_ref, def_yaml_xformed)
250
            execution = self._client.executions.create(action_ref,
251
                                                       workflow_input=inputs,
252
                                                       **options)
253
254
        status = LIVEACTION_STATUS_RUNNING
255
        partial_results = {'tasks': []}
256
257
        # pylint: disable=no-member
258
        current_context = {
259
            'execution_id': str(execution.id),
260
            'workflow_name': execution.workflow_name
261
        }
262
263
        exec_context = self.context
264
        exec_context = self._build_mistral_context(exec_context, current_context)
265
        LOG.info('Mistral query context is %s' % exec_context)
266
267
        return (status, partial_results, exec_context)
268
269
    def _get_tasks(self, wf_ex_id, full_task_name, task_name, executions):
270
        task_exs = self._client.tasks.list(workflow_execution_id=wf_ex_id)
271
272
        if '.' in task_name:
273
            dot_pos = task_name.index('.')
274
            parent_task_name = task_name[:dot_pos]
275
            task_name = task_name[dot_pos + 1:]
276
277
            parent_task_ids = [task.id for task in task_exs if task.name == parent_task_name]
278
279
            workflow_ex_ids = [wf_ex.id for wf_ex in executions
280
                               if (getattr(wf_ex, 'task_execution_id', None) and
281
                                   wf_ex.task_execution_id in parent_task_ids)]
282
283
            tasks = {}
284
285
            for sub_wf_ex_id in workflow_ex_ids:
286
                tasks.update(self._get_tasks(sub_wf_ex_id, full_task_name, task_name, executions))
287
288
            return tasks
289
290
        # pylint: disable=no-member
291
        tasks = {
292
            full_task_name: task.to_dict()
293
            for task in task_exs
294
            if task.name == task_name and task.state == 'ERROR'
295
        }
296
297
        return tasks
298
299
    def resume(self, ex_ref, task_specs):
300
        mistral_ctx = ex_ref.context.get('mistral', dict())
301
302
        if not mistral_ctx.get('execution_id'):
303
            raise Exception('Unable to rerun because mistral execution_id is missing.')
304
305
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
306
307
        # pylint: disable=no-member
308
        if execution.state not in ['ERROR']:
309
            raise Exception('Workflow execution is not in a rerunable state.')
310
311
        executions = self._client.executions.list()
312
313
        tasks = {}
314
315
        for task_name, task_spec in six.iteritems(task_specs):
316
            tasks.update(self._get_tasks(execution.id, task_name, task_name, executions))
317
318
        missing_tasks = list(set(task_specs.keys()) - set(tasks.keys()))
319
        if missing_tasks:
320
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
321
                            'rerunable tasks: %s. Please make sure that the task name is correct '
322
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
323
324
        # Construct additional options for the workflow execution
325
        options = self._construct_workflow_execution_options()
326
327
        for task_name, task_obj in six.iteritems(tasks):
328
            # pylint: disable=unexpected-keyword-arg
329
            self._client.tasks.rerun(
330
                task_obj['id'],
331
                reset=task_specs[task_name].get('reset', False),
332
                env=options.get('env', None)
333
            )
334
335
        status = LIVEACTION_STATUS_RUNNING
336
        partial_results = {'tasks': []}
337
338
        # pylint: disable=no-member
339
        current_context = {
340
            'execution_id': str(execution.id),
341
            'workflow_name': execution.workflow_name
342
        }
343
344
        exec_context = self.context
345
        exec_context = self._build_mistral_context(exec_context, current_context)
346
        LOG.info('Mistral query context is %s' % exec_context)
347
348
        return (status, partial_results, exec_context)
349
350
    @retrying.retry(
351
        retry_on_exception=utils.retry_on_exceptions,
352
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
353
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
354
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
355
    def cancel(self):
356
        mistral_ctx = self.context.get('mistral', dict())
357
358
        if not mistral_ctx.get('execution_id'):
359
            raise Exception('Unable to cancel because mistral execution_id is missing.')
360
361
        # There is no cancellation state in Mistral. Pause the workflow so
362
        # actions that are still executing can gracefully reach completion.
363
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
364
365
    @staticmethod
366
    def _build_mistral_context(parent, current):
367
        """
368
        Mistral workflow might be kicked off in st2 by a parent Mistral
369
        workflow. In that case, we need to make sure that the existing
370
        mistral 'context' is moved as 'parent' and the child workflow
371
        'context' is added.
372
        """
373
        parent = copy.deepcopy(parent)
374
        context = dict()
375
376
        if not parent:
377
            context['mistral'] = current
378
        else:
379
            if 'mistral' in parent.keys():
380
                orig_parent_context = parent.get('mistral', dict())
381
                actual_parent = dict()
382
                if 'workflow_name' in orig_parent_context.keys():
383
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
384
                    del orig_parent_context['workflow_name']
385
                if 'workflow_execution_id' in orig_parent_context.keys():
386
                    actual_parent['workflow_execution_id'] = \
387
                        orig_parent_context['workflow_execution_id']
388
                    del orig_parent_context['workflow_execution_id']
389
                context['mistral'] = orig_parent_context
390
                context['mistral'].update(current)
391
                context['mistral']['parent'] = actual_parent
392
            else:
393
                context['mistral'] = current
394
395
        return context
396