Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

WiringTest   D

Complexity

Total Complexity 60

Size/Duplication

Total Lines 576
Duplicated Lines 0 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
c 5
b 0
f 0
dl 0
loc 576
rs 4.2857
wmc 60

29 Methods

Rating   Name   Duplication   Size   Complexity  
A setUp() 0 6 1
A test_basic_workbook() 0 5 1
A test_complex_workbook_subflow_actions() 0 7 1
A test_with_items() 0 6 1
A tearDown() 0 6 4
A test_concurrent_load() 0 11 3
A test_execution_failure() 0 4 1
A test_basic_workflow() 0 5 1
A test_complex_workbook_with_jinja() 0 6 1
A test_complex_workbook_with_yaql() 0 6 1
A test_basic_rerun_task() 0 20 3
A test_basic_rerun() 0 20 3
A test_task_cancellation() 0 16 3
B test_rerun_subflow_and_resume_with_items_task() 0 33 3
B test_rerun_subflow_and_reset_with_items_task() 0 28 3
B test_basic_rerun_and_resume_with_items_task() 0 33 3
B test_basic_rerun_and_reset_with_items_task() 0 28 3
A test_rerun_subflow_task() 0 21 3
B test_cancellation_cascade_subworkflow_action() 0 29 3
A test_cancellation() 0 12 1
A test_resume_auto_pause() 0 13 1
B test_pause_resume_cascade_workbook_subworkflow() 0 40 3
A test_resume_auto_pause_cascade_subworkflow_action() 0 14 1
B test_pause_resume_cascade_subworkflow_action() 0 42 3
B test_pause_resume() 0 30 1
A test_resume_auto_pause_cascade_workbook_subworkflow() 0 14 1
A test_invoke_from_action_chain() 0 3 1
B test_pause_resume_cascade_to_subchain() 0 42 3
B test_pause_resume_cascade_subworkflow_from_chain() 0 44 3

How to fix   Complexity   

Complex Class

Complex classes like WiringTest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
import eventlet
21
22
from integration.mistral import base
23
24
25
class WiringTest(base.TestWorkflowExecution):
26
27
    temp_dir_path = None
28
29
    def setUp(self):
30
        super(WiringTest, self).setUp()
31
32
        # Create temporary directory used by the tests
33
        _, self.temp_dir_path = tempfile.mkstemp()
34
        os.chmod(self.temp_dir_path, 0755)   # nosec
35
36
    def tearDown(self):
37
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
38
            if os.path.isdir(self.temp_dir_path):
39
                shutil.rmtree(self.temp_dir_path)
40
            else:
41
                os.remove(self.temp_dir_path)
42
43
    def test_basic_workflow(self):
44
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
45
        execution = self._wait_for_completion(execution)
46
        self._assert_success(execution, num_tasks=1)
47
        self.assertIn('stdout', execution.result)
48
49
    def test_basic_workbook(self):
50
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
51
        execution = self._wait_for_completion(execution)
52
        self._assert_success(execution, num_tasks=1)
53
        self.assertIn('stdout', execution.result)
54
55
    def test_complex_workbook_with_yaql(self):
56
        execution = self._execute_workflow(
57
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
58
        execution = self._wait_for_completion(execution)
59
        self._assert_success(execution, num_tasks=8)
60
        self.assertIn('vm_id', execution.result)
61
62
    def test_complex_workbook_with_jinja(self):
63
        execution = self._execute_workflow(
64
            'examples.mistral-jinja-workbook-complex', {'vm_name': 'demo2'})
65
        execution = self._wait_for_completion(execution)
66
        self._assert_success(execution, num_tasks=8)
67
        self.assertIn('vm_id', execution.result)
68
69
    def test_complex_workbook_subflow_actions(self):
70
        execution = self._execute_workflow(
71
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
72
        execution = self._wait_for_completion(execution)
73
        self._assert_success(execution, num_tasks=2)
74
        self.assertIn('tagline', execution.result)
75
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
76
77
    def test_with_items(self):
78
        params = {'cmd': 'date', 'count': 8}
79
        execution = self._execute_workflow('examples.mistral-repeat', params)
80
        execution = self._wait_for_completion(execution)
81
        self._assert_success(execution, num_tasks=1)
82
        self.assertEqual(len(execution.result['result']), params['count'])
83
84
    def test_concurrent_load(self):
85
        wf_name = 'examples.mistral-workbook-complex'
86
        wf_params = {'vm_name': 'demo1'}
87
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
88
89
        eventlet.sleep(30)
90
91
        for execution in executions:
92
            e = self._wait_for_completion(execution)
93
            self._assert_success(e, num_tasks=8)
94
            self.assertIn('vm_id', e.result)
95
96
    def test_execution_failure(self):
97
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
98
        execution = self._wait_for_completion(execution)
99
        self._assert_failure(execution)
100
101
    def test_cancellation(self):
102
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 10})
103
        execution = self._wait_for_state(execution, ['running'])
104
        self.st2client.liveactions.delete(execution)
105
106
        execution = self._wait_for_completion(
107
            execution,
108
            expect_tasks=False,
109
            expect_tasks_completed=False
110
        )
111
112
        self._assert_canceled(execution, are_tasks_completed=False)
113
114
    def test_cancellation_cascade_subworkflow_action(self):
115
        execution = self._execute_workflow(
116
            'examples.mistral-test-cancel-subworkflow-action',
117
            {'sleep': 30}
118
        )
119
120
        execution = self._wait_for_state(execution, ['running'])
121
        self.st2client.liveactions.delete(execution)
122
123
        execution = self._wait_for_completion(
124
            execution,
125
            expect_tasks=False,
126
            expect_tasks_completed=False
127
        )
128
129
        self.assertEqual(execution.status, 'canceled')
130
131
        task_executions = [e for e in self.st2client.liveactions.get_all()
132
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
133
134
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
135
136
        subworkflow_execution = self._wait_for_completion(
137
            subworkflow_execution,
138
            expect_tasks=False,
139
            expect_tasks_completed=False
140
        )
141
142
        self.assertEqual(execution.status, 'canceled')
143
144
    def test_task_cancellation(self):
145
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 30})
146
        execution = self._wait_for_state(execution, ['running'])
147
148
        task_executions = [e for e in self.st2client.liveactions.get_all()
149
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
150
151
        self.assertGreater(len(task_executions), 0)
152
153
        self.st2client.liveactions.delete(task_executions[0])
154
        execution = self._wait_for_completion(execution, expect_tasks_completed=True)
155
        self._assert_canceled(execution, are_tasks_completed=True)
156
157
        task_results = execution.result.get('tasks', [])
158
        self.assertGreater(len(task_results), 0)
159
        self.assertEqual(task_results[0]['state'], 'CANCELLED')
160
161
    def test_basic_rerun(self):
162
        path = self.temp_dir_path
163
164
        with open(path, 'w') as f:
165
            f.write('1')
166
167
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
168
        execution = self._wait_for_completion(execution)
169
        self._assert_failure(execution)
170
        orig_st2_ex_id = execution.id
171
        orig_wf_ex_id = execution.context['mistral']['execution_id']
172
173
        with open(path, 'w') as f:
174
            f.write('0')
175
176
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id)
177
        self.assertNotEqual(execution.id, orig_st2_ex_id)
178
        execution = self._wait_for_completion(execution)
179
        self._assert_success(execution, num_tasks=1)
180
        self.assertNotEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
181
182
    def test_basic_rerun_task(self):
183
        path = self.temp_dir_path
184
185
        with open(path, 'w') as f:
186
            f.write('1')
187
188
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
189
        execution = self._wait_for_completion(execution)
190
        self._assert_failure(execution)
191
        orig_st2_ex_id = execution.id
192
        orig_wf_ex_id = execution.context['mistral']['execution_id']
193
194
        with open(path, 'w') as f:
195
            f.write('0')
196
197
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
198
        self.assertNotEqual(execution.id, orig_st2_ex_id)
199
        execution = self._wait_for_completion(execution)
200
        self._assert_success(execution, num_tasks=1)
201
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
202
203
    def test_rerun_subflow_task(self):
204
        path = self.temp_dir_path
205
206
        with open(path, 'w') as f:
207
            f.write('1')
208
209
        workflow_name = 'examples.mistral-test-rerun-subflow'
210
        execution = self._execute_workflow(workflow_name, {'tempfile': path})
211
        execution = self._wait_for_completion(execution)
212
        self._assert_failure(execution)
213
        orig_st2_ex_id = execution.id
214
        orig_wf_ex_id = execution.context['mistral']['execution_id']
215
216
        with open(path, 'w') as f:
217
            f.write('0')
218
219
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
220
        self.assertNotEqual(execution.id, orig_st2_ex_id)
221
        execution = self._wait_for_completion(execution)
222
        self._assert_success(execution, num_tasks=1)
223
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
224
225
    def test_basic_rerun_and_reset_with_items_task(self):
226
        path = self.temp_dir_path
227
228
        with open(path, 'w') as f:
229
            f.write('1')
230
231
        execution = self._execute_workflow(
232
            'examples.mistral-test-rerun-with-items',
233
            {'tempfile': path}
234
        )
235
236
        execution = self._wait_for_completion(execution)
237
        self._assert_failure(execution)
238
        orig_st2_ex_id = execution.id
239
        orig_wf_ex_id = execution.context['mistral']['execution_id']
240
241
        with open(path, 'w') as f:
242
            f.write('0')
243
244
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
245
        self.assertNotEqual(execution.id, orig_st2_ex_id)
246
247
        execution = self._wait_for_completion(execution)
248
        self._assert_success(execution, num_tasks=1)
249
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
250
251
        children = self.st2client.liveactions.get_property(execution.id, 'children')
252
        self.assertEqual(len(children), 4)
253
254
    def test_basic_rerun_and_resume_with_items_task(self):
255
        path = self.temp_dir_path
256
257
        with open(path, 'w') as f:
258
            f.write('1')
259
260
        execution = self._execute_workflow(
261
            'examples.mistral-test-rerun-with-items',
262
            {'tempfile': path}
263
        )
264
265
        execution = self._wait_for_completion(execution)
266
        self._assert_failure(execution)
267
        orig_st2_ex_id = execution.id
268
        orig_wf_ex_id = execution.context['mistral']['execution_id']
269
270
        with open(path, 'w') as f:
271
            f.write('0')
272
273
        execution = self.st2client.liveactions.re_run(
274
            orig_st2_ex_id,
275
            tasks=['task1'],
276
            no_reset=['task1']
277
        )
278
279
        self.assertNotEqual(execution.id, orig_st2_ex_id)
280
281
        execution = self._wait_for_completion(execution)
282
        self._assert_success(execution, num_tasks=1)
283
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
284
285
        children = self.st2client.liveactions.get_property(execution.id, 'children')
286
        self.assertEqual(len(children), 2)
287
288
    def test_rerun_subflow_and_reset_with_items_task(self):
289
        path = self.temp_dir_path
290
291
        with open(path, 'w') as f:
292
            f.write('1')
293
294
        execution = self._execute_workflow(
295
            'examples.mistral-test-rerun-subflow-with-items',
296
            {'tempfile': path}
297
        )
298
299
        execution = self._wait_for_completion(execution)
300
        self._assert_failure(execution)
301
        orig_st2_ex_id = execution.id
302
        orig_wf_ex_id = execution.context['mistral']['execution_id']
303
304
        with open(path, 'w') as f:
305
            f.write('0')
306
307
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
308
        self.assertNotEqual(execution.id, orig_st2_ex_id)
309
310
        execution = self._wait_for_completion(execution)
311
        self._assert_success(execution, num_tasks=1)
312
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
313
314
        children = self.st2client.liveactions.get_property(execution.id, 'children')
315
        self.assertEqual(len(children), 4)
316
317
    def test_rerun_subflow_and_resume_with_items_task(self):
318
        path = self.temp_dir_path
319
320
        with open(path, 'w') as f:
321
            f.write('1')
322
323
        execution = self._execute_workflow(
324
            'examples.mistral-test-rerun-subflow-with-items',
325
            {'tempfile': path}
326
        )
327
328
        execution = self._wait_for_completion(execution)
329
        self._assert_failure(execution)
330
        orig_st2_ex_id = execution.id
331
        orig_wf_ex_id = execution.context['mistral']['execution_id']
332
333
        with open(path, 'w') as f:
334
            f.write('0')
335
336
        execution = self.st2client.liveactions.re_run(
337
            orig_st2_ex_id,
338
            tasks=['task1.task1'],
339
            no_reset=['task1.task1']
340
        )
341
342
        self.assertNotEqual(execution.id, orig_st2_ex_id)
343
344
        execution = self._wait_for_completion(execution)
345
        self._assert_success(execution, num_tasks=1)
346
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
347
348
        children = self.st2client.liveactions.get_property(execution.id, 'children')
349
        self.assertEqual(len(children), 2)
350
351
    def test_invoke_from_action_chain(self):
352
        execution = self._execute_workflow('examples.invoke-mistral-with-jinja', {'cmd': 'date'})
353
        execution = self._wait_for_state(execution, ['succeeded'])
354
355
    def test_pause_resume(self):
356
        # A temp file is created during test setup. Ensure the temp file exists.
357
        path = self.temp_dir_path
358
        self.assertTrue(os.path.exists(path))
359
360
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
361
        params = {'tempfile': path, 'message': 'foobar'}
362
        execution = self._execute_workflow('examples.mistral-test-pause-resume', params)
363
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
364
365
        # Pause the workflow before the temp file is created. The workflow will be paused
366
        # but task1 will still be running to allow for graceful exit.
367
        execution = self.st2client.liveactions.pause(execution.id)
368
369
        # Expecting the execution to be pausing, waiting for task1 to be completed.
370
        execution = self._wait_for_state(execution, ['pausing'])
371
372
        # Delete the temporary file.
373
        os.remove(path)
374
        self.assertFalse(os.path.exists(path))
375
376
        # Wait for the execution to be paused.
377
        execution = self._wait_for_state(execution, ['paused'])
378
379
        # Resume the execution.
380
        execution = self.st2client.liveactions.resume(execution.id)
381
382
        # Wait for completion.
383
        execution = self._wait_for_completion(execution)
384
        self._assert_success(execution, num_tasks=2)
385
386
    def test_resume_auto_pause(self):
387
        # Launch the workflow. The workflow will pause automatically after the first task.
388
        params = {'message': 'foobar'}
389
        execution = self._execute_workflow('examples.mistral-test-pause-before-task', params)
390
        execution = self._wait_for_task(execution, 'task1', 'SUCCESS')
391
        execution = self._wait_for_state(execution, ['paused'])
392
393
        # Resume the execution.
394
        execution = self.st2client.liveactions.resume(execution.id)
395
396
        # Wait for completion.
397
        execution = self._wait_for_completion(execution)
398
        self._assert_success(execution, num_tasks=2)
399
400
    def test_resume_auto_pause_cascade_subworkflow_action(self):
401
        # Launch the workflow. The workflow will pause automatically after the first task.
402
        workflow = 'examples.mistral-test-pause-before-task-subworkflow-action'
403
        params = {'message': 'foobar'}
404
        execution = self._execute_workflow(workflow, params)
405
        execution = self._wait_for_task(execution, 'task1', 'PAUSED')
406
        execution = self._wait_for_state(execution, ['paused'])
407
408
        # Resume the execution.
409
        execution = self.st2client.liveactions.resume(execution.id)
410
411
        # Wait for completion.
412
        execution = self._wait_for_completion(execution)
413
        self._assert_success(execution, num_tasks=2)
414
415
    def test_resume_auto_pause_cascade_workbook_subworkflow(self):
416
        # Launch the workflow. The workflow will pause automatically after the first task.
417
        workflow = 'examples.mistral-test-pause-before-task-subworkflow-workbook'
418
        params = {'message': 'foobar'}
419
        execution = self._execute_workflow(workflow, params)
420
        execution = self._wait_for_task(execution, 'task1', 'PAUSED')
421
        execution = self._wait_for_state(execution, ['paused'])
422
423
        # Resume the execution.
424
        execution = self.st2client.liveactions.resume(execution.id)
425
426
        # Wait for completion.
427
        execution = self._wait_for_completion(execution)
428
        self._assert_success(execution, num_tasks=2)
429
430
    def test_pause_resume_cascade_subworkflow_action(self):
431
        # A temp file is created during test setup. Ensure the temp file exists.
432
        path = self.temp_dir_path
433
        self.assertTrue(os.path.exists(path))
434
435
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
436
        params = {'tempfile': path, 'message': 'foobar'}
437
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-action'
438
        execution = self._execute_workflow(action_ref, params)
439
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
440
441
        # Pause the workflow before the temp file is created. The workflow will be paused
442
        # but task1 will still be running to allow for graceful exit.
443
        execution = self.st2client.liveactions.pause(execution.id)
444
445
        # Expecting the execution to be pausing, waiting for task1 to be completed.
446
        execution = self._wait_for_state(execution, ['pausing'])
447
448
        # Get the subworkflow execution.
449
        task_executions = [e for e in self.st2client.liveactions.get_all()
450
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
451
452
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
453
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['pausing'])
454
455
        # Delete the temporary file.
456
        os.remove(path)
457
        self.assertFalse(os.path.exists(path))
458
459
        # Wait for the executions to be paused.
460
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['paused'])
461
        execution = self._wait_for_state(execution, ['paused'])
462
463
        # Resume the parent execution.
464
        execution = self.st2client.liveactions.resume(execution.id)
465
466
        # Wait for completion.
467
        subworkflow_execution = self._wait_for_completion(subworkflow_execution)
468
        self._assert_success(subworkflow_execution, num_tasks=2)
469
470
        execution = self._wait_for_completion(execution)
471
        self._assert_success(execution, num_tasks=2)
472
473
    def test_pause_resume_cascade_workbook_subworkflow(self):
474
        # A temp file is created during test setup. Ensure the temp file exists.
475
        path = self.temp_dir_path
476
        self.assertTrue(os.path.exists(path))
477
478
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
479
        params = {'tempfile': path, 'message': 'foobar'}
480
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-workbook'
481
        execution = self._execute_workflow(action_ref, params)
482
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
483
484
        # Pause the main workflow before the temp file is created.
485
        # The subworkflow will also pause.
486
        execution = self.st2client.liveactions.pause(execution.id)
487
488
        # Expecting the execution to be pausing, waiting for task1 to be completed.
489
        execution = self._wait_for_state(execution, ['pausing'])
490
        execution = self._wait_for_task(execution, 'task1', 'PAUSED')
491
492
        # Get the task execution (since subworkflow is in a workbook, st2 has no visibility).
493
        task_executions = [e for e in self.st2client.liveactions.get_all()
494
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
495
496
        task1_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
497
        task1_execution = self._wait_for_state(task1_execution, ['running'])
498
499
        # Delete the temporary file.
500
        os.remove(path)
501
        self.assertFalse(os.path.exists(path))
502
503
        # Wait for the main workflow to be paused.
504
        task1_execution = self._wait_for_state(task1_execution, ['succeeded'])
505
        execution = self._wait_for_state(execution, ['paused'])
506
507
        # Resume the parent execution.
508
        execution = self.st2client.liveactions.resume(execution.id)
509
510
        # Wait for completion.
511
        execution = self._wait_for_completion(execution)
512
        self._assert_success(execution, num_tasks=2)
513
514
    def test_pause_resume_cascade_to_subchain(self):
515
        # A temp file is created during test setup. Ensure the temp file exists.
516
        path = self.temp_dir_path
517
        self.assertTrue(os.path.exists(path))
518
519
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
520
        params = {'tempfile': path, 'message': 'foobar'}
521
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-chain'
522
        execution = self._execute_workflow(action_ref, params)
523
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
524
525
        # Pause the workflow before the temp file is created. The workflow will be paused
526
        # but task1 will still be running to allow for graceful exit.
527
        execution = self.st2client.liveactions.pause(execution.id)
528
529
        # Expecting the execution to be pausing, waiting for task1 to be completed.
530
        execution = self._wait_for_state(execution, ['pausing'])
531
532
        # Get the subworkflow execution.
533
        task_executions = [e for e in self.st2client.liveactions.get_all()
534
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
535
536
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
537
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['pausing'])
538
539
        # Delete the temporary file.
540
        os.remove(path)
541
        self.assertFalse(os.path.exists(path))
542
543
        # Wait for the executions to be paused.
544
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['paused'])
545
        execution = self._wait_for_state(execution, ['paused'])
546
547
        # Resume the parent execution.
548
        execution = self.st2client.liveactions.resume(execution.id)
549
550
        # Wait for completion.
551
        subworkflow_execution = self._wait_for_completion(subworkflow_execution)
552
        self._assert_success(subworkflow_execution, num_tasks=2)
553
554
        execution = self._wait_for_completion(execution)
555
        self._assert_success(execution, num_tasks=2)
556
557
    def test_pause_resume_cascade_subworkflow_from_chain(self):
558
        # A temp file is created during test setup. Ensure the temp file exists.
559
        path = self.temp_dir_path
560
        self.assertTrue(os.path.exists(path))
561
562
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
563
        params = {'tempfile': path, 'message': 'foobar'}
564
        action_ref = 'examples.chain-test-pause-resume-with-subworkflow'
565
        execution = self._execute_workflow(action_ref, params)
566
567
        # Expecting the execution to be running.
568
        execution = self._wait_for_state(execution, ['running'])
569
570
        # Pause the workflow before the temp file is created. The workflow will be paused
571
        # but task1 will still be running to allow for graceful exit.
572
        execution = self.st2client.liveactions.pause(execution.id)
573
574
        # Expecting the execution to be pausing, waiting for task1 to be completed.
575
        execution = self._wait_for_state(execution, ['pausing'])
576
577
        # Get the subworkflow execution.
578
        task_executions = [e for e in self.st2client.liveactions.get_all()
579
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
580
581
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
582
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['pausing'])
583
584
        # Delete the temporary file.
585
        os.remove(path)
586
        self.assertFalse(os.path.exists(path))
587
588
        # Wait for the executions to be paused.
589
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['paused'])
590
        execution = self._wait_for_state(execution, ['paused'])
591
592
        # Resume the parent execution.
593
        execution = self.st2client.liveactions.resume(execution.id)
594
595
        # Wait for completion.
596
        subworkflow_execution = self._wait_for_completion(subworkflow_execution)
597
        self._assert_success(subworkflow_execution, num_tasks=2)
598
599
        execution = self._wait_for_completion(execution)
600
        self._assert_success(execution, num_tasks=2)
601