Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

tests/unit/test_actionchain_cancel.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import eventlet
18
import mock
19
import os
20
import tempfile
21
22
from st2common.bootstrap import actionsregistrar
23
from st2common.bootstrap import runnersregistrar
24
25
from st2common.constants import action as action_constants
26
from st2common.models.db.liveaction import LiveActionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.services import action as action_service
30
from st2common.transport.liveaction import LiveActionPublisher
31
from st2common.transport.publishers import CUDPublisher
32
33
from st2tests import DbTestCase
34
from st2tests import fixturesloader
35
from st2tests.mocks.liveaction import MockLiveActionPublisherNonBlocking
36
from six.moves import range
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in range.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
37
38
39
TEST_FIXTURES = {
40
    'chains': [
41
        'test_cancel.yaml',
42
        'test_cancel_with_subworkflow.yaml'
43
    ],
44
    'actions': [
45
        'test_cancel.yaml',
46
        'test_cancel_with_subworkflow.yaml'
47
    ]
48
}
49
50
TEST_PACK = 'action_chain_tests'
51
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
52
53
PACKS = [
54
    TEST_PACK_PATH,
55
    fixturesloader.get_fixtures_packs_base_path() + '/core'
56
]
57
58
USERNAME = 'stanley'
59
60
61
@mock.patch.object(
62
    CUDPublisher,
63
    'publish_update',
64
    mock.MagicMock(return_value=None))
65
@mock.patch.object(
66
    CUDPublisher,
67
    'publish_create',
68
    mock.MagicMock(return_value=None))
69
@mock.patch.object(
70
    LiveActionPublisher,
71
    'publish_state',
72
    mock.MagicMock(side_effect=MockLiveActionPublisherNonBlocking.publish_state))
73
class ActionChainRunnerPauseResumeTest(DbTestCase):
74
75
    temp_file_path = None
76
77
    @classmethod
78
    def setUpClass(cls):
79
        super(ActionChainRunnerPauseResumeTest, cls).setUpClass()
80
81
        # Register runners.
82
        runnersregistrar.register_runners()
83
84
        # Register test pack(s).
85
        actions_registrar = actionsregistrar.ActionsRegistrar(
86
            use_pack_cache=False,
87
            fail_on_failure=True
88
        )
89
90
        for pack in PACKS:
91
            actions_registrar.register_from_pack(pack)
92
93
    def setUp(self):
94
        super(ActionChainRunnerPauseResumeTest, self).setUp()
95
96
        # Create temporary directory used by the tests
97
        _, self.temp_file_path = tempfile.mkstemp()
98
        os.chmod(self.temp_file_path, 0o755)   # nosec
99
100
    def tearDown(self):
101
        if self.temp_file_path and os.path.exists(self.temp_file_path):
102
            os.remove(self.temp_file_path)
103
104
        super(ActionChainRunnerPauseResumeTest, self).tearDown()
105
106
    def _wait_for_status(self, liveaction, status, interval=0.1, retries=100):
107
        # Wait until the liveaction reaches status.
108
        for i in range(0, retries):
109
            liveaction = LiveAction.get_by_id(str(liveaction.id))
110
            if liveaction.status != status:
111
                eventlet.sleep(interval)
112
                continue
113
114
        return liveaction
115
116
    def _wait_for_children(self, execution, interval=0.1, retries=100):
117
        # Wait until the execution has children.
118
        for i in range(0, retries):
119
            execution = ActionExecution.get_by_id(str(execution.id))
120
            if len(getattr(execution, 'children', [])) <= 0:
121
                eventlet.sleep(interval)
122
                continue
123
124
        return execution
125
126
    def test_chain_cancel(self):
127
        # A temp file is created during test setup. Ensure the temp file exists.
128
        # The test action chain will stall until this file is deleted. This gives
129
        # the unit test a moment to run any test related logic.
130
        path = self.temp_file_path
131
        self.assertTrue(os.path.exists(path))
132
133
        action = TEST_PACK + '.' + 'test_cancel'
134
        params = {'tempfile': path, 'message': 'foobar'}
135
        liveaction = LiveActionDB(action=action, parameters=params)
136
        liveaction, execution = action_service.request(liveaction)
137
        liveaction = LiveAction.get_by_id(str(liveaction.id))
138
139
        # Wait until the liveaction is running.
140
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
141
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
142
143
        # Request action chain to cancel.
144
        liveaction, execution = action_service.request_cancellation(liveaction, USERNAME)
145
146
        # Wait until the liveaction is canceling.
147
        liveaction = self._wait_for_status(
148
            liveaction, action_constants.LIVEACTION_STATUS_CANCELING)
149
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
150
151
        # Delete the temporary file that the action chain is waiting on.
152
        os.remove(path)
153
        self.assertFalse(os.path.exists(path))
154
155
        # Wait until the liveaction is canceled.
156
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_CANCELED)
157
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
158
159
        # Wait for non-blocking threads to complete. Ensure runner is not running.
160
        MockLiveActionPublisherNonBlocking.wait_all()
161
162
        # Check liveaction result.
163
        self.assertIn('tasks', liveaction.result)
164
        self.assertEqual(len(liveaction.result['tasks']), 1)
165
166
    def test_chain_cancel_cascade_to_subworkflow(self):
167
        # A temp file is created during test setup. Ensure the temp file exists.
168
        # The test action chain will stall until this file is deleted. This gives
169
        # the unit test a moment to run any test related logic.
170
        path = self.temp_file_path
171
        self.assertTrue(os.path.exists(path))
172
173
        action = TEST_PACK + '.' + 'test_cancel_with_subworkflow'
174
        params = {'tempfile': path, 'message': 'foobar'}
175
        liveaction = LiveActionDB(action=action, parameters=params)
176
        liveaction, execution = action_service.request(liveaction)
177
        liveaction = LiveAction.get_by_id(str(liveaction.id))
178
179
        # Wait until the liveaction is running.
180
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
181
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
182
183
        # Wait for subworkflow to register.
184
        execution = self._wait_for_children(execution)
185
        self.assertEqual(len(execution.children), 1)
186
187
        # Wait until the subworkflow is running.
188
        task1_exec = ActionExecution.get_by_id(execution.children[0])
189
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
190
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
191
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
192
193
        # Request action chain to cancel.
194
        liveaction, execution = action_service.request_cancellation(liveaction, USERNAME)
195
196
        # Wait until the liveaction is canceling.
197
        liveaction = self._wait_for_status(
198
            liveaction, action_constants.LIVEACTION_STATUS_CANCELING)
199
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELING)
200
        self.assertEqual(len(execution.children), 1)
201
202
        # Wait until the subworkflow is canceling.
203
        task1_exec = ActionExecution.get_by_id(execution.children[0])
204
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
205
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_CANCELING)
206
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELING)
207
208
        # Delete the temporary file that the action chain is waiting on.
209
        os.remove(path)
210
        self.assertFalse(os.path.exists(path))
211
212
        # Wait until the liveaction is canceled.
213
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_CANCELED)
214
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
215
        self.assertEqual(len(execution.children), 1)
216
217
        # Wait until the subworkflow is canceled.
218
        task1_exec = ActionExecution.get_by_id(execution.children[0])
219
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
220
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_CANCELED)
221
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELED)
222
223
        # Wait for non-blocking threads to complete. Ensure runner is not running.
224
        MockLiveActionPublisherNonBlocking.wait_all()
225
226
        # Check liveaction result.
227
        self.assertIn('tasks', liveaction.result)
228
        self.assertEqual(len(liveaction.result['tasks']), 1)
229
230
        subworkflow = liveaction.result['tasks'][0]
231
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
232
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_CANCELED)
233
234
    def test_chain_cancel_cascade_to_parent_workflow(self):
235
        # A temp file is created during test setup. Ensure the temp file exists.
236
        # The test action chain will stall until this file is deleted. This gives
237
        # the unit test a moment to run any test related logic.
238
        path = self.temp_file_path
239
        self.assertTrue(os.path.exists(path))
240
241
        action = TEST_PACK + '.' + 'test_cancel_with_subworkflow'
242
        params = {'tempfile': path, 'message': 'foobar'}
243
        liveaction = LiveActionDB(action=action, parameters=params)
244
        liveaction, execution = action_service.request(liveaction)
245
        liveaction = LiveAction.get_by_id(str(liveaction.id))
246
247
        # Wait until the liveaction is running.
248
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
249
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
250
251
        # Wait for subworkflow to register.
252
        execution = self._wait_for_children(execution)
253
        self.assertEqual(len(execution.children), 1)
254
255
        # Wait until the subworkflow is running.
256
        task1_exec = ActionExecution.get_by_id(execution.children[0])
257
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
258
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
259
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
260
261
        # Request subworkflow to cancel.
262
        task1_live, task1_exec = action_service.request_cancellation(task1_live, USERNAME)
263
264
        # Wait until the subworkflow is canceling.
265
        task1_exec = ActionExecution.get_by_id(execution.children[0])
266
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
267
        task1_live = self._wait_for_status(
268
            task1_live, action_constants.LIVEACTION_STATUS_CANCELING)
269
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELING)
270
271
        # Delete the temporary file that the action chain is waiting on.
272
        os.remove(path)
273
        self.assertFalse(os.path.exists(path))
274
275
        # Wait until the subworkflow is canceled.
276
        task1_exec = ActionExecution.get_by_id(execution.children[0])
277
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
278
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_CANCELED)
279
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_CANCELED)
280
281
        # Wait until the parent liveaction is canceled.
282
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_CANCELED)
283
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_CANCELED)
284
        self.assertEqual(len(execution.children), 1)
285
286
        # Wait for non-blocking threads to complete. Ensure runner is not running.
287
        MockLiveActionPublisherNonBlocking.wait_all()
288
289
        # Check liveaction result.
290
        self.assertIn('tasks', liveaction.result)
291
        self.assertEqual(len(liveaction.result['tasks']), 1)
292
293
        subworkflow = liveaction.result['tasks'][0]
294
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
295
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_CANCELED)
296