Completed
Pull Request — master (#3125)
by W
04:29
created

MistralRunnerPolicyTest._drop_all_other_policies()   A

Complexity

Conditions 4

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
c 1
b 0
f 0
dl 0
loc 5
rs 9.2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import mock
20
import yaml
21
22
from mistralclient.api.v2 import executions
23
from mistralclient.api.v2 import workflows
24
from oslo_config import cfg
25
26
# XXX: actionsensor import depends on config being setup.
27
import st2tests.config as tests_config
28
tests_config.parse_args()
29
30
from mistral_v2 import MistralRunner
31
import st2common
32
from st2common.bootstrap import actionsregistrar
33
from st2common.bootstrap import policiesregistrar
34
from st2common.bootstrap import runnersregistrar
35
from st2common.constants import action as action_constants
36
from st2common.models.db.liveaction import LiveActionDB
37
from st2common.persistence.liveaction import LiveAction
38
from st2common.persistence.policy import Policy
39
from st2common.runners import base as runners
40
from st2common.services import action as action_service
41
from st2common.transport.liveaction import LiveActionPublisher
42
from st2common.transport.publishers import CUDPublisher
43
from st2common.util import loader
44
from st2tests import DbTestCase
45
from st2tests import fixturesloader
46
from st2tests.mocks.liveaction import MockLiveActionPublisher
47
48
49
TEST_PACK = 'mistral_tests'
50
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
51
52
PACKS = [
53
    TEST_PACK_PATH,
54
    fixturesloader.get_fixtures_packs_base_path() + '/core'
55
]
56
57
# Non-workbook with a single workflow
58
WF1_META_FILE_NAME = 'workflow_v2.yaml'
59
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
60
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
61
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
62
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
63
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
64
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
65
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
66
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
67
MISTRAL_EXECUTION = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
68
WF1_EXEC = copy.deepcopy(MISTRAL_EXECUTION)
69
70
71
@mock.patch.object(
72
    CUDPublisher,
73
    'publish_update',
74
    mock.MagicMock(return_value=None))
75
@mock.patch.object(
76
    CUDPublisher,
77
    'publish_create',
78
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
79
@mock.patch.object(
80
    LiveActionPublisher,
81
    'publish_state',
82
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
83
class MistralRunnerPolicyTest(DbTestCase):
84
85
    @classmethod
86
    def setUpClass(cls):
87
        super(MistralRunnerPolicyTest, cls).setUpClass()
88
89
        # Override the retry configuration here otherwise st2tests.config.parse_args
90
        # in DbTestCase.setUpClass will reset these overrides.
91
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
92
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
93
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
94
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
95
96
    def setUp(self):
97
        super(MistralRunnerPolicyTest, self).setUp()
98
99
        # Start with a clean database for each test.
100
        self._establish_connection_and_re_create_db()
101
102
        # Register runners.
103
        runnersregistrar.register_runners()
104
105
        actions_registrar = actionsregistrar.ActionsRegistrar(
106
            use_pack_cache=False,
107
            fail_on_failure=True
108
        )
109
110
        for pack in PACKS:
111
            actions_registrar.register_from_pack(pack)
112
113
        # Register policies required for the tests.
114
        policiesregistrar.register_policy_types(st2common)
115
116
        policies_registrar = policiesregistrar.PolicyRegistrar(
117
            use_pack_cache=False,
118
            fail_on_failure=True
119
        )
120
121
        for pack in PACKS:
122
            policies_registrar.register_from_pack(pack)
123
124
    @classmethod
125
    def get_runner_class(cls, runner_name):
126
        return runners.get_runner(runner_name).__class__
127
128
    def _drop_all_other_policies(self, test_policy):
129
        policy_dbs = [policy_db for policy_db in Policy.get_all() if policy_db.ref != test_policy]
130
131
        for policy_db in policy_dbs:
132
            Policy.delete(policy_db, publish=False)
133
134
    @mock.patch.object(
135
        workflows.WorkflowManager, 'list',
136
        mock.MagicMock(return_value=[]))
137
    @mock.patch.object(
138
        workflows.WorkflowManager, 'get',
139
        mock.MagicMock(return_value=WF1))
140
    @mock.patch.object(
141
        workflows.WorkflowManager, 'create',
142
        mock.MagicMock(return_value=[WF1]))
143
    @mock.patch.object(
144
        executions.ExecutionManager, 'create',
145
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
146
    def test_cancel_on_task_action_concurrency(self):
147
        # Delete other policies in the test pack to avoid conflicts.
148
        required_policy = 'mistral_tests.cancel_on_concurrency'
149
        self._drop_all_other_policies(required_policy)
150
151
        # Get threshold from the policy.
152
        policy = Policy.get_by_ref(required_policy)
153
        threshold = policy.parameters.get('threshold', 0)
154
        self.assertGreater(threshold, 0)
155
156
        # Launch instances of the workflow up to threshold.
157
        for i in range(0, threshold):
158
            liveaction = LiveActionDB(action=WF1_NAME, parameters={'friend': 'friend' + str(i)})
159
            liveaction, execution1 = action_service.request(liveaction)
160
            liveaction = LiveAction.get_by_id(str(liveaction.id))
161
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
162
163
        # Check number of running instances
164
        running = LiveAction.count(
165
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING)
166
167
        self.assertEqual(running, threshold)
168
169
        # Mock the mistral runner cancel method to assert cancel is called.
170
        mistral_runner_cls = self.get_runner_class('mistral_v2')
171
172
        with mock.patch.object(mistral_runner_cls, 'cancel', mock.MagicMock(return_value=None)):
173
            # Launch another instance of the workflow with mistral callback defined
174
            # to indicate that this is executed under a workflow.
175
            callback = {
176
                'source': 'mistral',
177
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
178
            }
179
180
            params = {'friend': 'grande animalerie'}
181
182
            liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback)
183
            liveaction2, execution2 = action_service.request(liveaction2)
184
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
185
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELED)
186
187
            # Assert cancel has been called.
188
            mistral_runner_cls.cancel.assert_called_once_with()
189
190
    @mock.patch.object(
191
        workflows.WorkflowManager, 'list',
192
        mock.MagicMock(return_value=[]))
193
    @mock.patch.object(
194
        workflows.WorkflowManager, 'get',
195
        mock.MagicMock(return_value=WF1))
196
    @mock.patch.object(
197
        workflows.WorkflowManager, 'create',
198
        mock.MagicMock(return_value=[WF1]))
199
    @mock.patch.object(
200
        executions.ExecutionManager, 'create',
201
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
202
    def test_cancel_on_task_action_concurrency_by_attr(self):
203
        # Delete other policies in the test pack to avoid conflicts.
204
        required_policy = 'mistral_tests.cancel_on_concurrency_by_attr'
205
        self._drop_all_other_policies(required_policy)
206
207
        # Get threshold from the policy.
208
        policy = Policy.get_by_ref(required_policy)
209
        threshold = policy.parameters.get('threshold', 0)
210
        self.assertGreater(threshold, 0)
211
212
        params = {'friend': 'grande animalerie'}
213
214
        # Launch instances of the workflow up to threshold.
215
        for i in range(0, threshold):
216
            liveaction = LiveActionDB(action=WF1_NAME, parameters=params)
217
            liveaction, execution1 = action_service.request(liveaction)
218
            liveaction = LiveAction.get_by_id(str(liveaction.id))
219
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
220
221
        # Check number of running instances
222
        running = LiveAction.count(
223
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING,
224
            parameters__friend=params['friend'])
225
226
        self.assertEqual(running, threshold)
227
228
        # Mock the mistral runner cancel method to assert cancel is called.
229
        mistral_runner_cls = self.get_runner_class('mistral_v2')
230
231
        with mock.patch.object(mistral_runner_cls, 'cancel', mock.MagicMock(return_value=None)):
232
            # Launch another instance of the workflow with mistral callback defined
233
            # to indicate that this is executed under a workflow.
234
            callback = {
235
                'source': 'mistral',
236
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
237
            }
238
239
            liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback)
240
            liveaction2, execution2 = action_service.request(liveaction2)
241
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
242
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELED)
243
244
            # Assert cancel has been called.
245
            mistral_runner_cls.cancel.assert_called_once_with()
246