Passed
Push — develop ( 0891a6...20e5ca )
by
unknown
06:24 queued 03:05
created

test_cancellation_cascade_subworkflow_action()   B

Complexity

Conditions 3

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 29
rs 8.8571
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
import eventlet
21
22
from integration.mistral import base
23
24
25
class WiringTest(base.TestWorkflowExecution):
26
27
    temp_dir_path = None
28
29
    def setUp(self):
30
        super(WiringTest, self).setUp()
31
32
        # Create temporary directory used by the tests
33
        _, self.temp_dir_path = tempfile.mkstemp()
34
        os.chmod(self.temp_dir_path, 0755)   # nosec
35
36
    def tearDown(self):
37
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
38
            if os.path.isdir(self.temp_dir_path):
39
                shutil.rmtree(self.temp_dir_path)
40
            else:
41
                os.remove(self.temp_dir_path)
42
43
    def test_basic_workflow(self):
44
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
45
        execution = self._wait_for_completion(execution)
46
        self._assert_success(execution, num_tasks=1)
47
        self.assertIn('stdout', execution.result)
48
49
    def test_basic_workbook(self):
50
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
51
        execution = self._wait_for_completion(execution)
52
        self._assert_success(execution, num_tasks=1)
53
        self.assertIn('stdout', execution.result)
54
55
    def test_complex_workbook_with_yaql(self):
56
        execution = self._execute_workflow(
57
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
58
        execution = self._wait_for_completion(execution)
59
        self._assert_success(execution, num_tasks=8)
60
        self.assertIn('vm_id', execution.result)
61
62
    def test_complex_workbook_with_jinja(self):
63
        execution = self._execute_workflow(
64
            'examples.mistral-jinja-workbook-complex', {'vm_name': 'demo2'})
65
        execution = self._wait_for_completion(execution)
66
        self._assert_success(execution, num_tasks=8)
67
        self.assertIn('vm_id', execution.result)
68
69
    def test_complex_workbook_subflow_actions(self):
70
        execution = self._execute_workflow(
71
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
72
        execution = self._wait_for_completion(execution)
73
        self._assert_success(execution, num_tasks=2)
74
        self.assertIn('tagline', execution.result)
75
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
76
77
    def test_with_items(self):
78
        params = {'cmd': 'date', 'count': 8}
79
        execution = self._execute_workflow('examples.mistral-repeat', params)
80
        execution = self._wait_for_completion(execution)
81
        self._assert_success(execution, num_tasks=1)
82
        self.assertEqual(len(execution.result['result']), params['count'])
83
84
    def test_concurrent_load(self):
85
        wf_name = 'examples.mistral-workbook-complex'
86
        wf_params = {'vm_name': 'demo1'}
87
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
88
89
        eventlet.sleep(30)
90
91
        for execution in executions:
92
            e = self._wait_for_completion(execution)
93
            self._assert_success(e, num_tasks=8)
94
            self.assertIn('vm_id', e.result)
95
96
    def test_execution_failure(self):
97
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
98
        execution = self._wait_for_completion(execution)
99
        self._assert_failure(execution)
100
101
    def test_cancellation(self):
102
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 10})
103
        execution = self._wait_for_state(execution, ['running'])
104
        self.st2client.liveactions.delete(execution)
105
106
        execution = self._wait_for_completion(
107
            execution,
108
            expect_tasks=False,
109
            expect_tasks_completed=False
110
        )
111
112
        self._assert_canceled(execution, are_tasks_completed=False)
113
114
    def test_cancellation_cascade_subworkflow_action(self):
115
        execution = self._execute_workflow(
116
            'examples.mistral-test-cancel-subworkflow-action',
117
            {'sleep': 10}
118
        )
119
120
        execution = self._wait_for_state(execution, ['running'])
121
        self.st2client.liveactions.delete(execution)
122
123
        execution = self._wait_for_completion(
124
            execution,
125
            expect_tasks=False,
126
            expect_tasks_completed=False
127
        )
128
129
        self.assertEqual(execution.status, 'canceled')
130
131
        task_executions = [e for e in self.st2client.liveactions.get_all()
132
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
133
134
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
135
136
        subworkflow_execution = self._wait_for_completion(
137
            subworkflow_execution,
138
            expect_tasks=False,
139
            expect_tasks_completed=False
140
        )
141
142
        self.assertEqual(execution.status, 'canceled')
143
144
    def test_task_cancellation(self):
145
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 30})
146
        execution = self._wait_for_state(execution, ['running'])
147
148
        task_executions = [e for e in self.st2client.liveactions.get_all()
149
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
150
151
        self.assertGreater(len(task_executions), 0)
152
153
        self.st2client.liveactions.delete(task_executions[0])
154
        execution = self._wait_for_completion(execution, expect_tasks_completed=True)
155
        self._assert_canceled(execution, are_tasks_completed=True)
156
157
        task_results = execution.result.get('tasks', [])
158
        self.assertGreater(len(task_results), 0)
159
        self.assertEqual(task_results[0]['state'], 'CANCELLED')
160
161
    def test_basic_rerun(self):
162
        path = self.temp_dir_path
163
164
        with open(path, 'w') as f:
165
            f.write('1')
166
167
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
168
        execution = self._wait_for_completion(execution)
169
        self._assert_failure(execution)
170
        orig_st2_ex_id = execution.id
171
        orig_wf_ex_id = execution.context['mistral']['execution_id']
172
173
        with open(path, 'w') as f:
174
            f.write('0')
175
176
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id)
177
        self.assertNotEqual(execution.id, orig_st2_ex_id)
178
        execution = self._wait_for_completion(execution)
179
        self._assert_success(execution, num_tasks=1)
180
        self.assertNotEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
181
182
    def test_basic_rerun_task(self):
183
        path = self.temp_dir_path
184
185
        with open(path, 'w') as f:
186
            f.write('1')
187
188
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
189
        execution = self._wait_for_completion(execution)
190
        self._assert_failure(execution)
191
        orig_st2_ex_id = execution.id
192
        orig_wf_ex_id = execution.context['mistral']['execution_id']
193
194
        with open(path, 'w') as f:
195
            f.write('0')
196
197
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
198
        self.assertNotEqual(execution.id, orig_st2_ex_id)
199
        execution = self._wait_for_completion(execution)
200
        self._assert_success(execution, num_tasks=1)
201
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
202
203
    def test_rerun_subflow_task(self):
204
        path = self.temp_dir_path
205
206
        with open(path, 'w') as f:
207
            f.write('1')
208
209
        workflow_name = 'examples.mistral-test-rerun-subflow'
210
        execution = self._execute_workflow(workflow_name, {'tempfile': path})
211
        execution = self._wait_for_completion(execution)
212
        self._assert_failure(execution)
213
        orig_st2_ex_id = execution.id
214
        orig_wf_ex_id = execution.context['mistral']['execution_id']
215
216
        with open(path, 'w') as f:
217
            f.write('0')
218
219
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
220
        self.assertNotEqual(execution.id, orig_st2_ex_id)
221
        execution = self._wait_for_completion(execution)
222
        self._assert_success(execution, num_tasks=1)
223
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
224
225
    def test_basic_rerun_and_reset_with_items_task(self):
226
        path = self.temp_dir_path
227
228
        with open(path, 'w') as f:
229
            f.write('1')
230
231
        execution = self._execute_workflow(
232
            'examples.mistral-test-rerun-with-items',
233
            {'tempfile': path}
234
        )
235
236
        execution = self._wait_for_completion(execution)
237
        self._assert_failure(execution)
238
        orig_st2_ex_id = execution.id
239
        orig_wf_ex_id = execution.context['mistral']['execution_id']
240
241
        with open(path, 'w') as f:
242
            f.write('0')
243
244
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
245
        self.assertNotEqual(execution.id, orig_st2_ex_id)
246
247
        execution = self._wait_for_completion(execution)
248
        self._assert_success(execution, num_tasks=1)
249
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
250
251
        children = self.st2client.liveactions.get_property(execution.id, 'children')
252
        self.assertEqual(len(children), 4)
253
254
    def test_basic_rerun_and_resume_with_items_task(self):
255
        path = self.temp_dir_path
256
257
        with open(path, 'w') as f:
258
            f.write('1')
259
260
        execution = self._execute_workflow(
261
            'examples.mistral-test-rerun-with-items',
262
            {'tempfile': path}
263
        )
264
265
        execution = self._wait_for_completion(execution)
266
        self._assert_failure(execution)
267
        orig_st2_ex_id = execution.id
268
        orig_wf_ex_id = execution.context['mistral']['execution_id']
269
270
        with open(path, 'w') as f:
271
            f.write('0')
272
273
        execution = self.st2client.liveactions.re_run(
274
            orig_st2_ex_id,
275
            tasks=['task1'],
276
            no_reset=['task1']
277
        )
278
279
        self.assertNotEqual(execution.id, orig_st2_ex_id)
280
281
        execution = self._wait_for_completion(execution)
282
        self._assert_success(execution, num_tasks=1)
283
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
284
285
        children = self.st2client.liveactions.get_property(execution.id, 'children')
286
        self.assertEqual(len(children), 2)
287
288
    def test_rerun_subflow_and_reset_with_items_task(self):
289
        path = self.temp_dir_path
290
291
        with open(path, 'w') as f:
292
            f.write('1')
293
294
        execution = self._execute_workflow(
295
            'examples.mistral-test-rerun-subflow-with-items',
296
            {'tempfile': path}
297
        )
298
299
        execution = self._wait_for_completion(execution)
300
        self._assert_failure(execution)
301
        orig_st2_ex_id = execution.id
302
        orig_wf_ex_id = execution.context['mistral']['execution_id']
303
304
        with open(path, 'w') as f:
305
            f.write('0')
306
307
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
308
        self.assertNotEqual(execution.id, orig_st2_ex_id)
309
310
        execution = self._wait_for_completion(execution)
311
        self._assert_success(execution, num_tasks=1)
312
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
313
314
        children = self.st2client.liveactions.get_property(execution.id, 'children')
315
        self.assertEqual(len(children), 4)
316
317
    def test_rerun_subflow_and_resume_with_items_task(self):
318
        path = self.temp_dir_path
319
320
        with open(path, 'w') as f:
321
            f.write('1')
322
323
        execution = self._execute_workflow(
324
            'examples.mistral-test-rerun-subflow-with-items',
325
            {'tempfile': path}
326
        )
327
328
        execution = self._wait_for_completion(execution)
329
        self._assert_failure(execution)
330
        orig_st2_ex_id = execution.id
331
        orig_wf_ex_id = execution.context['mistral']['execution_id']
332
333
        with open(path, 'w') as f:
334
            f.write('0')
335
336
        execution = self.st2client.liveactions.re_run(
337
            orig_st2_ex_id,
338
            tasks=['task1.task1'],
339
            no_reset=['task1.task1']
340
        )
341
342
        self.assertNotEqual(execution.id, orig_st2_ex_id)
343
344
        execution = self._wait_for_completion(execution)
345
        self._assert_success(execution, num_tasks=1)
346
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
347
348
        children = self.st2client.liveactions.get_property(execution.id, 'children')
349
        self.assertEqual(len(children), 2)
350