Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

tests/unit/test_actionchain_pause_resume.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import eventlet
18
import mock
19
import os
20
import tempfile
21
22
from st2common.bootstrap import actionsregistrar
23
from st2common.bootstrap import runnersregistrar
24
25
from st2common.constants import action as action_constants
26
from st2common.models.db.liveaction import LiveActionDB
27
from st2common.persistence.execution import ActionExecution
28
from st2common.persistence.liveaction import LiveAction
29
from st2common.services import action as action_service
30
from st2common.transport.liveaction import LiveActionPublisher
31
from st2common.transport.publishers import CUDPublisher
32
from st2common.util import action_db as action_utils
33
from st2common.util import date as date_utils
34
35
from st2tests import DbTestCase
36
from st2tests import fixturesloader
37
from st2tests.mocks.liveaction import MockLiveActionPublisherNonBlocking
38
from six.moves import range
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in range.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
39
40
41
TEST_FIXTURES = {
42
    'chains': [
43
        'test_pause_resume.yaml',
44
        'test_pause_resume_context_result',
45
        'test_pause_resume_with_published_vars.yaml',
46
        'test_pause_resume_with_error.yaml',
47
        'test_pause_resume_with_subworkflow.yaml',
48
        'test_pause_resume_with_context_access.yaml',
49
        'test_pause_resume_with_init_vars.yaml',
50
        'test_pause_resume_with_no_more_task.yaml',
51
        'test_pause_resume_last_task_failed_with_no_next_task.yaml'
52
    ],
53
    'actions': [
54
        'test_pause_resume.yaml',
55
        'test_pause_resume_context_result',
56
        'test_pause_resume_with_published_vars.yaml',
57
        'test_pause_resume_with_error.yaml',
58
        'test_pause_resume_with_subworkflow.yaml',
59
        'test_pause_resume_with_context_access.yaml',
60
        'test_pause_resume_with_init_vars.yaml',
61
        'test_pause_resume_with_no_more_task.yaml',
62
        'test_pause_resume_last_task_failed_with_no_next_task.yaml'
63
    ]
64
}
65
66
TEST_PACK = 'action_chain_tests'
67
TEST_PACK_PATH = fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
68
69
PACKS = [
70
    TEST_PACK_PATH,
71
    fixturesloader.get_fixtures_packs_base_path() + '/core'
72
]
73
74
USERNAME = 'stanley'
75
76
77
@mock.patch.object(
78
    CUDPublisher,
79
    'publish_update',
80
    mock.MagicMock(return_value=None))
81
@mock.patch.object(
82
    CUDPublisher,
83
    'publish_create',
84
    mock.MagicMock(return_value=None))
85
@mock.patch.object(
86
    LiveActionPublisher,
87
    'publish_state',
88
    mock.MagicMock(side_effect=MockLiveActionPublisherNonBlocking.publish_state))
89
class ActionChainRunnerPauseResumeTest(DbTestCase):
90
91
    temp_file_path = None
92
93
    @classmethod
94
    def setUpClass(cls):
95
        super(ActionChainRunnerPauseResumeTest, cls).setUpClass()
96
97
        # Register runners.
98
        runnersregistrar.register_runners()
99
100
        # Register test pack(s).
101
        actions_registrar = actionsregistrar.ActionsRegistrar(
102
            use_pack_cache=False,
103
            fail_on_failure=True
104
        )
105
106
        for pack in PACKS:
107
            actions_registrar.register_from_pack(pack)
108
109
    def setUp(self):
110
        super(ActionChainRunnerPauseResumeTest, self).setUp()
111
112
        # Create temporary directory used by the tests
113
        _, self.temp_file_path = tempfile.mkstemp()
114
        os.chmod(self.temp_file_path, 0o755)   # nosec
115
116
    def tearDown(self):
117
        if self.temp_file_path and os.path.exists(self.temp_file_path):
118
            os.remove(self.temp_file_path)
119
120
        super(ActionChainRunnerPauseResumeTest, self).tearDown()
121
122
    def _wait_for_status(self, liveaction, status, interval=0.1, retries=100):
123
        # Wait until the liveaction reaches status.
124
        for i in range(0, retries):
125
            liveaction = LiveAction.get_by_id(str(liveaction.id))
126
            if liveaction.status != status:
127
                eventlet.sleep(interval)
128
                continue
129
            else:
130
                break
131
132
        return liveaction
133
134
    def _wait_for_children(self, execution, interval=0.1, retries=100):
135
        # Wait until the execution has children.
136
        for i in range(0, retries):
137
            execution = ActionExecution.get_by_id(str(execution.id))
138
            if len(getattr(execution, 'children', [])) <= 0:
139
                eventlet.sleep(interval)
140
                continue
141
142
        return execution
143
144
    def test_chain_pause_resume(self):
145
        # A temp file is created during test setup. Ensure the temp file exists.
146
        # The test action chain will stall until this file is deleted. This gives
147
        # the unit test a moment to run any test related logic.
148
        path = self.temp_file_path
149
        self.assertTrue(os.path.exists(path))
150
151
        action = TEST_PACK + '.' + 'test_pause_resume'
152
        params = {'tempfile': path, 'message': 'foobar'}
153
        liveaction = LiveActionDB(action=action, parameters=params)
154
        liveaction, execution = action_service.request(liveaction)
155
        liveaction = LiveAction.get_by_id(str(liveaction.id))
156
157
        # Wait until the liveaction is running.
158
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
159
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
160
161
        # Request action chain to pause.
162
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
163
164
        # Wait until the liveaction is pausing.
165
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
166
        extra_info = str(liveaction)
167
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
168
169
        # Delete the temporary file that the action chain is waiting on.
170
        os.remove(path)
171
        self.assertFalse(os.path.exists(path))
172
173
        # Wait until the liveaction is paused.
174
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
175
        extra_info = str(liveaction)
176
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
177
178
        # Wait for non-blocking threads to complete. Ensure runner is not running.
179
        MockLiveActionPublisherNonBlocking.wait_all()
180
181
        # Request action chain to resume.
182
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
183
184
        # Wait until the liveaction is completed.
185
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
186
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
187
188
        # Wait for non-blocking threads to complete.
189
        MockLiveActionPublisherNonBlocking.wait_all()
190
191
        # Check liveaction result.
192
        self.assertIn('tasks', liveaction.result)
193
        self.assertEqual(len(liveaction.result['tasks']), 2)
194
195
    def test_chain_pause_resume_with_published_vars(self):
196
        # A temp file is created during test setup. Ensure the temp file exists.
197
        # The test action chain will stall until this file is deleted. This gives
198
        # the unit test a moment to run any test related logic.
199
        path = self.temp_file_path
200
        self.assertTrue(os.path.exists(path))
201
202
        action = TEST_PACK + '.' + 'test_pause_resume_with_published_vars'
203
        params = {'tempfile': path, 'message': 'foobar'}
204
        liveaction = LiveActionDB(action=action, parameters=params)
205
        liveaction, execution = action_service.request(liveaction)
206
        liveaction = LiveAction.get_by_id(str(liveaction.id))
207
208
        # Wait until the liveaction is running.
209
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
210
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
211
212
        # Request action chain to pause.
213
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
214
215
        # Wait until the liveaction is pausing.
216
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
217
        extra_info = str(liveaction)
218
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
219
220
        # Delete the temporary file that the action chain is waiting on.
221
        os.remove(path)
222
        self.assertFalse(os.path.exists(path))
223
224
        # Wait until the liveaction is paused.
225
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
226
        extra_info = str(liveaction)
227
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
228
229
        # Wait for non-blocking threads to complete. Ensure runner is not running.
230
        MockLiveActionPublisherNonBlocking.wait_all()
231
232
        # Request action chain to resume.
233
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
234
235
        # Wait until the liveaction is completed.
236
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
237
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
238
239
        # Wait for non-blocking threads to complete.
240
        MockLiveActionPublisherNonBlocking.wait_all()
241
242
        # Check liveaction result.
243
        self.assertIn('tasks', liveaction.result)
244
        self.assertEqual(len(liveaction.result['tasks']), 2)
245
        self.assertIn('published', liveaction.result)
246
        self.assertDictEqual({'var1': 'foobar', 'var2': 'fubar'}, liveaction.result['published'])
247
248
    def test_chain_pause_resume_with_published_vars_display_false(self):
249
        # A temp file is created during test setup. Ensure the temp file exists.
250
        # The test action chain will stall until this file is deleted. This gives
251
        # the unit test a moment to run any test related logic.
252
        path = self.temp_file_path
253
        self.assertTrue(os.path.exists(path))
254
255
        action = TEST_PACK + '.' + 'test_pause_resume_with_published_vars'
256
        params = {'tempfile': path, 'message': 'foobar', 'display_published': False}
257
        liveaction = LiveActionDB(action=action, parameters=params)
258
        liveaction, execution = action_service.request(liveaction)
259
        liveaction = LiveAction.get_by_id(str(liveaction.id))
260
261
        # Wait until the liveaction is running.
262
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
263
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
264
265
        # Request action chain to pause.
266
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
267
268
        # Wait until the liveaction is pausing.
269
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
270
        extra_info = str(liveaction)
271
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
272
273
        # Delete the temporary file that the action chain is waiting on.
274
        os.remove(path)
275
        self.assertFalse(os.path.exists(path))
276
277
        # Wait until the liveaction is paused.
278
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
279
        extra_info = str(liveaction)
280
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
281
282
        # Wait for non-blocking threads to complete. Ensure runner is not running.
283
        MockLiveActionPublisherNonBlocking.wait_all()
284
285
        # Request action chain to resume.
286
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
287
288
        # Wait until the liveaction is completed.
289
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
290
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
291
292
        # Wait for non-blocking threads to complete.
293
        MockLiveActionPublisherNonBlocking.wait_all()
294
295
        # Check liveaction result.
296
        self.assertIn('tasks', liveaction.result)
297
        self.assertEqual(len(liveaction.result['tasks']), 2)
298
        self.assertNotIn('published', liveaction.result)
299
300
    def test_chain_pause_resume_with_error(self):
301
        # A temp file is created during test setup. Ensure the temp file exists.
302
        # The test action chain will stall until this file is deleted. This gives
303
        # the unit test a moment to run any test related logic.
304
        path = self.temp_file_path
305
        self.assertTrue(os.path.exists(path))
306
307
        action = TEST_PACK + '.' + 'test_pause_resume_with_error'
308
        params = {'tempfile': path, 'message': 'foobar'}
309
        liveaction = LiveActionDB(action=action, parameters=params)
310
        liveaction, execution = action_service.request(liveaction)
311
        liveaction = LiveAction.get_by_id(str(liveaction.id))
312
313
        # Wait until the liveaction is running.
314
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
315
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
316
317
        # Request action chain to pause.
318
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
319
320
        # Wait until the liveaction is pausing.
321
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
322
        extra_info = str(liveaction)
323
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
324
325
        # Delete the temporary file that the action chain is waiting on.
326
        os.remove(path)
327
        self.assertFalse(os.path.exists(path))
328
329
        # Wait until the liveaction is paused.
330
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
331
        extra_info = str(liveaction)
332
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
333
334
        # Wait for non-blocking threads to complete. Ensure runner is not running.
335
        MockLiveActionPublisherNonBlocking.wait_all()
336
337
        # Request action chain to resume.
338
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
339
340
        # Wait until the liveaction is completed.
341
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
342
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
343
344
        # Wait for non-blocking threads to complete.
345
        MockLiveActionPublisherNonBlocking.wait_all()
346
347
        # Check liveaction result.
348
        self.assertIn('tasks', liveaction.result)
349
        self.assertEqual(len(liveaction.result['tasks']), 2)
350
        self.assertTrue(liveaction.result['tasks'][0]['result']['failed'])
351
        self.assertEqual(1, liveaction.result['tasks'][0]['result']['return_code'])
352
        self.assertTrue(liveaction.result['tasks'][1]['result']['succeeded'])
353
        self.assertEqual(0, liveaction.result['tasks'][1]['result']['return_code'])
354
355
    def test_chain_pause_resume_cascade_to_subworkflow(self):
356
        # A temp file is created during test setup. Ensure the temp file exists.
357
        # The test action chain will stall until this file is deleted. This gives
358
        # the unit test a moment to run any test related logic.
359
        path = self.temp_file_path
360
        self.assertTrue(os.path.exists(path))
361
362
        action = TEST_PACK + '.' + 'test_pause_resume_with_subworkflow'
363
        params = {'tempfile': path, 'message': 'foobar'}
364
        liveaction = LiveActionDB(action=action, parameters=params)
365
        liveaction, execution = action_service.request(liveaction)
366
        liveaction = LiveAction.get_by_id(str(liveaction.id))
367
368
        # Wait until the liveaction is running.
369
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
370
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
371
372
        # Wait for subworkflow to register.
373
        execution = self._wait_for_children(execution)
374
        self.assertEqual(len(execution.children), 1)
375
376
        # Wait until the subworkflow is running.
377
        task1_exec = ActionExecution.get_by_id(execution.children[0])
378
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
379
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
380
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
381
382
        # Request action chain to pause.
383
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
384
385
        # Wait until the liveaction is pausing.
386
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
387
        extra_info = str(liveaction)
388
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
389
        self.assertEqual(len(execution.children), 1)
390
391
        # Wait until the subworkflow is pausing.
392
        task1_exec = ActionExecution.get_by_id(execution.children[0])
393
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
394
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSING)
395
        extra_info = str(task1_live)
396
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
397
398
        # Delete the temporary file that the action chain is waiting on.
399
        os.remove(path)
400
        self.assertFalse(os.path.exists(path))
401
402
        # Wait until the liveaction is paused.
403
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
404
        extra_info = str(liveaction)
405
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
406
        self.assertEqual(len(execution.children), 1)
407
408
        # Wait until the subworkflow is paused.
409
        task1_exec = ActionExecution.get_by_id(execution.children[0])
410
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
411
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSED)
412
        extra_info = str(task1_live)
413
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
414
415
        # Wait for non-blocking threads to complete. Ensure runner is not running.
416
        MockLiveActionPublisherNonBlocking.wait_all()
417
418
        # Check liveaction result.
419
        self.assertIn('tasks', liveaction.result)
420
        self.assertEqual(len(liveaction.result['tasks']), 1)
421
422
        subworkflow = liveaction.result['tasks'][0]
423
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
424
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
425
426
        # Request action chain to resume.
427
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
428
429
        # Wait until the liveaction is completed.
430
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
431
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
432
433
        # Wait for non-blocking threads to complete.
434
        MockLiveActionPublisherNonBlocking.wait_all()
435
436
        # Check liveaction result.
437
        self.assertIn('tasks', liveaction.result)
438
        self.assertEqual(len(liveaction.result['tasks']), 2)
439
440
        subworkflow = liveaction.result['tasks'][0]
441
        self.assertEqual(len(subworkflow['result']['tasks']), 2)
442
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_SUCCEEDED)
443
444
    def test_chain_pause_resume_cascade_to_parent_workflow(self):
445
        # A temp file is created during test setup. Ensure the temp file exists.
446
        # The test action chain will stall until this file is deleted. This gives
447
        # the unit test a moment to run any test related logic.
448
        path = self.temp_file_path
449
        self.assertTrue(os.path.exists(path))
450
451
        action = TEST_PACK + '.' + 'test_pause_resume_with_subworkflow'
452
        params = {'tempfile': path, 'message': 'foobar'}
453
        liveaction = LiveActionDB(action=action, parameters=params)
454
        liveaction, execution = action_service.request(liveaction)
455
        liveaction = LiveAction.get_by_id(str(liveaction.id))
456
457
        # Wait until the liveaction is running.
458
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
459
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
460
461
        # Wait for subworkflow to register.
462
        execution = self._wait_for_children(execution)
463
        self.assertEqual(len(execution.children), 1)
464
465
        # Wait until the subworkflow is running.
466
        task1_exec = ActionExecution.get_by_id(execution.children[0])
467
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
468
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_RUNNING)
469
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_RUNNING)
470
471
        # Request subworkflow to pause.
472
        task1_live, task1_exec = action_service.request_pause(task1_live, USERNAME)
473
474
        # Wait until the subworkflow is pausing.
475
        task1_exec = ActionExecution.get_by_id(execution.children[0])
476
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
477
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSING)
478
        extra_info = str(task1_live)
479
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
480
481
        # Delete the temporary file that the action chain is waiting on.
482
        os.remove(path)
483
        self.assertFalse(os.path.exists(path))
484
485
        # Wait until the subworkflow is paused.
486
        task1_exec = ActionExecution.get_by_id(execution.children[0])
487
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
488
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_PAUSED)
489
        extra_info = str(task1_live)
490
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
491
492
        # Wait until the parent liveaction is paused.
493
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
494
        extra_info = str(liveaction)
495
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
496
        self.assertEqual(len(execution.children), 1)
497
498
        # Wait for non-blocking threads to complete. Ensure runner is not running.
499
        MockLiveActionPublisherNonBlocking.wait_all()
500
501
        # Check liveaction result.
502
        self.assertIn('tasks', liveaction.result)
503
        self.assertEqual(len(liveaction.result['tasks']), 1)
504
505
        subworkflow = liveaction.result['tasks'][0]
506
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
507
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
508
509
        # Request subworkflow to resume.
510
        task1_live, task1_exec = action_service.request_resume(task1_live, USERNAME)
511
512
        # Wait until the subworkflow is paused.
513
        task1_exec = ActionExecution.get_by_id(execution.children[0])
514
        task1_live = LiveAction.get_by_id(task1_exec.liveaction['id'])
515
        task1_live = self._wait_for_status(task1_live, action_constants.LIVEACTION_STATUS_SUCCEEDED)
516
        self.assertEqual(task1_live.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
517
518
        # The parent workflow will stay paused.
519
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
520
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED)
521
522
        # Wait for non-blocking threads to complete.
523
        MockLiveActionPublisherNonBlocking.wait_all()
524
525
        # Check liveaction result of the parent, which should stay the same
526
        # because only the subworkflow was resumed.
527
        self.assertIn('tasks', liveaction.result)
528
        self.assertEqual(len(liveaction.result['tasks']), 1)
529
530
        subworkflow = liveaction.result['tasks'][0]
531
        self.assertEqual(len(subworkflow['result']['tasks']), 1)
532
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_PAUSED)
533
534
        # Request parent workflow to resume.
535
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
536
537
        # Wait until the liveaction is completed.
538
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
539
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
540
541
        # Wait for non-blocking threads to complete.
542
        MockLiveActionPublisherNonBlocking.wait_all()
543
544
        # Check liveaction result.
545
        self.assertIn('tasks', liveaction.result)
546
        self.assertEqual(len(liveaction.result['tasks']), 2)
547
548
        subworkflow = liveaction.result['tasks'][0]
549
        self.assertEqual(len(subworkflow['result']['tasks']), 2)
550
        self.assertEqual(subworkflow['state'], action_constants.LIVEACTION_STATUS_SUCCEEDED)
551
552
    def test_chain_pause_resume_with_context_access(self):
553
        # A temp file is created during test setup. Ensure the temp file exists.
554
        # The test action chain will stall until this file is deleted. This gives
555
        # the unit test a moment to run any test related logic.
556
        path = self.temp_file_path
557
        self.assertTrue(os.path.exists(path))
558
559
        action = TEST_PACK + '.' + 'test_pause_resume_with_context_access'
560
        params = {'tempfile': path, 'message': 'foobar'}
561
        liveaction = LiveActionDB(action=action, parameters=params)
562
        liveaction, execution = action_service.request(liveaction)
563
        liveaction = LiveAction.get_by_id(str(liveaction.id))
564
565
        # Wait until the liveaction is running.
566
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
567
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
568
569
        # Request action chain to pause.
570
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
571
572
        # Wait until the liveaction is pausing.
573
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
574
        extra_info = str(liveaction)
575
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
576
577
        # Delete the temporary file that the action chain is waiting on.
578
        os.remove(path)
579
        self.assertFalse(os.path.exists(path))
580
581
        # Wait until the liveaction is paused.
582
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
583
        extra_info = str(liveaction)
584
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
585
586
        # Wait for non-blocking threads to complete. Ensure runner is not running.
587
        MockLiveActionPublisherNonBlocking.wait_all()
588
589
        # Request action chain to resume.
590
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
591
592
        # Wait until the liveaction is completed.
593
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
594
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
595
596
        # Wait for non-blocking threads to complete.
597
        MockLiveActionPublisherNonBlocking.wait_all()
598
599
        # Check liveaction result.
600
        self.assertIn('tasks', liveaction.result)
601
        self.assertEqual(len(liveaction.result['tasks']), 3)
602
        self.assertEqual(liveaction.result['tasks'][2]['result']['stdout'], 'foobar')
603
604
    def test_chain_pause_resume_with_init_vars(self):
605
        # A temp file is created during test setup. Ensure the temp file exists.
606
        # The test action chain will stall until this file is deleted. This gives
607
        # the unit test a moment to run any test related logic.
608
        path = self.temp_file_path
609
        self.assertTrue(os.path.exists(path))
610
611
        action = TEST_PACK + '.' + 'test_pause_resume_with_init_vars'
612
        params = {'tempfile': path, 'message': 'foobar'}
613
        liveaction = LiveActionDB(action=action, parameters=params)
614
        liveaction, execution = action_service.request(liveaction)
615
        liveaction = LiveAction.get_by_id(str(liveaction.id))
616
617
        # Wait until the liveaction is running.
618
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
619
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
620
621
        # Request action chain to pause.
622
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
623
624
        # Wait until the liveaction is pausing.
625
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
626
        extra_info = str(liveaction)
627
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
628
629
        # Delete the temporary file that the action chain is waiting on.
630
        os.remove(path)
631
        self.assertFalse(os.path.exists(path))
632
633
        # Wait until the liveaction is paused.
634
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
635
        extra_info = str(liveaction)
636
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
637
638
        # Wait for non-blocking threads to complete. Ensure runner is not running.
639
        MockLiveActionPublisherNonBlocking.wait_all()
640
641
        # Request action chain to resume.
642
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
643
644
        # Wait until the liveaction is completed.
645
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
646
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
647
648
        # Wait for non-blocking threads to complete.
649
        MockLiveActionPublisherNonBlocking.wait_all()
650
651
        # Check liveaction result.
652
        self.assertIn('tasks', liveaction.result)
653
        self.assertEqual(len(liveaction.result['tasks']), 2)
654
        self.assertEqual(liveaction.result['tasks'][1]['result']['stdout'], 'FOOBAR')
655
656
    def test_chain_pause_resume_with_no_more_task(self):
657
        # A temp file is created during test setup. Ensure the temp file exists.
658
        # The test action chain will stall until this file is deleted. This gives
659
        # the unit test a moment to run any test related logic.
660
        path = self.temp_file_path
661
        self.assertTrue(os.path.exists(path))
662
663
        action = TEST_PACK + '.' + 'test_pause_resume_with_no_more_task'
664
        params = {'tempfile': path, 'message': 'foobar'}
665
        liveaction = LiveActionDB(action=action, parameters=params)
666
        liveaction, execution = action_service.request(liveaction)
667
        liveaction = LiveAction.get_by_id(str(liveaction.id))
668
669
        # Wait until the liveaction is running.
670
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
671
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
672
673
        # Request action chain to pause.
674
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
675
676
        # Wait until the liveaction is pausing.
677
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
678
        extra_info = str(liveaction)
679
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
680
681
        # Delete the temporary file that the action chain is waiting on.
682
        os.remove(path)
683
        self.assertFalse(os.path.exists(path))
684
685
        # Wait until the liveaction is paused.
686
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
687
        extra_info = str(liveaction)
688
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
689
690
        # Wait for non-blocking threads to complete. Ensure runner is not running.
691
        MockLiveActionPublisherNonBlocking.wait_all()
692
693
        # Request action chain to resume.
694
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
695
696
        # Wait until the liveaction is completed.
697
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
698
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
699
700
        # Wait for non-blocking threads to complete.
701
        MockLiveActionPublisherNonBlocking.wait_all()
702
703
        # Check liveaction result.
704
        self.assertIn('tasks', liveaction.result)
705
        self.assertEqual(len(liveaction.result['tasks']), 1)
706
707
    def test_chain_pause_resume_last_task_failed_with_no_next_task(self):
708
        # A temp file is created during test setup. Ensure the temp file exists.
709
        # The test action chain will stall until this file is deleted. This gives
710
        # the unit test a moment to run any test related logic.
711
        path = self.temp_file_path
712
        self.assertTrue(os.path.exists(path))
713
714
        action = TEST_PACK + '.' + 'test_pause_resume_last_task_failed_with_no_next_task'
715
        params = {'tempfile': path, 'message': 'foobar'}
716
        liveaction = LiveActionDB(action=action, parameters=params)
717
        liveaction, execution = action_service.request(liveaction)
718
        liveaction = LiveAction.get_by_id(str(liveaction.id))
719
720
        # Wait until the liveaction is running.
721
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_RUNNING)
722
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_RUNNING)
723
724
        # Request action chain to pause.
725
        liveaction, execution = action_service.request_pause(liveaction, USERNAME)
726
727
        # Wait until the liveaction is pausing.
728
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSING)
729
        extra_info = str(liveaction)
730
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSING, extra_info)
731
732
        # Delete the temporary file that the action chain is waiting on.
733
        os.remove(path)
734
        self.assertFalse(os.path.exists(path))
735
736
        # Wait until the liveaction is paused.
737
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
738
        extra_info = str(liveaction)
739
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
740
741
        # Wait for non-blocking threads to complete. Ensure runner is not running.
742
        MockLiveActionPublisherNonBlocking.wait_all()
743
744
        # Request action chain to resume.
745
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
746
747
        # Wait until the liveaction is completed.
748
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_FAILED)
749
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_FAILED)
750
751
        # Wait for non-blocking threads to complete.
752
        MockLiveActionPublisherNonBlocking.wait_all()
753
754
        # Check liveaction result.
755
        self.assertIn('tasks', liveaction.result)
756
        self.assertEqual(len(liveaction.result['tasks']), 1)
757
758
        self.assertEqual(
759
            liveaction.result['tasks'][0]['state'],
760
            action_constants.LIVEACTION_STATUS_FAILED
761
        )
762
763
    def test_chain_pause_resume_status_change(self):
764
        # Tests context_result is updated when last task's status changes between pause and resume
765
766
        action = TEST_PACK + '.' + 'test_pause_resume_context_result'
767
        liveaction = LiveActionDB(action=action)
768
        liveaction, execution = action_service.request(liveaction)
769
        liveaction = LiveAction.get_by_id(str(liveaction.id))
770
771
        # Wait until the liveaction is paused.
772
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_PAUSED)
773
        extra_info = str(liveaction)
774
        self.assertEqual(liveaction.status, action_constants.LIVEACTION_STATUS_PAUSED, extra_info)
775
776
        # Wait for non-blocking threads to complete. Ensure runner is not running.
777
        MockLiveActionPublisherNonBlocking.wait_all()
778
779
        last_task_liveaction_id = liveaction.result['tasks'][-1]['liveaction_id']
780
781
        action_utils.update_liveaction_status(
782
            status=action_constants.LIVEACTION_STATUS_SUCCEEDED,
783
            end_timestamp=date_utils.get_datetime_utc_now(),
784
            result={'foo': 'bar'},
785
            liveaction_id=last_task_liveaction_id
786
        )
787
788
        # Request action chain to resume.
789
        liveaction, execution = action_service.request_resume(liveaction, USERNAME)
790
791
        # Wait until the liveaction is completed.
792
        liveaction = self._wait_for_status(liveaction, action_constants.LIVEACTION_STATUS_SUCCEEDED)
793
794
        self.assertEqual(
795
            liveaction.status,
796
            action_constants.LIVEACTION_STATUS_SUCCEEDED,
797
            str(liveaction)
798
        )
799
800
        # Wait for non-blocking threads to complete.
801
        MockLiveActionPublisherNonBlocking.wait_all()
802
803
        # Check liveaction result.
804
        self.assertIn('tasks', liveaction.result)
805
        self.assertEqual(len(liveaction.result['tasks']), 2)
806
        self.assertEqual(liveaction.result['tasks'][0]['result']['foo'], 'bar')
807