Completed
Pull Request — master (#2304)
by Arma
07:07
created

integration.mistral.WiringTest   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 61
Duplicated Lines 0 %
Metric Value
wmc 10
dl 0
loc 61
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import eventlet
17
18
from integration.mistral import base
19
20
21
class WiringTest(base.TestWorkflowExecution):
22
23
    def test_basic_workflow(self):
24
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'date'})
25
        execution = self._wait_for_completion(execution)
26
        self._assert_success(execution, num_tasks=1)
27
        self.assertIn('stdout', execution.result)
28
29
    def test_basic_workbook(self):
30
        execution = self._execute_workflow('examples.mistral-workbook-basic', {'cmd': 'date'})
31
        execution = self._wait_for_completion(execution)
32
        self._assert_success(execution, num_tasks=1)
33
        self.assertIn('stdout', execution.result)
34
35
    def test_complex_workbook(self):
36
        execution = self._execute_workflow(
37
            'examples.mistral-workbook-complex', {'vm_name': 'demo1'})
38
        execution = self._wait_for_completion(execution)
39
        self._assert_success(execution, num_tasks=8)
40
        self.assertIn('vm_id', execution.result)
41
42
    def test_complex_workbook_subflow_actions(self):
43
        execution = self._execute_workflow(
44
            'examples.mistral-workbook-subflows', {'subject': 'st2', 'adjective': 'cool'})
45
        execution = self._wait_for_completion(execution)
46
        self._assert_success(execution, num_tasks=2)
47
        self.assertIn('tagline', execution.result)
48
        self.assertEqual(execution.result['tagline'], 'st2 is cool!')
49
50
    def test_with_items(self):
51
        params = {'cmd': 'date', 'count': 8}
52
        execution = self._execute_workflow('examples.mistral-repeat', params)
53
        execution = self._wait_for_completion(execution)
54
        self._assert_success(execution, num_tasks=1)
55
        self.assertEqual(len(execution.result['result']), params['count'])
56
57
    def test_concurrent_load(self):
58
        wf_name = 'examples.mistral-workbook-complex'
59
        wf_params = {'vm_name': 'demo1'}
60
        executions = [self._execute_workflow(wf_name, wf_params) for i in range(3)]
61
62
        eventlet.sleep(30)
63
64
        for execution in executions:
65
            e = self._wait_for_completion(execution)
66
            self._assert_success(e, num_tasks=8)
67
            self.assertIn('vm_id', e.result)
68
69
    def test_execution_failure(self):
70
        execution = self._execute_workflow('examples.mistral-basic', {'cmd': 'foo'})
71
        execution = self._wait_for_completion(execution)
72
        self._assert_failure(execution)
73
74
    def test_cancellation(self):
75
        execution = self._execute_workflow('examples.mistral-test-cancel', {'sleep': 10})
76
        execution = self._wait_for_state(execution, ['running'])
77
        self.st2client.liveactions.delete(execution)
78
        execution = self._wait_for_completion(execution, expect_tasks_completed=False)
79
        self._assert_canceled(execution, are_tasks_completed=False)
80
        execution = self._wait_for_completion(execution, expect_tasks_completed=True)
81
        self._assert_canceled(execution, are_tasks_completed=True)
82