Passed
Pull Request — master (#3753)
by W
17:07 queued 11:07
created

CancellationWiringTest.test_cancellation()   B

Complexity

Conditions 1

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 27
rs 8.8571
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
from integration.mistral import base
21
22
23
class CancellationWiringTest(base.TestWorkflowExecution):
24
25
    temp_dir_path = None
26
27
    def setUp(self):
28
        super(CancellationWiringTest, self).setUp()
29
30
        # Create temporary directory used by the tests
31
        _, self.temp_dir_path = tempfile.mkstemp()
32
        os.chmod(self.temp_dir_path, 0755)   # nosec
33
34
    def tearDown(self):
35
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
36
            if os.path.isdir(self.temp_dir_path):
37
                shutil.rmtree(self.temp_dir_path)
38
            else:
39
                os.remove(self.temp_dir_path)
40
41
    def test_cancellation(self):
42
        # A temp file is created during test setup. Ensure the temp file exists.
43
        path = self.temp_dir_path
44
        self.assertTrue(os.path.exists(path))
45
46
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
47
        params = {'tempfile': path, 'message': 'foobar'}
48
        execution = self._execute_workflow('examples.mistral-test-cancel', params)
49
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
50
51
        # Cancel the workflow before the temp file is created. The workflow will be paused
52
        # but task1 will still be running to allow for graceful exit.
53
        self.st2client.liveactions.delete(execution)
54
55
        # Expecting the execution to be canceling, waiting for task1 to be completed.
56
        execution = self._wait_for_state(execution, ['canceling'])
57
58
        # Delete the temporary file.
59
        os.remove(path)
60
        self.assertFalse(os.path.exists(path))
61
62
        # Wait for the execution to be canceled.
63
        execution = self._wait_for_state(execution, ['canceled'])
64
65
        # Task is completed successfully for graceful exit.
66
        self.assertEqual(len(execution.result.get('tasks', [])), 1)
67
        self.assertEqual(execution.result['tasks'][0]['state'], 'SUCCESS')
68
69
    def test_task_cancellation(self):
70
        # A temp file is created during test setup. Ensure the temp file exists.
71
        path = self.temp_dir_path
72
        self.assertTrue(os.path.exists(path))
73
74
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
75
        params = {'tempfile': path, 'message': 'foobar'}
76
        execution = self._execute_workflow('examples.mistral-test-cancel', params)
77
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
78
79
        # Identify and cancel the task execution.
80
        task_executions = [e for e in self.st2client.liveactions.get_all()
81
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
82
83
        self.assertGreater(len(task_executions), 0)
84
85
        self.st2client.liveactions.delete(task_executions[0])
86
87
        # Wait for the execution and task to be canceled.
88
        execution = self._wait_for_state(execution, ['canceled'])
89
        self.assertEqual(len(execution.result.get('tasks', [])), 1)
90
        self.assertEqual(execution.result['tasks'][0]['state'], 'CANCELLED')
91
92
    def test_cancellation_cascade_to_subworkflow_action(self):
93
        # A temp file is created during test setup. Ensure the temp file exists.
94
        path = self.temp_dir_path
95
        self.assertTrue(os.path.exists(path))
96
97
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
98
        params = {'tempfile': path, 'message': 'foobar'}
99
        action_ref = 'examples.mistral-test-cancel-subworkflow-action'
100
        execution = self._execute_workflow(action_ref, params)
101
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
102
103
        # Cancel the workflow before the temp file is created. The workflow will be canceled
104
        # but task1 will still be running to allow for graceful exit.
105
        self.st2client.liveactions.delete(execution)
106
107
        # Expecting the execution to be canceling, waiting for task1 to be completed.
108
        execution = self._wait_for_state(execution, ['canceling'])
109
110
        # Get the subworkflow execution.
111
        task_executions = [e for e in self.st2client.liveactions.get_all()
112
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
113
114
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
115
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceling'])
116
117
        # Delete the temporary file.
118
        os.remove(path)
119
        self.assertFalse(os.path.exists(path))
120
121
        # Wait for the executions to be canceled.
122
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceled'])
123
        execution = self._wait_for_state(execution, ['canceled'])
124
125
        # Task is canceled in the execution result.
126
        self.assertEqual(len(execution.result.get('tasks', [])), 1)
127
        self.assertEqual(execution.result['tasks'][0]['state'], 'CANCELLED')
128
129
    def test_cancellation_cascade_to_subchain(self):
130
        # A temp file is created during test setup. Ensure the temp file exists.
131
        path = self.temp_dir_path
132
        self.assertTrue(os.path.exists(path))
133
134
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
135
        params = {'tempfile': path, 'message': 'foobar'}
136
        action_ref = 'examples.mistral-test-cancel-subworkflow-chain'
137
        execution = self._execute_workflow(action_ref, params)
138
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
139
140
        # Cancel the workflow before the temp file is created. The workflow will be canceled
141
        # but task1 will still be running to allow for graceful exit.
142
        self.st2client.liveactions.delete(execution)
143
144
        # Expecting the execution to be canceling, waiting for task1 to be completed.
145
        execution = self._wait_for_state(execution, ['canceling'])
146
147
        # Get the subworkflow execution.
148
        task_executions = [e for e in self.st2client.liveactions.get_all()
149
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
150
151
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
152
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceling'])
153
154
        # Delete the temporary file.
155
        os.remove(path)
156
        self.assertFalse(os.path.exists(path))
157
158
        # Wait for the executions to be canceled.
159
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceled'])
160
        execution = self._wait_for_state(execution, ['canceled'])
161
162
        # Task is canceled in the execution result.
163
        self.assertEqual(len(execution.result.get('tasks', [])), 1)
164
        self.assertEqual(execution.result['tasks'][0]['state'], 'CANCELLED')
165
166
    def test_cancellation_cascade_from_subworkflow_action(self):
167
        # A temp file is created during test setup. Ensure the temp file exists.
168
        path = self.temp_dir_path
169
        self.assertTrue(os.path.exists(path))
170
171
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
172
        params = {'tempfile': path, 'message': 'foobar'}
173
        action_ref = 'examples.mistral-test-cancel-subworkflow-action'
174
        execution = self._execute_workflow(action_ref, params)
175
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
176
177
        # Identify and cancel the task execution.
178
        task_executions = [e for e in self.st2client.liveactions.get_all()
179
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
180
181
        self.assertGreater(len(task_executions), 0)
182
183
        self.st2client.liveactions.delete(task_executions[0])
184
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
185
186
        # Expecting task1 and main workflow execution to be canceling.
187
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceling'])
188
        execution = self._wait_for_state(execution, ['canceling'])
189
190
        # Delete the temporary file.
191
        os.remove(path)
192
        self.assertFalse(os.path.exists(path))
193
194
        # Wait for the executions to be canceled.
195
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceled'])
196
        execution = self._wait_for_state(execution, ['canceled'])
197
198
        # Task is canceled in the execution result.
199
        self.assertEqual(len(execution.result.get('tasks', [])), 1)
200
        self.assertEqual(execution.result['tasks'][0]['state'], 'CANCELLED')
201
202
    def test_cancellation_cascade_from_subchain(self):
203
        # A temp file is created during test setup. Ensure the temp file exists.
204
        path = self.temp_dir_path
205
        self.assertTrue(os.path.exists(path))
206
207
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
208
        params = {'tempfile': path, 'message': 'foobar'}
209
        action_ref = 'examples.mistral-test-cancel-subworkflow-chain'
210
        execution = self._execute_workflow(action_ref, params)
211
        execution = self._wait_for_task(execution, 'task1', 'RUNNING')
212
213
        # Identify and cancel the task execution.
214
        task_executions = [e for e in self.st2client.liveactions.get_all()
215
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
216
217
        self.assertGreater(len(task_executions), 0)
218
219
        self.st2client.liveactions.delete(task_executions[0])
220
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
221
222
        # Expecting task1 to be canceling. Main workflow execution is still running.
223
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceling'])
224
        execution = self._wait_for_state(execution, ['running'])
225
226
        # Delete the temporary file.
227
        os.remove(path)
228
        self.assertFalse(os.path.exists(path))
229
230
        # Wait for the executions to be canceled.
231
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceled'])
232
        execution = self._wait_for_state(execution, ['canceled'])
233
234
        # Task is canceled in the execution result.
235
        self.assertEqual(len(execution.result.get('tasks', [])), 1)
236
        self.assertEqual(execution.result['tasks'][0]['state'], 'CANCELLED')
237
238
    def test_cancellation_chain_cascade_to_subworkflow(self):
239
        # A temp file is created during test setup. Ensure the temp file exists.
240
        path = self.temp_dir_path
241
        self.assertTrue(os.path.exists(path))
242
243
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
244
        params = {'tempfile': path, 'message': 'foobar'}
245
        action_ref = 'examples.chain-test-cancel-with-subworkflow'
246
        execution = self._execute_workflow(action_ref, params)
247
248
        # Expecting the execution to be running.
249
        execution = self._wait_for_state(execution, ['running'])
250
251
        # Cancel the workflow before the temp file is created. The workflow will be canceled
252
        # but task1 will still be running to allow for graceful exit.
253
        self.st2client.liveactions.delete(execution)
254
255
        # Expecting the execution to be cancelinging, waiting for task1 to be completed.
256
        execution = self._wait_for_state(execution, ['canceling'])
257
258
        # Get the subworkflow execution.
259
        task_executions = [e for e in self.st2client.liveactions.get_all()
260
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
261
262
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
263
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceling'])
264
265
        # Delete the temporary file.
266
        os.remove(path)
267
        self.assertFalse(os.path.exists(path))
268
269
        # Wait for the executions to be paused.
270
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceled'])
271
        execution = self._wait_for_state(execution, ['canceled'])
272
273
    def test_cancellation_chain_cascade_from_subworkflow(self):
274
        # A temp file is created during test setup. Ensure the temp file exists.
275
        path = self.temp_dir_path
276
        self.assertTrue(os.path.exists(path))
277
278
        # Launch the workflow. The workflow will wait for the temp file to be deleted.
279
        params = {'tempfile': path, 'message': 'foobar'}
280
        action_ref = 'examples.chain-test-cancel-with-subworkflow'
281
        execution = self._execute_workflow(action_ref, params)
282
283
        # Expecting the execution to be running.
284
        execution = self._wait_for_state(execution, ['running'])
285
286
        # Identify and cancel the task execution.
287
        task_executions = [e for e in self.st2client.liveactions.get_all()
288
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
289
290
        self.assertGreater(len(task_executions), 0)
291
292
        self.st2client.liveactions.delete(task_executions[0])
293
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
294
295
        # Expecting task1 and main workflow execution to be canceling.
296
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceling'])
297
        execution = self._wait_for_state(execution, ['running'])
298
299
        # Delete the temporary file.
300
        os.remove(path)
301
        self.assertFalse(os.path.exists(path))
302
303
        # Wait for the executions to be paused.
304
        subworkflow_execution = self._wait_for_state(subworkflow_execution, ['canceled'])
305
        execution = self._wait_for_state(execution, ['canceled'])
306