Passed
Pull Request — master (#3163)
by W
04:43
created

test_cancel_on_task_action_concurrency()   B

Complexity

Conditions 3

Size

Total Lines 77

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
dl 0
loc 77
rs 8.9342
c 2
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import mock
20
import yaml
21
22
from mistralclient.api.v2 import action_executions
23
from mistralclient.api.v2 import executions
24
from mistralclient.api.v2 import workflows
25
from oslo_config import cfg
26
27
# XXX: actionsensor import depends on config being setup.
28
import st2tests.config as tests_config
29
tests_config.parse_args()
30
31
from mistral_v2 import MistralRunner
32
import st2common
33
from st2common.bootstrap import actionsregistrar
34
from st2common.bootstrap import policiesregistrar
35
from st2common.bootstrap import runnersregistrar
36
from st2common.constants import action as action_constants
37
from st2common.models.db.execution import ActionExecutionDB
38
from st2common.models.db.liveaction import LiveActionDB
39
from st2common.persistence.execution import ActionExecution
40
from st2common.persistence.liveaction import LiveAction
41
from st2common.persistence.policy import Policy
42
from st2common.runners import base as runners
43
from st2common.services import action as action_service
44
from st2common.services import trace as trace_service
45
from st2common.transport.liveaction import LiveActionPublisher
46
from st2common.transport.publishers import CUDPublisher
47
from st2common.util import loader
48
from st2tests import DbTestCase
49
from st2tests import fixturesloader
50
from st2tests.mocks.liveaction import MockLiveActionPublisher
51
52
53
TEST_PACK = 'mistral_tests'
54
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
55
56
PACKS = [
57
    TEST_PACK_PATH,
58
    fixturesloader.get_fixtures_packs_base_path() + '/core'
59
]
60
61
# Non-workbook with a single workflow
62
WF1_META_FILE_NAME = 'workflow_v2.yaml'
63
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
64
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
65
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
66
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
67
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
68
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
69
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
70
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
71
MISTRAL_EXECUTION = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
72
WF1_EXEC = copy.deepcopy(MISTRAL_EXECUTION)
73
74
MOCK_ACTION_EXEC_DB = ActionExecutionDB(
75
    action={'ref': 'mock.workflow'},
76
    runner={'ref': 'mock.runner'},
77
    liveaction={'id': uuid.uuid4().hex},
78
    status=action_constants.LIVEACTION_STATUS_RUNNING,
79
    context={'mistral': {'auth_token': uuid.uuid4().hex}}
80
)
81
82
83
@mock.patch.object(
84
    trace_service, 'get_trace_db_by_live_action',
85
    mock.MagicMock(return_value=(None, None)))
86
@mock.patch.object(
87
    ActionExecution, 'get_by_id',
88
    mock.MagicMock(return_value=MOCK_ACTION_EXEC_DB))
89
@mock.patch.object(
90
    CUDPublisher,
91
    'publish_update',
92
    mock.MagicMock(return_value=None))
93
@mock.patch.object(
94
    CUDPublisher,
95
    'publish_create',
96
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
97
@mock.patch.object(
98
    LiveActionPublisher,
99
    'publish_state',
100
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
101
class MistralRunnerPolicyTest(DbTestCase):
102
103
    @classmethod
104
    def setUpClass(cls):
105
        super(MistralRunnerPolicyTest, cls).setUpClass()
106
107
        # Override the retry configuration here otherwise st2tests.config.parse_args
108
        # in DbTestCase.setUpClass will reset these overrides.
109
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
110
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
111
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
112
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
113
114
    def setUp(self):
115
        super(MistralRunnerPolicyTest, self).setUp()
116
117
        # Start with a clean database for each test.
118
        self._establish_connection_and_re_create_db()
119
120
        # Register runners.
121
        runnersregistrar.register_runners()
122
123
        actions_registrar = actionsregistrar.ActionsRegistrar(
124
            use_pack_cache=False,
125
            fail_on_failure=True
126
        )
127
128
        for pack in PACKS:
129
            actions_registrar.register_from_pack(pack)
130
131
        # Register policies required for the tests.
132
        policiesregistrar.register_policy_types(st2common)
133
134
        policies_registrar = policiesregistrar.PolicyRegistrar(
135
            use_pack_cache=False,
136
            fail_on_failure=True
137
        )
138
139
        for pack in PACKS:
140
            policies_registrar.register_from_pack(pack)
141
142
    @classmethod
143
    def get_runner_class(cls, runner_name):
144
        return runners.get_runner(runner_name).__class__
145
146
    def _drop_all_other_policies(self, test_policy):
147
        policy_dbs = [policy_db for policy_db in Policy.get_all() if policy_db.ref != test_policy]
148
149
        for policy_db in policy_dbs:
150
            Policy.delete(policy_db, publish=False)
151
152
    @mock.patch.object(
153
        workflows.WorkflowManager, 'list',
154
        mock.MagicMock(return_value=[]))
155
    @mock.patch.object(
156
        workflows.WorkflowManager, 'get',
157
        mock.MagicMock(return_value=WF1))
158
    @mock.patch.object(
159
        workflows.WorkflowManager, 'create',
160
        mock.MagicMock(return_value=[WF1]))
161
    @mock.patch.object(
162
        executions.ExecutionManager, 'create',
163
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
164
    @mock.patch.object(
165
        action_executions.ActionExecutionManager, 'update',
166
        mock.MagicMock(return_value=None))
167
    def test_cancel_on_task_action_concurrency(self):
168
        # Delete other policies in the test pack to avoid conflicts.
169
        required_policy = 'mistral_tests.cancel_on_concurrency'
170
        self._drop_all_other_policies(required_policy)
171
172
        # Get threshold from the policy.
173
        policy = Policy.get_by_ref(required_policy)
174
        threshold = policy.parameters.get('threshold', 0)
175
        self.assertGreater(threshold, 0)
176
177
        # Launch instances of the workflow up to threshold.
178
        for i in range(0, threshold):
179
            liveaction = LiveActionDB(action=WF1_NAME, parameters={'friend': 'friend' + str(i)})
180
            liveaction, execution1 = action_service.request(liveaction)
181
            liveaction = LiveAction.get_by_id(str(liveaction.id))
182
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
183
184
        # Check number of running instances
185
        running = LiveAction.count(
186
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING)
187
188
        self.assertEqual(running, threshold)
189
190
        # Mock the mistral runner cancel method to assert cancel is called.
191
        mistral_runner_cls = self.get_runner_class('mistral_v2')
192
193
        with mock.patch.object(mistral_runner_cls, 'cancel', mock.MagicMock(return_value=None)):
194
            # Launch another instance of the workflow with mistral callback defined
195
            # to indicate that this is executed under a workflow.
196
            callback = {
197
                'source': 'mistral',
198
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
199
            }
200
201
            params = {'friend': 'grande animalerie'}
202
203
            context = {
204
                'parent': {
205
                    'execution_id': uuid.uuid4().hex
206
                }
207
            }
208
209
            liveaction2 = LiveActionDB(
210
                action=WF1_NAME,
211
                parameters=params,
212
                callback=callback,
213
                context=context
214
            )
215
216
            liveaction2, execution2 = action_service.request(liveaction2)
217
218
            action_executions.ActionExecutionManager.update.assert_called_once_with(
219
                '12345',
220
                output='{"error": "Execution canceled by user."}',
221
                state='ERROR'
222
            )
223
224
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
225
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELED)
226
227
            # Assert cancel has been called.
228
            mistral_runner_cls.cancel.assert_called_once_with()
229
230
    @mock.patch.object(
231
        workflows.WorkflowManager, 'list',
232
        mock.MagicMock(return_value=[]))
233
    @mock.patch.object(
234
        workflows.WorkflowManager, 'get',
235
        mock.MagicMock(return_value=WF1))
236
    @mock.patch.object(
237
        workflows.WorkflowManager, 'create',
238
        mock.MagicMock(return_value=[WF1]))
239
    @mock.patch.object(
240
        executions.ExecutionManager, 'create',
241
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
242
    @mock.patch.object(
243
        action_executions.ActionExecutionManager, 'update',
244
        mock.MagicMock(return_value=None))
245
    def test_cancel_on_task_action_concurrency_by_attr(self):
246
        # Delete other policies in the test pack to avoid conflicts.
247
        required_policy = 'mistral_tests.cancel_on_concurrency_by_attr'
248
        self._drop_all_other_policies(required_policy)
249
250
        # Get threshold from the policy.
251
        policy = Policy.get_by_ref(required_policy)
252
        threshold = policy.parameters.get('threshold', 0)
253
        self.assertGreater(threshold, 0)
254
255
        params = {'friend': 'grande animalerie'}
256
257
        # Launch instances of the workflow up to threshold.
258
        for i in range(0, threshold):
259
            liveaction = LiveActionDB(action=WF1_NAME, parameters=params)
260
            liveaction, execution1 = action_service.request(liveaction)
261
            liveaction = LiveAction.get_by_id(str(liveaction.id))
262
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
263
264
        # Check number of running instances
265
        running = LiveAction.count(
266
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING,
267
            parameters__friend=params['friend'])
268
269
        self.assertEqual(running, threshold)
270
271
        # Mock the mistral runner cancel method to assert cancel is called.
272
        mistral_runner_cls = self.get_runner_class('mistral_v2')
273
274
        with mock.patch.object(mistral_runner_cls, 'cancel', mock.MagicMock(return_value=None)):
275
            # Launch another instance of the workflow with mistral callback defined
276
            # to indicate that this is executed under a workflow.
277
            callback = {
278
                'source': 'mistral',
279
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
280
            }
281
282
            context = {
283
                'parent': {
284
                    'execution_id': uuid.uuid4().hex
285
                }
286
            }
287
288
            liveaction2 = LiveActionDB(
289
                action=WF1_NAME,
290
                parameters=params,
291
                callback=callback,
292
                context=context
293
            )
294
295
            liveaction2, execution2 = action_service.request(liveaction2)
296
297
            action_executions.ActionExecutionManager.update.assert_called_once_with(
298
                '12345',
299
                output='{"error": "Execution canceled by user."}',
300
                state='ERROR'
301
            )
302
303
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
304
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELED)
305
306
            # Assert cancel has been called.
307
            mistral_runner_cls.cancel.assert_called_once_with()
308