Passed
Pull Request — master (#3753)
by W
09:44
created

WiringTest.test_complex_workbook_with_jinja()   A

Complexity

Conditions 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 6
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import shutil
18
import tempfile
19
20
import eventlet
21
22
from integration.mistral import base
23
24
25
class WiringTest(base.TestWorkflowExecution):
26
27
    temp_dir_path = None
28
29
    def setUp(self):
30
        super(WiringTest, self).setUp()
31
32
        # Create temporary directory used by the tests
33
        _, self.temp_dir_path = tempfile.mkstemp()
34
        os.chmod(self.temp_dir_path, 0755)   # nosec
35
36
    def tearDown(self):
37
        if self.temp_dir_path and os.path.exists(self.temp_dir_path):
38
            if os.path.isdir(self.temp_dir_path):
39
                shutil.rmtree(self.temp_dir_path)
40
            else:
41
                os.remove(self.temp_dir_path)
42
43
    def test_basic_workflow(self):
44
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
45
        execution = self._wait_for_completion(execution)
46
        self._assert_success(execution, num_tasks=1)
47
        self.assertIn('stdout', execution.result)
48
49
    def test_basic_workbook(self):
50
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
51
        execution = self._wait_for_completion(execution)
52
        self._assert_success(execution, num_tasks=1)
53
        self.assertIn('stdout', execution.result)
54
55
    def test_complex_workbook_with_yaql(self):
56
        execution = self._execute_workflow(
57
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
58
        execution = self._wait_for_completion(execution)
59
        self._assert_success(execution, num_tasks=8)
60
        self.assertIn('vm_id', execution.result)
61
62
    def test_complex_workbook_with_jinja(self):
63
        execution = self._execute_workflow(
64
            'examples.mistral-jinja-workbook-complex', {'vm_name': 'demo2'})
65
        execution = self._wait_for_completion(execution)
66
        self._assert_success(execution, num_tasks=8)
67
        self.assertIn('vm_id', execution.result)
68
69
    def test_complex_workbook_subflow_actions(self):
70
        execution = self._execute_workflow(
71
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
72
        execution = self._wait_for_completion(execution)
73
        self._assert_success(execution, num_tasks=2)
74
        self.assertIn('tagline', execution.result)
75
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
76
77
    def test_with_items(self):
78
        params = {'cmd': 'date', 'count': 8}
79
        execution = self._execute_workflow('examples.mistral-repeat', params)
80
        execution = self._wait_for_completion(execution)
81
        self._assert_success(execution, num_tasks=1)
82
        self.assertEqual(len(execution.result['result']), params['count'])
83
84
    def test_concurrent_load(self):
85
        wf_name = 'examples.mistral-workbook-complex'
86
        wf_params = {'vm_name': 'demo1'}
87
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
88
89
        eventlet.sleep(30)
90
91
        for execution in executions:
92
            e = self._wait_for_completion(execution)
93
            self._assert_success(e, num_tasks=8)
94
            self.assertIn('vm_id', e.result)
95
96
    def test_execution_failure(self):
97
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
98
        execution = self._wait_for_completion(execution)
99
        self._assert_failure(execution)
100
101
    def test_invoke_from_action_chain(self):
102
        execution = self._execute_workflow('examples.invoke-mistral-with-jinja', {'cmd': 'date'})
103
        execution = self._wait_for_state(execution, ['succeeded'])
104