Passed
Pull Request — master (#3753)
by W
04:57
created

ActionChainRunnerPauseResumeTest.test_chain_cancel()   B

Complexity

Conditions 1

Size

Total Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 39
rs 8.8571
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
import mock
18
import os
19
import tempfile
20
21
from st2common.bootstrap import actionsregistrar
22
from st2common.bootstrap import runnersregistrar
23
24
from st2common.constants import action as action_constants
25
from st2common.models.db.liveaction import LiveActionDB
26
from st2common.persistence.execution import ActionExecution
27
from st2common.persistence.liveaction import LiveAction
28
from st2common.services import action as action_service
29
from st2common.transport.liveaction import LiveActionPublisher
30
from st2common.transport.publishers import CUDPublisher
31
32
from st2tests import DbTestCase
33
from st2tests import fixturesloader
34
from st2tests.mocks.liveaction import MockLiveActionPublisherNonBlocking
35
36
37
TEST_FIXTURES = {
38
    'chains': [
39
        'test_cancel.yaml',
40
        'test_cancel_with_subworkflow.yaml'
41
    ],
42
    'actions': [
43
        'test_cancel.yaml',
44
        'test_cancel_with_subworkflow.yaml'
45
    ]
46
}
47
48
TEST_PACK = 'action_chain_tests'
49
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
50
51
PACKS = [
52
    TEST_PACK_PATH,
53
    fixturesloader.get_fixtures_packs_base_path() + '/core'
54
]
55
56
USERNAME = 'stanley'
57
58
59
@mock.patch.object(
60
    CUDPublisher,
61
    'publish_update',
62
    mock.MagicMock(return_value=None))
63
@mock.patch.object(
64
    CUDPublisher,
65
    'publish_create',
66
    mock.MagicMock(return_value=None))
67
@mock.patch.object(
68
    LiveActionPublisher,
69
    'publish_state',
70
    mock.MagicMock(side_effect=MockLiveActionPublisherNonBlocking.publish_state))
71
class ActionChainRunnerPauseResumeTest(DbTestCase):
72
73
    temp_file_path = None
74
75
    @classmethod
76
    def setUpClass(cls):
77
        super(ActionChainRunnerPauseResumeTest, cls).setUpClass()
78
79
        # Register runners.
80
        runnersregistrar.register_runners()
81
82
        # Register test pack(s).
83
        actions_registrar = actionsregistrar.ActionsRegistrar(
84
            use_pack_cache=False,
85
            fail_on_failure=True
86
        )
87
88
        for pack in PACKS:
89
            actions_registrar.register_from_pack(pack)
90
91
    def setUp(self):
92
        super(ActionChainRunnerPauseResumeTest, self).setUp()
93
94
        # Create temporary directory used by the tests
95
        _, self.temp_file_path = tempfile.mkstemp()
96
        os.chmod(self.temp_file_path, 0755)   # nosec
97
98
    def tearDown(self):
99
        if self.temp_file_path and os.path.exists(self.temp_file_path):
100
            os.remove(self.temp_file_path)
101
102
        super(ActionChainRunnerPauseResumeTest, self).tearDown()
103
104
    def _wait_for_status(self, liveaction, status, interval=0.1, retries=100):
105
        # Wait until the liveaction reaches status.
106
        for i in range(0, retries):
107
            liveaction = LiveAction.get_by_id(str(liveaction.id))
108
            if liveaction.status != status:
109
                eventlet.sleep(interval)
110
                continue
111
112
        return liveaction
113
114
    def _wait_for_children(self, execution, interval=0.1, retries=100):
115
        # Wait until the execution has children.
116
        for i in range(0, retries):
117
            execution = ActionExecution.get_by_id(str(execution.id))
118
            if len(getattr(execution, 'children', [])) <= 0:
119
                eventlet.sleep(interval)
120
                continue
121
122
        return execution
123
124
    def test_chain_cancel(self):
125
        # A temp file is created during test setup. Ensure the temp file exists.
126
        # The test action chain will stall until this file is deleted. This gives
127
        # the unit test a moment to run any test related logic.
128
        path = self.temp_file_path
129
        self.assertTrue(os.path.exists(path))
130
131
        action = TEST_PACK + '.' + 'test_cancel'
132
        params = {'tempfile': path, 'message': 'foobar'}
133
        liveaction = LiveActionDB(action=action, parameters=params)
134
        liveaction, execution = action_service.request(liveaction)
135
        liveaction = LiveAction.get_by_id(str(liveaction.id))
136
137
        # Wait until the liveaction is running.
138
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
139
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
140
141
        # Request action chain to cancel.
142
        liveaction, execution = action_service.request_cancellation(liveaction, USERNAME)
143
144
        # Wait until the liveaction is canceling.
145
        liveaction = self._wait_for_status(
146
            liveaction, action_constants.LIVEACTION_STATUS_CANCELING)
147
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
148
149
        # Delete the temporary file that the action chain is waiting on.
150
        os.remove(path)
151
        self.assertFalse(os.path.exists(path))
152
153
        # Wait until the liveaction is canceled.
154
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_CANCELED)
155
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
156
157
        # Wait for non-blocking threads to complete. Ensure runner is not running.
158
        MockLiveActionPublisherNonBlocking.wait_all()
159
160
        # Check liveaction result.
161
        self.assertIn('tasks', liveaction.result)
162
        self.assertEqual(len(liveaction.result['tasks']), 1)
163
164
    def test_chain_cancel_cascade_to_subworkflow(self):
165
        # A temp file is created during test setup. Ensure the temp file exists.
166
        # The test action chain will stall until this file is deleted. This gives
167
        # the unit test a moment to run any test related logic.
168
        path = self.temp_file_path
169
        self.assertTrue(os.path.exists(path))
170
171
        action = TEST_PACK + '.' + 'test_cancel_with_subworkflow'
172
        params = {'tempfile': path, 'message': 'foobar'}
173
        liveaction = LiveActionDB(action=action, parameters=params)
174
        liveaction, execution = action_service.request(liveaction)
175
        liveaction = LiveAction.get_by_id(str(liveaction.id))
176
177
        # Wait until the liveaction is running.
178
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
179
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
180
181
        # Wait for subworkflow to register.
182
        execution = self._wait_for_children(execution)
183
        self.assertEqual(len(execution.children), 1)
184
185
        # Wait until the subworkflow is running.
186
        task1_exec = ActionExecution.get_by_id(execution.children[0])
187
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
188
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
189
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
190
191
        # Request action chain to cancel.
192
        liveaction, execution = action_service.request_cancellation(liveaction, USERNAME)
193
194
        # Wait until the liveaction is canceling.
195
        liveaction = self._wait_for_status(
196
            liveaction, action_constants.LIVEACTION_STATUS_CANCELING)
197
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
198
        self.assertEqual(len(execution.children), 1)
199
200
        # Wait until the subworkflow is canceling.
201
        task1_exec = ActionExecution.get_by_id(execution.children[0])
202
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
203
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_CANCELING)
204
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELING)
205
206
        # Delete the temporary file that the action chain is waiting on.
207
        os.remove(path)
208
        self.assertFalse(os.path.exists(path))
209
210
        # Wait until the liveaction is canceled.
211
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_CANCELED)
212
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
213
        self.assertEqual(len(execution.children), 1)
214
215
        # Wait until the subworkflow is canceled.
216
        task1_exec = ActionExecution.get_by_id(execution.children[0])
217
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
218
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_CANCELED)
219
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELED)
220
221
        # Wait for non-blocking threads to complete. Ensure runner is not running.
222
        MockLiveActionPublisherNonBlocking.wait_all()
223
224
        # Check liveaction result.
225
        self.assertIn('tasks', liveaction.result)
226
        self.assertEqual(len(liveaction.result['tasks']), 1)
227
228
        subworkflow = liveaction.result['tasks'][0]
229
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
230
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_CANCELED)
231
232
    def test_chain_cancel_cascade_to_parent_workflow(self):
233
        # A temp file is created during test setup. Ensure the temp file exists.
234
        # The test action chain will stall until this file is deleted. This gives
235
        # the unit test a moment to run any test related logic.
236
        path = self.temp_file_path
237
        self.assertTrue(os.path.exists(path))
238
239
        action = TEST_PACK + '.' + 'test_cancel_with_subworkflow'
240
        params = {'tempfile': path, 'message': 'foobar'}
241
        liveaction = LiveActionDB(action=action, parameters=params)
242
        liveaction, execution = action_service.request(liveaction)
243
        liveaction = LiveAction.get_by_id(str(liveaction.id))
244
245
        # Wait until the liveaction is running.
246
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
247
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
248
249
        # Wait for subworkflow to register.
250
        execution = self._wait_for_children(execution)
251
        self.assertEqual(len(execution.children), 1)
252
253
        # Wait until the subworkflow is running.
254
        task1_exec = ActionExecution.get_by_id(execution.children[0])
255
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
256
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
257
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
258
259
        # Request subworkflow to cancel.
260
        task1_live, task1_exec = action_service.request_cancellation(task1_live, USERNAME)
261
262
        # Wait until the subworkflow is canceling.
263
        task1_exec = ActionExecution.get_by_id(execution.children[0])
264
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
265
        task1_live = self._wait_for_status(
266
            task1_live, action_constants.LIVEACTION_STATUS_CANCELING)
267
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELING)
268
269
        # Delete the temporary file that the action chain is waiting on.
270
        os.remove(path)
271
        self.assertFalse(os.path.exists(path))
272
273
        # Wait until the subworkflow is canceled.
274
        task1_exec = ActionExecution.get_by_id(execution.children[0])
275
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
276
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_CANCELED)
277
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELED)
278
279
        # Wait until the parent liveaction is canceled.
280
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_CANCELED)
281
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
282
        self.assertEqual(len(execution.children), 1)
283
284
        # Wait for non-blocking threads to complete. Ensure runner is not running.
285
        MockLiveActionPublisherNonBlocking.wait_all()
286
287
        # Check liveaction result.
288
        self.assertIn('tasks', liveaction.result)
289
        self.assertEqual(len(liveaction.result['tasks']), 1)
290
291
        subworkflow = liveaction.result['tasks'][0]
292
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
293
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_CANCELED)
294