Completed
Pull Request — master (#2380)
by W
06:39
created

st2actions.runners.mistral.MistralRunner.start()   C

Complexity

Conditions 7

Size

Total Lines 56

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 56
rs 6.7295
cc 7

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2actions.runners import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url)
57
58
    def pre_run(self):
59
        if getattr(self, 'liveaction', None):
60
            self._notify = getattr(self.liveaction, 'notify', None)
61
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
62
63
    @staticmethod
64
    def _check_name(action_ref, is_workbook, def_dict):
65
        # If workbook, change the value of the "name" key.
66
        if is_workbook:
67
            if def_dict.get('name') != action_ref:
68
                raise Exception('Name of the workbook must be the same as the '
69
                                'fully qualified action name "%s".' % action_ref)
70
        # If workflow, change the key name of the workflow.
71
        else:
72
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
73
            if workflow_name != action_ref:
74
                raise Exception('Name of the workflow must be the same as the '
75
                                'fully qualified action name "%s".' % action_ref)
76
77
    def _save_workbook(self, name, def_yaml):
78
        # If the workbook is not found, the mistral client throws a generic API exception.
79
        try:
80
            # Update existing workbook.
81
            wb = self._client.workbooks.get(name)
82
        except:
83
            # Delete if definition was previously a workflow.
84
            # If not found, an API exception is thrown.
85
            try:
86
                self._client.workflows.delete(name)
87
            except:
88
                pass
89
90
            # Create the new workbook.
91
            wb = self._client.workbooks.create(def_yaml)
92
93
        # Update the workbook definition.
94
        # pylint: disable=no-member
95
        if wb.definition != def_yaml:
96
            self._client.workbooks.update(def_yaml)
97
98
    def _save_workflow(self, name, def_yaml):
99
        # If the workflow is not found, the mistral client throws a generic API exception.
100
        try:
101
            # Update existing workbook.
102
            wf = self._client.workflows.get(name)
103
        except:
104
            # Delete if definition was previously a workbook.
105
            # If not found, an API exception is thrown.
106
            try:
107
                self._client.workbooks.delete(name)
108
            except:
109
                pass
110
111
            # Create the new workflow.
112
            wf = self._client.workflows.create(def_yaml)[0]
113
114
        # Update the workflow definition.
115
        # pylint: disable=no-member
116
        if wf.definition != def_yaml:
117
            self._client.workflows.update(def_yaml)
118
119
    def _find_default_workflow(self, def_dict):
120
        num_workflows = len(def_dict['workflows'].keys())
121
122
        if num_workflows > 1:
123
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
124
            if not fully_qualified_wf_name:
125
                raise ValueError('Workbook definition is detected. '
126
                                 'Default workflow cannot be determined.')
127
128
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
129
            if wf_name not in def_dict['workflows']:
130
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
131
                                 % fully_qualified_wf_name)
132
133
            return fully_qualified_wf_name
134
        elif num_workflows == 1:
135
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
136
        else:
137
            raise Exception('There are no workflows in the workbook.')
138
139
    def _construct_workflow_execution_options(self):
140
        # This URL is used by Mistral to talk back to the API
141
        api_url = get_mistral_api_url()
142
        endpoint = api_url + '/actionexecutions'
143
144
        # This URL is available in the context and can be used by the users inside a workflow,
145
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
146
        public_api_url = get_full_public_api_url()
147
148
        # Build context with additional information
149
        parent_context = {
150
            'execution_id': self.execution_id
151
        }
152
        if getattr(self.liveaction, 'context', None):
153
            parent_context.update(self.liveaction.context)
154
155
        st2_execution_context = {
156
            'endpoint': endpoint,
157
            'parent': parent_context,
158
            'notify': {},
159
            'skip_notify_tasks': self._skip_notify_tasks
160
        }
161
162
        # Include notification information
163
        if self._notify:
164
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
165
            st2_execution_context['notify'] = notify_dict
166
167
        if self.auth_token:
168
            st2_execution_context['auth_token'] = self.auth_token.token
169
170
        options = {
171
            'env': {
172
                'st2_execution_id': self.execution_id,
173
                'st2_liveaction_id': self.liveaction_id,
174
                'st2_action_api_url': public_api_url,
175
                '__actions': {
176
                    'st2.action': {
177
                        'st2_context': st2_execution_context
178
                    }
179
                }
180
            }
181
        }
182
183
        return options
184
185
    def _get_resume_options(self):
186
        return self.context.get('re-run', {})
187
188
    @retrying.retry(
189
        retry_on_exception=utils.retry_on_exceptions,
190
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
191
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
192
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
193
    def run(self, action_parameters):
194
        resume_options = self._get_resume_options()
195
        tasks = resume_options.get('tasks', [])
196
        resume = self.rerun_ex_ref and tasks
197
        return self.resume(self.rerun_ex_ref, tasks) if resume else self.start(action_parameters)
198
199
    def start(self, action_parameters):
200
        # Test connection
201
        self._client.workflows.list()
202
203
        # Setup inputs for the workflow execution.
204
        inputs = self.runner_parameters.get('context', dict())
205
        inputs.update(action_parameters)
206
207
        # Get workbook/workflow definition from file.
208
        with open(self.entry_point, 'r') as def_file:
209
            def_yaml = def_file.read()
210
211
        def_dict = yaml.safe_load(def_yaml)
212
        is_workbook = ('workflows' in def_dict)
213
214
        if not is_workbook:
215
            # Non-workbook definition containing multiple workflows is not supported.
216
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
217
                raise Exception('Workflow (not workbook) definition is detected. '
218
                                'Multiple workflows is not supported.')
219
220
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
221
        self._check_name(action_ref, is_workbook, def_dict)
222
        def_dict_xformed = utils.transform_definition(def_dict)
223
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
224
225
        # Construct additional options for the workflow execution
226
        options = self._construct_workflow_execution_options()
227
228
        # Save workbook/workflow definition.
229
        if is_workbook:
230
            self._save_workbook(action_ref, def_yaml_xformed)
231
            default_workflow = self._find_default_workflow(def_dict_xformed)
232
            execution = self._client.executions.create(default_workflow,
233
                                                       workflow_input=inputs,
234
                                                       **options)
235
        else:
236
            self._save_workflow(action_ref, def_yaml_xformed)
237
            execution = self._client.executions.create(action_ref,
238
                                                       workflow_input=inputs,
239
                                                       **options)
240
241
        status = LIVEACTION_STATUS_RUNNING
242
        partial_results = {'tasks': []}
243
244
        # pylint: disable=no-member
245
        current_context = {
246
            'execution_id': str(execution.id),
247
            'workflow_name': execution.workflow_name
248
        }
249
250
        exec_context = self.context
251
        exec_context = self._build_mistral_context(exec_context, current_context)
252
        LOG.info('Mistral query context is %s' % exec_context)
253
254
        return (status, partial_results, exec_context)
255
256
    def resume(self, ex_ref, task_names):
257
        mistral_ctx = ex_ref.context.get('mistral', dict())
258
259
        if not mistral_ctx.get('execution_id'):
260
            raise Exception('Unable to rerun because mistral execution_id is missing.')
261
262
        execution = self._client.executions.get(mistral_ctx.get('execution_id'))
263
264
        # pylint: disable=no-member
265
        if execution.state not in ['ERROR']:
266
            raise Exception('Workflow execution is not in a rerunable state.')
267
268
        # pylint: disable=no-member
269
        tasks = {task.name: task.to_dict()
270
                 for task in self._client.tasks.list(workflow_execution_id=execution.id)
271
                 if task.name in task_names and task.state == 'ERROR'}
272
273
        missing_tasks = list(set(task_names) - set(tasks.keys()))
274
        if missing_tasks:
275
            raise Exception('Only tasks in error state can be rerun. Unable to identify '
276
                            'rerunable tasks: %s. Please make sure that the task name is correct '
277
                            'and the task is in rerunable state.' % ', '.join(missing_tasks))
278
279
        # Construct additional options for the workflow execution
280
        options = self._construct_workflow_execution_options()
281
282
        for task in tasks.values():
283
            # pylint: disable=unexpected-keyword-arg
284
            self._client.tasks.rerun(task['id'], env=options.get('env', None))
285
286
        status = LIVEACTION_STATUS_RUNNING
287
        partial_results = {'tasks': []}
288
289
        # pylint: disable=no-member
290
        current_context = {
291
            'execution_id': str(execution.id),
292
            'workflow_name': execution.workflow_name
293
        }
294
295
        exec_context = self.context
296
        exec_context = self._build_mistral_context(exec_context, current_context)
297
        LOG.info('Mistral query context is %s' % exec_context)
298
299
        return (status, partial_results, exec_context)
300
301
    @retrying.retry(
302
        retry_on_exception=utils.retry_on_exceptions,
303
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
304
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
305
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
306
    def cancel(self):
307
        mistral_ctx = self.context.get('mistral', dict())
308
309
        if not mistral_ctx.get('execution_id'):
310
            raise Exception('Unable to cancel because mistral execution_id is missing.')
311
312
        # There is no cancellation state in Mistral. Pause the workflow so
313
        # actions that are still executing can gracefully reach completion.
314
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
315
316
    @staticmethod
317
    def _build_mistral_context(parent, current):
318
        """
319
        Mistral workflow might be kicked off in st2 by a parent Mistral
320
        workflow. In that case, we need to make sure that the existing
321
        mistral 'context' is moved as 'parent' and the child workflow
322
        'context' is added.
323
        """
324
        parent = copy.deepcopy(parent)
325
        context = dict()
326
327
        if not parent:
328
            context['mistral'] = current
329
        else:
330
            if 'mistral' in parent.keys():
331
                orig_parent_context = parent.get('mistral', dict())
332
                actual_parent = dict()
333
                if 'workflow_name' in orig_parent_context.keys():
334
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
335
                    del orig_parent_context['workflow_name']
336
                if 'workflow_execution_id' in orig_parent_context.keys():
337
                    actual_parent['workflow_execution_id'] = \
338
                        orig_parent_context['workflow_execution_id']
339
                    del orig_parent_context['workflow_execution_id']
340
                context['mistral'] = orig_parent_context
341
                context['mistral'].update(current)
342
                context['mistral']['parent'] = actual_parent
343
            else:
344
                context['mistral'] = current
345
346
        return context
347