Passed
Pull Request — master (#3177)
by W
04:38
created

WiringTest.test_complex_workbook_subflow_actions()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 7
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import ast
17
import os
18
import shutil
19
import tempfile
20
21
import eventlet
22
23
from integration.mistral import base
24
25
26
class WiringTest(base.TestWorkflowExecution):
27
28
    temp_dir_path = None
29
30
    def setUp(self):
31
        super(WiringTest, self).setUp()
32
33
        # Create temporary directory used by the tests
34
        _, self.temp_dir_path = tempfile.mkstemp()
35
        os.chmod(self.temp_dir_path, 0666)   # nosec
36
37
    def tearDown(self):
38
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
39
            if os.path.isdir(self.temp_dir_path):
40
                shutil.rmtree(self.temp_dir_path)
41
            else:
42
                os.remove(self.temp_dir_path)
43
44
    def test_basic_workflow(self):
45
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
46
        execution = self._wait_for_completion(execution)
47
        self._assert_success(execution, num_tasks=1)
48
        self.assertIn('stdout', execution.result)
49
50
    def test_basic_workbook(self):
51
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
52
        execution = self._wait_for_completion(execution)
53
        self._assert_success(execution, num_tasks=1)
54
        self.assertIn('stdout', execution.result)
55
56
    def test_complex_workbook_with_yaql(self):
57
        execution = self._execute_workflow(
58
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
59
        execution = self._wait_for_completion(execution)
60
        self._assert_success(execution, num_tasks=8)
61
        self.assertIn('vm_id', execution.result)
62
63
    def test_complex_workbook_with_jinja(self):
64
        execution = self._execute_workflow(
65
            'examples.mistral-jinja-workbook-complex', {'vm_name': 'demo2'})
66
        execution = self._wait_for_completion(execution)
67
        self._assert_success(execution, num_tasks=8)
68
        self.assertIn('vm_id', execution.result)
69
70
    def test_complex_workbook_subflow_actions(self):
71
        execution = self._execute_workflow(
72
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
73
        execution = self._wait_for_completion(execution)
74
        self._assert_success(execution, num_tasks=2)
75
        self.assertIn('tagline', execution.result)
76
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
77
78
    def test_with_items(self):
79
        params = {'cmd': 'date', 'count': 8}
80
        execution = self._execute_workflow('examples.mistral-repeat', params)
81
        execution = self._wait_for_completion(execution)
82
        self._assert_success(execution, num_tasks=1)
83
        self.assertEqual(len(execution.result['result']), params['count'])
84
85
    def test_concurrent_load(self):
86
        wf_name = 'examples.mistral-workbook-complex'
87
        wf_params = {'vm_name': 'demo1'}
88
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
89
90
        eventlet.sleep(30)
91
92
        for execution in executions:
93
            e = self._wait_for_completion(execution)
94
            self._assert_success(e, num_tasks=8)
95
            self.assertIn('vm_id', e.result)
96
97
    def test_execution_failure(self):
98
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
99
        execution = self._wait_for_completion(execution)
100
        self._assert_failure(execution)
101
102
    def test_cancellation(self):
103
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 10})
104
        execution = self._wait_for_state(execution, ['running'])
105
        self.st2client.liveactions.delete(execution)
106
        execution = self._wait_for_completion(execution, expect_tasks_completed=False)
107
        self._assert_canceled(execution, are_tasks_completed=False)
108
109
    def test_task_cancellation(self):
110
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 30})
111
        execution = self._wait_for_state(execution, ['running'])
112
113
        task_executions = [e for e in self.st2client.liveactions.get_all()
114
                           if e.context.get('parent', {}).get('execution_id') == execution.id]
115
116
        self.assertGreater(len(task_executions), 0)
117
118
        self.st2client.liveactions.delete(task_executions[0])
119
        execution = self._wait_for_completion(execution, expect_tasks_completed=True)
120
        self._assert_failure(execution)
121
122
        task_results = execution.result.get('tasks', [])
123
        self.assertGreater(len(task_results), 0)
124
        expected_state_info = {'error': 'Execution canceled by user.'}
125
        self.assertDictEqual(ast.literal_eval(task_results[0]['state_info']), expected_state_info)
126
127
    def test_basic_rerun(self):
128
        path = self.temp_dir_path
129
130
        with open(path, 'w') as f:
131
            f.write('1')
132
133
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
134
        execution = self._wait_for_completion(execution)
135
        self._assert_failure(execution)
136
        orig_st2_ex_id = execution.id
137
        orig_wf_ex_id = execution.context['mistral']['execution_id']
138
139
        with open(path, 'w') as f:
140
            f.write('0')
141
142
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id)
143
        self.assertNotEqual(execution.id, orig_st2_ex_id)
144
        execution = self._wait_for_completion(execution)
145
        self._assert_success(execution, num_tasks=1)
146
        self.assertNotEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
147
148
    def test_basic_rerun_task(self):
149
        path = self.temp_dir_path
150
151
        with open(path, 'w') as f:
152
            f.write('1')
153
154
        execution = self._execute_workflow('examples.mistral-test-rerun', {'tempfile': path})
155
        execution = self._wait_for_completion(execution)
156
        self._assert_failure(execution)
157
        orig_st2_ex_id = execution.id
158
        orig_wf_ex_id = execution.context['mistral']['execution_id']
159
160
        with open(path, 'w') as f:
161
            f.write('0')
162
163
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
164
        self.assertNotEqual(execution.id, orig_st2_ex_id)
165
        execution = self._wait_for_completion(execution)
166
        self._assert_success(execution, num_tasks=1)
167
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
168
169
    def test_rerun_subflow_task(self):
170
        path = self.temp_dir_path
171
172
        with open(path, 'w') as f:
173
            f.write('1')
174
175
        workflow_name = 'examples.mistral-test-rerun-subflow'
176
        execution = self._execute_workflow(workflow_name, {'tempfile': path})
177
        execution = self._wait_for_completion(execution)
178
        self._assert_failure(execution)
179
        orig_st2_ex_id = execution.id
180
        orig_wf_ex_id = execution.context['mistral']['execution_id']
181
182
        with open(path, 'w') as f:
183
            f.write('0')
184
185
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
186
        self.assertNotEqual(execution.id, orig_st2_ex_id)
187
        execution = self._wait_for_completion(execution)
188
        self._assert_success(execution, num_tasks=1)
189
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
190
191
    def test_basic_rerun_and_reset_with_items_task(self):
192
        path = self.temp_dir_path
193
194
        with open(path, 'w') as f:
195
            f.write('1')
196
197
        execution = self._execute_workflow(
198
            'examples.mistral-test-rerun-with-items',
199
            {'tempfile': path}
200
        )
201
202
        execution = self._wait_for_completion(execution)
203
        self._assert_failure(execution)
204
        orig_st2_ex_id = execution.id
205
        orig_wf_ex_id = execution.context['mistral']['execution_id']
206
207
        with open(path, 'w') as f:
208
            f.write('0')
209
210
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1'])
211
        self.assertNotEqual(execution.id, orig_st2_ex_id)
212
213
        execution = self._wait_for_completion(execution)
214
        self._assert_success(execution, num_tasks=1)
215
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
216
217
        children = self.st2client.liveactions.get_property(execution.id, 'children')
218
        self.assertEqual(len(children), 4)
219
220
    def test_basic_rerun_and_resume_with_items_task(self):
221
        path = self.temp_dir_path
222
223
        with open(path, 'w') as f:
224
            f.write('1')
225
226
        execution = self._execute_workflow(
227
            'examples.mistral-test-rerun-with-items',
228
            {'tempfile': path}
229
        )
230
231
        execution = self._wait_for_completion(execution)
232
        self._assert_failure(execution)
233
        orig_st2_ex_id = execution.id
234
        orig_wf_ex_id = execution.context['mistral']['execution_id']
235
236
        with open(path, 'w') as f:
237
            f.write('0')
238
239
        execution = self.st2client.liveactions.re_run(
240
            orig_st2_ex_id,
241
            tasks=['task1'],
242
            no_reset=['task1']
243
        )
244
245
        self.assertNotEqual(execution.id, orig_st2_ex_id)
246
247
        execution = self._wait_for_completion(execution)
248
        self._assert_success(execution, num_tasks=1)
249
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
250
251
        children = self.st2client.liveactions.get_property(execution.id, 'children')
252
        self.assertEqual(len(children), 2)
253
254
    def test_rerun_subflow_and_reset_with_items_task(self):
255
        path = self.temp_dir_path
256
257
        with open(path, 'w') as f:
258
            f.write('1')
259
260
        execution = self._execute_workflow(
261
            'examples.mistral-test-rerun-subflow-with-items',
262
            {'tempfile': path}
263
        )
264
265
        execution = self._wait_for_completion(execution)
266
        self._assert_failure(execution)
267
        orig_st2_ex_id = execution.id
268
        orig_wf_ex_id = execution.context['mistral']['execution_id']
269
270
        with open(path, 'w') as f:
271
            f.write('0')
272
273
        execution = self.st2client.liveactions.re_run(orig_st2_ex_id, tasks=['task1.task1'])
274
        self.assertNotEqual(execution.id, orig_st2_ex_id)
275
276
        execution = self._wait_for_completion(execution)
277
        self._assert_success(execution, num_tasks=1)
278
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
279
280
        children = self.st2client.liveactions.get_property(execution.id, 'children')
281
        self.assertEqual(len(children), 4)
282
283
    def test_rerun_subflow_and_resume_with_items_task(self):
284
        path = self.temp_dir_path
285
286
        with open(path, 'w') as f:
287
            f.write('1')
288
289
        execution = self._execute_workflow(
290
            'examples.mistral-test-rerun-subflow-with-items',
291
            {'tempfile': path}
292
        )
293
294
        execution = self._wait_for_completion(execution)
295
        self._assert_failure(execution)
296
        orig_st2_ex_id = execution.id
297
        orig_wf_ex_id = execution.context['mistral']['execution_id']
298
299
        with open(path, 'w') as f:
300
            f.write('0')
301
302
        execution = self.st2client.liveactions.re_run(
303
            orig_st2_ex_id,
304
            tasks=['task1.task1'],
305
            no_reset=['task1.task1']
306
        )
307
308
        self.assertNotEqual(execution.id, orig_st2_ex_id)
309
310
        execution = self._wait_for_completion(execution)
311
        self._assert_success(execution, num_tasks=1)
312
        self.assertEqual(execution.context['mistral']['execution_id'], orig_wf_ex_id)
313
314
        children = self.st2client.liveactions.get_property(execution.id, 'children')
315
        self.assertEqual(len(children), 2)
316