Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

mistral_v2/tests/unit/test_mistral_v2_cancel.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import uuid
19
20
import mock
21
from mock import call
22
import requests
23
import yaml
24
25
from mistralclient.api.v2 import executions
26
from mistralclient.api.v2 import workflows
27
from oslo_config import cfg
28
29
# XXX: actionsensor import depends on config being setup.
30
import st2tests.config as tests_config
31
from six.moves import range
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in range.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
32
tests_config.parse_args()
33
34
from mistral_v2.mistral_v2 import MistralRunner
35
from st2common.bootstrap import actionsregistrar
36
from st2common.bootstrap import runnersregistrar
37
from st2common.constants import action as action_constants
38
from st2common.models.db.execution import ActionExecutionDB
39
from st2common.models.db.liveaction import LiveActionDB
40
from st2common.persistence.liveaction import LiveAction
41
from st2common.runners import base as runners
42
from st2common.services import action as action_service
43
from st2common.transport.liveaction import LiveActionPublisher
44
from st2common.transport.publishers import CUDPublisher
45
from st2common.util import loader
46
from st2tests import DbTestCase
47
from st2tests import fixturesloader
48
from st2tests.mocks.liveaction import MockLiveActionPublisher
49
50
51
TEST_PACK = 'mistral_tests'
52
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
53
54
PACKS = [
55
    TEST_PACK_PATH,
56
    fixturesloader.get_fixtures_packs_base_path() + '/core'
57
]
58
59
# Action executions requirements
60
ACTION_PARAMS = {'friend': 'Rocky'}
61
NON_EMPTY_RESULT = 'non-empty'
62
63
# Non-workbook with a single workflow
64
WF1_META_FILE_NAME = 'workflow_v2.yaml'
65
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
66
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
67
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
68
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
69
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
70
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
71
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
72
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
73
WF1_OLD = workflows.Workflow(None, {'name': WF1_NAME, 'definition': ''})
74
WF1_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
75
WF1_EXEC_CANCELLED = copy.deepcopy(WF1_EXEC)
76
WF1_EXEC_CANCELLED['state'] = 'CANCELLED'
77
78
# Workflow with a subworkflow action
79
WF2_META_FILE_NAME = 'workflow_v2_call_workflow_action.yaml'
80
WF2_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF2_META_FILE_NAME
81
WF2_META_CONTENT = loader.load_meta_file(WF2_META_FILE_PATH)
82
WF2_NAME = WF2_META_CONTENT['pack'] + '.' + WF2_META_CONTENT['name']
83
WF2_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF2_META_CONTENT['entry_point']
84
WF2_ENTRY_POINT_X = WF2_ENTRY_POINT.replace(WF2_META_FILE_NAME, 'xformed_' + WF2_META_FILE_NAME)
85
WF2_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF2_ENTRY_POINT_X))
86
WF2_YAML = yaml.safe_dump(WF2_SPEC, default_flow_style=False)
87
WF2 = workflows.Workflow(None, {'name': WF2_NAME, 'definition': WF2_YAML})
88
WF2_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF2_NAME}
89
WF2_EXEC_CANCELLED = copy.deepcopy(WF2_EXEC)
90
WF2_EXEC_CANCELLED['state'] = 'CANCELLED'
91
92
93
@mock.patch.object(
94
    CUDPublisher,
95
    'publish_update',
96
    mock.MagicMock(return_value=None))
97
@mock.patch.object(
98
    CUDPublisher,
99
    'publish_create',
100
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
101
@mock.patch.object(
102
    LiveActionPublisher,
103
    'publish_state',
104
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
105
class MistralRunnerCancelTest(DbTestCase):
106
107
    @classmethod
108
    def setUpClass(cls):
109
        super(MistralRunnerCancelTest, cls).setUpClass()
110
111
        # Override the retry configuration here otherwise st2tests.config.parse_args
112
        # in DbTestCase.setUpClass will reset these overrides.
113
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
114
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
115
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
116
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
117
118
        # Register runners.
119
        runnersregistrar.register_runners()
120
121
        # Register test pack(s).
122
        actions_registrar = actionsregistrar.ActionsRegistrar(
123
            use_pack_cache=False,
124
            fail_on_failure=True
125
        )
126
127
        for pack in PACKS:
128
            actions_registrar.register_from_pack(pack)
129
130
    @classmethod
131
    def get_runner_class(cls, runner_name):
132
        return runners.get_runner(runner_name, runner_name).__class__
133
134
    @mock.patch.object(
135
        workflows.WorkflowManager, 'list',
136
        mock.MagicMock(return_value=[]))
137
    @mock.patch.object(
138
        workflows.WorkflowManager, 'get',
139
        mock.MagicMock(return_value=WF1))
140
    @mock.patch.object(
141
        workflows.WorkflowManager, 'create',
142
        mock.MagicMock(return_value=[WF1]))
143
    @mock.patch.object(
144
        executions.ExecutionManager, 'create',
145
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
146
    @mock.patch.object(
147
        executions.ExecutionManager, 'update',
148
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC_CANCELLED)))
149
    @mock.patch.object(
150
        action_service, 'is_children_active',
151
        mock.MagicMock(return_value=True))
152
    def test_cancel(self):
153
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
154
        liveaction, execution = action_service.request(liveaction)
155
        liveaction = LiveAction.get_by_id(str(liveaction.id))
156
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
157
158
        mistral_context = liveaction.context.get('mistral', None)
159
        self.assertIsNotNone(mistral_context)
160
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
161
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
162
163
        requester = cfg.CONF.system_user.user
164
        liveaction, execution = action_service.request_cancellation(liveaction, requester)
165
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'CANCELLED')
166
        liveaction = LiveAction.get_by_id(str(liveaction.id))
167
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
168
169
    @mock.patch.object(
170
        workflows.WorkflowManager, 'list',
171
        mock.MagicMock(return_value=[]))
172
    @mock.patch.object(
173
        workflows.WorkflowManager, 'get',
174
        mock.MagicMock(side_effect=[WF2, WF1]))
175
    @mock.patch.object(
176
        workflows.WorkflowManager, 'create',
177
        mock.MagicMock(side_effect=[[WF2], [WF1]]))
178
    @mock.patch.object(
179
        executions.ExecutionManager, 'create',
180
        mock.MagicMock(side_effect=[
181
            executions.Execution(None, WF2_EXEC),
182
            executions.Execution(None, WF1_EXEC)]))
183
    @mock.patch.object(
184
        executions.ExecutionManager, 'update',
185
        mock.MagicMock(side_effect=[
186
            executions.Execution(None, WF2_EXEC_CANCELLED),
187
            executions.Execution(None, WF1_EXEC_CANCELLED)]))
188
    def test_cancel_subworkflow_action(self):
189
        liveaction1 = LiveActionDB(action=WF2_NAME, parameters=ACTION_PARAMS)
190
        liveaction1, execution1 = action_service.request(liveaction1)
191
        liveaction1 = LiveAction.get_by_id(str(liveaction1.id))
192
        self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_RUNNING)
193
194
        liveaction2 = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
195
        liveaction2, execution2 = action_service.request(liveaction2)
196
        liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
197
        self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_RUNNING)
198
199
        # Mock the children of the parent execution to make this
200
        # test case has subworkflow execution.
201
        with mock.patch.object(
202
                ActionExecutionDB, 'children',
203
                new_callable=mock.PropertyMock) as action_ex_children_mock:
204
            action_ex_children_mock.return_value = [execution2.id]
205
206
            mistral_context = liveaction1.context.get('mistral', None)
207
            self.assertIsNotNone(mistral_context)
208
            self.assertEqual(mistral_context['execution_id'], WF2_EXEC.get('id'))
209
            self.assertEqual(mistral_context['workflow_name'], WF2_EXEC.get('workflow_name'))
210
211
            requester = cfg.CONF.system_user.user
212
            liveaction1, execution1 = action_service.request_cancellation(liveaction1, requester)
213
214
            self.assertTrue(executions.ExecutionManager.update.called)
215
            self.assertEqual(executions.ExecutionManager.update.call_count, 2)
216
217
            calls = [
218
                mock.call(WF2_EXEC.get('id'), 'CANCELLED'),
219
                mock.call(WF1_EXEC.get('id'), 'CANCELLED')
220
            ]
221
222
            executions.ExecutionManager.update.assert_has_calls(calls, any_order=False)
223
224
    @mock.patch.object(
225
        workflows.WorkflowManager, 'list',
226
        mock.MagicMock(return_value=[]))
227
    @mock.patch.object(
228
        workflows.WorkflowManager, 'get',
229
        mock.MagicMock(return_value=WF1))
230
    @mock.patch.object(
231
        workflows.WorkflowManager, 'create',
232
        mock.MagicMock(return_value=[WF1]))
233
    @mock.patch.object(
234
        executions.ExecutionManager, 'create',
235
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
236
    @mock.patch.object(
237
        executions.ExecutionManager, 'update',
238
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError(),
239
                                    executions.Execution(None, WF1_EXEC_CANCELLED)]))
240
    @mock.patch.object(
241
        action_service, 'is_children_active',
242
        mock.MagicMock(return_value=True))
243
    def test_cancel_retry(self):
244
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
245
        liveaction, execution = action_service.request(liveaction)
246
        liveaction = LiveAction.get_by_id(str(liveaction.id))
247
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
248
249
        mistral_context = liveaction.context.get('mistral', None)
250
        self.assertIsNotNone(mistral_context)
251
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
252
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
253
254
        requester = cfg.CONF.system_user.user
255
        liveaction, execution = action_service.request_cancellation(liveaction, requester)
256
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'CANCELLED')
257
        liveaction = LiveAction.get_by_id(str(liveaction.id))
258
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
259
260
    @mock.patch.object(
261
        workflows.WorkflowManager, 'list',
262
        mock.MagicMock(return_value=[]))
263
    @mock.patch.object(
264
        workflows.WorkflowManager, 'get',
265
        mock.MagicMock(return_value=WF1))
266
    @mock.patch.object(
267
        workflows.WorkflowManager, 'create',
268
        mock.MagicMock(return_value=[WF1]))
269
    @mock.patch.object(
270
        executions.ExecutionManager, 'create',
271
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
272
    @mock.patch.object(
273
        executions.ExecutionManager, 'update',
274
        mock.MagicMock(side_effect=requests.exceptions.ConnectionError('Connection refused')))
275
    def test_cancel_retry_exhausted(self):
276
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
277
        liveaction, execution = action_service.request(liveaction)
278
        liveaction = LiveAction.get_by_id(str(liveaction.id))
279
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
280
281
        mistral_context = liveaction.context.get('mistral', None)
282
        self.assertIsNotNone(mistral_context)
283
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
284
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
285
286
        requester = cfg.CONF.system_user.user
287
        liveaction, execution = action_service.request_cancellation(liveaction, requester)
288
289
        calls = [call(WF1_EXEC.get('id'), 'CANCELLED') for i in range(0, 2)]
290
        executions.ExecutionManager.update.assert_has_calls(calls)
291
292
        liveaction = LiveAction.get_by_id(str(liveaction.id))
293
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
294