1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import copy |
||
18 | import uuid |
||
19 | |||
20 | import mock |
||
21 | import yaml |
||
22 | |||
23 | from mistralclient.api.v2 import action_executions |
||
24 | from mistralclient.api.v2 import executions |
||
25 | from mistralclient.api.v2 import workflows |
||
26 | from oslo_config import cfg |
||
27 | |||
28 | # XXX: actionsensor import depends on config being setup. |
||
29 | import st2tests.config as tests_config |
||
30 | from six.moves import range |
||
0 ignored issues
–
show
|
|||
31 | tests_config.parse_args() |
||
32 | |||
33 | import st2common |
||
34 | from mistral_v2.mistral_v2 import MistralRunner |
||
35 | from st2common.bootstrap import actionsregistrar |
||
36 | from st2common.bootstrap import policiesregistrar |
||
37 | from st2common.bootstrap import runnersregistrar |
||
38 | from st2common.constants import action as action_constants |
||
39 | from st2common.models.db.liveaction import LiveActionDB |
||
40 | from st2common.persistence.liveaction import LiveAction |
||
41 | from st2common.persistence.policy import Policy |
||
42 | from st2common.runners import base as runners |
||
43 | from st2common.services import action as action_service |
||
44 | from st2common.transport.liveaction import LiveActionPublisher |
||
45 | from st2common.transport.publishers import CUDPublisher |
||
46 | from st2common.util import loader |
||
47 | from st2tests import DbTestCase |
||
48 | from st2tests import fixturesloader |
||
49 | from st2tests.mocks.liveaction import MockLiveActionPublisher |
||
50 | |||
51 | |||
52 | MISTRAL_RUNNER_NAME = 'mistral_v2' |
||
53 | TEST_PACK = 'mistral_tests' |
||
54 | TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK |
||
55 | |||
56 | PACKS = [ |
||
57 | TEST_PACK_PATH, |
||
58 | fixturesloader.get_fixtures_packs_base_path() + '/core' |
||
59 | ] |
||
60 | |||
61 | # Non-workbook with a single workflow |
||
62 | WF1_META_FILE_NAME = 'workflow_v2.yaml' |
||
63 | WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME |
||
64 | WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH) |
||
65 | WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name'] |
||
66 | WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point'] |
||
67 | WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME) |
||
68 | WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X)) |
||
69 | WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False) |
||
70 | WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML}) |
||
71 | MISTRAL_EXECUTION = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME} |
||
72 | WF1_EXEC = copy.deepcopy(MISTRAL_EXECUTION) |
||
73 | |||
74 | |||
75 | @mock.patch.object( |
||
76 | CUDPublisher, |
||
77 | 'publish_update', |
||
78 | mock.MagicMock(return_value=None)) |
||
79 | @mock.patch.object( |
||
80 | CUDPublisher, |
||
81 | 'publish_create', |
||
82 | mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create)) |
||
83 | @mock.patch.object( |
||
84 | LiveActionPublisher, |
||
85 | 'publish_state', |
||
86 | mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state)) |
||
87 | class MistralRunnerPolicyTest(DbTestCase): |
||
88 | |||
89 | @classmethod |
||
90 | def setUpClass(cls): |
||
91 | super(MistralRunnerPolicyTest, cls).setUpClass() |
||
92 | |||
93 | # Override the retry configuration here otherwise st2tests.config.parse_args |
||
94 | # in DbTestCase.setUpClass will reset these overrides. |
||
95 | cfg.CONF.set_override('retry_exp_msec', 100, group='mistral') |
||
96 | cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral') |
||
97 | cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral') |
||
98 | cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth') |
||
99 | |||
100 | def setUp(self): |
||
101 | super(MistralRunnerPolicyTest, self).setUp() |
||
102 | |||
103 | # Start with a clean database for each test. |
||
104 | self._establish_connection_and_re_create_db() |
||
105 | |||
106 | # Register runners. |
||
107 | runnersregistrar.register_runners() |
||
108 | |||
109 | actions_registrar = actionsregistrar.ActionsRegistrar( |
||
110 | use_pack_cache=False, |
||
111 | fail_on_failure=True |
||
112 | ) |
||
113 | |||
114 | for pack in PACKS: |
||
115 | actions_registrar.register_from_pack(pack) |
||
116 | |||
117 | # Register policies required for the tests. |
||
118 | policiesregistrar.register_policy_types(st2common) |
||
119 | |||
120 | policies_registrar = policiesregistrar.PolicyRegistrar( |
||
121 | use_pack_cache=False, |
||
122 | fail_on_failure=True |
||
123 | ) |
||
124 | |||
125 | for pack in PACKS: |
||
126 | policies_registrar.register_from_pack(pack) |
||
127 | |||
128 | @classmethod |
||
129 | def get_runner_class(cls, runner_name): |
||
130 | return runners.get_runner(runner_name, runner_name).__class__ |
||
131 | |||
132 | def _drop_all_other_policies(self, test_policy): |
||
133 | policy_dbs = [policy_db for policy_db in Policy.get_all() if policy_db.ref != test_policy] |
||
134 | |||
135 | for policy_db in policy_dbs: |
||
136 | Policy.delete(policy_db, publish=False) |
||
137 | |||
138 | @mock.patch.object( |
||
139 | workflows.WorkflowManager, 'list', |
||
140 | mock.MagicMock(return_value=[])) |
||
141 | @mock.patch.object( |
||
142 | workflows.WorkflowManager, 'get', |
||
143 | mock.MagicMock(return_value=WF1)) |
||
144 | @mock.patch.object( |
||
145 | workflows.WorkflowManager, 'create', |
||
146 | mock.MagicMock(return_value=[WF1])) |
||
147 | @mock.patch.object( |
||
148 | executions.ExecutionManager, 'create', |
||
149 | mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC))) |
||
150 | @mock.patch.object( |
||
151 | action_executions.ActionExecutionManager, 'update', |
||
152 | mock.MagicMock(return_value=None)) |
||
153 | def test_cancel_on_task_action_concurrency(self): |
||
154 | # Delete other policies in the test pack to avoid conflicts. |
||
155 | required_policy = 'mistral_tests.cancel_on_concurrency' |
||
156 | self._drop_all_other_policies(required_policy) |
||
157 | |||
158 | # Get threshold from the policy. |
||
159 | policy = Policy.get_by_ref(required_policy) |
||
160 | threshold = policy.parameters.get('threshold', 0) |
||
161 | self.assertGreater(threshold, 0) |
||
162 | |||
163 | # Launch instances of the workflow up to threshold. |
||
164 | for i in range(0, threshold): |
||
165 | liveaction = LiveActionDB(action=WF1_NAME, parameters={'friend': 'friend' + str(i)}) |
||
166 | liveaction, execution1 = action_service.request(liveaction) |
||
167 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
168 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
169 | |||
170 | # Check number of running instances |
||
171 | running = LiveAction.count( |
||
172 | action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING) |
||
173 | |||
174 | self.assertEqual(running, threshold) |
||
175 | |||
176 | # Mock the mistral runner cancel method to assert cancel is called. |
||
177 | mistral_runner_cls = self.get_runner_class('mistral_v2') |
||
178 | mock_cancel_return_value = (action_constants.LIVEACTION_STATUS_CANCELING, None, None) |
||
179 | mock_cancel = mock.MagicMock(return_value=mock_cancel_return_value) |
||
180 | |||
181 | with mock.patch.object(mistral_runner_cls, 'cancel', mock_cancel): |
||
182 | # Launch another instance of the workflow with mistral callback defined |
||
183 | # to indicate that this is executed under a workflow. |
||
184 | callback = { |
||
185 | 'source': MISTRAL_RUNNER_NAME, |
||
186 | 'url': 'http://127.0.0.1:8989/v2/action_executions/12345' |
||
187 | } |
||
188 | |||
189 | params = {'friend': 'grande animalerie'} |
||
190 | |||
191 | liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback) |
||
192 | liveaction2, execution2 = action_service.request(liveaction2) |
||
193 | liveaction2 = LiveAction.get_by_id(str(liveaction2.id)) |
||
194 | |||
195 | # Assert cancel has been called. |
||
196 | self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELING) |
||
197 | mistral_runner_cls.cancel.assert_called_once_with() |
||
198 | |||
199 | @mock.patch.object( |
||
200 | workflows.WorkflowManager, 'list', |
||
201 | mock.MagicMock(return_value=[])) |
||
202 | @mock.patch.object( |
||
203 | workflows.WorkflowManager, 'get', |
||
204 | mock.MagicMock(return_value=WF1)) |
||
205 | @mock.patch.object( |
||
206 | workflows.WorkflowManager, 'create', |
||
207 | mock.MagicMock(return_value=[WF1])) |
||
208 | @mock.patch.object( |
||
209 | executions.ExecutionManager, 'create', |
||
210 | mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC))) |
||
211 | @mock.patch.object( |
||
212 | action_executions.ActionExecutionManager, 'update', |
||
213 | mock.MagicMock(return_value=None)) |
||
214 | def test_cancel_on_task_action_concurrency_by_attr(self): |
||
215 | # Delete other policies in the test pack to avoid conflicts. |
||
216 | required_policy = 'mistral_tests.cancel_on_concurrency_by_attr' |
||
217 | self._drop_all_other_policies(required_policy) |
||
218 | |||
219 | # Get threshold from the policy. |
||
220 | policy = Policy.get_by_ref(required_policy) |
||
221 | threshold = policy.parameters.get('threshold', 0) |
||
222 | self.assertGreater(threshold, 0) |
||
223 | |||
224 | params = {'friend': 'grande animalerie'} |
||
225 | |||
226 | # Launch instances of the workflow up to threshold. |
||
227 | for i in range(0, threshold): |
||
228 | liveaction = LiveActionDB(action=WF1_NAME, parameters=params) |
||
229 | liveaction, execution1 = action_service.request(liveaction) |
||
230 | liveaction = LiveAction.get_by_id(str(liveaction.id)) |
||
231 | self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING) |
||
232 | |||
233 | # Check number of running instances |
||
234 | running = LiveAction.count( |
||
235 | action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING, |
||
236 | parameters__friend=params['friend']) |
||
237 | |||
238 | self.assertEqual(running, threshold) |
||
239 | |||
240 | # Mock the mistral runner cancel method to assert cancel is called. |
||
241 | mistral_runner_cls = self.get_runner_class('mistral_v2') |
||
242 | mock_cancel_return_value = (action_constants.LIVEACTION_STATUS_CANCELING, None, None) |
||
243 | mock_cancel = mock.MagicMock(return_value=mock_cancel_return_value) |
||
244 | |||
245 | with mock.patch.object(mistral_runner_cls, 'cancel', mock_cancel): |
||
246 | # Launch another instance of the workflow with mistral callback defined |
||
247 | # to indicate that this is executed under a workflow. |
||
248 | callback = { |
||
249 | 'source': MISTRAL_RUNNER_NAME, |
||
250 | 'url': 'http://127.0.0.1:8989/v2/action_executions/12345' |
||
251 | } |
||
252 | |||
253 | liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback) |
||
254 | liveaction2, execution2 = action_service.request(liveaction2) |
||
255 | liveaction2 = LiveAction.get_by_id(str(liveaction2.id)) |
||
256 | |||
257 | # Assert cancel has been called. |
||
258 | self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELING) |
||
259 | mistral_runner_cls.cancel.assert_called_once_with() |
||
260 |
It is generally discouraged to redefine built-ins as this makes code very hard to read.