Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

mistral_v2/tests/unit/test_mistral_v2_policy.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import uuid
19
20
import mock
21
import yaml
22
23
from mistralclient.api.v2 import action_executions
24
from mistralclient.api.v2 import executions
25
from mistralclient.api.v2 import workflows
26
from oslo_config import cfg
27
28
# XXX: actionsensor import depends on config being setup.
29
import st2tests.config as tests_config
30
from six.moves import range
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in range.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
31
tests_config.parse_args()
32
33
import st2common
34
from mistral_v2.mistral_v2 import MistralRunner
35
from st2common.bootstrap import actionsregistrar
36
from st2common.bootstrap import policiesregistrar
37
from st2common.bootstrap import runnersregistrar
38
from st2common.constants import action as action_constants
39
from st2common.models.db.liveaction import LiveActionDB
40
from st2common.persistence.liveaction import LiveAction
41
from st2common.persistence.policy import Policy
42
from st2common.runners import base as runners
43
from st2common.services import action as action_service
44
from st2common.transport.liveaction import LiveActionPublisher
45
from st2common.transport.publishers import CUDPublisher
46
from st2common.util import loader
47
from st2tests import DbTestCase
48
from st2tests import fixturesloader
49
from st2tests.mocks.liveaction import MockLiveActionPublisher
50
51
52
MISTRAL_RUNNER_NAME = 'mistral_v2'
53
TEST_PACK = 'mistral_tests'
54
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
55
56
PACKS = [
57
    TEST_PACK_PATH,
58
    fixturesloader.get_fixtures_packs_base_path() + '/core'
59
]
60
61
# Non-workbook with a single workflow
62
WF1_META_FILE_NAME = 'workflow_v2.yaml'
63
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
64
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
65
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
66
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
67
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
68
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
69
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
70
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
71
MISTRAL_EXECUTION = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
72
WF1_EXEC = copy.deepcopy(MISTRAL_EXECUTION)
73
74
75
@mock.patch.object(
76
    CUDPublisher,
77
    'publish_update',
78
    mock.MagicMock(return_value=None))
79
@mock.patch.object(
80
    CUDPublisher,
81
    'publish_create',
82
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
83
@mock.patch.object(
84
    LiveActionPublisher,
85
    'publish_state',
86
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
87
class MistralRunnerPolicyTest(DbTestCase):
88
89
    @classmethod
90
    def setUpClass(cls):
91
        super(MistralRunnerPolicyTest, cls).setUpClass()
92
93
        # Override the retry configuration here otherwise st2tests.config.parse_args
94
        # in DbTestCase.setUpClass will reset these overrides.
95
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
96
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
97
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
98
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
99
100
    def setUp(self):
101
        super(MistralRunnerPolicyTest, self).setUp()
102
103
        # Start with a clean database for each test.
104
        self._establish_connection_and_re_create_db()
105
106
        # Register runners.
107
        runnersregistrar.register_runners()
108
109
        actions_registrar = actionsregistrar.ActionsRegistrar(
110
            use_pack_cache=False,
111
            fail_on_failure=True
112
        )
113
114
        for pack in PACKS:
115
            actions_registrar.register_from_pack(pack)
116
117
        # Register policies required for the tests.
118
        policiesregistrar.register_policy_types(st2common)
119
120
        policies_registrar = policiesregistrar.PolicyRegistrar(
121
            use_pack_cache=False,
122
            fail_on_failure=True
123
        )
124
125
        for pack in PACKS:
126
            policies_registrar.register_from_pack(pack)
127
128
    @classmethod
129
    def get_runner_class(cls, runner_name):
130
        return runners.get_runner(runner_name, runner_name).__class__
131
132
    def _drop_all_other_policies(self, test_policy):
133
        policy_dbs = [policy_db for policy_db in Policy.get_all() if policy_db.ref != test_policy]
134
135
        for policy_db in policy_dbs:
136
            Policy.delete(policy_db, publish=False)
137
138
    @mock.patch.object(
139
        workflows.WorkflowManager, 'list',
140
        mock.MagicMock(return_value=[]))
141
    @mock.patch.object(
142
        workflows.WorkflowManager, 'get',
143
        mock.MagicMock(return_value=WF1))
144
    @mock.patch.object(
145
        workflows.WorkflowManager, 'create',
146
        mock.MagicMock(return_value=[WF1]))
147
    @mock.patch.object(
148
        executions.ExecutionManager, 'create',
149
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
150
    @mock.patch.object(
151
        action_executions.ActionExecutionManager, 'update',
152
        mock.MagicMock(return_value=None))
153
    def test_cancel_on_task_action_concurrency(self):
154
        # Delete other policies in the test pack to avoid conflicts.
155
        required_policy = 'mistral_tests.cancel_on_concurrency'
156
        self._drop_all_other_policies(required_policy)
157
158
        # Get threshold from the policy.
159
        policy = Policy.get_by_ref(required_policy)
160
        threshold = policy.parameters.get('threshold', 0)
161
        self.assertGreater(threshold, 0)
162
163
        # Launch instances of the workflow up to threshold.
164
        for i in range(0, threshold):
165
            liveaction = LiveActionDB(action=WF1_NAME, parameters={'friend': 'friend' + str(i)})
166
            liveaction, execution1 = action_service.request(liveaction)
167
            liveaction = LiveAction.get_by_id(str(liveaction.id))
168
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
169
170
        # Check number of running instances
171
        running = LiveAction.count(
172
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING)
173
174
        self.assertEqual(running, threshold)
175
176
        # Mock the mistral runner cancel method to assert cancel is called.
177
        mistral_runner_cls = self.get_runner_class('mistral_v2')
178
        mock_cancel_return_value = (action_constants.LIVEACTION_STATUS_CANCELING, None, None)
179
        mock_cancel = mock.MagicMock(return_value=mock_cancel_return_value)
180
181
        with mock.patch.object(mistral_runner_cls, 'cancel', mock_cancel):
182
            # Launch another instance of the workflow with mistral callback defined
183
            # to indicate that this is executed under a workflow.
184
            callback = {
185
                'source': MISTRAL_RUNNER_NAME,
186
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
187
            }
188
189
            params = {'friend': 'grande animalerie'}
190
191
            liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback)
192
            liveaction2, execution2 = action_service.request(liveaction2)
193
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
194
195
            # Assert cancel has been called.
196
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELING)
197
            mistral_runner_cls.cancel.assert_called_once_with()
198
199
    @mock.patch.object(
200
        workflows.WorkflowManager, 'list',
201
        mock.MagicMock(return_value=[]))
202
    @mock.patch.object(
203
        workflows.WorkflowManager, 'get',
204
        mock.MagicMock(return_value=WF1))
205
    @mock.patch.object(
206
        workflows.WorkflowManager, 'create',
207
        mock.MagicMock(return_value=[WF1]))
208
    @mock.patch.object(
209
        executions.ExecutionManager, 'create',
210
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
211
    @mock.patch.object(
212
        action_executions.ActionExecutionManager, 'update',
213
        mock.MagicMock(return_value=None))
214
    def test_cancel_on_task_action_concurrency_by_attr(self):
215
        # Delete other policies in the test pack to avoid conflicts.
216
        required_policy = 'mistral_tests.cancel_on_concurrency_by_attr'
217
        self._drop_all_other_policies(required_policy)
218
219
        # Get threshold from the policy.
220
        policy = Policy.get_by_ref(required_policy)
221
        threshold = policy.parameters.get('threshold', 0)
222
        self.assertGreater(threshold, 0)
223
224
        params = {'friend': 'grande animalerie'}
225
226
        # Launch instances of the workflow up to threshold.
227
        for i in range(0, threshold):
228
            liveaction = LiveActionDB(action=WF1_NAME, parameters=params)
229
            liveaction, execution1 = action_service.request(liveaction)
230
            liveaction = LiveAction.get_by_id(str(liveaction.id))
231
            self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
232
233
        # Check number of running instances
234
        running = LiveAction.count(
235
            action=WF1_NAME, status=action_constants.LIVEACTION_STATUS_RUNNING,
236
            parameters__friend=params['friend'])
237
238
        self.assertEqual(running, threshold)
239
240
        # Mock the mistral runner cancel method to assert cancel is called.
241
        mistral_runner_cls = self.get_runner_class('mistral_v2')
242
        mock_cancel_return_value = (action_constants.LIVEACTION_STATUS_CANCELING, None, None)
243
        mock_cancel = mock.MagicMock(return_value=mock_cancel_return_value)
244
245
        with mock.patch.object(mistral_runner_cls, 'cancel', mock_cancel):
246
            # Launch another instance of the workflow with mistral callback defined
247
            # to indicate that this is executed under a workflow.
248
            callback = {
249
                'source': MISTRAL_RUNNER_NAME,
250
                'url': 'http://127.0.0.1:8989/v2/action_executions/12345'
251
            }
252
253
            liveaction2 = LiveActionDB(action=WF1_NAME, parameters=params, callback=callback)
254
            liveaction2, execution2 = action_service.request(liveaction2)
255
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
256
257
            # Assert cancel has been called.
258
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_CANCELING)
259
            mistral_runner_cls.cancel.assert_called_once_with()
260