Completed
Pull Request — master (#2334)
by Edward
06:02
created

st2actions.runners.mistral.MistralRunner   B

Complexity

Total Complexity 39

Size/Duplication

Total Lines 245
Duplicated Lines 0 %
Metric Value
wmc 39
dl 0
loc 245
rs 8.2857

9 Methods

Rating   Name   Duplication   Size   Complexity  
A _save_workbook() 0 20 4
A pre_run() 0 4 2
A _save_workflow() 0 20 4
B _find_default_workflow() 0 19 5
B _check_name() 0 13 6
A cancel() 0 14 2
B _build_mistral_context() 0 31 5
A __init__() 0 11 1
F run() 0 101 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import retrying
20
import six
21
import yaml
22
from mistralclient.api import client as mistral
23
from oslo_config import cfg
24
25
from st2actions.runners import AsyncActionRunner
26
from st2common.constants.action import LIVEACTION_STATUS_RUNNING
27
from st2common import log as logging
28
from st2common.models.api.notification import NotificationsHelper
29
from st2common.util.workflow import mistral as utils
30
from st2common.util.url import get_url_without_trailing_slash
31
from st2common.util.api import get_full_public_api_url
32
from st2common.util.api import get_mistral_api_url
33
34
35
LOG = logging.getLogger(__name__)
36
37
38
def get_runner():
39
    return MistralRunner(str(uuid.uuid4()))
40
41
42
class MistralRunner(AsyncActionRunner):
43
44
    url = get_url_without_trailing_slash(cfg.CONF.mistral.v2_base_url)
45
46
    def __init__(self, runner_id):
47
        super(MistralRunner, self).__init__(runner_id=runner_id)
48
        self._on_behalf_user = cfg.CONF.system_user.user
49
        self._notify = None
50
        self._skip_notify_tasks = []
51
        self._client = mistral.client(
52
            mistral_url=self.url,
53
            username=cfg.CONF.mistral.keystone_username,
54
            api_key=cfg.CONF.mistral.keystone_password,
55
            project_name=cfg.CONF.mistral.keystone_project_name,
56
            auth_url=cfg.CONF.mistral.keystone_auth_url)
57
58
    def pre_run(self):
59
        if getattr(self, 'liveaction', None):
60
            self._notify = getattr(self.liveaction, 'notify', None)
61
        self._skip_notify_tasks = self.runner_parameters.get('skip_notify', [])
62
63
    @staticmethod
64
    def _check_name(action_ref, is_workbook, def_dict):
65
        # If workbook, change the value of the "name" key.
66
        if is_workbook:
67
            if def_dict.get('name') != action_ref:
68
                raise Exception('Name of the workbook must be the same as the '
69
                                'fully qualified action name "%s".' % action_ref)
70
        # If workflow, change the key name of the workflow.
71
        else:
72
            workflow_name = [k for k, v in six.iteritems(def_dict) if k != 'version'][0]
73
            if workflow_name != action_ref:
74
                raise Exception('Name of the workflow must be the same as the '
75
                                'fully qualified action name "%s".' % action_ref)
76
77
    def _save_workbook(self, name, def_yaml):
78
        # If the workbook is not found, the mistral client throws a generic API exception.
79
        try:
80
            # Update existing workbook.
81
            wb = self._client.workbooks.get(name)
82
        except:
83
            # Delete if definition was previously a workflow.
84
            # If not found, an API exception is thrown.
85
            try:
86
                self._client.workflows.delete(name)
87
            except:
88
                pass
89
90
            # Create the new workbook.
91
            wb = self._client.workbooks.create(def_yaml)
92
93
        # Update the workbook definition.
94
        # pylint: disable=no-member
95
        if wb.definition != def_yaml:
96
            self._client.workbooks.update(def_yaml)
97
98
    def _save_workflow(self, name, def_yaml):
99
        # If the workflow is not found, the mistral client throws a generic API exception.
100
        try:
101
            # Update existing workbook.
102
            wf = self._client.workflows.get(name)
103
        except:
104
            # Delete if definition was previously a workbook.
105
            # If not found, an API exception is thrown.
106
            try:
107
                self._client.workbooks.delete(name)
108
            except:
109
                pass
110
111
            # Create the new workflow.
112
            wf = self._client.workflows.create(def_yaml)[0]
113
114
        # Update the workflow definition.
115
        # pylint: disable=no-member
116
        if wf.definition != def_yaml:
117
            self._client.workflows.update(def_yaml)
118
119
    def _find_default_workflow(self, def_dict):
120
        num_workflows = len(def_dict['workflows'].keys())
121
122
        if num_workflows > 1:
123
            fully_qualified_wf_name = self.runner_parameters.get('workflow')
124
            if not fully_qualified_wf_name:
125
                raise ValueError('Workbook definition is detected. '
126
                                 'Default workflow cannot be determined.')
127
128
            wf_name = fully_qualified_wf_name[fully_qualified_wf_name.rindex('.') + 1:]
129
            if wf_name not in def_dict['workflows']:
130
                raise ValueError('Unable to find the workflow "%s" in the workbook.'
131
                                 % fully_qualified_wf_name)
132
133
            return fully_qualified_wf_name
134
        elif num_workflows == 1:
135
            return '%s.%s' % (def_dict['name'], def_dict['workflows'].keys()[0])
136
        else:
137
            raise Exception('There are no workflows in the workbook.')
138
139
    @retrying.retry(
140
        retry_on_exception=utils.retry_on_exceptions,
141
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
142
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
143
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
144
    def run(self, action_parameters):
145
        # Test connection
146
        self._client.workflows.list()
147
148
        # Setup inputs for the workflow execution.
149
        inputs = self.runner_parameters.get('context', dict())
150
        inputs.update(action_parameters)
151
152
        # This URL is used by Mistral to talk back to the API
153
        api_url = get_mistral_api_url()
154
        endpoint = api_url + '/actionexecutions'
155
156
        # This URL is available in the context and can be used by the users inside a workflow,
157
        # similar to "ST2_ACTION_API_URL" environment variable available to actions
158
        public_api_url = get_full_public_api_url()
159
160
        # Build context with additional information
161
        parent_context = {
162
            'execution_id': self.execution_id
163
        }
164
        if getattr(self.liveaction, 'context', None):
165
            parent_context.update(self.liveaction.context)
166
167
        st2_execution_context = {
168
            'endpoint': endpoint,
169
            'parent': parent_context,
170
            'notify': {},
171
            'skip_notify_tasks': self._skip_notify_tasks
172
        }
173
174
        # Include notification information
175
        if self._notify:
176
            notify_dict = NotificationsHelper.from_model(notify_model=self._notify)
177
            st2_execution_context['notify'] = notify_dict
178
179
        if self.auth_token:
180
            st2_execution_context['auth_token'] = self.auth_token.token
181
182
        options = {
183
            'env': {
184
                'st2_execution_id': self.execution_id,
185
                'st2_liveaction_id': self.liveaction_id,
186
                'st2_action_api_url': public_api_url,
187
                '__actions': {
188
                    'st2.action': {
189
                        'st2_context': st2_execution_context
190
                    }
191
                }
192
            }
193
        }
194
195
        # Get workbook/workflow definition from file.
196
        with open(self.entry_point, 'r') as def_file:
197
            def_yaml = def_file.read()
198
199
        def_dict = yaml.safe_load(def_yaml)
200
        is_workbook = ('workflows' in def_dict)
201
202
        if not is_workbook:
203
            # Non-workbook definition containing multiple workflows is not supported.
204
            if len([k for k, _ in six.iteritems(def_dict) if k != 'version']) != 1:
205
                raise Exception('Workflow (not workbook) definition is detected. '
206
                                'Multiple workflows is not supported.')
207
208
        action_ref = '%s.%s' % (self.action.pack, self.action.name)
209
        self._check_name(action_ref, is_workbook, def_dict)
210
        def_dict_xformed = utils.transform_definition(def_dict)
211
        def_yaml_xformed = yaml.safe_dump(def_dict_xformed, default_flow_style=False)
212
213
        # Save workbook/workflow definition.
214
        if is_workbook:
215
            self._save_workbook(action_ref, def_yaml_xformed)
216
            default_workflow = self._find_default_workflow(def_dict_xformed)
217
            execution = self._client.executions.create(default_workflow,
218
                                                       workflow_input=inputs,
219
                                                       **options)
220
        else:
221
            self._save_workflow(action_ref, def_yaml_xformed)
222
            execution = self._client.executions.create(action_ref,
223
                                                       workflow_input=inputs,
224
                                                       **options)
225
226
        status = LIVEACTION_STATUS_RUNNING
227
        partial_results = {'tasks': []}
228
229
        # pylint: disable=no-member
230
        current_context = {
231
            'execution_id': str(execution.id),
232
            'workflow_name': execution.workflow_name
233
        }
234
235
        exec_context = self.context
236
        exec_context = self._build_mistral_context(exec_context, current_context)
237
        LOG.info('Mistral query context is %s' % exec_context)
238
239
        return (status, partial_results, exec_context)
240
241
    @retrying.retry(
242
        retry_on_exception=utils.retry_on_exceptions,
243
        wait_exponential_multiplier=cfg.CONF.mistral.retry_exp_msec,
244
        wait_exponential_max=cfg.CONF.mistral.retry_exp_max_msec,
245
        stop_max_delay=cfg.CONF.mistral.retry_stop_max_msec)
246
    def cancel(self):
247
        mistral_ctx = self.context.get('mistral', dict())
248
249
        if not mistral_ctx.get('execution_id'):
250
            raise Exception('Unable to cancel because mistral execution_id is missing.')
251
252
        # There is no cancellation state in Mistral. Pause the workflow so
253
        # actions that are still executing can gracefully reach completion.
254
        self._client.executions.update(mistral_ctx.get('execution_id'), 'PAUSED')
255
256
    @staticmethod
257
    def _build_mistral_context(parent, current):
258
        """
259
        Mistral workflow might be kicked off in st2 by a parent Mistral
260
        workflow. In that case, we need to make sure that the existing
261
        mistral 'context' is moved as 'parent' and the child workflow
262
        'context' is added.
263
        """
264
        parent = copy.deepcopy(parent)
265
        context = dict()
266
267
        if not parent:
268
            context['mistral'] = current
269
        else:
270
            if 'mistral' in parent.keys():
271
                orig_parent_context = parent.get('mistral', dict())
272
                actual_parent = dict()
273
                if 'workflow_name' in orig_parent_context.keys():
274
                    actual_parent['workflow_name'] = orig_parent_context['workflow_name']
275
                    del orig_parent_context['workflow_name']
276
                if 'workflow_execution_id' in orig_parent_context.keys():
277
                    actual_parent['workflow_execution_id'] = \
278
                        orig_parent_context['workflow_execution_id']
279
                    del orig_parent_context['workflow_execution_id']
280
                context['mistral'] = orig_parent_context
281
                context['mistral'].update(current)
282
                context['mistral']['parent'] = actual_parent
283
            else:
284
                context['mistral'] = current
285
286
        return context
287