Passed
Push — master ( c48c6c...de488a )
by
unknown
03:01
created

test_cancel_subworkflow_action()   A

Complexity

Conditions 1

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 51
rs 9.4109
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import mock
20
from mock import call
21
import requests
22
import yaml
23
24
from mistralclient.api.v2 import executions
25
from mistralclient.api.v2 import workflows
26
from oslo_config import cfg
27
28
# XXX: actionsensor import depends on config being setup.
29
import st2tests.config as tests_config
30
tests_config.parse_args()
31
32
from mistral_v2 import MistralRunner
33
from st2common.bootstrap import actionsregistrar
34
from st2common.bootstrap import runnersregistrar
35
from st2common.constants import action as action_constants
36
from st2common.models.db.execution import ActionExecutionDB
37
from st2common.models.db.liveaction import LiveActionDB
38
from st2common.persistence.liveaction import LiveAction
39
from st2common.runners import base as runners
40
from st2common.services import action as action_service
41
from st2common.transport.liveaction import LiveActionPublisher
42
from st2common.transport.publishers import CUDPublisher
43
from st2common.util import loader
44
from st2tests import DbTestCase
45
from st2tests import fixturesloader
46
from st2tests.mocks.liveaction import MockLiveActionPublisher
47
48
49
TEST_PACK = 'mistral_tests'
50
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
51
52
PACKS = [
53
    TEST_PACK_PATH,
54
    fixturesloader.get_fixtures_packs_base_path() + '/core'
55
]
56
57
# Action executions requirements
58
ACTION_PARAMS = {'friend': 'Rocky'}
59
NON_EMPTY_RESULT = 'non-empty'
60
61
# Non-workbook with a single workflow
62
WF1_META_FILE_NAME = 'workflow_v2.yaml'
63
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
64
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
65
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
66
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
67
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
68
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
69
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
70
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
71
WF1_OLD = workflows.Workflow(None, {'name': WF1_NAME, 'definition': ''})
72
WF1_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
73
WF1_EXEC_CANCELLED = copy.deepcopy(WF1_EXEC)
74
WF1_EXEC_CANCELLED['state'] = 'CANCELLED'
75
76
# Workflow with a subworkflow action
77
WF2_META_FILE_NAME = 'workflow_v2_call_workflow_action.yaml'
78
WF2_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF2_META_FILE_NAME
79
WF2_META_CONTENT = loader.load_meta_file(WF2_META_FILE_PATH)
80
WF2_NAME = WF2_META_CONTENT['pack'] + '.' + WF2_META_CONTENT['name']
81
WF2_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF2_META_CONTENT['entry_point']
82
WF2_ENTRY_POINT_X = WF2_ENTRY_POINT.replace(WF2_META_FILE_NAME, 'xformed_' + WF2_META_FILE_NAME)
83
WF2_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF2_ENTRY_POINT_X))
84
WF2_YAML = yaml.safe_dump(WF2_SPEC, default_flow_style=False)
85
WF2 = workflows.Workflow(None, {'name': WF2_NAME, 'definition': WF2_YAML})
86
WF2_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF2_NAME}
87
WF2_EXEC_CANCELLED = copy.deepcopy(WF2_EXEC)
88
WF2_EXEC_CANCELLED['state'] = 'CANCELLED'
89
90
91
@mock.patch.object(
92
    CUDPublisher,
93
    'publish_update',
94
    mock.MagicMock(return_value=None))
95
@mock.patch.object(
96
    CUDPublisher,
97
    'publish_create',
98
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
99
@mock.patch.object(
100
    LiveActionPublisher,
101
    'publish_state',
102
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
103
class MistralRunnerCancelTest(DbTestCase):
104
105
    @classmethod
106
    def setUpClass(cls):
107
        super(MistralRunnerCancelTest, cls).setUpClass()
108
109
        # Override the retry configuration here otherwise st2tests.config.parse_args
110
        # in DbTestCase.setUpClass will reset these overrides.
111
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
112
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
113
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
114
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
115
116
        # Register runners.
117
        runnersregistrar.register_runners()
118
119
        # Register test pack(s).
120
        actions_registrar = actionsregistrar.ActionsRegistrar(
121
            use_pack_cache=False,
122
            fail_on_failure=True
123
        )
124
125
        for pack in PACKS:
126
            actions_registrar.register_from_pack(pack)
127
128
    @classmethod
129
    def get_runner_class(cls, runner_name):
130
        return runners.get_runner(runner_name).__class__
131
132
    @mock.patch.object(
133
        workflows.WorkflowManager, 'list',
134
        mock.MagicMock(return_value=[]))
135
    @mock.patch.object(
136
        workflows.WorkflowManager, 'get',
137
        mock.MagicMock(return_value=WF1))
138
    @mock.patch.object(
139
        workflows.WorkflowManager, 'create',
140
        mock.MagicMock(return_value=[WF1]))
141
    @mock.patch.object(
142
        executions.ExecutionManager, 'create',
143
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
144
    @mock.patch.object(
145
        executions.ExecutionManager, 'update',
146
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC_CANCELLED)))
147
    def test_cancel(self):
148
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
149
        liveaction, execution = action_service.request(liveaction)
150
        liveaction = LiveAction.get_by_id(str(liveaction.id))
151
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
152
153
        mistral_context = liveaction.context.get('mistral', None)
154
        self.assertIsNotNone(mistral_context)
155
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
156
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
157
158
        requester = cfg.CONF.system_user.user
159
        liveaction, execution = action_service.request_cancellation(liveaction, requester)
160
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'CANCELLED')
161
        liveaction = LiveAction.get_by_id(str(liveaction.id))
162
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
163
164
    @mock.patch.object(
165
        workflows.WorkflowManager, 'list',
166
        mock.MagicMock(return_value=[]))
167
    @mock.patch.object(
168
        workflows.WorkflowManager, 'get',
169
        mock.MagicMock(side_effect=[WF2, WF1]))
170
    @mock.patch.object(
171
        workflows.WorkflowManager, 'create',
172
        mock.MagicMock(side_effect=[[WF2], [WF1]]))
173
    @mock.patch.object(
174
        executions.ExecutionManager, 'create',
175
        mock.MagicMock(side_effect=[
176
            executions.Execution(None, WF2_EXEC),
177
            executions.Execution(None, WF1_EXEC)]))
178
    @mock.patch.object(
179
        executions.ExecutionManager, 'update',
180
        mock.MagicMock(side_effect=[
181
            executions.Execution(None, WF2_EXEC_CANCELLED),
182
            executions.Execution(None, WF1_EXEC_CANCELLED)]))
183
    def test_cancel_subworkflow_action(self):
184
        liveaction1 = LiveActionDB(action=WF2_NAME, parameters=ACTION_PARAMS)
185
        liveaction1, execution1 = action_service.request(liveaction1)
186
        liveaction1 = LiveAction.get_by_id(str(liveaction1.id))
187
        self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_RUNNING)
188
189
        liveaction2 = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
190
        liveaction2, execution2 = action_service.request(liveaction2)
191
        liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
192
        self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_RUNNING)
193
194
        # Mock the children of the parent execution to make this
195
        # test case has subworkflow execution.
196
        ActionExecutionDB.children = mock.PropertyMock(return_value=[execution2.id])
197
198
        mistral_context = liveaction1.context.get('mistral', None)
199
        self.assertIsNotNone(mistral_context)
200
        self.assertEqual(mistral_context['execution_id'], WF2_EXEC.get('id'))
201
        self.assertEqual(mistral_context['workflow_name'], WF2_EXEC.get('workflow_name'))
202
203
        requester = cfg.CONF.system_user.user
204
        liveaction1, execution1 = action_service.request_cancellation(liveaction1, requester)
205
206
        self.assertTrue(executions.ExecutionManager.update.called)
207
        self.assertEqual(executions.ExecutionManager.update.call_count, 2)
208
209
        calls = [
210
            mock.call(WF2_EXEC.get('id'), 'CANCELLED'),
211
            mock.call(WF1_EXEC.get('id'), 'CANCELLED')
212
        ]
213
214
        executions.ExecutionManager.update.assert_has_calls(calls, any_order=False)
215
216
    @mock.patch.object(
217
        workflows.WorkflowManager, 'list',
218
        mock.MagicMock(return_value=[]))
219
    @mock.patch.object(
220
        workflows.WorkflowManager, 'get',
221
        mock.MagicMock(return_value=WF1))
222
    @mock.patch.object(
223
        workflows.WorkflowManager, 'create',
224
        mock.MagicMock(return_value=[WF1]))
225
    @mock.patch.object(
226
        executions.ExecutionManager, 'create',
227
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
228
    @mock.patch.object(
229
        executions.ExecutionManager, 'update',
230
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError(),
231
                                    executions.Execution(None, WF1_EXEC_CANCELLED)]))
232
    def test_cancel_retry(self):
233
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
234
        liveaction, execution = action_service.request(liveaction)
235
        liveaction = LiveAction.get_by_id(str(liveaction.id))
236
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
237
238
        mistral_context = liveaction.context.get('mistral', None)
239
        self.assertIsNotNone(mistral_context)
240
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
241
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
242
243
        requester = cfg.CONF.system_user.user
244
        liveaction, execution = action_service.request_cancellation(liveaction, requester)
245
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'CANCELLED')
246
        liveaction = LiveAction.get_by_id(str(liveaction.id))
247
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
248
249
    @mock.patch.object(
250
        workflows.WorkflowManager, 'list',
251
        mock.MagicMock(return_value=[]))
252
    @mock.patch.object(
253
        workflows.WorkflowManager, 'get',
254
        mock.MagicMock(return_value=WF1))
255
    @mock.patch.object(
256
        workflows.WorkflowManager, 'create',
257
        mock.MagicMock(return_value=[WF1]))
258
    @mock.patch.object(
259
        executions.ExecutionManager, 'create',
260
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
261
    @mock.patch.object(
262
        executions.ExecutionManager, 'update',
263
        mock.MagicMock(side_effect=requests.exceptions.ConnectionError('Connection refused')))
264
    def test_cancel_retry_exhausted(self):
265
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
266
        liveaction, execution = action_service.request(liveaction)
267
        liveaction = LiveAction.get_by_id(str(liveaction.id))
268
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
269
270
        mistral_context = liveaction.context.get('mistral', None)
271
        self.assertIsNotNone(mistral_context)
272
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
273
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
274
275
        requester = cfg.CONF.system_user.user
276
        liveaction, execution = action_service.request_cancellation(liveaction, requester)
277
278
        calls = [call(WF1_EXEC.get('id'), 'CANCELLED') for i in range(0, 2)]
279
        executions.ExecutionManager.update.assert_has_calls(calls)
280
281
        liveaction = LiveAction.get_by_id(str(liveaction.id))
282
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
283