Passed
Pull Request — master (#3645)
by W
05:04
created

_wait_for_children()   A

Complexity

Conditions 3

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
c 1
b 0
f 0
dl 0
loc 9
rs 9.6666
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
import mock
18
import os
19
import tempfile
20
21
from st2common.bootstrap import actionsregistrar
22
from st2common.bootstrap import runnersregistrar
23
24
from st2common.constants import action as action_constants
25
from st2common.models.db.liveaction import LiveActionDB
26
from st2common.persistence.execution import ActionExecution
27
from st2common.persistence.liveaction import LiveAction
28
from st2common.services import action as action_service
29
from st2common.transport.liveaction import LiveActionPublisher
30
from st2common.transport.publishers import CUDPublisher
31
32
from st2tests import DbTestCase
33
from st2tests import fixturesloader
34
from st2tests.mocks.liveaction import MockLiveActionPublisherNonBlocking
35
36
37
TEST_FIXTURES = {
38
    'chains': [
39
        'test_pause_resume.yaml',
40
        'test_pause_resume_with_published_vars.yaml',
41
        'test_pause_resume_with_error.yaml',
42
        'test_pause_resume_with_subworkflow.yaml',
43
        'test_pause_resume_with_context_access.yaml',
44
        'test_pause_resume_with_init_vars.yaml'
45
    ],
46
    'actions': [
47
        'test_pause_resume.yaml',
48
        'test_pause_resume_with_published_vars.yaml',
49
        'test_pause_resume_with_error.yaml',
50
        'test_pause_resume_with_subworkflow.yaml',
51
        'test_pause_resume_with_context_access.yaml',
52
        'test_pause_resume_with_init_vars.yaml'
53
    ]
54
}
55
56
TEST_PACK = 'action_chain_tests'
57
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
58
59
PACKS = [
60
    TEST_PACK_PATH,
61
    fixturesloader.get_fixtures_packs_base_path() + '/core'
62
]
63
64
USERNAME = 'stanley'
65
66
67
@mock.patch.object(
68
    CUDPublisher,
69
    'publish_update',
70
    mock.MagicMock(return_value=None))
71
@mock.patch.object(
72
    CUDPublisher,
73
    'publish_create',
74
    mock.MagicMock(return_value=None))
75
@mock.patch.object(
76
    LiveActionPublisher,
77
    'publish_state',
78
    mock.MagicMock(side_effect=MockLiveActionPublisherNonBlocking.publish_state))
79
class ActionChainRunnerPauseResumeTest(DbTestCase):
80
81
    temp_file_path = None
82
83
    @classmethod
84
    def setUpClass(cls):
85
        super(ActionChainRunnerPauseResumeTest, cls).setUpClass()
86
87
        # Register runners.
88
        runnersregistrar.register_runners()
89
90
        # Register test pack(s).
91
        actions_registrar = actionsregistrar.ActionsRegistrar(
92
            use_pack_cache=False,
93
            fail_on_failure=True
94
        )
95
96
        for pack in PACKS:
97
            actions_registrar.register_from_pack(pack)
98
99
    def setUp(self):
100
        super(ActionChainRunnerPauseResumeTest, self).setUp()
101
102
        # Create temporary directory used by the tests
103
        _, self.temp_file_path = tempfile.mkstemp()
104
        os.chmod(self.temp_file_path, 0755)   # nosec
105
106
    def tearDown(self):
107
        if self.temp_file_path and os.path.exists(self.temp_file_path):
108
            os.remove(self.temp_file_path)
109
110
        super(ActionChainRunnerPauseResumeTest, self).tearDown()
111
112
    def _wait_for_status(self, liveaction, status, interval=0.1, retries=100):
113
        # Wait until the liveaction reaches status.
114
        for i in range(0, retries):
115
            liveaction = LiveAction.get_by_id(str(liveaction.id))
116
            if liveaction.status != status:
117
                eventlet.sleep(interval)
118
                continue
119
120
        return liveaction
121
122
    def _wait_for_children(self, execution, interval=0.1, retries=100):
123
        # Wait until the execution has children.
124
        for i in range(0, retries):
125
            execution = ActionExecution.get_by_id(str(execution.id))
126
            if len(getattr(execution, 'children', [])) <= 0:
127
                eventlet.sleep(interval)
128
                continue
129
130
        return execution
131
132
    def test_chain_pause_resume(self):
133
        # A temp file is created during test setup. Ensure the temp file exists.
134
        # The test action chain will stall until this file is deleted. This gives
135
        # the unit test a moment to run any test related logic.
136
        path = self.temp_file_path
137
        self.assertTrue(os.path.exists(path))
138
139
        action = TEST_PACK + '.' + 'test_pause_resume'
140
        params = {'tempfile': path, 'message': 'foobar'}
141
        liveaction = LiveActionDB(action=action, parameters=params)
142
        liveaction, execution = action_service.request(liveaction)
143
        liveaction = LiveAction.get_by_id(str(liveaction.id))
144
145
        # Wait until the liveaction is running.
146
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
147
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
148
149
        # Request action chain to pause.
150
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
151
152
        # Wait until the liveaction is pausing.
153
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
154
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
155
156
        # Delete the temporary file that the action chain is waiting on.
157
        os.remove(path)
158
        self.assertFalse(os.path.exists(path))
159
160
        # Wait until the liveaction is paused.
161
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
162
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
163
164
        # Wait for non-blocking threads to complete. Ensure runner is not running.
165
        MockLiveActionPublisherNonBlocking.wait_all()
166
167
        # Request action chain to resume.
168
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
169
170
        # Wait until the liveaction is completed.
171
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
172
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
173
174
        # Wait for non-blocking threads to complete.
175
        MockLiveActionPublisherNonBlocking.wait_all()
176
177
        # Check liveaction result.
178
        self.assertIn('tasks', liveaction.result)
179
        self.assertEqual(len(liveaction.result['tasks']), 2)
180
181
    def test_chain_pause_resume_with_published_vars(self):
182
        # A temp file is created during test setup. Ensure the temp file exists.
183
        # The test action chain will stall until this file is deleted. This gives
184
        # the unit test a moment to run any test related logic.
185
        path = self.temp_file_path
186
        self.assertTrue(os.path.exists(path))
187
188
        action = TEST_PACK + '.' + 'test_pause_resume_with_published_vars'
189
        params = {'tempfile': path, 'message': 'foobar'}
190
        liveaction = LiveActionDB(action=action, parameters=params)
191
        liveaction, execution = action_service.request(liveaction)
192
        liveaction = LiveAction.get_by_id(str(liveaction.id))
193
194
        # Wait until the liveaction is running.
195
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
196
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
197
198
        # Request action chain to pause.
199
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
200
201
        # Wait until the liveaction is pausing.
202
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
203
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
204
205
        # Delete the temporary file that the action chain is waiting on.
206
        os.remove(path)
207
        self.assertFalse(os.path.exists(path))
208
209
        # Wait until the liveaction is paused.
210
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
211
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
212
213
        # Wait for non-blocking threads to complete. Ensure runner is not running.
214
        MockLiveActionPublisherNonBlocking.wait_all()
215
216
        # Request action chain to resume.
217
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
218
219
        # Wait until the liveaction is completed.
220
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
221
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
222
223
        # Wait for non-blocking threads to complete.
224
        MockLiveActionPublisherNonBlocking.wait_all()
225
226
        # Check liveaction result.
227
        self.assertIn('tasks', liveaction.result)
228
        self.assertEqual(len(liveaction.result['tasks']), 2)
229
        self.assertIn('published', liveaction.result)
230
        self.assertDictEqual({'var1': 'foobar', 'var2': 'fubar'}, liveaction.result['published'])
231
232
    def test_chain_pause_resume_with_published_vars_display_false(self):
233
        # A temp file is created during test setup. Ensure the temp file exists.
234
        # The test action chain will stall until this file is deleted. This gives
235
        # the unit test a moment to run any test related logic.
236
        path = self.temp_file_path
237
        self.assertTrue(os.path.exists(path))
238
239
        action = TEST_PACK + '.' + 'test_pause_resume_with_published_vars'
240
        params = {'tempfile': path, 'message': 'foobar', 'display_published': False}
241
        liveaction = LiveActionDB(action=action, parameters=params)
242
        liveaction, execution = action_service.request(liveaction)
243
        liveaction = LiveAction.get_by_id(str(liveaction.id))
244
245
        # Wait until the liveaction is running.
246
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
247
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
248
249
        # Request action chain to pause.
250
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
251
252
        # Wait until the liveaction is pausing.
253
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
254
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
255
256
        # Delete the temporary file that the action chain is waiting on.
257
        os.remove(path)
258
        self.assertFalse(os.path.exists(path))
259
260
        # Wait until the liveaction is paused.
261
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
262
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
263
264
        # Wait for non-blocking threads to complete. Ensure runner is not running.
265
        MockLiveActionPublisherNonBlocking.wait_all()
266
267
        # Request action chain to resume.
268
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
269
270
        # Wait until the liveaction is completed.
271
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
272
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
273
274
        # Wait for non-blocking threads to complete.
275
        MockLiveActionPublisherNonBlocking.wait_all()
276
277
        # Check liveaction result.
278
        self.assertIn('tasks', liveaction.result)
279
        self.assertEqual(len(liveaction.result['tasks']), 2)
280
        self.assertNotIn('published', liveaction.result)
281
282
    def test_chain_pause_resume_with_error(self):
283
        # A temp file is created during test setup. Ensure the temp file exists.
284
        # The test action chain will stall until this file is deleted. This gives
285
        # the unit test a moment to run any test related logic.
286
        path = self.temp_file_path
287
        self.assertTrue(os.path.exists(path))
288
289
        action = TEST_PACK + '.' + 'test_pause_resume_with_error'
290
        params = {'tempfile': path, 'message': 'foobar'}
291
        liveaction = LiveActionDB(action=action, parameters=params)
292
        liveaction, execution = action_service.request(liveaction)
293
        liveaction = LiveAction.get_by_id(str(liveaction.id))
294
295
        # Wait until the liveaction is running.
296
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
297
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
298
299
        # Request action chain to pause.
300
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
301
302
        # Wait until the liveaction is pausing.
303
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
304
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
305
306
        # Delete the temporary file that the action chain is waiting on.
307
        os.remove(path)
308
        self.assertFalse(os.path.exists(path))
309
310
        # Wait until the liveaction is paused.
311
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
312
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
313
314
        # Wait for non-blocking threads to complete. Ensure runner is not running.
315
        MockLiveActionPublisherNonBlocking.wait_all()
316
317
        # Request action chain to resume.
318
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
319
320
        # Wait until the liveaction is completed.
321
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
322
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
323
324
        # Wait for non-blocking threads to complete.
325
        MockLiveActionPublisherNonBlocking.wait_all()
326
327
        # Check liveaction result.
328
        self.assertIn('tasks', liveaction.result)
329
        self.assertEqual(len(liveaction.result['tasks']), 2)
330
        self.assertTrue(liveaction.result['tasks'][0]['result']['failed'])
331
        self.assertEqual(1, liveaction.result['tasks'][0]['result']['return_code'])
332
        self.assertTrue(liveaction.result['tasks'][1]['result']['succeeded'])
333
        self.assertEqual(0, liveaction.result['tasks'][1]['result']['return_code'])
334
335
    def test_chain_pause_resume_cascade_to_subworkflow(self):
336
        # A temp file is created during test setup. Ensure the temp file exists.
337
        # The test action chain will stall until this file is deleted. This gives
338
        # the unit test a moment to run any test related logic.
339
        path = self.temp_file_path
340
        self.assertTrue(os.path.exists(path))
341
342
        action = TEST_PACK + '.' + 'test_pause_resume_with_subworkflow'
343
        params = {'tempfile': path, 'message': 'foobar'}
344
        liveaction = LiveActionDB(action=action, parameters=params)
345
        liveaction, execution = action_service.request(liveaction)
346
        liveaction = LiveAction.get_by_id(str(liveaction.id))
347
348
        # Wait until the liveaction is running.
349
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
350
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
351
352
        # Wait for subworkflow to register.
353
        execution = self._wait_for_children(execution)
354
        self.assertEqual(len(execution.children), 1)
355
356
        # Wait until the subworkflow is running.
357
        task1_exec = ActionExecution.get_by_id(execution.children[0])
358
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
359
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
360
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
361
362
        # Request action chain to pause.
363
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
364
365
        # Wait until the liveaction is pausing.
366
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
367
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
368
        self.assertEqual(len(execution.children), 1)
369
370
        # Wait until the subworkflow is pausing.
371
        task1_exec = ActionExecution.get_by_id(execution.children[0])
372
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
373
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSING)
374
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSING)
375
376
        # Delete the temporary file that the action chain is waiting on.
377
        os.remove(path)
378
        self.assertFalse(os.path.exists(path))
379
380
        # Wait until the liveaction is paused.
381
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
382
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
383
        self.assertEqual(len(execution.children), 1)
384
385
        # Wait until the subworkflow is paused.
386
        task1_exec = ActionExecution.get_by_id(execution.children[0])
387
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
388
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSED)
389
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSED)
390
391
        # Wait for non-blocking threads to complete. Ensure runner is not running.
392
        MockLiveActionPublisherNonBlocking.wait_all()
393
394
        # Check liveaction result.
395
        self.assertIn('tasks', liveaction.result)
396
        self.assertEqual(len(liveaction.result['tasks']), 1)
397
398
        subworkflow = liveaction.result['tasks'][0]
399
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
400
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
401
402
        # Request action chain to resume.
403
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
404
405
        # Wait until the liveaction is completed.
406
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
407
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
408
409
        # Wait for non-blocking threads to complete.
410
        MockLiveActionPublisherNonBlocking.wait_all()
411
412
        # Check liveaction result.
413
        self.assertIn('tasks', liveaction.result)
414
        self.assertEqual(len(liveaction.result['tasks']), 2)
415
416
        subworkflow = liveaction.result['tasks'][0]
417
        self.assertEqual(len(subworkflow['result']['tasks']), 2)
418
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_SUCCEEDED)
419
420
    def test_chain_pause_resume_cascade_to_parent_workflow(self):
421
        # A temp file is created during test setup. Ensure the temp file exists.
422
        # The test action chain will stall until this file is deleted. This gives
423
        # the unit test a moment to run any test related logic.
424
        path = self.temp_file_path
425
        self.assertTrue(os.path.exists(path))
426
427
        action = TEST_PACK + '.' + 'test_pause_resume_with_subworkflow'
428
        params = {'tempfile': path, 'message': 'foobar'}
429
        liveaction = LiveActionDB(action=action, parameters=params)
430
        liveaction, execution = action_service.request(liveaction)
431
        liveaction = LiveAction.get_by_id(str(liveaction.id))
432
433
        # Wait until the liveaction is running.
434
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
435
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
436
437
        # Wait for subworkflow to register.
438
        execution = self._wait_for_children(execution)
439
        self.assertEqual(len(execution.children), 1)
440
441
        # Wait until the subworkflow is running.
442
        task1_exec = ActionExecution.get_by_id(execution.children[0])
443
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
444
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
445
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
446
447
        # Request subworkflow to pause.
448
        task1_live, task1_exec = action_service.request_pause(task1_live, USERNAME)
449
450
        # Wait until the subworkflow is pausing.
451
        task1_exec = ActionExecution.get_by_id(execution.children[0])
452
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
453
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSING)
454
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSING)
455
456
        # Delete the temporary file that the action chain is waiting on.
457
        os.remove(path)
458
        self.assertFalse(os.path.exists(path))
459
460
        # Wait until the subworkflow is paused.
461
        task1_exec = ActionExecution.get_by_id(execution.children[0])
462
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
463
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSED)
464
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSED)
465
466
        # Wait until the parent liveaction is paused.
467
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
468
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
469
        self.assertEqual(len(execution.children), 1)
470
471
        # Wait for non-blocking threads to complete. Ensure runner is not running.
472
        MockLiveActionPublisherNonBlocking.wait_all()
473
474
        # Check liveaction result.
475
        self.assertIn('tasks', liveaction.result)
476
        self.assertEqual(len(liveaction.result['tasks']), 1)
477
478
        subworkflow = liveaction.result['tasks'][0]
479
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
480
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
481
482
        # Request subworkflow to resume.
483
        task1_live, task1_exec = action_service.request_resume(task1_live, USERNAME)
484
485
        # Wait until the subworkflow is paused.
486
        task1_exec = ActionExecution.get_by_id(execution.children[0])
487
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
488
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_SUCCEEDED)
489
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
490
491
        # The parent workflow will stay paused.
492
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
493
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
494
495
        # Wait for non-blocking threads to complete.
496
        MockLiveActionPublisherNonBlocking.wait_all()
497
498
        # Check liveaction result of the parent, which should stay the same
499
        # because only the subworkflow was resumed.
500
        self.assertIn('tasks', liveaction.result)
501
        self.assertEqual(len(liveaction.result['tasks']), 1)
502
503
        subworkflow = liveaction.result['tasks'][0]
504
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
505
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
506
507
        # Request parent workflow to resume.
508
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
509
510
        # Wait until the liveaction is completed.
511
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
512
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
513
514
        # Wait for non-blocking threads to complete.
515
        MockLiveActionPublisherNonBlocking.wait_all()
516
517
        # Check liveaction result.
518
        self.assertIn('tasks', liveaction.result)
519
        self.assertEqual(len(liveaction.result['tasks']), 2)
520
521
        subworkflow = liveaction.result['tasks'][0]
522
        self.assertEqual(len(subworkflow['result']['tasks']), 2)
523
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_SUCCEEDED)
524
525
    def test_chain_pause_resume_with_context_access(self):
526
        # A temp file is created during test setup. Ensure the temp file exists.
527
        # The test action chain will stall until this file is deleted. This gives
528
        # the unit test a moment to run any test related logic.
529
        path = self.temp_file_path
530
        self.assertTrue(os.path.exists(path))
531
532
        action = TEST_PACK + '.' + 'test_pause_resume_with_context_access'
533
        params = {'tempfile': path, 'message': 'foobar'}
534
        liveaction = LiveActionDB(action=action, parameters=params)
535
        liveaction, execution = action_service.request(liveaction)
536
        liveaction = LiveAction.get_by_id(str(liveaction.id))
537
538
        # Wait until the liveaction is running.
539
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
540
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
541
542
        # Request action chain to pause.
543
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
544
545
        # Wait until the liveaction is pausing.
546
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
547
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
548
549
        # Delete the temporary file that the action chain is waiting on.
550
        os.remove(path)
551
        self.assertFalse(os.path.exists(path))
552
553
        # Wait until the liveaction is paused.
554
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
555
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
556
557
        # Wait for non-blocking threads to complete. Ensure runner is not running.
558
        MockLiveActionPublisherNonBlocking.wait_all()
559
560
        # Request action chain to resume.
561
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
562
563
        # Wait until the liveaction is completed.
564
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
565
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
566
567
        # Wait for non-blocking threads to complete.
568
        MockLiveActionPublisherNonBlocking.wait_all()
569
570
        # Check liveaction result.
571
        self.assertIn('tasks', liveaction.result)
572
        self.assertEqual(len(liveaction.result['tasks']), 3)
573
        self.assertEqual(liveaction.result['tasks'][2]['result']['stdout'], 'foobar')
574
575
    def test_chain_pause_resume_with_init_vars(self):
576
        # A temp file is created during test setup. Ensure the temp file exists.
577
        # The test action chain will stall until this file is deleted. This gives
578
        # the unit test a moment to run any test related logic.
579
        path = self.temp_file_path
580
        self.assertTrue(os.path.exists(path))
581
582
        action = TEST_PACK + '.' + 'test_pause_resume_with_init_vars'
583
        params = {'tempfile': path, 'message': 'foobar'}
584
        liveaction = LiveActionDB(action=action, parameters=params)
585
        liveaction, execution = action_service.request(liveaction)
586
        liveaction = LiveAction.get_by_id(str(liveaction.id))
587
588
        # Wait until the liveaction is running.
589
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
590
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
591
592
        # Request action chain to pause.
593
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
594
595
        # Wait until the liveaction is pausing.
596
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
597
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING)
598
599
        # Delete the temporary file that the action chain is waiting on.
600
        os.remove(path)
601
        self.assertFalse(os.path.exists(path))
602
603
        # Wait until the liveaction is paused.
604
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
605
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
606
607
        # Wait for non-blocking threads to complete. Ensure runner is not running.
608
        MockLiveActionPublisherNonBlocking.wait_all()
609
610
        # Request action chain to resume.
611
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
612
613
        # Wait until the liveaction is completed.
614
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
615
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
616
617
        # Wait for non-blocking threads to complete.
618
        MockLiveActionPublisherNonBlocking.wait_all()
619
620
        # Check liveaction result.
621
        self.assertIn('tasks', liveaction.result)
622
        self.assertEqual(len(liveaction.result['tasks']), 2)
623
        self.assertEqual(liveaction.result['tasks'][1]['result']['stdout'], 'FOOBAR')
624