GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — kale/action-datastore (#6)
by Manas
05:59
created

_check_name()   B

Complexity

Conditions 6

Size

Total Lines 13

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 13
rs 8
cc 6
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2actions.runners import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url)
57
58
    def pre_run(self):
59
        if getattr(self, 'liveaction', None):
60
            self._notify = getattr(self.liveaction, 'notify', None)
61
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
62
63
    @staticmethod
64
    def _check_name(action_ref, is_workbook, def_dict):
65
        # If workbook, change the value of the "name" key.
66
        if is_workbook:
67
            if def_dict.get('name') != action_ref:
68
                raise Exception('Name of the workbook must be the same as the '
69
                                'fully qualified action name "%s".' % action_ref)
70
        # If workflow, change the key name of the workflow.
71
        else:
72
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
73
            if workflow_name != action_ref:
74
                raise Exception('Name of the workflow must be the same as the '
75
                                'fully qualified action name "%s".' % action_ref)
76
77
    def _save_workbook(self, name, def_yaml):
78
        # If the workbook is not found, the mistral client throws a generic API exception.
79
        try:
80
            # Update existing workbook.
81
            wb = self._client.workbooks.get(name)
82
        except:
83
            # Delete if definition was previously a workflow.
84
            # If not found, an API exception is thrown.
85
            try:
86
                self._client.workflows.delete(name)
87
            except:
88
                pass
89
90
            # Create the new workbook.
91
            wb = self._client.workbooks.create(def_yaml)
92
93
        # Update the workbook definition.
94
        # pylint: disable=no-member
95
        if wb.definition != def_yaml:
96
            self._client.workbooks.update(def_yaml)
97
98
    def _save_workflow(self, name, def_yaml):
99
        # If the workflow is not found, the mistral client throws a generic API exception.
100
        try:
101
            # Update existing workbook.
102
            wf = self._client.workflows.get(name)
103
        except:
104
            # Delete if definition was previously a workbook.
105
            # If not found, an API exception is thrown.
106
            try:
107
                self._client.workbooks.delete(name)
108
            except:
109
                pass
110
111
            # Create the new workflow.
112
            wf = self._client.workflows.create(def_yaml)[0]
113
114
        # Update the workflow definition.
115
        # pylint: disable=no-member
116
        if wf.definition != def_yaml:
117
            self._client.workflows.update(def_yaml)
118
119
    def _find_default_workflow(self, def_dict):
120
        num_workflows = len(def_dict['workflows'].keys())
121
122
        if num_workflows > 1:
123
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
124
            if not fully_qualified_wf_name:
125
                raise ValueError('Workbook definition is detected. '
126
                                 'Default workflow cannot be determined.')
127
128
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
129
            if wf_name not in def_dict['workflows']:
130
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
131
                                 % fully_qualified_wf_name)
132
133
            return fully_qualified_wf_name
134
        elif num_workflows == 1:
135
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
136
        else:
137
            raise Exception('There are no workflows in the workbook.')
138
139
    def _construct_workflow_execution_options(self):
140
        # This URL is used by Mistral to talk back to the API
141
        api_url = get_mistral_api_url()
142
        endpoint = api_url + '/actionexecutions'
143
144
        # This URL is available in the context and can be used by the users inside a workflow,
145
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
146
        public_api_url = get_full_public_api_url()
147
148
        # Build context with additional information
149
        parent_context = {
150
            'execution_id': self.execution_id
151
        }
152
        if getattr(self.liveaction, 'context', None):
153
            parent_context.update(self.liveaction.context)
154
155
        st2_execution_context = {
156
            'endpoint': endpoint,
157
            'parent': parent_context,
158
            'notify': {},
159
            'skip_notify_tasks': self._skip_notify_tasks
160
        }
161
162
        # Include notification information
163
        if self._notify:
164
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
165
            st2_execution_context['notify'] = notify_dict
166
167
        if self.auth_token:
168
            st2_execution_context['auth_token'] = self.auth_token.token
169
170
        options = {
171
            'env': {
172
                'st2_execution_id': self.execution_id,
173
                'st2_liveaction_id': self.liveaction_id,
174
                'st2_action_api_url': public_api_url,
175
                '__actions': {
176
                    'st2.action': {
177
                        'st2_context': st2_execution_context
178
                    }
179
                }
180
            }
181
        }
182
183
        return options
184
185
    def _get_resume_options(self):
186
        return self.context.get('re-run', {})
187
188
    @retrying.retry(
189
        retry_on_exception=utils.retry_on_exceptions,
190
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
191
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
192
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
193
    def run(self, action_parameters):
194
        resume_options = self._get_resume_options()
195
        tasks = resume_options.get('tasks', [])
196
        resume = self.rerun_ex_ref and tasks
197
198
        if resume:
199
            result = self.resume(ex_ref=self.rerun_ex_ref, task_names=tasks)
200
        else:
201
            result = self.start(action_parameters=action_parameters)
202
203
        return result
204
205
    def start(self, action_parameters):
206
        # Test connection
207
        self._client.workflows.list()
208
209
        # Setup inputs for the workflow execution.
210
        inputs = self.runner_parameters.get('context', dict())
211
        inputs.update(action_parameters)
212
213
        # Get workbook/workflow definition from file.
214
        with open(self.entry_point, 'r') as def_file:
215
            def_yaml = def_file.read()
216
217
        def_dict = yaml.safe_load(def_yaml)
218
        is_workbook = ('workflows' in def_dict)
219
220
        if not is_workbook:
221
            # Non-workbook definition containing multiple workflows is not supported.
222
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
223
                raise Exception('Workflow (not workbook) definition is detected. '
224
                                'Multiple workflows is not supported.')
225
226
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
227
        self._check_name(action_ref, is_workbook, def_dict)
228
        def_dict_xformed = utils.transform_definition(def_dict)
229
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
230
231
        # Construct additional options for the workflow execution
232
        options = self._construct_workflow_execution_options()
233
234
        # Save workbook/workflow definition.
235
        if is_workbook:
236
            self._save_workbook(action_ref, def_yaml_xformed)
237
            default_workflow = self._find_default_workflow(def_dict_xformed)
238
            execution = self._client.executions.create(default_workflow,
239
                                                       workflow_input=inputs,
240
                                                       **options)
241
        else:
242
            self._save_workflow(action_ref, def_yaml_xformed)
243
            execution = self._client.executions.create(action_ref,
244
                                                       workflow_input=inputs,
245
                                                       **options)
246
247
        status = LIVEACTION_STATUS_RUNNING
248
        partial_results = {'tasks': []}
249
250
        # pylint: disable=no-member
251
        current_context = {
252
            'execution_id': str(execution.id),
253
            'workflow_name': execution.workflow_name
254
        }
255
256
        exec_context = self.context
257
        exec_context = self._build_mistral_context(exec_context, current_context)
258
        LOG.info('Mistral query context is %s' % exec_context)
259
260
        return (status, partial_results, exec_context)
261
262
    def resume(self, ex_ref, task_names):
263
        mistral_ctx = ex_ref.context.get('mistral', dict())
264
265
        if not mistral_ctx.get('execution_id'):
266
            raise Exception('Unable to rerun because mistral execution_id is missing.')
267
268
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
269
270
        # pylint: disable=no-member
271
        if execution.state not in ['ERROR']:
272
            raise Exception('Workflow execution is not in a rerunable state.')
273
274
        # pylint: disable=no-member
275
        tasks = {task.name: task.to_dict()
276
                 for task in self._client.tasks.list(workflow_execution_id=execution.id)
277
                 if task.name in task_names and task.state == 'ERROR'}
278
279
        missing_tasks = list(set(task_names) - set(tasks.keys()))
280
        if missing_tasks:
281
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
282
                            'rerunable tasks: %s. Please make sure that the task name is correct '
283
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
284
285
        # Construct additional options for the workflow execution
286
        options = self._construct_workflow_execution_options()
287
288
        for task in tasks.values():
289
            # pylint: disable=unexpected-keyword-arg
290
            self._client.tasks.rerun(task['id'], env=options.get('env', None))
291
292
        status = LIVEACTION_STATUS_RUNNING
293
        partial_results = {'tasks': []}
294
295
        # pylint: disable=no-member
296
        current_context = {
297
            'execution_id': str(execution.id),
298
            'workflow_name': execution.workflow_name
299
        }
300
301
        exec_context = self.context
302
        exec_context = self._build_mistral_context(exec_context, current_context)
303
        LOG.info('Mistral query context is %s' % exec_context)
304
305
        return (status, partial_results, exec_context)
306
307
    @retrying.retry(
308
        retry_on_exception=utils.retry_on_exceptions,
309
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
310
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
311
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
312
    def cancel(self):
313
        mistral_ctx = self.context.get('mistral', dict())
314
315
        if not mistral_ctx.get('execution_id'):
316
            raise Exception('Unable to cancel because mistral execution_id is missing.')
317
318
        # There is no cancellation state in Mistral. Pause the workflow so
319
        # actions that are still executing can gracefully reach completion.
320
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
321
322
    @staticmethod
323
    def _build_mistral_context(parent, current):
324
        """
325
        Mistral workflow might be kicked off in st2 by a parent Mistral
326
        workflow. In that case, we need to make sure that the existing
327
        mistral 'context' is moved as 'parent' and the child workflow
328
        'context' is added.
329
        """
330
        parent = copy.deepcopy(parent)
331
        context = dict()
332
333
        if not parent:
334
            context['mistral'] = current
335
        else:
336
            if 'mistral' in parent.keys():
337
                orig_parent_context = parent.get('mistral', dict())
338
                actual_parent = dict()
339
                if 'workflow_name' in orig_parent_context.keys():
340
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
341
                    del orig_parent_context['workflow_name']
342
                if 'workflow_execution_id' in orig_parent_context.keys():
343
                    actual_parent['workflow_execution_id'] = \
344
                        orig_parent_context['workflow_execution_id']
345
                    del orig_parent_context['workflow_execution_id']
346
                context['mistral'] = orig_parent_context
347
                context['mistral'].update(current)
348
                context['mistral']['parent'] = actual_parent
349
            else:
350
                context['mistral'] = current
351
352
        return context
353