Test Failed
Pull Request — master (#4197)
by W
03:53
created

test_pause_resume_cascade_to_subchain()   A

Complexity

Conditions 1

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 35
rs 9.0399
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import os
18
import shutil
19
import tempfile
20
21
from integration.mistral import base
22
23
from st2common.constants import action as action_constants
24
25
26
class PauseResumeWiringTest(base.TestWorkflowExecution):
27
28
    temp_dir_path = None
29
30
    def setUp(self):
31
        super(PauseResumeWiringTest, self).setUp()
32
33
        # Create temporary directory used by the tests
34
        _, self.temp_dir_path = tempfile.mkstemp()
35
        os.chmod(self.temp_dir_path, 0o755)   # nosec
36
37
    def tearDown(self):
38
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
39
            if os.path.isdir(self.temp_dir_path):
40
                shutil.rmtree(self.temp_dir_path)
41
            else:
42
                os.remove(self.temp_dir_path)
43
44
    def test_pause_resume(self):
45
        # A temp file is created during test setup. Ensure the temp file exists.
46
        path = self.temp_dir_path
47
        self.assertTrue(os.path.exists(path))
48
49
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
50
        params = {'tempfile': path, 'message': 'foobar'}
51
        ex = self._execute_workflow('examples.mistral-test-pause-resume', params)
52
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_RUNNING)
53
        self._wait_for_task(ex, 'task1', action_constants.LIVEACTION_STATUS_RUNNING)
54
55
        # Pause the workflow before the temp file is created. The workflow will be paused
56
        # but task1 will still be running to allow for graceful exit.
57
        ex = self.st2client.liveactions.pause(ex.id)
58
59
        # Expecting the ex to be pausing, waiting for task1 to be completed.
60
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSING)
61
62
        # Delete the temporary file.
63
        os.remove(path)
64
        self.assertFalse(os.path.exists(path))
65
66
        # Wait for the ex to be paused.
67
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
68
69
        # Resume the ex.
70
        ex = self.st2client.liveactions.resume(ex.id)
71
72
        # Wait for completion.
73
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
74
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
75
76
    def test_resume_auto_pause(self):
77
        # Launch the workflow. The workflow will pause automatically after the first task.
78
        params = {'message': 'foobar'}
79
        ex = self._execute_workflow('examples.mistral-test-pause-before-task', params)
80
        self._wait_for_task(ex, 'task1', action_constants.LIVEACTION_STATUS_SUCCEEDED)
81
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
82
83
        # Resume the ex.
84
        ex = self.st2client.liveactions.resume(ex.id)
85
86
        # Wait for completion.
87
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
88
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
89
90
    def test_resume_auto_pause_cascade_subworkflow_action(self):
91
        # Launch the workflow. The workflow will pause automatically after the first task.
92
        workflow = 'examples.mistral-test-pause-before-task-subworkflow-action'
93
        params = {'message': 'foobar'}
94
        ex = self._execute_workflow(workflow, params)
95
        self._wait_for_task(ex, 'task1', action_constants.LIVEACTION_STATUS_PAUSED)
96
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
97
98
        # Resume the ex.
99
        ex = self.st2client.liveactions.resume(ex.id)
100
101
        # Wait for completion.
102
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
103
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
104
105
    def test_resume_auto_pause_cascade_workbook_subworkflow(self):
106
        # Launch the workflow. The workflow will pause automatically after the first task.
107
        workflow = 'examples.mistral-test-pause-before-task-subworkflow-workbook'
108
        params = {'message': 'foobar'}
109
        ex = self._execute_workflow(workflow, params)
110
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
111
112
        # Ensure only task1 is executed and task2 is not present.
113
        self._wait_for_task(ex, 'task1', action_constants.LIVEACTION_STATUS_SUCCEEDED)
114
        task_exs = self._get_children(ex)
115
        self.assertEqual(len(task_exs), 1)
116
117
        # Resume the ex.
118
        ex = self.st2client.liveactions.resume(ex.id)
119
120
        # Wait for completion.
121
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
122
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
123
        task_exs = self._get_children(ex)
124
        self.assertEqual(len(task_exs), 3)
125
126
    def test_pause_resume_cascade_subworkflow_action(self):
127
        # A temp file is created during test setup. Ensure the temp file exists.
128
        path = self.temp_dir_path
129
        self.assertTrue(os.path.exists(path))
130
131
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
132
        params = {'tempfile': path, 'message': 'foobar'}
133
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-action'
134
        ex = self._execute_workflow(action_ref, params)
135
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_RUNNING)
136
        task_exs = self._wait_for_task(ex, 'task1', action_constants.LIVEACTION_STATUS_RUNNING)
137
138
        # Pause the workflow before the temp file is created. The workflow will be paused
139
        # but task1 will still be running to allow for graceful exit.
140
        ex = self.st2client.liveactions.pause(ex.id)
141
142
        # Expecting the ex to be pausing, waiting for task1 to be completed.
143
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSING)
144
        subwf_ex = self._wait_for_state(task_exs[0], action_constants.LIVEACTION_STATUS_PAUSING)
145
146
        # Delete the temporary file.
147
        os.remove(path)
148
        self.assertFalse(os.path.exists(path))
149
150
        # Wait for the exs to be paused.
151
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_PAUSED)
152
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
153
154
        # Resume the parent ex.
155
        ex = self.st2client.liveactions.resume(ex.id)
156
157
        # Wait for completion.
158
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
159
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
160
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
161
162
    def test_pause_resume_cascade_workbook_subworkflow(self):
163
        # A temp file is created during test setup. Ensure the temp file exists.
164
        path = self.temp_dir_path
165
        self.assertTrue(os.path.exists(path))
166
167
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
168
        params = {'tempfile': path, 'message': 'foobar'}
169
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-workbook'
170
        ex = self._execute_workflow(action_ref, params)
171
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_RUNNING)
172
173
        # Get the task execution for the workbook subworkflow.
174
        task_exs = self._get_children(ex)
175
        self.assertEqual(len(task_exs), 1)
176
        self._wait_for_state(task_exs[0], action_constants.LIVEACTION_STATUS_RUNNING)
177
178
        # Pause the main workflow before the temp file is created.
179
        # The subworkflow will also pause.
180
        ex = self.st2client.liveactions.pause(ex.id)
181
182
        # Expecting the ex to be pausing, waiting for task1 to be completed.
183
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSING)
184
185
        # Delete the temporary file.
186
        os.remove(path)
187
        self.assertFalse(os.path.exists(path))
188
189
        # Wait for the main workflow to be paused.
190
        self._wait_for_state(task_exs[0], action_constants.LIVEACTION_STATUS_SUCCEEDED)
191
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
192
193
        # Resume the parent ex.
194
        ex = self.st2client.liveactions.resume(ex.id)
195
196
        # Wait for completion.
197
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
198
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
199
200
    def test_pause_resume_cascade_to_subchain(self):
201
        # A temp file is created during test setup. Ensure the temp file exists.
202
        path = self.temp_dir_path
203
        self.assertTrue(os.path.exists(path))
204
205
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
206
        params = {'tempfile': path, 'message': 'foobar'}
207
        action_ref = 'examples.mistral-test-pause-resume-subworkflow-chain'
208
        ex = self._execute_workflow(action_ref, params)
209
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_RUNNING)
210
        task_exs = self._wait_for_task(ex, 'task1', action_constants.LIVEACTION_STATUS_RUNNING)
211
212
        # Pause the workflow before the temp file is created. The workflow will be paused
213
        # but task1 will still be running to allow for graceful exit.
214
        ex = self.st2client.liveactions.pause(ex.id)
215
216
        # Expecting the ex to be pausing, waiting for task1 to be completed.
217
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSING)
218
        subwf_ex = self._wait_for_state(task_exs[0], action_constants.LIVEACTION_STATUS_PAUSING)
219
220
        # Delete the temporary file.
221
        os.remove(path)
222
        self.assertFalse(os.path.exists(path))
223
224
        # Wait for the exs to be paused.
225
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_PAUSED)
226
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
227
228
        # Resume the parent ex.
229
        ex = self.st2client.liveactions.resume(ex.id)
230
231
        # Wait for completion.
232
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
233
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
234
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
235
236
    def test_pause_resume_cascade_subworkflow_from_chain(self):
237
        # A temp file is created during test setup. Ensure the temp file exists.
238
        path = self.temp_dir_path
239
        self.assertTrue(os.path.exists(path))
240
241
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
242
        params = {'tempfile': path, 'message': 'foobar'}
243
        action_ref = 'examples.chain-test-pause-resume-with-subworkflow'
244
        ex = self._execute_workflow(action_ref, params)
245
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_RUNNING)
246
247
        # Get the execution for the subworkflow in the action chain.
248
        task_exs = self._get_children(ex)
249
        self.assertEqual(len(task_exs), 1)
250
        subwf_ex = self._wait_for_state(task_exs[0], action_constants.LIVEACTION_STATUS_RUNNING)
251
252
        # Pause the workflow before the temp file is created. The workflow will be paused
253
        # but task1 will still be running to allow for graceful exit.
254
        ex = self.st2client.liveactions.pause(ex.id)
255
256
        # Expecting the ex to be pausing, waiting for task1 to be completed.
257
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_PAUSING)
258
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSING)
259
260
        # Delete the temporary file.
261
        os.remove(path)
262
        self.assertFalse(os.path.exists(path))
263
264
        # Wait for the exs to be paused.
265
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_PAUSED)
266
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_PAUSED)
267
268
        # Resume the parent ex.
269
        ex = self.st2client.liveactions.resume(ex.id)
270
271
        # Wait for completion.
272
        subwf_ex = self._wait_for_state(subwf_ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
273
        ex = self._wait_for_state(ex, action_constants.LIVEACTION_STATUS_SUCCEEDED)
274
        self.assertEqual(len(ex.result.get('tasks', [])), 2)
275