Passed
Pull Request — master (#3753)
by W
09:44
created

PauseResumeWiringTest   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 264
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 264
rs 10
wmc 21

10 Methods

Rating   Name   Duplication   Size   Complexity  
B test_pause_resume_cascade_subworkflow_from_chain() 0 44 3
A test_resume_auto_pause_cascade_subworkflow_action() 0 14 1
B test_pause_resume() 0 30 1
A test_resume_auto_pause_cascade_workbook_subworkflow() 0 14 1
A setUp() 0 6 1
B test_pause_resume_cascade_subworkflow_action() 0 42 3
B test_pause_resume_cascade_workbook_subworkflow() 0 40 3
A tearDown() 0 6 4
A test_resume_auto_pause() 0 13 1
B test_pause_resume_cascade_to_subchain() 0 42 3
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
from integration.mistral import base
21
22
23
class PauseResumeWiringTest(base.TestWorkflowExecution):
24
25
    temp_dir_path = None
26
27
    def setUp(self):
28
        super(PauseResumeWiringTest, self).setUp()
29
30
        # Create temporary directory used by the tests
31
        _, self.temp_dir_path = tempfile.mkstemp()
32
        os.chmod(self.temp_dir_path, 0755)   # nosec
33
34
    def tearDown(self):
35
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
36
            if os.path.isdir(self.temp_dir_path):
37
                shutil.rmtree(self.temp_dir_path)
38
            else:
39
                os.remove(self.temp_dir_path)
40
41
    def test_pause_resume(self):
42
        # A temp file is created during test setup. Ensure the temp file exists.
43
        path = self.temp_dir_path
44
        self.assertTrue(os.path.exists(path))
45
46
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
47
        params = {'tempfile': path, 'message': 'foobar'}
48
        execution = self._execute_workflow('examples.mistral-test-pause-resume', params)
49
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
50
51
        # Pause the workflow before the temp file is created. The workflow will be paused
52
        # but task1 will still be running to allow for graceful exit.
53
        execution = self.st2client.liveactions.pause(execution.id)
54
55
        # Expecting the execution to be pausing, waiting for task1 to be completed.
56
        execution = self._wait_for_state(execution, ['pausing'])
57
58
        # Delete the temporary file.
59
        os.remove(path)
60
        self.assertFalse(os.path.exists(path))
61
62
        # Wait for the execution to be paused.
63
        execution = self._wait_for_state(execution, ['paused'])
64
65
        # Resume the execution.
66
        execution = self.st2client.liveactions.resume(execution.id)
67
68
        # Wait for completion.
69
        execution = self._wait_for_completion(execution)
70
        self._assert_success(execution, num_tasks=2)
71
72
    def test_resume_auto_pause(self):
73
        # Launch the workflow. The workflow will pause automatically after the first task.
74
        params = {'message': 'foobar'}
75
        execution = self._execute_workflow('examples.mistral-test-pause-before-task', params)
76
        execution = self._wait_for_task(execution, 'task1', 'SUCCESS')
77
        execution = self._wait_for_state(execution, ['paused'])
78
79
        # Resume the execution.
80
        execution = self.st2client.liveactions.resume(execution.id)
81
82
        # Wait for completion.
83
        execution = self._wait_for_completion(execution)
84
        self._assert_success(execution, num_tasks=2)
85
86
    def test_resume_auto_pause_cascade_subworkflow_action(self):
87
        # Launch the workflow. The workflow will pause automatically after the first task.
88
        workflow = 'examples.mistral-test-pause-before-task-subworkflow-action'
89
        params = {'message': 'foobar'}
90
        execution = self._execute_workflow(workflow, params)
91
        execution = self._wait_for_task(execution, 'task1', 'PAUSED')
92
        execution = self._wait_for_state(execution, ['paused'])
93
94
        # Resume the execution.
95
        execution = self.st2client.liveactions.resume(execution.id)
96
97
        # Wait for completion.
98
        execution = self._wait_for_completion(execution)
99
        self._assert_success(execution, num_tasks=2)
100
101
    def test_resume_auto_pause_cascade_workbook_subworkflow(self):
102
        # Launch the workflow. The workflow will pause automatically after the first task.
103
        workflow = 'examples.mistral-test-pause-before-task-subworkflow-workbook'
104
        params = {'message': 'foobar'}
105
        execution = self._execute_workflow(workflow, params)
106
        execution = self._wait_for_task(execution, 'task1', 'PAUSED')
107
        execution = self._wait_for_state(execution, ['paused'])
108
109
        # Resume the execution.
110
        execution = self.st2client.liveactions.resume(execution.id)
111
112
        # Wait for completion.
113
        execution = self._wait_for_completion(execution)
114
        self._assert_success(execution, num_tasks=2)
115
116
    def test_pause_resume_cascade_subworkflow_action(self):
117
        # A temp file is created during test setup. Ensure the temp file exists.
118
        path = self.temp_dir_path
119
        self.assertTrue(os.path.exists(path))
120
121
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
122
        params = {'tempfile': path, 'message': 'foobar'}
123
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-action'
124
        execution = self._execute_workflow(action_ref, params)
125
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
126
127
        # Pause the workflow before the temp file is created. The workflow will be paused
128
        # but task1 will still be running to allow for graceful exit.
129
        execution = self.st2client.liveactions.pause(execution.id)
130
131
        # Expecting the execution to be pausing, waiting for task1 to be completed.
132
        execution = self._wait_for_state(execution, ['pausing'])
133
134
        # Get the subworkflow execution.
135
        task_executions = [e for e in self.st2client.liveactions.get_all()
136
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
137
138
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
139
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['pausing'])
140
141
        # Delete the temporary file.
142
        os.remove(path)
143
        self.assertFalse(os.path.exists(path))
144
145
        # Wait for the executions to be paused.
146
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['paused'])
147
        execution = self._wait_for_state(execution, ['paused'])
148
149
        # Resume the parent execution.
150
        execution = self.st2client.liveactions.resume(execution.id)
151
152
        # Wait for completion.
153
        subworkflow_execution = self._wait_for_completion(subworkflow_execution)
154
        self._assert_success(subworkflow_execution, num_tasks=2)
155
156
        execution = self._wait_for_completion(execution)
157
        self._assert_success(execution, num_tasks=2)
158
159
    def test_pause_resume_cascade_workbook_subworkflow(self):
160
        # A temp file is created during test setup. Ensure the temp file exists.
161
        path = self.temp_dir_path
162
        self.assertTrue(os.path.exists(path))
163
164
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
165
        params = {'tempfile': path, 'message': 'foobar'}
166
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-workbook'
167
        execution = self._execute_workflow(action_ref, params)
168
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
169
170
        # Pause the main workflow before the temp file is created.
171
        # The subworkflow will also pause.
172
        execution = self.st2client.liveactions.pause(execution.id)
173
174
        # Expecting the execution to be pausing, waiting for task1 to be completed.
175
        execution = self._wait_for_state(execution, ['pausing'])
176
        execution = self._wait_for_task(execution, 'task1', 'PAUSED')
177
178
        # Get the task execution (since subworkflow is in a workbook, st2 has no visibility).
179
        task_executions = [e for e in self.st2client.liveactions.get_all()
180
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
181
182
        task1_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
183
        task1_execution = self._wait_for_state(task1_execution, ['running'])
184
185
        # Delete the temporary file.
186
        os.remove(path)
187
        self.assertFalse(os.path.exists(path))
188
189
        # Wait for the main workflow to be paused.
190
        task1_execution = self._wait_for_state(task1_execution, ['succeeded'])
191
        execution = self._wait_for_state(execution, ['paused'])
192
193
        # Resume the parent execution.
194
        execution = self.st2client.liveactions.resume(execution.id)
195
196
        # Wait for completion.
197
        execution = self._wait_for_completion(execution)
198
        self._assert_success(execution, num_tasks=2)
199
200
    def test_pause_resume_cascade_to_subchain(self):
201
        # A temp file is created during test setup. Ensure the temp file exists.
202
        path = self.temp_dir_path
203
        self.assertTrue(os.path.exists(path))
204
205
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
206
        params = {'tempfile': path, 'message': 'foobar'}
207
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-chain'
208
        execution = self._execute_workflow(action_ref, params)
209
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
210
211
        # Pause the workflow before the temp file is created. The workflow will be paused
212
        # but task1 will still be running to allow for graceful exit.
213
        execution = self.st2client.liveactions.pause(execution.id)
214
215
        # Expecting the execution to be pausing, waiting for task1 to be completed.
216
        execution = self._wait_for_state(execution, ['pausing'])
217
218
        # Get the subworkflow execution.
219
        task_executions = [e for e in self.st2client.liveactions.get_all()
220
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
221
222
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
223
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['pausing'])
224
225
        # Delete the temporary file.
226
        os.remove(path)
227
        self.assertFalse(os.path.exists(path))
228
229
        # Wait for the executions to be paused.
230
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['paused'])
231
        execution = self._wait_for_state(execution, ['paused'])
232
233
        # Resume the parent execution.
234
        execution = self.st2client.liveactions.resume(execution.id)
235
236
        # Wait for completion.
237
        subworkflow_execution = self._wait_for_completion(subworkflow_execution)
238
        self._assert_success(subworkflow_execution, num_tasks=2)
239
240
        execution = self._wait_for_completion(execution)
241
        self._assert_success(execution, num_tasks=2)
242
243
    def test_pause_resume_cascade_subworkflow_from_chain(self):
244
        # A temp file is created during test setup. Ensure the temp file exists.
245
        path = self.temp_dir_path
246
        self.assertTrue(os.path.exists(path))
247
248
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
249
        params = {'tempfile': path, 'message': 'foobar'}
250
        action_ref = 'examples.chain-test-pause-resume-with-subworkflow'
251
        execution = self._execute_workflow(action_ref, params)
252
253
        # Expecting the execution to be running.
254
        execution = self._wait_for_state(execution, ['running'])
255
256
        # Pause the workflow before the temp file is created. The workflow will be paused
257
        # but task1 will still be running to allow for graceful exit.
258
        execution = self.st2client.liveactions.pause(execution.id)
259
260
        # Expecting the execution to be pausing, waiting for task1 to be completed.
261
        execution = self._wait_for_state(execution, ['pausing'])
262
263
        # Get the subworkflow execution.
264
        task_executions = [e for e in self.st2client.liveactions.get_all()
265
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
266
267
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
268
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['pausing'])
269
270
        # Delete the temporary file.
271
        os.remove(path)
272
        self.assertFalse(os.path.exists(path))
273
274
        # Wait for the executions to be paused.
275
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['paused'])
276
        execution = self._wait_for_state(execution, ['paused'])
277
278
        # Resume the parent execution.
279
        execution = self.st2client.liveactions.resume(execution.id)
280
281
        # Wait for completion.
282
        subworkflow_execution = self._wait_for_completion(subworkflow_execution)
283
        self._assert_success(subworkflow_execution, num_tasks=2)
284
285
        execution = self._wait_for_completion(execution)
286
        self._assert_success(execution, num_tasks=2)
287