Test Setup Failed
Pull Request — master (#4154)
by W
03:36
created

WiringTest.test_data_flow()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 7
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import eventlet
19
20
from integration.orchestra import base
21
from six.moves import range
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in range.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
22
23
from st2common.constants import action as ac_const
24
25
26
class WiringTest(base.TestWorkflowExecution):
27
28
    def test_concurrent_load(self):
29
        wf_name = 'examples.orchestra-mock-create-vm'
30
        wf_input = {'vm_name': 'demo1'}
31
        exs = [self._execute_workflow(wf_name, wf_input) for i in range(3)]
32
33
        eventlet.sleep(20)
34
35
        for ex in exs:
36
            e = self._wait_for_completion(ex)
37
            self.assertEqual(e.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
38
            self.assertIn('vm_id', e.result)
39
40
    def test_data_flow(self):
41
        wf_name = 'examples.orchestra-data-flow'
42
        wf_input = {'a1': 'fee fi fo fum'}
43
        expected_output = {'a5': wf_input['a1'], 'b5': wf_input['a1']}
44
        ex = self._execute_workflow(wf_name, wf_input)
45
        ex = self._wait_for_completion(ex)
46
        self.assertDictEqual(ex.result, expected_output)
47