Completed
Push — master ( d1f0a7...3b2ece )
by Edward
21:04 queued 05:38
created

integration.mistral.ExamplesTest   A

Complexity

Total Complexity 19

Size/Duplication

Total Lines 105
Duplicated Lines 0 %
Metric Value
wmc 19
dl 0
loc 105
rs 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A test_repeat_with_items() 0 7 1
A test_handle_retry() 0 4 1
A test_handle_error() 0 9 2
A test_repeat() 0 7 1
A test_join() 0 7 3
A test_with_items_batch_processing() 0 13 2
A test_workbook_multiple_subflows() 0 4 1
B test_branching() 0 27 4
A test_handle_error_task_default() 0 9 2
A test_environment() 0 7 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import six
17
18
from integration.mistral import base
19
20
21
class ExamplesTest(base.TestWorkflowExecution):
22
23
    def test_environment(self):
24
        execution = self._execute_workflow('examples.mistral-env-var')
25
        execution = self._wait_for_completion(execution)
26
        self._assert_success(execution, num_tasks=1)
27
        tasks = {t['name']: t for t in execution.result['tasks']}
28
        expected_output = 'https://127.0.0.1:9101/history/' + execution.id
29
        self.assertEqual(tasks['task1']['result']['stdout'], expected_output)
30
31
    def test_branching(self):
32
        # Execute with path a.
33
        inputs = {'which': 'a'}
34
        execution = self._execute_workflow('examples.mistral-branching', parameters=inputs)
35
        execution = self._wait_for_completion(execution)
36
        self._assert_success(execution, num_tasks=2)
37
        tasks = {t['name']: t for t in execution.result['tasks']}
38
        self.assertEqual(tasks['a']['state'], 'SUCCESS')
39
        self.assertEqual(tasks['a']['result']['stdout'], 'Took path A.')
40
41
        # Execute with path b.
42
        inputs = {'which': 'b'}
43
        execution = self._execute_workflow('examples.mistral-branching', parameters=inputs)
44
        execution = self._wait_for_completion(execution)
45
        self._assert_success(execution, num_tasks=2)
46
        tasks = {t['name']: t for t in execution.result['tasks']}
47
        self.assertEqual(tasks['b']['state'], 'SUCCESS')
48
        self.assertEqual(tasks['b']['result']['stdout'], 'Took path B.')
49
50
        # Execute with path c.
51
        inputs = {'which': 'c'}
52
        execution = self._execute_workflow('examples.mistral-branching', parameters=inputs)
53
        execution = self._wait_for_completion(execution)
54
        self._assert_success(execution, num_tasks=2)
55
        tasks = {t['name']: t for t in execution.result['tasks']}
56
        self.assertEqual(tasks['c']['state'], 'SUCCESS')
57
        self.assertEqual(tasks['c']['result']['stdout'], 'Took path C.')
58
59
    def test_join(self):
60
        execution = self._execute_workflow('examples.mistral-join')
61
        execution = self._wait_for_completion(execution)
62
        self._assert_success(execution, num_tasks=5)
63
        tasks = {t['name']: t for t in execution.result['tasks']}
64
        for task_name, task_result in six.iteritems(tasks):
65
            self.assertEqual(task_result['result']['stdout'], task_name)
66
67
    def test_handle_error(self):
68
        execution = self._execute_workflow('examples.mistral-handle-error')
69
        execution = self._wait_for_completion(execution, expect_tasks_completed=False)
70
        self.assertEqual(execution.status, 'failed')
71
        self.assertIn('tasks', execution.result)
72
        self.assertEqual(2, len(execution.result['tasks']))
73
        tasks = {t['name']: t for t in execution.result['tasks']}
74
        self.assertEqual(tasks['task1']['state'], 'ERROR')
75
        self.assertIn(tasks['notify_on_error']['state'], ['RUNNING', 'SUCCESS'])
76
77
    def test_handle_error_task_default(self):
78
        execution = self._execute_workflow('examples.mistral-handle-error-task-default')
79
        execution = self._wait_for_completion(execution, expect_tasks_completed=False)
80
        self.assertEqual(execution.status, 'failed')
81
        self.assertIn('tasks', execution.result)
82
        self.assertEqual(2, len(execution.result['tasks']))
83
        tasks = {t['name']: t for t in execution.result['tasks']}
84
        self.assertEqual(tasks['task1']['state'], 'ERROR')
85
        self.assertIn(tasks['notify_on_error']['state'], ['RUNNING', 'SUCCESS'])
86
87
    def test_handle_retry(self):
88
        execution = self._execute_workflow('examples.mistral-handle-retry')
89
        execution = self._wait_for_completion(execution)
90
        self._assert_success(execution, num_tasks=4)
91
92
    def test_repeat(self):
93
        inputs = {'cmd': 'echo "Yo!"', 'count': 3}
94
        execution = self._execute_workflow('examples.mistral-repeat', parameters=inputs)
95
        execution = self._wait_for_completion(execution)
96
        self._assert_success(execution, num_tasks=1)
97
        self.assertEqual(len(execution.result['result']), inputs['count'])
98
        self.assertListEqual(execution.result['result'], ['Yo!'] * inputs['count'])
99
100
    def test_repeat_with_items(self):
101
        inputs = {'cmds': ['echo "a"', 'echo "b"', 'echo "c"']}
102
        execution = self._execute_workflow('examples.mistral-repeat-with-items', parameters=inputs)
103
        execution = self._wait_for_completion(execution)
104
        self._assert_success(execution, num_tasks=1)
105
        self.assertEqual(len(execution.result['result']), len(inputs['cmds']))
106
        self.assertListEqual(sorted(execution.result['result']), ['a', 'b', 'c'])
107
108
    def test_with_items_batch_processing(self):
109
        inputs = {'cmd': 'date +%s', 'count': 4}
110
        execution = self._execute_workflow('examples.mistral-with-items-concurrency',
111
                                           parameters=inputs)
112
113
        execution = self._wait_for_completion(execution)
114
        self._assert_success(execution, num_tasks=1)
115
        self.assertEqual(len(execution.result['result']), inputs['count'])
116
117
        timestamps = [int(dt) for dt in execution.result['result']]
118
        self.assertTrue(timestamps[1] - timestamps[0] <= 1)
119
        self.assertTrue(timestamps[3] - timestamps[2] <= 1)
120
        self.assertTrue(timestamps[2] - timestamps[1] >= 2)
121
122
    def test_workbook_multiple_subflows(self):
123
        execution = self._execute_workflow('examples.mistral-workbook-multiple-subflows')
124
        execution = self._wait_for_completion(execution)
125
        self._assert_success(execution, num_tasks=4)
126