Passed
Pull Request — master (#3163)
by W
05:12
created

MistralRunnerPolicyTest.get_runner_class()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
c 0
b 0
f 0
dl 0
loc 3
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import mock
20
import yaml
21
22
from mistralclient.api.v2 import action_executions
23
from mistralclient.api.v2 import executions
24
from mistralclient.api.v2 import workflows
25
from oslo_config import cfg
26
27
# XXX: actionsensor import depends on config being setup.
28
import st2tests.config as tests_config
29
tests_config.parse_args()
30
31
from mistral_v2 import MistralRunner
32
import st2common
33
from st2common.bootstrap import actionsregistrar
34
from st2common.bootstrap import policiesregistrar
35
from st2common.bootstrap import runnersregistrar
36
from st2common.constants import action as action_constants
37
from st2common.models.db.execution import ActionExecutionDB
38
from st2common.models.db.liveaction import LiveActionDB
39
from st2common.persistence.execution import ActionExecution
40
from st2common.persistence.liveaction import LiveAction
41
from st2common.persistence.policy import Policy
42
from st2common.runners import base as runners
43
from st2common.services import action as action_service
44
from st2common.services import trace as trace_service
45
from st2common.transport.liveaction import LiveActionPublisher
46
from st2common.transport.publishers import CUDPublisher
47
from st2common.util import loader
48
from st2tests import DbTestCase
49
from st2tests import fixturesloader
50
from st2tests.mocks.liveaction import MockLiveActionPublisher
51
52
53
MISTRAL_RUNNER_NAME = 'mistral_v2'
54
TEST_PACK = 'mistral_tests'
55
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
56
57
PACKS = [
58
    TEST_PACK_PATH,
59
    fixturesloader.get_fixtures_packs_base_path() + '/core'
60
]
61
62
# Non-workbook with a single workflow
63
WF1_META_FILE_NAME = 'workflow_v2.yaml'
64
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
65
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
66
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
67
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
68
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
69
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
70
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
71
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
72
MISTRAL_EXECUTION = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
73
WF1_EXEC = copy.deepcopy(MISTRAL_EXECUTION)
74
75
MOCK_ACTION_EXEC_DB = ActionExecutionDB(
76
    action={'ref': 'mock.workflow'},
77
    runner={'ref': 'mock.runner'},
78
    liveaction={'id': uuid.uuid4().hex},
79
    status=action_constants.LIVEACTION_STATUS_RUNNING,
80
    context={'mistral': {'auth_token': uuid.uuid4().hex}}
81
)
82
83
84
@mock.patch.object(
85
    trace_service, 'get_trace_db_by_live_action',
86
    mock.MagicMock(return_value=(None, None)))
87
@mock.patch.object(
88
    ActionExecution, 'get_by_id',
89
    mock.MagicMock(return_value=MOCK_ACTION_EXEC_DB))
90
@mock.patch.object(
91
    CUDPublisher,
92
    'publish_update',
93
    mock.MagicMock(return_value=None))
94
@mock.patch.object(
95
    CUDPublisher,
96
    'publish_create',
97
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
98
@mock.patch.object(
99
    LiveActionPublisher,
100
    'publish_state',
101
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
102
class MistralRunnerPolicyTest(DbTestCase):
103
104
    @classmethod
105
    def setUpClass(cls):
106
        super(MistralRunnerPolicyTest, cls).setUpClass()
107
108
        # Override the retry configuration here otherwise st2tests.config.parse_args
109
        # in DbTestCase.setUpClass will reset these overrides.
110
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
111
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
112
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
113
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
114
115
    def setUp(self):
116
        super(MistralRunnerPolicyTest, self).setUp()
117
118
        # Start with a clean database for each test.
119
        self._establish_connection_and_re_create_db()
120
121
        # Register runners.
122
        runnersregistrar.register_runners()
123
124
        actions_registrar = actionsregistrar.ActionsRegistrar(
125
            use_pack_cache=False,
126
            fail_on_failure=True
127
        )
128
129
        for pack in PACKS:
130
            actions_registrar.register_from_pack(pack)
131
132
        # Register policies required for the tests.
133
        policiesregistrar.register_policy_types(st2common)
134
135
        policies_registrar = policiesregistrar.PolicyRegistrar(
136
            use_pack_cache=False,
137
            fail_on_failure=True
138
        )
139
140
        for pack in PACKS:
141
            policies_registrar.register_from_pack(pack)
142
143
    @classmethod
144
    def get_runner_class(cls, runner_name):
145
        return runners.get_runner(runner_name).__class__
146
147
    def _drop_all_other_policies(self, test_policy):
148
        policy_dbs = [policy_db for policy_db in Policy.get_all() if policy_db.ref != test_policy]
149
150
        for policy_db in policy_dbs:
151
            Policy.delete(policy_db, publish=False)
152
153
    @mock.patch.object(
154
        workflows.WorkflowManager, 'list',
155
        mock.MagicMock(return_value=[]))
156
    @mock.patch.object(
157
        workflows.WorkflowManager, 'get',
158
        mock.MagicMock(return_value=WF1))
159
    @mock.patch.object(
160
        workflows.WorkflowManager, 'create',
161
        mock.MagicMock(return_value=[WF1]))
162
    @mock.patch.object(
163
        executions.ExecutionManager, 'create',
164
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
165
    @mock.patch.object(
166
        action_executions.ActionExecutionManager, 'update',
167
        mock.MagicMock(return_value=None))
168
    def test_cancel_on_task_action_concurrency(self):
169
        # Delete other policies in the test pack to avoid conflicts.
170
        required_policy = 'mistral_tests.cancel_on_concurrency'
171
        self._drop_all_other_policies(required_policy)
172
173
        # Get threshold from the policy.
174
        policy = Policy.get_by_ref(required_policy)
175
        threshold = policy.parameters.get('threshold', 0)
176
        self.assertGreater(threshold, 0)
177
178
        # Launch instances of the workflow up to threshold.
179
        for i in range(0, threshold):
180
            liveaction = LiveActionDB(action=WF1_NAME, parameters={'friend': 'friend' + str(i)})
181
            liveaction, execution1 = action_service.request(liveaction)
182
            liveaction = LiveAction.get_by_id(str(liveaction.id))
183
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
184
185
        # Check number of running instances
186
        running = LiveAction.count(
187
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING)
188
189
        self.assertEqual(running, threshold)
190
191
        # Mock the mistral runner cancel method to assert cancel is called.
192
        mistral_runner_cls = self.get_runner_class('mistral_v2')
193
194
        with mock.patch.object(mistral_runner_cls, 'cancel', mock.MagicMock(return_value=None)):
195
            # Launch another instance of the workflow with mistral callback defined
196
            # to indicate that this is executed under a workflow.
197
            callback = {
198
                'source': MISTRAL_RUNNER_NAME,
199
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
200
            }
201
202
            params = {'friend': 'grande animalerie'}
203
204
            context = {
205
                'parent': {
206
                    'execution_id': uuid.uuid4().hex
207
                }
208
            }
209
210
            liveaction2 = LiveActionDB(
211
                action=WF1_NAME,
212
                parameters=params,
213
                callback=callback,
214
                context=context
215
            )
216
217
            liveaction2, execution2 = action_service.request(liveaction2)
218
219
            action_executions.ActionExecutionManager.update.assert_called_once_with(
220
                '12345',
221
                output='{"error": "Execution canceled by user."}',
222
                state='CANCELLED'
223
            )
224
225
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
226
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELED)
227
228
            # Assert cancel has been called.
229
            mistral_runner_cls.cancel.assert_called_once_with()
230
231
    @mock.patch.object(
232
        workflows.WorkflowManager, 'list',
233
        mock.MagicMock(return_value=[]))
234
    @mock.patch.object(
235
        workflows.WorkflowManager, 'get',
236
        mock.MagicMock(return_value=WF1))
237
    @mock.patch.object(
238
        workflows.WorkflowManager, 'create',
239
        mock.MagicMock(return_value=[WF1]))
240
    @mock.patch.object(
241
        executions.ExecutionManager, 'create',
242
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
243
    @mock.patch.object(
244
        action_executions.ActionExecutionManager, 'update',
245
        mock.MagicMock(return_value=None))
246
    def test_cancel_on_task_action_concurrency_by_attr(self):
247
        # Delete other policies in the test pack to avoid conflicts.
248
        required_policy = 'mistral_tests.cancel_on_concurrency_by_attr'
249
        self._drop_all_other_policies(required_policy)
250
251
        # Get threshold from the policy.
252
        policy = Policy.get_by_ref(required_policy)
253
        threshold = policy.parameters.get('threshold', 0)
254
        self.assertGreater(threshold, 0)
255
256
        params = {'friend': 'grande animalerie'}
257
258
        # Launch instances of the workflow up to threshold.
259
        for i in range(0, threshold):
260
            liveaction = LiveActionDB(action=WF1_NAME, parameters=params)
261
            liveaction, execution1 = action_service.request(liveaction)
262
            liveaction = LiveAction.get_by_id(str(liveaction.id))
263
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
264
265
        # Check number of running instances
266
        running = LiveAction.count(
267
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING,
268
            parameters__friend=params['friend'])
269
270
        self.assertEqual(running, threshold)
271
272
        # Mock the mistral runner cancel method to assert cancel is called.
273
        mistral_runner_cls = self.get_runner_class('mistral_v2')
274
275
        with mock.patch.object(mistral_runner_cls, 'cancel', mock.MagicMock(return_value=None)):
276
            # Launch another instance of the workflow with mistral callback defined
277
            # to indicate that this is executed under a workflow.
278
            callback = {
279
                'source': MISTRAL_RUNNER_NAME,
280
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
281
            }
282
283
            context = {
284
                'parent': {
285
                    'execution_id': uuid.uuid4().hex
286
                }
287
            }
288
289
            liveaction2 = LiveActionDB(
290
                action=WF1_NAME,
291
                parameters=params,
292
                callback=callback,
293
                context=context
294
            )
295
296
            liveaction2, execution2 = action_service.request(liveaction2)
297
298
            action_executions.ActionExecutionManager.update.assert_called_once_with(
299
                '12345',
300
                output='{"error": "Execution canceled by user."}',
301
                state='CANCELLED'
302
            )
303
304
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
305
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELED)
306
307
            # Assert cancel has been called.
308
            mistral_runner_cls.cancel.assert_called_once_with()
309