1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import copy |
||
18 | import uuid |
||
19 | |||
20 | import mock |
||
21 | from mock import call |
||
22 | import requests |
||
23 | import yaml |
||
24 | |||
25 | from mistralclient.api.v2 import executions |
||
26 | from mistralclient.api.v2 import workflows |
||
27 | from oslo_config import cfg |
||
28 | |||
29 | # XXX: actionsensor import depends on config being setup. |
||
30 | import st2tests.config as tests_config |
||
31 | from six.moves import range |
||
0 ignored issues
–
show
|
|||
32 | tests_config.parse_args() |
||
33 | |||
34 | from mistral_v2.mistral_v2 import MistralRunner |
||
35 | from st2common.bootstrap import actionsregistrar |
||
36 | from st2common.bootstrap import runnersregistrar |
||
37 | from st2common.constants import action as action_constants |
||
38 | from st2common.models.db.execution import ActionExecutionDB |
||
39 | from st2common.models.db.liveaction import LiveActionDB |
||
40 | from st2common.persistence.liveaction import LiveAction |
||
41 | from st2common.runners import base as runners |
||
42 | from st2common.services import action as action_service |
||
43 | from st2common.transport.liveaction import LiveActionPublisher |
||
44 | from st2common.transport.publishers import CUDPublisher |
||
45 | from st2common.util import loader |
||
46 | from st2tests import DbTestCase |
||
47 | from st2tests import fixturesloader |
||
48 | from st2tests.mocks.liveaction import MockLiveActionPublisher |
||
49 | |||
50 | |||
51 | TEST_PACK = 'mistral_tests' |
||
52 | TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK |
||
53 | |||
54 | PACKS = [ |
||
55 | TEST_PACK_PATH, |
||
56 | fixturesloader.get_fixtures_packs_base_path() + '/core' |
||
57 | ] |
||
58 | |||
59 | # Action executions requirements |
||
60 | ACTION_PARAMS = {'friend': 'Rocky'} |
||
61 | NON_EMPTY_RESULT = 'non-empty' |
||
62 | |||
63 | # Non-workbook with a single workflow |
||
64 | WF1_META_FILE_NAME = 'workflow_v2.yaml' |
||
65 | WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME |
||
66 | WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH) |
||
67 | WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name'] |
||
68 | WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point'] |
||
69 | WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME) |
||
70 | WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X)) |
||
71 | WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False) |
||
72 | WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML}) |
||
73 | WF1_OLD = workflows.Workflow(None, {'name': WF1_NAME, 'definition': ''}) |
||
74 | WF1_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME} |
||
75 | WF1_EXEC_CANCELLED = copy.deepcopy(WF1_EXEC) |
||
76 | WF1_EXEC_CANCELLED['state'] = 'CANCELLED' |
||
77 | |||
78 | # Workflow with a subworkflow action |
||
79 | WF2_META_FILE_NAME = 'workflow_v2_call_workflow_action.yaml' |
||
80 | WF2_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF2_META_FILE_NAME |
||
81 | WF2_META_CONTENT = loader.load_meta_file(WF2_META_FILE_PATH) |
||
82 | WF2_NAME = WF2_META_CONTENT['pack'] + '.' + WF2_META_CONTENT['name'] |
||
83 | WF2_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF2_META_CONTENT['entry_point'] |
||
84 | WF2_ENTRY_POINT_X = WF2_ENTRY_POINT.replace(WF2_META_FILE_NAME, 'xformed_' + WF2_META_FILE_NAME) |
||
85 | WF2_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF2_ENTRY_POINT_X)) |
||
86 | WF2_YAML = yaml.safe_dump(WF2_SPEC, default_flow_style=False) |
||
87 | WF2 = workflows.Workflow(None, {'name': WF2_NAME, 'definition': WF2_YAML}) |
||
88 | WF2_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF2_NAME} |
||
89 | WF2_EXEC_CANCELLED = copy.deepcopy(WF2_EXEC) |
||
90 | WF2_EXEC_CANCELLED['state'] = 'CANCELLED' |
||
91 | |||
92 | |||
93 | @mock.patch.object( |
||
94 | CUDPublisher, |
||
95 | 'publish_update', |
||
96 | mock.MagicMock(return_value=None)) |
||
97 | @mock.patch.object( |
||
98 | CUDPublisher, |
||
99 | 'publish_create', |
||
100 | mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create)) |
||
101 | @mock.patch.object( |
||
102 | LiveActionPublisher, |
||
103 | 'publish_state', |
||
104 | mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state)) |
||
105 | class MistralRunnerCancelTest(DbTestCase): |
||
106 | |||
107 | @classmethod |
||
108 | def setUpClass(cls): |
||
109 | super(MistralRunnerCancelTest, cls).setUpClass() |
||
110 | |||
111 | # Override the retry configuration here otherwise st2tests.config.parse_args |
||
112 | # in DbTestCase.setUpClass will reset these overrides. |
||
113 | cfg.CONF.set_override('retry_exp_msec', 100, group='mistral') |
||
114 | cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral') |
||
115 | cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral') |
||
116 | cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth') |
||
117 | |||
118 | # Register runners. |
||
119 | runnersregistrar.register_runners() |
||
120 | |||
121 | # Register test pack(s). |
||
122 | actions_registrar = actionsregistrar.ActionsRegistrar( |
||
123 | use_pack_cache=False, |
||
124 | fail_on_failure=True |
||
125 | ) |
||
126 | |||
127 | for pack in PACKS: |
||
128 | actions_registrar.register_from_pack(pack) |
||
129 | |||
130 | @classmethod |
||
131 | def get_runner_class(cls, runner_name): |
||
132 | return runners.get_runner(runner_name, runner_name).__class__ |
||
133 | |||
134 | @mock.patch.object( |
||
135 | workflows.WorkflowManager, 'list', |
||
136 | mock.MagicMock(return_value=[])) |
||
137 | @mock.patch.object( |
||
138 | workflows.WorkflowManager, 'get', |
||
139 | mock.MagicMock(return_value=WF1)) |
||
140 | @mock.patch.object( |
||
141 | workflows.WorkflowManager, 'create', |
||
142 | mock.MagicMock(return_value=[WF1])) |
||
143 | @mock.patch.object( |
||
144 | executions.ExecutionManager, 'create', |
||
145 | mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC))) |
||
146 | @mock.patch.object( |
||
147 | executions.ExecutionManager, 'update', |
||
148 | mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC_CANCELLED))) |
||
149 | @mock.patch.object( |
||
150 | action_service, 'is_children_active', |
||
151 | mock.MagicMock(return_value=True)) |
||
152 | def test_cancel(self): |
||
153 | liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS) |
||
154 | liveaction, execution = action_service.request(liveaction) |
||
155 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
156 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
157 | |||
158 | mistral_context = liveaction.context.get('mistral', None) |
||
159 | self.assertIsNotNone(mistral_context) |
||
160 | self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id')) |
||
161 | self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name')) |
||
162 | |||
163 | requester = cfg.CONF.system_user.user |
||
164 | liveaction, execution = action_service.request_cancellation(liveaction, requester) |
||
165 | executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'CANCELLED') |
||
166 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
167 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING) |
||
168 | |||
169 | @mock.patch.object( |
||
170 | workflows.WorkflowManager, 'list', |
||
171 | mock.MagicMock(return_value=[])) |
||
172 | @mock.patch.object( |
||
173 | workflows.WorkflowManager, 'get', |
||
174 | mock.MagicMock(side_effect=[WF2, WF1])) |
||
175 | @mock.patch.object( |
||
176 | workflows.WorkflowManager, 'create', |
||
177 | mock.MagicMock(side_effect=[[WF2], [WF1]])) |
||
178 | @mock.patch.object( |
||
179 | executions.ExecutionManager, 'create', |
||
180 | mock.MagicMock(side_effect=[ |
||
181 | executions.Execution(None, WF2_EXEC), |
||
182 | executions.Execution(None, WF1_EXEC)])) |
||
183 | @mock.patch.object( |
||
184 | executions.ExecutionManager, 'update', |
||
185 | mock.MagicMock(side_effect=[ |
||
186 | executions.Execution(None, WF2_EXEC_CANCELLED), |
||
187 | executions.Execution(None, WF1_EXEC_CANCELLED)])) |
||
188 | def test_cancel_subworkflow_action(self): |
||
189 | liveaction1 = LiveActionDB(action=WF2_NAME, parameters=ACTION_PARAMS) |
||
190 | liveaction1, execution1 = action_service.request(liveaction1) |
||
191 | liveaction1 = LiveAction.get_by_id(str(liveaction1.id)) |
||
192 | self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
193 | |||
194 | liveaction2 = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS) |
||
195 | liveaction2, execution2 = action_service.request(liveaction2) |
||
196 | liveaction2 = LiveAction.get_by_id(str(liveaction2.id)) |
||
197 | self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
198 | |||
199 | # Mock the children of the parent execution to make this |
||
200 | # test case has subworkflow execution. |
||
201 | with mock.patch.object( |
||
202 | ActionExecutionDB, 'children', |
||
203 | new_callable=mock.PropertyMock) as action_ex_children_mock: |
||
204 | action_ex_children_mock.return_value = [execution2.id] |
||
205 | |||
206 | mistral_context = liveaction1.context.get('mistral', None) |
||
207 | self.assertIsNotNone(mistral_context) |
||
208 | self.assertEqual(mistral_context['execution_id'], WF2_EXEC.get('id')) |
||
209 | self.assertEqual(mistral_context['workflow_name'], WF2_EXEC.get('workflow_name')) |
||
210 | |||
211 | requester = cfg.CONF.system_user.user |
||
212 | liveaction1, execution1 = action_service.request_cancellation(liveaction1, requester) |
||
213 | |||
214 | self.assertTrue(executions.ExecutionManager.update.called) |
||
215 | self.assertEqual(executions.ExecutionManager.update.call_count, 2) |
||
216 | |||
217 | calls = [ |
||
218 | mock.call(WF2_EXEC.get('id'), 'CANCELLED'), |
||
219 | mock.call(WF1_EXEC.get('id'), 'CANCELLED') |
||
220 | ] |
||
221 | |||
222 | executions.ExecutionManager.update.assert_has_calls(calls, any_order=False) |
||
223 | |||
224 | @mock.patch.object( |
||
225 | workflows.WorkflowManager, 'list', |
||
226 | mock.MagicMock(return_value=[])) |
||
227 | @mock.patch.object( |
||
228 | workflows.WorkflowManager, 'get', |
||
229 | mock.MagicMock(return_value=WF1)) |
||
230 | @mock.patch.object( |
||
231 | workflows.WorkflowManager, 'create', |
||
232 | mock.MagicMock(return_value=[WF1])) |
||
233 | @mock.patch.object( |
||
234 | executions.ExecutionManager, 'create', |
||
235 | mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC))) |
||
236 | @mock.patch.object( |
||
237 | executions.ExecutionManager, 'update', |
||
238 | mock.MagicMock(side_effect=[requests.exceptions.ConnectionError(), |
||
239 | executions.Execution(None, WF1_EXEC_CANCELLED)])) |
||
240 | @mock.patch.object( |
||
241 | action_service, 'is_children_active', |
||
242 | mock.MagicMock(return_value=True)) |
||
243 | def test_cancel_retry(self): |
||
244 | liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS) |
||
245 | liveaction, execution = action_service.request(liveaction) |
||
246 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
247 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
248 | |||
249 | mistral_context = liveaction.context.get('mistral', None) |
||
250 | self.assertIsNotNone(mistral_context) |
||
251 | self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id')) |
||
252 | self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name')) |
||
253 | |||
254 | requester = cfg.CONF.system_user.user |
||
255 | liveaction, execution = action_service.request_cancellation(liveaction, requester) |
||
256 | executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'CANCELLED') |
||
257 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
258 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING) |
||
259 | |||
260 | @mock.patch.object( |
||
261 | workflows.WorkflowManager, 'list', |
||
262 | mock.MagicMock(return_value=[])) |
||
263 | @mock.patch.object( |
||
264 | workflows.WorkflowManager, 'get', |
||
265 | mock.MagicMock(return_value=WF1)) |
||
266 | @mock.patch.object( |
||
267 | workflows.WorkflowManager, 'create', |
||
268 | mock.MagicMock(return_value=[WF1])) |
||
269 | @mock.patch.object( |
||
270 | executions.ExecutionManager, 'create', |
||
271 | mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC))) |
||
272 | @mock.patch.object( |
||
273 | executions.ExecutionManager, 'update', |
||
274 | mock.MagicMock(side_effect=requests.exceptions.ConnectionError('Connection refused'))) |
||
275 | def test_cancel_retry_exhausted(self): |
||
276 | liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS) |
||
277 | liveaction, execution = action_service.request(liveaction) |
||
278 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
279 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
280 | |||
281 | mistral_context = liveaction.context.get('mistral', None) |
||
282 | self.assertIsNotNone(mistral_context) |
||
283 | self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id')) |
||
284 | self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name')) |
||
285 | |||
286 | requester = cfg.CONF.system_user.user |
||
287 | liveaction, execution = action_service.request_cancellation(liveaction, requester) |
||
288 | |||
289 | calls = [call(WF1_EXEC.get('id'), 'CANCELLED') for i in range(0, 2)] |
||
290 | executions.ExecutionManager.update.assert_has_calls(calls) |
||
291 | |||
292 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
293 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING) |
||
294 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.