Passed
Push — develop ( c48c6c...de488a )
by
unknown
05:55 queued 02:50
created

test_cancellation_cascade_subworkflow_action()   A

Complexity

Conditions 3

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 21
rs 9.3142
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
import eventlet
21
22
from integration.mistral import base
23
24
25
class WiringTest(base.TestWorkflowExecution):
26
27
    temp_dir_path = None
28
29
    def setUp(self):
30
        super(WiringTest, self).setUp()
31
32
        # Create temporary directory used by the tests
33
        _, self.temp_dir_path = tempfile.mkstemp()
34
        os.chmod(self.temp_dir_path, 0666)   # nosec
35
36
    def tearDown(self):
37
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
38
            if os.path.isdir(self.temp_dir_path):
39
                shutil.rmtree(self.temp_dir_path)
40
            else:
41
                os.remove(self.temp_dir_path)
42
43
    def test_basic_workflow(self):
44
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
45
        execution = self._wait_for_completion(execution)
46
        self._assert_success(execution, num_tasks=1)
47
        self.assertIn('stdout', execution.result)
48
49
    def test_basic_workbook(self):
50
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
51
        execution = self._wait_for_completion(execution)
52
        self._assert_success(execution, num_tasks=1)
53
        self.assertIn('stdout', execution.result)
54
55
    def test_complex_workbook_with_yaql(self):
56
        execution = self._execute_workflow(
57
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
58
        execution = self._wait_for_completion(execution)
59
        self._assert_success(execution, num_tasks=8)
60
        self.assertIn('vm_id', execution.result)
61
62
    def test_complex_workbook_with_jinja(self):
63
        execution = self._execute_workflow(
64
            'examples.mistral-jinja-workbook-complex', {'vm_name': 'demo2'})
65
        execution = self._wait_for_completion(execution)
66
        self._assert_success(execution, num_tasks=8)
67
        self.assertIn('vm_id', execution.result)
68
69
    def test_complex_workbook_subflow_actions(self):
70
        execution = self._execute_workflow(
71
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
72
        execution = self._wait_for_completion(execution)
73
        self._assert_success(execution, num_tasks=2)
74
        self.assertIn('tagline', execution.result)
75
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
76
77
    def test_with_items(self):
78
        params = {'cmd': 'date', 'count': 8}
79
        execution = self._execute_workflow('examples.mistral-repeat', params)
80
        execution = self._wait_for_completion(execution)
81
        self._assert_success(execution, num_tasks=1)
82
        self.assertEqual(len(execution.result['result']), params['count'])
83
84
    def test_concurrent_load(self):
85
        wf_name = 'examples.mistral-workbook-complex'
86
        wf_params = {'vm_name': 'demo1'}
87
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
88
89
        eventlet.sleep(30)
90
91
        for execution in executions:
92
            e = self._wait_for_completion(execution)
93
            self._assert_success(e, num_tasks=8)
94
            self.assertIn('vm_id', e.result)
95
96
    def test_execution_failure(self):
97
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
98
        execution = self._wait_for_completion(execution)
99
        self._assert_failure(execution)
100
101
    def test_cancellation(self):
102
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 10})
103
        execution = self._wait_for_state(execution, ['running'])
104
        self.st2client.liveactions.delete(execution)
105
        execution = self._wait_for_completion(execution, expect_tasks_completed=False)
106
        self._assert_canceled(execution, are_tasks_completed=False)
107
108
    def test_cancellation_cascade_subworkflow_action(self):
109
        execution = self._execute_workflow(
110
            'examples.mistral-test-cancel-subworkflow-action',
111
            {'sleep': 10}
112
        )
113
114
        execution = self._wait_for_state(execution, ['running'])
115
        self.st2client.liveactions.delete(execution)
116
        execution = self._wait_for_completion(execution, expect_tasks_completed=False)
117
        self.assertEqual(execution.status, 'canceled')
118
119
        task_executions = [e for e in self.st2client.liveactions.get_all()
120
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
121
122
        subworkflow_execution = self.st2client.liveactions.get_by_id(task_executions[0].id)
123
        subworkflow_execution = self._wait_for_completion(
124
            subworkflow_execution,
125
            expect_tasks_completed=False
126
        )
127
128
        self.assertEqual(execution.status, 'canceled')
129
130
    def test_task_cancellation(self):
131
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 30})
132
        execution = self._wait_for_state(execution, ['running'])
133
134
        task_executions = [e for e in self.st2client.liveactions.get_all()
135
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
136
137
        self.assertGreater(len(task_executions), 0)
138
139
        self.st2client.liveactions.delete(task_executions[0])
140
        execution = self._wait_for_completion(execution, expect_tasks_completed=True)
141
        self._assert_canceled(execution, are_tasks_completed=True)
142
143
        task_results = execution.result.get('tasks', [])
144
        self.assertGreater(len(task_results), 0)
145
        self.assertEqual(task_results[0]['state'], 'CANCELLED')
146
147
    def test_basic_rerun(self):
148
        path = self.temp_dir_path
149
150
        with open(path, 'w') as f:
151
            f.write('1')
152
153
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
154
        execution = self._wait_for_completion(execution)
155
        self._assert_failure(execution)
156
        orig_st2_ex_id = execution.id
157
        orig_wf_ex_id = execution.context['mistral']['execution_id']
158
159
        with open(path, 'w') as f:
160
            f.write('0')
161
162
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id)
163
        self.assertNotEqual(execution.id, orig_st2_ex_id)
164
        execution = self._wait_for_completion(execution)
165
        self._assert_success(execution, num_tasks=1)
166
        self.assertNotEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
167
168
    def test_basic_rerun_task(self):
169
        path = self.temp_dir_path
170
171
        with open(path, 'w') as f:
172
            f.write('1')
173
174
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
175
        execution = self._wait_for_completion(execution)
176
        self._assert_failure(execution)
177
        orig_st2_ex_id = execution.id
178
        orig_wf_ex_id = execution.context['mistral']['execution_id']
179
180
        with open(path, 'w') as f:
181
            f.write('0')
182
183
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
184
        self.assertNotEqual(execution.id, orig_st2_ex_id)
185
        execution = self._wait_for_completion(execution)
186
        self._assert_success(execution, num_tasks=1)
187
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
188
189
    def test_rerun_subflow_task(self):
190
        path = self.temp_dir_path
191
192
        with open(path, 'w') as f:
193
            f.write('1')
194
195
        workflow_name = 'examples.mistral-test-rerun-subflow'
196
        execution = self._execute_workflow(workflow_name, {'tempfile': path})
197
        execution = self._wait_for_completion(execution)
198
        self._assert_failure(execution)
199
        orig_st2_ex_id = execution.id
200
        orig_wf_ex_id = execution.context['mistral']['execution_id']
201
202
        with open(path, 'w') as f:
203
            f.write('0')
204
205
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
206
        self.assertNotEqual(execution.id, orig_st2_ex_id)
207
        execution = self._wait_for_completion(execution)
208
        self._assert_success(execution, num_tasks=1)
209
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
210
211
    def test_basic_rerun_and_reset_with_items_task(self):
212
        path = self.temp_dir_path
213
214
        with open(path, 'w') as f:
215
            f.write('1')
216
217
        execution = self._execute_workflow(
218
            'examples.mistral-test-rerun-with-items',
219
            {'tempfile': path}
220
        )
221
222
        execution = self._wait_for_completion(execution)
223
        self._assert_failure(execution)
224
        orig_st2_ex_id = execution.id
225
        orig_wf_ex_id = execution.context['mistral']['execution_id']
226
227
        with open(path, 'w') as f:
228
            f.write('0')
229
230
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
231
        self.assertNotEqual(execution.id, orig_st2_ex_id)
232
233
        execution = self._wait_for_completion(execution)
234
        self._assert_success(execution, num_tasks=1)
235
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
236
237
        children = self.st2client.liveactions.get_property(execution.id, 'children')
238
        self.assertEqual(len(children), 4)
239
240
    def test_basic_rerun_and_resume_with_items_task(self):
241
        path = self.temp_dir_path
242
243
        with open(path, 'w') as f:
244
            f.write('1')
245
246
        execution = self._execute_workflow(
247
            'examples.mistral-test-rerun-with-items',
248
            {'tempfile': path}
249
        )
250
251
        execution = self._wait_for_completion(execution)
252
        self._assert_failure(execution)
253
        orig_st2_ex_id = execution.id
254
        orig_wf_ex_id = execution.context['mistral']['execution_id']
255
256
        with open(path, 'w') as f:
257
            f.write('0')
258
259
        execution = self.st2client.liveactions.re_run(
260
            orig_st2_ex_id,
261
            tasks=['task1'],
262
            no_reset=['task1']
263
        )
264
265
        self.assertNotEqual(execution.id, orig_st2_ex_id)
266
267
        execution = self._wait_for_completion(execution)
268
        self._assert_success(execution, num_tasks=1)
269
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
270
271
        children = self.st2client.liveactions.get_property(execution.id, 'children')
272
        self.assertEqual(len(children), 2)
273
274
    def test_rerun_subflow_and_reset_with_items_task(self):
275
        path = self.temp_dir_path
276
277
        with open(path, 'w') as f:
278
            f.write('1')
279
280
        execution = self._execute_workflow(
281
            'examples.mistral-test-rerun-subflow-with-items',
282
            {'tempfile': path}
283
        )
284
285
        execution = self._wait_for_completion(execution)
286
        self._assert_failure(execution)
287
        orig_st2_ex_id = execution.id
288
        orig_wf_ex_id = execution.context['mistral']['execution_id']
289
290
        with open(path, 'w') as f:
291
            f.write('0')
292
293
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
294
        self.assertNotEqual(execution.id, orig_st2_ex_id)
295
296
        execution = self._wait_for_completion(execution)
297
        self._assert_success(execution, num_tasks=1)
298
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
299
300
        children = self.st2client.liveactions.get_property(execution.id, 'children')
301
        self.assertEqual(len(children), 4)
302
303
    def test_rerun_subflow_and_resume_with_items_task(self):
304
        path = self.temp_dir_path
305
306
        with open(path, 'w') as f:
307
            f.write('1')
308
309
        execution = self._execute_workflow(
310
            'examples.mistral-test-rerun-subflow-with-items',
311
            {'tempfile': path}
312
        )
313
314
        execution = self._wait_for_completion(execution)
315
        self._assert_failure(execution)
316
        orig_st2_ex_id = execution.id
317
        orig_wf_ex_id = execution.context['mistral']['execution_id']
318
319
        with open(path, 'w') as f:
320
            f.write('0')
321
322
        execution = self.st2client.liveactions.re_run(
323
            orig_st2_ex_id,
324
            tasks=['task1.task1'],
325
            no_reset=['task1.task1']
326
        )
327
328
        self.assertNotEqual(execution.id, orig_st2_ex_id)
329
330
        execution = self._wait_for_completion(execution)
331
        self._assert_success(execution, num_tasks=1)
332
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
333
334
        children = self.st2client.liveactions.get_property(execution.id, 'children')
335
        self.assertEqual(len(children), 2)
336