Passed
Pull Request — master (#4000)
by W
06:13
created

ExamplesTest.test_workbook_multiple_subflows()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
from integration.mistral import base
19
20
from st2common.constants import action as action_constants
21
22
23
class ExamplesTest(base.TestWorkflowExecution):
24
25
    def test_environment(self):
26
        ex = self._execute_workflow('examples.mistral-env-var')
27
        ex = self._wait_for_completion(ex)
28
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
29
        expected_output = 'http://127.0.0.1:9101/executions/' + ex.id
30
        self.assertEqual(ex.result['url'], expected_output)
31
32
    def test_branching(self):
33
        # Execute with path a.
34
        params = {'which': 'a'}
35
        ex = self._execute_workflow('examples.mistral-branching', params)
36
        ex = self._wait_for_completion(ex)
37
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
38
        self.assertEqual(ex.result['stdout'], 'Took path A.')
39
40
        # Execute with path b.
41
        params = {'which': 'b'}
42
        ex = self._execute_workflow('examples.mistral-branching', params)
43
        ex = self._wait_for_completion(ex)
44
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
45
        self.assertEqual(ex.result['stdout'], 'Took path B.')
46
47
        # Execute with path c.
48
        params = {'which': 'c'}
49
        ex = self._execute_workflow('examples.mistral-branching', params)
50
        ex = self._wait_for_completion(ex)
51
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
52
        self.assertEqual(ex.result['stdout'], 'Took path C.')
53
54
    def test_join(self):
55
        ex = self._execute_workflow('examples.mistral-join')
56
        ex = self._wait_for_completion(ex)
57
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
58
59
    def test_handle_error(self):
60
        ex = self._execute_workflow('examples.mistral-handle-error')
61
        ex = self._wait_for_completion(ex)
62
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_FAILED)
63
        self.assertTrue(ex.result['error_handled'])
64
65
    def test_handle_error_task_default(self):
66
        ex = self._execute_workflow('examples.mistral-handle-error-task-default')
67
        ex = self._wait_for_completion(ex)
68
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_FAILED)
69
        self.assertTrue(ex.result['error_handled'])
70
71
    def test_handle_retry(self):
72
        ex = self._execute_workflow('examples.mistral-handle-retry')
73
        ex = self._wait_for_completion(ex)
74
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
75
76
    def test_repeat(self):
77
        params = {'cmd': 'echo "Yo!"', 'count': 3}
78
        ex = self._execute_workflow('examples.mistral-repeat', params)
79
        ex = self._wait_for_completion(ex)
80
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
81
        self.assertEqual(len(ex.result['result']), params['count'])
82
        self.assertListEqual(ex.result['result'], ['Yo!'] * params['count'])
83
84
    def test_repeat_with_items(self):
85
        params = {'cmds': ['echo "a"', 'echo "b"', 'echo "c"']}
86
        ex = self._execute_workflow('examples.mistral-repeat-with-items', params)
87
        ex = self._wait_for_completion(ex)
88
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
89
        self.assertEqual(len(ex.result['result']), len(params['cmds']))
90
        self.assertListEqual(sorted(ex.result['result']), ['a', 'b', 'c'])
91
92
    def test_with_items_batch_processing(self):
93
        params = {'cmd': 'date +%s', 'count': 4}
94
        ex = self._execute_workflow('examples.mistral-with-items-concurrency', params)
95
        ex = self._wait_for_completion(ex)
96
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
97
        self.assertEqual(len(ex.result['result']), params['count'])
98
99
        timestamps = [int(dt) for dt in ex.result['result']]
100
        self.assertTrue(timestamps[1] - timestamps[0] < 3)
101
        self.assertTrue(timestamps[3] - timestamps[2] < 3)
102
        self.assertTrue(timestamps[2] - timestamps[1] >= 3)
103
104
    def test_workbook_multiple_subflows(self):
105
        ex = self._execute_workflow('examples.mistral-workbook-multiple-subflows')
106
        ex = self._wait_for_completion(ex)
107
        self.assertEqual(ex.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
108