Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

ActionChainRunnerPauseResumeTest   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 659
Duplicated Lines 0 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 0
loc 659
rs 9.8373
wmc 22

15 Methods

Rating   Name   Duplication   Size   Complexity  
A test_chain_pause_resume_with_published_vars_display_false() 0 49 1
B test_chain_pause_resume_cascade_to_subworkflow() 0 84 1
A setUp() 0 6 1
A test_chain_pause_resume_last_task_failed_with_no_next_task() 0 52 1
A test_chain_pause_resume() 0 48 1
A setUpClass() 0 15 2
A tearDown() 0 5 3
A test_chain_pause_resume_with_init_vars() 0 49 1
A _wait_for_status() 0 9 3
A test_chain_pause_resume_with_context_access() 0 49 1
A test_chain_pause_resume_with_published_vars() 0 50 1
A test_chain_pause_resume_with_no_more_task() 0 48 1
A test_chain_pause_resume_with_error() 0 52 1
B test_chain_pause_resume_cascade_to_parent_workflow() 0 104 1
A _wait_for_children() 0 9 3
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
import mock
18
import os
19
import tempfile
20
21
from st2common.bootstrap import actionsregistrar
22
from st2common.bootstrap import runnersregistrar
23
24
from st2common.constants import action as action_constants
25
from st2common.models.db.liveaction import LiveActionDB
26
from st2common.persistence.execution import ActionExecution
27
from st2common.persistence.liveaction import LiveAction
28
from st2common.services import action as action_service
29
from st2common.transport.liveaction import LiveActionPublisher
30
from st2common.transport.publishers import CUDPublisher
31
32
from st2tests import DbTestCase
33
from st2tests import fixturesloader
34
from st2tests.mocks.liveaction import MockLiveActionPublisherNonBlocking
35
36
37
TEST_FIXTURES = {
38
    'chains': [
39
        'test_pause_resume.yaml',
40
        'test_pause_resume_with_published_vars.yaml',
41
        'test_pause_resume_with_error.yaml',
42
        'test_pause_resume_with_subworkflow.yaml',
43
        'test_pause_resume_with_context_access.yaml',
44
        'test_pause_resume_with_init_vars.yaml',
45
        'test_pause_resume_with_no_more_task.yaml',
46
        'test_pause_resume_last_task_failed_with_no_next_task.yaml'
47
    ],
48
    'actions': [
49
        'test_pause_resume.yaml',
50
        'test_pause_resume_with_published_vars.yaml',
51
        'test_pause_resume_with_error.yaml',
52
        'test_pause_resume_with_subworkflow.yaml',
53
        'test_pause_resume_with_context_access.yaml',
54
        'test_pause_resume_with_init_vars.yaml',
55
        'test_pause_resume_with_no_more_task.yaml',
56
        'test_pause_resume_last_task_failed_with_no_next_task.yaml'
57
    ]
58
}
59
60
TEST_PACK = 'action_chain_tests'
61
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
62
63
PACKS = [
64
    TEST_PACK_PATH,
65
    fixturesloader.get_fixtures_packs_base_path() + '/core'
66
]
67
68
USERNAME = 'stanley'
69
70
71
@mock.patch.object(
72
    CUDPublisher,
73
    'publish_update',
74
    mock.MagicMock(return_value=None))
75
@mock.patch.object(
76
    CUDPublisher,
77
    'publish_create',
78
    mock.MagicMock(return_value=None))
79
@mock.patch.object(
80
    LiveActionPublisher,
81
    'publish_state',
82
    mock.MagicMock(side_effect=MockLiveActionPublisherNonBlocking.publish_state))
83
class ActionChainRunnerPauseResumeTest(DbTestCase):
84
85
    temp_file_path = None
86
87
    @classmethod
88
    def setUpClass(cls):
89
        super(ActionChainRunnerPauseResumeTest, cls).setUpClass()
90
91
        # Register runners.
92
        runnersregistrar.register_runners()
93
94
        # Register test pack(s).
95
        actions_registrar = actionsregistrar.ActionsRegistrar(
96
            use_pack_cache=False,
97
            fail_on_failure=True
98
        )
99
100
        for pack in PACKS:
101
            actions_registrar.register_from_pack(pack)
102
103
    def setUp(self):
104
        super(ActionChainRunnerPauseResumeTest, self).setUp()
105
106
        # Create temporary directory used by the tests
107
        _, self.temp_file_path = tempfile.mkstemp()
108
        os.chmod(self.temp_file_path, 0755)   # nosec
109
110
    def tearDown(self):
111
        if self.temp_file_path and os.path.exists(self.temp_file_path):
112
            os.remove(self.temp_file_path)
113
114
        super(ActionChainRunnerPauseResumeTest, self).tearDown()
115
116
    def _wait_for_status(self, liveaction, status, interval=0.1, retries=100):
117
        # Wait until the liveaction reaches status.
118
        for i in range(0, retries):
119
            liveaction = LiveAction.get_by_id(str(liveaction.id))
120
            if liveaction.status != status:
121
                eventlet.sleep(interval)
122
                continue
123
124
        return liveaction
125
126
    def _wait_for_children(self, execution, interval=0.1, retries=100):
127
        # Wait until the execution has children.
128
        for i in range(0, retries):
129
            execution = ActionExecution.get_by_id(str(execution.id))
130
            if len(getattr(execution, 'children', [])) <= 0:
131
                eventlet.sleep(interval)
132
                continue
133
134
        return execution
135
136
    def test_chain_pause_resume(self):
137
        # A temp file is created during test setup. Ensure the temp file exists.
138
        # The test action chain will stall until this file is deleted. This gives
139
        # the unit test a moment to run any test related logic.
140
        path = self.temp_file_path
141
        self.assertTrue(os.path.exists(path))
142
143
        action = TEST_PACK + '.' + 'test_pause_resume'
144
        params = {'tempfile': path, 'message': 'foobar'}
145
        liveaction = LiveActionDB(action=action, parameters=params)
146
        liveaction, execution = action_service.request(liveaction)
147
        liveaction = LiveAction.get_by_id(str(liveaction.id))
148
149
        # Wait until the liveaction is running.
150
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
151
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
152
153
        # Request action chain to pause.
154
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
155
156
        # Wait until the liveaction is pausing.
157
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
158
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
159
160
        # Delete the temporary file that the action chain is waiting on.
161
        os.remove(path)
162
        self.assertFalse(os.path.exists(path))
163
164
        # Wait until the liveaction is paused.
165
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
166
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
167
168
        # Wait for non-blocking threads to complete. Ensure runner is not running.
169
        MockLiveActionPublisherNonBlocking.wait_all()
170
171
        # Request action chain to resume.
172
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
173
174
        # Wait until the liveaction is completed.
175
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
176
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
177
178
        # Wait for non-blocking threads to complete.
179
        MockLiveActionPublisherNonBlocking.wait_all()
180
181
        # Check liveaction result.
182
        self.assertIn('tasks', liveaction.result)
183
        self.assertEqual(len(liveaction.result['tasks']), 2)
184
185
    def test_chain_pause_resume_with_published_vars(self):
186
        # A temp file is created during test setup. Ensure the temp file exists.
187
        # The test action chain will stall until this file is deleted. This gives
188
        # the unit test a moment to run any test related logic.
189
        path = self.temp_file_path
190
        self.assertTrue(os.path.exists(path))
191
192
        action = TEST_PACK + '.' + 'test_pause_resume_with_published_vars'
193
        params = {'tempfile': path, 'message': 'foobar'}
194
        liveaction = LiveActionDB(action=action, parameters=params)
195
        liveaction, execution = action_service.request(liveaction)
196
        liveaction = LiveAction.get_by_id(str(liveaction.id))
197
198
        # Wait until the liveaction is running.
199
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
200
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
201
202
        # Request action chain to pause.
203
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
204
205
        # Wait until the liveaction is pausing.
206
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
207
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
208
209
        # Delete the temporary file that the action chain is waiting on.
210
        os.remove(path)
211
        self.assertFalse(os.path.exists(path))
212
213
        # Wait until the liveaction is paused.
214
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
215
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
216
217
        # Wait for non-blocking threads to complete. Ensure runner is not running.
218
        MockLiveActionPublisherNonBlocking.wait_all()
219
220
        # Request action chain to resume.
221
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
222
223
        # Wait until the liveaction is completed.
224
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
225
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
226
227
        # Wait for non-blocking threads to complete.
228
        MockLiveActionPublisherNonBlocking.wait_all()
229
230
        # Check liveaction result.
231
        self.assertIn('tasks', liveaction.result)
232
        self.assertEqual(len(liveaction.result['tasks']), 2)
233
        self.assertIn('published', liveaction.result)
234
        self.assertDictEqual({'var1': 'foobar', 'var2': 'fubar'}, liveaction.result['published'])
235
236
    def test_chain_pause_resume_with_published_vars_display_false(self):
237
        # A temp file is created during test setup. Ensure the temp file exists.
238
        # The test action chain will stall until this file is deleted. This gives
239
        # the unit test a moment to run any test related logic.
240
        path = self.temp_file_path
241
        self.assertTrue(os.path.exists(path))
242
243
        action = TEST_PACK + '.' + 'test_pause_resume_with_published_vars'
244
        params = {'tempfile': path, 'message': 'foobar', 'display_published': False}
245
        liveaction = LiveActionDB(action=action, parameters=params)
246
        liveaction, execution = action_service.request(liveaction)
247
        liveaction = LiveAction.get_by_id(str(liveaction.id))
248
249
        # Wait until the liveaction is running.
250
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
251
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
252
253
        # Request action chain to pause.
254
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
255
256
        # Wait until the liveaction is pausing.
257
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
258
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
259
260
        # Delete the temporary file that the action chain is waiting on.
261
        os.remove(path)
262
        self.assertFalse(os.path.exists(path))
263
264
        # Wait until the liveaction is paused.
265
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
266
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
267
268
        # Wait for non-blocking threads to complete. Ensure runner is not running.
269
        MockLiveActionPublisherNonBlocking.wait_all()
270
271
        # Request action chain to resume.
272
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
273
274
        # Wait until the liveaction is completed.
275
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
276
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
277
278
        # Wait for non-blocking threads to complete.
279
        MockLiveActionPublisherNonBlocking.wait_all()
280
281
        # Check liveaction result.
282
        self.assertIn('tasks', liveaction.result)
283
        self.assertEqual(len(liveaction.result['tasks']), 2)
284
        self.assertNotIn('published', liveaction.result)
285
286
    def test_chain_pause_resume_with_error(self):
287
        # A temp file is created during test setup. Ensure the temp file exists.
288
        # The test action chain will stall until this file is deleted. This gives
289
        # the unit test a moment to run any test related logic.
290
        path = self.temp_file_path
291
        self.assertTrue(os.path.exists(path))
292
293
        action = TEST_PACK + '.' + 'test_pause_resume_with_error'
294
        params = {'tempfile': path, 'message': 'foobar'}
295
        liveaction = LiveActionDB(action=action, parameters=params)
296
        liveaction, execution = action_service.request(liveaction)
297
        liveaction = LiveAction.get_by_id(str(liveaction.id))
298
299
        # Wait until the liveaction is running.
300
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
301
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
302
303
        # Request action chain to pause.
304
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
305
306
        # Wait until the liveaction is pausing.
307
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
308
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
309
310
        # Delete the temporary file that the action chain is waiting on.
311
        os.remove(path)
312
        self.assertFalse(os.path.exists(path))
313
314
        # Wait until the liveaction is paused.
315
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
316
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
317
318
        # Wait for non-blocking threads to complete. Ensure runner is not running.
319
        MockLiveActionPublisherNonBlocking.wait_all()
320
321
        # Request action chain to resume.
322
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
323
324
        # Wait until the liveaction is completed.
325
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
326
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
327
328
        # Wait for non-blocking threads to complete.
329
        MockLiveActionPublisherNonBlocking.wait_all()
330
331
        # Check liveaction result.
332
        self.assertIn('tasks', liveaction.result)
333
        self.assertEqual(len(liveaction.result['tasks']), 2)
334
        self.assertTrue(liveaction.result['tasks'][0]['result']['failed'])
335
        self.assertEqual(1, liveaction.result['tasks'][0]['result']['return_code'])
336
        self.assertTrue(liveaction.result['tasks'][1]['result']['succeeded'])
337
        self.assertEqual(0, liveaction.result['tasks'][1]['result']['return_code'])
338
339
    def test_chain_pause_resume_cascade_to_subworkflow(self):
340
        # A temp file is created during test setup. Ensure the temp file exists.
341
        # The test action chain will stall until this file is deleted. This gives
342
        # the unit test a moment to run any test related logic.
343
        path = self.temp_file_path
344
        self.assertTrue(os.path.exists(path))
345
346
        action = TEST_PACK + '.' + 'test_pause_resume_with_subworkflow'
347
        params = {'tempfile': path, 'message': 'foobar'}
348
        liveaction = LiveActionDB(action=action, parameters=params)
349
        liveaction, execution = action_service.request(liveaction)
350
        liveaction = LiveAction.get_by_id(str(liveaction.id))
351
352
        # Wait until the liveaction is running.
353
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
354
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
355
356
        # Wait for subworkflow to register.
357
        execution = self._wait_for_children(execution)
358
        self.assertEqual(len(execution.children), 1)
359
360
        # Wait until the subworkflow is running.
361
        task1_exec = ActionExecution.get_by_id(execution.children[0])
362
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
363
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
364
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
365
366
        # Request action chain to pause.
367
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
368
369
        # Wait until the liveaction is pausing.
370
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
371
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
372
        self.assertEqual(len(execution.children), 1)
373
374
        # Wait until the subworkflow is pausing.
375
        task1_exec = ActionExecution.get_by_id(execution.children[0])
376
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
377
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSING)
378
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSING)
379
380
        # Delete the temporary file that the action chain is waiting on.
381
        os.remove(path)
382
        self.assertFalse(os.path.exists(path))
383
384
        # Wait until the liveaction is paused.
385
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
386
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
387
        self.assertEqual(len(execution.children), 1)
388
389
        # Wait until the subworkflow is paused.
390
        task1_exec = ActionExecution.get_by_id(execution.children[0])
391
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
392
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSED)
393
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSED)
394
395
        # Wait for non-blocking threads to complete. Ensure runner is not running.
396
        MockLiveActionPublisherNonBlocking.wait_all()
397
398
        # Check liveaction result.
399
        self.assertIn('tasks', liveaction.result)
400
        self.assertEqual(len(liveaction.result['tasks']), 1)
401
402
        subworkflow = liveaction.result['tasks'][0]
403
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
404
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
405
406
        # Request action chain to resume.
407
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
408
409
        # Wait until the liveaction is completed.
410
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
411
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
412
413
        # Wait for non-blocking threads to complete.
414
        MockLiveActionPublisherNonBlocking.wait_all()
415
416
        # Check liveaction result.
417
        self.assertIn('tasks', liveaction.result)
418
        self.assertEqual(len(liveaction.result['tasks']), 2)
419
420
        subworkflow = liveaction.result['tasks'][0]
421
        self.assertEqual(len(subworkflow['result']['tasks']), 2)
422
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_SUCCEEDED)
423
424
    def test_chain_pause_resume_cascade_to_parent_workflow(self):
425
        # A temp file is created during test setup. Ensure the temp file exists.
426
        # The test action chain will stall until this file is deleted. This gives
427
        # the unit test a moment to run any test related logic.
428
        path = self.temp_file_path
429
        self.assertTrue(os.path.exists(path))
430
431
        action = TEST_PACK + '.' + 'test_pause_resume_with_subworkflow'
432
        params = {'tempfile': path, 'message': 'foobar'}
433
        liveaction = LiveActionDB(action=action, parameters=params)
434
        liveaction, execution = action_service.request(liveaction)
435
        liveaction = LiveAction.get_by_id(str(liveaction.id))
436
437
        # Wait until the liveaction is running.
438
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
439
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
440
441
        # Wait for subworkflow to register.
442
        execution = self._wait_for_children(execution)
443
        self.assertEqual(len(execution.children), 1)
444
445
        # Wait until the subworkflow is running.
446
        task1_exec = ActionExecution.get_by_id(execution.children[0])
447
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
448
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
449
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
450
451
        # Request subworkflow to pause.
452
        task1_live, task1_exec = action_service.request_pause(task1_live, USERNAME)
453
454
        # Wait until the subworkflow is pausing.
455
        task1_exec = ActionExecution.get_by_id(execution.children[0])
456
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
457
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSING)
458
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSING)
459
460
        # Delete the temporary file that the action chain is waiting on.
461
        os.remove(path)
462
        self.assertFalse(os.path.exists(path))
463
464
        # Wait until the subworkflow is paused.
465
        task1_exec = ActionExecution.get_by_id(execution.children[0])
466
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
467
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSED)
468
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSED)
469
470
        # Wait until the parent liveaction is paused.
471
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
472
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
473
        self.assertEqual(len(execution.children), 1)
474
475
        # Wait for non-blocking threads to complete. Ensure runner is not running.
476
        MockLiveActionPublisherNonBlocking.wait_all()
477
478
        # Check liveaction result.
479
        self.assertIn('tasks', liveaction.result)
480
        self.assertEqual(len(liveaction.result['tasks']), 1)
481
482
        subworkflow = liveaction.result['tasks'][0]
483
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
484
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
485
486
        # Request subworkflow to resume.
487
        task1_live, task1_exec = action_service.request_resume(task1_live, USERNAME)
488
489
        # Wait until the subworkflow is paused.
490
        task1_exec = ActionExecution.get_by_id(execution.children[0])
491
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
492
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_SUCCEEDED)
493
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
494
495
        # The parent workflow will stay paused.
496
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
497
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
498
499
        # Wait for non-blocking threads to complete.
500
        MockLiveActionPublisherNonBlocking.wait_all()
501
502
        # Check liveaction result of the parent, which should stay the same
503
        # because only the subworkflow was resumed.
504
        self.assertIn('tasks', liveaction.result)
505
        self.assertEqual(len(liveaction.result['tasks']), 1)
506
507
        subworkflow = liveaction.result['tasks'][0]
508
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
509
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
510
511
        # Request parent workflow to resume.
512
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
513
514
        # Wait until the liveaction is completed.
515
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
516
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
517
518
        # Wait for non-blocking threads to complete.
519
        MockLiveActionPublisherNonBlocking.wait_all()
520
521
        # Check liveaction result.
522
        self.assertIn('tasks', liveaction.result)
523
        self.assertEqual(len(liveaction.result['tasks']), 2)
524
525
        subworkflow = liveaction.result['tasks'][0]
526
        self.assertEqual(len(subworkflow['result']['tasks']), 2)
527
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_SUCCEEDED)
528
529
    def test_chain_pause_resume_with_context_access(self):
530
        # A temp file is created during test setup. Ensure the temp file exists.
531
        # The test action chain will stall until this file is deleted. This gives
532
        # the unit test a moment to run any test related logic.
533
        path = self.temp_file_path
534
        self.assertTrue(os.path.exists(path))
535
536
        action = TEST_PACK + '.' + 'test_pause_resume_with_context_access'
537
        params = {'tempfile': path, 'message': 'foobar'}
538
        liveaction = LiveActionDB(action=action, parameters=params)
539
        liveaction, execution = action_service.request(liveaction)
540
        liveaction = LiveAction.get_by_id(str(liveaction.id))
541
542
        # Wait until the liveaction is running.
543
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
544
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
545
546
        # Request action chain to pause.
547
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
548
549
        # Wait until the liveaction is pausing.
550
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
551
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
552
553
        # Delete the temporary file that the action chain is waiting on.
554
        os.remove(path)
555
        self.assertFalse(os.path.exists(path))
556
557
        # Wait until the liveaction is paused.
558
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
559
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
560
561
        # Wait for non-blocking threads to complete. Ensure runner is not running.
562
        MockLiveActionPublisherNonBlocking.wait_all()
563
564
        # Request action chain to resume.
565
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
566
567
        # Wait until the liveaction is completed.
568
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
569
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
570
571
        # Wait for non-blocking threads to complete.
572
        MockLiveActionPublisherNonBlocking.wait_all()
573
574
        # Check liveaction result.
575
        self.assertIn('tasks', liveaction.result)
576
        self.assertEqual(len(liveaction.result['tasks']), 3)
577
        self.assertEqual(liveaction.result['tasks'][2]['result']['stdout'], 'foobar')
578
579
    def test_chain_pause_resume_with_init_vars(self):
580
        # A temp file is created during test setup. Ensure the temp file exists.
581
        # The test action chain will stall until this file is deleted. This gives
582
        # the unit test a moment to run any test related logic.
583
        path = self.temp_file_path
584
        self.assertTrue(os.path.exists(path))
585
586
        action = TEST_PACK + '.' + 'test_pause_resume_with_init_vars'
587
        params = {'tempfile': path, 'message': 'foobar'}
588
        liveaction = LiveActionDB(action=action, parameters=params)
589
        liveaction, execution = action_service.request(liveaction)
590
        liveaction = LiveAction.get_by_id(str(liveaction.id))
591
592
        # Wait until the liveaction is running.
593
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
594
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
595
596
        # Request action chain to pause.
597
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
598
599
        # Wait until the liveaction is pausing.
600
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
601
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
602
603
        # Delete the temporary file that the action chain is waiting on.
604
        os.remove(path)
605
        self.assertFalse(os.path.exists(path))
606
607
        # Wait until the liveaction is paused.
608
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
609
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
610
611
        # Wait for non-blocking threads to complete. Ensure runner is not running.
612
        MockLiveActionPublisherNonBlocking.wait_all()
613
614
        # Request action chain to resume.
615
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
616
617
        # Wait until the liveaction is completed.
618
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
619
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
620
621
        # Wait for non-blocking threads to complete.
622
        MockLiveActionPublisherNonBlocking.wait_all()
623
624
        # Check liveaction result.
625
        self.assertIn('tasks', liveaction.result)
626
        self.assertEqual(len(liveaction.result['tasks']), 2)
627
        self.assertEqual(liveaction.result['tasks'][1]['result']['stdout'], 'FOOBAR')
628
629
    def test_chain_pause_resume_with_no_more_task(self):
630
        # A temp file is created during test setup. Ensure the temp file exists.
631
        # The test action chain will stall until this file is deleted. This gives
632
        # the unit test a moment to run any test related logic.
633
        path = self.temp_file_path
634
        self.assertTrue(os.path.exists(path))
635
636
        action = TEST_PACK + '.' + 'test_pause_resume_with_no_more_task'
637
        params = {'tempfile': path, 'message': 'foobar'}
638
        liveaction = LiveActionDB(action=action, parameters=params)
639
        liveaction, execution = action_service.request(liveaction)
640
        liveaction = LiveAction.get_by_id(str(liveaction.id))
641
642
        # Wait until the liveaction is running.
643
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
644
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
645
646
        # Request action chain to pause.
647
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
648
649
        # Wait until the liveaction is pausing.
650
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
651
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
652
653
        # Delete the temporary file that the action chain is waiting on.
654
        os.remove(path)
655
        self.assertFalse(os.path.exists(path))
656
657
        # Wait until the liveaction is paused.
658
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
659
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
660
661
        # Wait for non-blocking threads to complete. Ensure runner is not running.
662
        MockLiveActionPublisherNonBlocking.wait_all()
663
664
        # Request action chain to resume.
665
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
666
667
        # Wait until the liveaction is completed.
668
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
669
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
670
671
        # Wait for non-blocking threads to complete.
672
        MockLiveActionPublisherNonBlocking.wait_all()
673
674
        # Check liveaction result.
675
        self.assertIn('tasks', liveaction.result)
676
        self.assertEqual(len(liveaction.result['tasks']), 1)
677
678
    def test_chain_pause_resume_last_task_failed_with_no_next_task(self):
679
        # A temp file is created during test setup. Ensure the temp file exists.
680
        # The test action chain will stall until this file is deleted. This gives
681
        # the unit test a moment to run any test related logic.
682
        path = self.temp_file_path
683
        self.assertTrue(os.path.exists(path))
684
685
        action = TEST_PACK + '.' + 'test_pause_resume_last_task_failed_with_no_next_task'
686
        params = {'tempfile': path, 'message': 'foobar'}
687
        liveaction = LiveActionDB(action=action, parameters=params)
688
        liveaction, execution = action_service.request(liveaction)
689
        liveaction = LiveAction.get_by_id(str(liveaction.id))
690
691
        # Wait until the liveaction is running.
692
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
693
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
694
695
        # Request action chain to pause.
696
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
697
698
        # Wait until the liveaction is pausing.
699
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
700
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
701
702
        # Delete the temporary file that the action chain is waiting on.
703
        os.remove(path)
704
        self.assertFalse(os.path.exists(path))
705
706
        # Wait until the liveaction is paused.
707
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
708
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
709
710
        # Wait for non-blocking threads to complete. Ensure runner is not running.
711
        MockLiveActionPublisherNonBlocking.wait_all()
712
713
        # Request action chain to resume.
714
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
715
716
        # Wait until the liveaction is completed.
717
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_FAILED)
718
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_FAILED)
719
720
        # Wait for non-blocking threads to complete.
721
        MockLiveActionPublisherNonBlocking.wait_all()
722
723
        # Check liveaction result.
724
        self.assertIn('tasks', liveaction.result)
725
        self.assertEqual(len(liveaction.result['tasks']), 1)
726
727
        self.assertEqual(
728
            liveaction.result['tasks'][0]['state'],
729
            action_constants.LIVEACTION_STATUS_FAILED
730
        )
731