Passed
Pull Request — master (#3507)
by W
07:37 queued 01:55
created

test_resume_subworkflow_action()   B

Complexity

Conditions 2

Size

Total Lines 96

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 96
rs 8.3859
c 1
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import uuid
18
19
import mock
20
import yaml
21
22
from mistralclient.api.v2 import executions
23
from mistralclient.api.v2 import workflows
24
from oslo_config import cfg
25
26
# XXX: actionsensor import depends on config being setup.
27
import st2tests.config as tests_config
28
tests_config.parse_args()
29
30
from mistral_v2 import MistralRunner
31
from st2common.bootstrap import actionsregistrar
32
from st2common.bootstrap import runnersregistrar
33
from st2common.constants import action as action_constants
34
from st2common.models.db.execution import ActionExecutionDB
35
from st2common.models.db.liveaction import LiveActionDB
36
from st2common.persistence.liveaction import LiveAction
37
from st2common.runners import base as runners
38
from st2common.services import action as action_service
39
from st2common.transport.liveaction import LiveActionPublisher
40
from st2common.transport.publishers import CUDPublisher
41
from st2common.util import loader
42
from st2tests import DbTestCase
43
from st2tests import fixturesloader
44
from st2tests.mocks.liveaction import MockLiveActionPublisher
45
46
47
TEST_PACK = 'mistral_tests'
48
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
49
50
PACKS = [
51
    TEST_PACK_PATH,
52
    fixturesloader.get_fixtures_packs_base_path() + '/core'
53
]
54
55
# Action executions requirements
56
ACTION_PARAMS = {'friend': 'Rocky'}
57
NON_EMPTY_RESULT = 'non-empty'
58
59
# Non-workbook with a single workflow
60
WF1_META_FILE_NAME = 'workflow_v2.yaml'
61
WF1_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF1_META_FILE_NAME
62
WF1_META_CONTENT = loader.load_meta_file(WF1_META_FILE_PATH)
63
WF1_NAME = WF1_META_CONTENT['pack'] + '.' + WF1_META_CONTENT['name']
64
WF1_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF1_META_CONTENT['entry_point']
65
WF1_ENTRY_POINT_X = WF1_ENTRY_POINT.replace(WF1_META_FILE_NAME, 'xformed_' + WF1_META_FILE_NAME)
66
WF1_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF1_ENTRY_POINT_X))
67
WF1_YAML = yaml.safe_dump(WF1_SPEC, default_flow_style=False)
68
WF1 = workflows.Workflow(None, {'name': WF1_NAME, 'definition': WF1_YAML})
69
WF1_OLD = workflows.Workflow(None, {'name': WF1_NAME, 'definition': ''})
70
WF1_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF1_NAME}
71
WF1_EXEC_PAUSED = copy.deepcopy(WF1_EXEC)
72
WF1_EXEC_PAUSED['state'] = 'PAUSED'
73
74
# Workflow with a subworkflow action
75
WF2_META_FILE_NAME = 'workflow_v2_call_workflow_action.yaml'
76
WF2_META_FILE_PATH = TEST_PACK_PATH + '/actions/' + WF2_META_FILE_NAME
77
WF2_META_CONTENT = loader.load_meta_file(WF2_META_FILE_PATH)
78
WF2_NAME = WF2_META_CONTENT['pack'] + '.' + WF2_META_CONTENT['name']
79
WF2_ENTRY_POINT = TEST_PACK_PATH + '/actions/' + WF2_META_CONTENT['entry_point']
80
WF2_ENTRY_POINT_X = WF2_ENTRY_POINT.replace(WF2_META_FILE_NAME, 'xformed_' + WF2_META_FILE_NAME)
81
WF2_SPEC = yaml.safe_load(MistralRunner.get_workflow_definition(WF2_ENTRY_POINT_X))
82
WF2_YAML = yaml.safe_dump(WF2_SPEC, default_flow_style=False)
83
WF2 = workflows.Workflow(None, {'name': WF2_NAME, 'definition': WF2_YAML})
84
WF2_EXEC = {'id': str(uuid.uuid4()), 'state': 'RUNNING', 'workflow_name': WF2_NAME}
85
WF2_EXEC_PAUSED = copy.deepcopy(WF2_EXEC)
86
WF2_EXEC_PAUSED['state'] = 'PAUSED'
87
88
89
@mock.patch.object(
90
    CUDPublisher,
91
    'publish_update',
92
    mock.MagicMock(return_value=None))
93
@mock.patch.object(
94
    CUDPublisher,
95
    'publish_create',
96
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_create))
97
@mock.patch.object(
98
    LiveActionPublisher,
99
    'publish_state',
100
    mock.MagicMock(side_effect=MockLiveActionPublisher.publish_state))
101
class MistralRunnerPauseResumeTest(DbTestCase):
102
103
    @classmethod
104
    def setUpClass(cls):
105
        super(MistralRunnerPauseResumeTest, cls).setUpClass()
106
107
        # Override the retry configuration here otherwise st2tests.config.parse_args
108
        # in DbTestCase.setUpClass will reset these overrides.
109
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
110
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
111
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
112
        cfg.CONF.set_override('api_url', 'http://0.0.0.0:9101', group='auth')
113
114
        # Register runners.
115
        runnersregistrar.register_runners()
116
117
        # Register test pack(s).
118
        actions_registrar = actionsregistrar.ActionsRegistrar(
119
            use_pack_cache=False,
120
            fail_on_failure=True
121
        )
122
123
        for pack in PACKS:
124
            actions_registrar.register_from_pack(pack)
125
126
    @classmethod
127
    def get_runner_class(cls, runner_name):
128
        return runners.get_runner(runner_name).__class__
129
130
    @mock.patch.object(
131
        workflows.WorkflowManager, 'list',
132
        mock.MagicMock(return_value=[]))
133
    @mock.patch.object(
134
        workflows.WorkflowManager, 'get',
135
        mock.MagicMock(return_value=WF1))
136
    @mock.patch.object(
137
        workflows.WorkflowManager, 'create',
138
        mock.MagicMock(return_value=[WF1]))
139
    @mock.patch.object(
140
        executions.ExecutionManager, 'create',
141
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
142
    @mock.patch.object(
143
        executions.ExecutionManager, 'update',
144
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC_PAUSED)))
145
    def test_pause(self):
146
        # Launch the workflow execution.
147
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
148
        liveaction, execution = action_service.request(liveaction)
149
        liveaction = LiveAction.get_by_id(str(liveaction.id))
150
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
151
152
        mistral_context = liveaction.context.get('mistral', None)
153
        self.assertIsNotNone(mistral_context)
154
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
155
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
156
157
        # Pause the workflow execution.
158
        requester = cfg.CONF.system_user.user
159
        liveaction, execution = action_service.request_pause(liveaction, requester)
160
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'PAUSED')
161
        liveaction = LiveAction.get_by_id(str(liveaction.id))
162
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
163
164
    @mock.patch.object(
165
        workflows.WorkflowManager, 'list',
166
        mock.MagicMock(return_value=[]))
167
    @mock.patch.object(
168
        workflows.WorkflowManager, 'get',
169
        mock.MagicMock(return_value=WF1))
170
    @mock.patch.object(
171
        workflows.WorkflowManager, 'create',
172
        mock.MagicMock(return_value=[WF1]))
173
    @mock.patch.object(
174
        executions.ExecutionManager, 'create',
175
        mock.MagicMock(return_value=executions.Execution(None, WF1_EXEC)))
176
    @mock.patch.object(
177
        executions.ExecutionManager, 'update',
178
        mock.MagicMock(side_effect=[
179
            executions.Execution(None, WF1_EXEC_PAUSED),
180
            executions.Execution(None, WF1_EXEC)]))
181
    def test_resume(self):
182
        # Launch the workflow execution.
183
        liveaction = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
184
        liveaction, execution = action_service.request(liveaction)
185
        liveaction = LiveAction.get_by_id(str(liveaction.id))
186
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
187
188
        mistral_context = liveaction.context.get('mistral', None)
189
        self.assertIsNotNone(mistral_context)
190
        self.assertEqual(mistral_context['execution_id'], WF1_EXEC.get('id'))
191
        self.assertEqual(mistral_context['workflow_name'], WF1_EXEC.get('workflow_name'))
192
193
        # Pause the workflow execution.
194
        requester = cfg.CONF.system_user.user
195
        liveaction, execution = action_service.request_pause(liveaction, requester)
196
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'PAUSED')
197
        liveaction = LiveAction.get_by_id(str(liveaction.id))
198
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
199
200
        # Manually update the liveaction from pausing to paused. The paused state
201
        # is usually updated by the mistral querier.
202
        action_service.update_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
203
        liveaction = LiveAction.get_by_id(str(liveaction.id))
204
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
205
206
        # Resume the workflow execution.
207
        liveaction, execution = action_service.request_resume(liveaction, requester)
208
        executions.ExecutionManager.update.assert_called_with(WF1_EXEC.get('id'), 'RUNNING')
209
        liveaction = LiveAction.get_by_id(str(liveaction.id))
210
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
211
212
    @mock.patch.object(
213
        workflows.WorkflowManager, 'list',
214
        mock.MagicMock(return_value=[]))
215
    @mock.patch.object(
216
        workflows.WorkflowManager, 'get',
217
        mock.MagicMock(side_effect=[WF2, WF1]))
218
    @mock.patch.object(
219
        workflows.WorkflowManager, 'create',
220
        mock.MagicMock(side_effect=[[WF2], [WF1]]))
221
    @mock.patch.object(
222
        executions.ExecutionManager, 'create',
223
        mock.MagicMock(side_effect=[
224
            executions.Execution(None, WF2_EXEC),
225
            executions.Execution(None, WF1_EXEC)]))
226
    @mock.patch.object(
227
        executions.ExecutionManager, 'update',
228
        mock.MagicMock(side_effect=[
229
            executions.Execution(None, WF2_EXEC_PAUSED),
230
            executions.Execution(None, WF1_EXEC_PAUSED),
231
            executions.Execution(None, WF2_EXEC),
232
            executions.Execution(None, WF1_EXEC)]))
233
    def test_resume_subworkflow_action(self):
234
        requester = cfg.CONF.system_user.user
235
236
        liveaction1 = LiveActionDB(action=WF2_NAME, parameters=ACTION_PARAMS)
237
        liveaction1, execution1 = action_service.request(liveaction1)
238
        liveaction1 = LiveAction.get_by_id(str(liveaction1.id))
239
        self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_RUNNING)
240
241
        liveaction2 = LiveActionDB(action=WF1_NAME, parameters=ACTION_PARAMS)
242
        liveaction2, execution2 = action_service.request(liveaction2)
243
        liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
244
        self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_RUNNING)
245
246
        # Mock the children of the parent execution to make this
247
        # test case has subworkflow execution.
248
        with mock.patch.object(
249
                ActionExecutionDB, 'children',
250
                new_callable=mock.PropertyMock) as action_ex_children_mock:
251
            action_ex_children_mock.return_value = [execution2.id]
252
253
            mistral_context = liveaction1.context.get('mistral', None)
254
            self.assertIsNotNone(mistral_context)
255
            self.assertEqual(mistral_context['execution_id'], WF2_EXEC.get('id'))
256
            self.assertEqual(mistral_context['workflow_name'], WF2_EXEC.get('workflow_name'))
257
258
            # Pause the parent liveaction and check that the request is cascaded down.
259
            liveaction1, execution1 = action_service.request_pause(liveaction1, requester)
260
261
            self.assertTrue(executions.ExecutionManager.update.called)
262
            self.assertEqual(executions.ExecutionManager.update.call_count, 2)
263
264
            calls = [
265
                mock.call(WF2_EXEC.get('id'), 'PAUSED'),
266
                mock.call(WF1_EXEC.get('id'), 'PAUSED')
267
            ]
268
269
            executions.ExecutionManager.update.assert_has_calls(calls, any_order=False)
270
271
            liveaction1 = LiveAction.get_by_id(str(liveaction1.id))
272
            self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_PAUSING)
273
274
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
275
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_PAUSING)
276
277
            # Manually set the liveaction status to PAUSED.
278
            action_service.update_status(liveaction2, action_constants.LIVEACTION_STATUS_PAUSED)
279
            action_service.update_status(liveaction1, action_constants.LIVEACTION_STATUS_PAUSED)
280
281
            liveaction1 = LiveAction.get_by_id(str(liveaction1.id))
282
            self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_PAUSED)
283
284
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
285
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_PAUSED)
286
287
            # Resume the parent liveaction and check that the request is cascaded down.
288
            liveaction1, execution1 = action_service.request_resume(liveaction1, requester)
289
290
            # Includes the previous calls.
291
            self.assertTrue(executions.ExecutionManager.update.called)
292
            self.assertEqual(executions.ExecutionManager.update.call_count, 4)
293
294
            calls = [
295
                mock.call(WF2_EXEC.get('id'), 'PAUSED'),
296
                mock.call(WF1_EXEC.get('id'), 'PAUSED'),
297
                mock.call(WF2_EXEC.get('id'), 'RUNNING'),
298
                mock.call(WF1_EXEC.get('id'), 'RUNNING')
299
            ]
300
301
            executions.ExecutionManager.update.assert_has_calls(calls, any_order=False)
302
303
            liveaction1 = LiveAction.get_by_id(str(liveaction1.id))
304
            self.assertEqual(liveaction1.status, action_constants.LIVEACTION_STATUS_RUNNING)
305
306
            liveaction2 = LiveAction.get_by_id(str(liveaction2.id))
307
            self.assertEqual(liveaction2.status, action_constants.LIVEACTION_STATUS_RUNNING)
308