Test Setup Failed
Pull Request — master (#4154)
by W
03:36
created

OrchestraRunnerTest.test_run_workflow()   B

Complexity

Conditions 1

Size

Total Lines 92

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 92
rs 8.491

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
from orchestra import states as wf_states
21
22
import st2tests
23
24
# XXX: actionsensor import depends on config being setup.
25
import st2tests.config as tests_config
26
tests_config.parse_args()
27
28
from tests.unit import base
29
30
from st2common.bootstrap import actionsregistrar
31
from st2common.bootstrap import runnersregistrar
32
from st2common.constants import action as ac_const
33
from st2common.models.db import liveaction as lv_db_models
34
from st2common.persistence import execution as ex_db_access
35
from st2common.persistence import liveaction as lv_db_access
36
from st2common.persistence import workflow as wf_db_access
37
from st2common.runners import base as runners
38
from st2common.services import action as ac_svc
39
from st2common.services import workflows as wf_svc
40
from st2common.transport import liveaction as lv_ac_xport
41
from st2common.transport import workflow as wf_ex_xport
42
from st2common.transport import publishers
43
from st2tests.mocks import liveaction as mock_lv_ac_xport
44
from st2tests.mocks import workflow as mock_wf_ex_xport
45
46
47
TEST_FIXTURES = {
48
    'workflows': [
49
        'sequential.yaml'
50
    ],
51
    'actions': [
52
        'sequential.yaml'
53
    ]
54
}
55
56
TEST_PACK = 'orchestra_tests'
57
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
58
59
PACKS = [
60
    TEST_PACK_PATH,
61
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
62
]
63
64
65
@mock.patch.object(
66
    publishers.CUDPublisher,
67
    'publish_update',
68
    mock.MagicMock(return_value=None))
69
@mock.patch.object(
70
    lv_ac_xport.LiveActionPublisher,
71
    'publish_create',
72
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
73
@mock.patch.object(
74
    lv_ac_xport.LiveActionPublisher,
75
    'publish_state',
76
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
77
@mock.patch.object(
78
    wf_ex_xport.WorkflowExecutionPublisher,
79
    'publish_create',
80
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
81
@mock.patch.object(
82
    wf_ex_xport.WorkflowExecutionPublisher,
83
    'publish_state',
84
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
85
class OrchestraRunnerTest(st2tests.DbTestCase):
86
87
    @classmethod
88
    def setUpClass(cls):
89
        super(OrchestraRunnerTest, cls).setUpClass()
90
91
        # Register runners.
92
        runnersregistrar.register_runners()
93
94
        # Register test pack(s).
95
        actions_registrar = actionsregistrar.ActionsRegistrar(
96
            use_pack_cache=False,
97
            fail_on_failure=True
98
        )
99
100
        for pack in PACKS:
101
            actions_registrar.register_from_pack(pack)
102
103
    @classmethod
104
    def get_runner_class(cls, runner_name):
105
        return runners.get_runner(runner_name, runner_name).__class__
106
107
    def test_run_workflow(self):
108
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][0])
109
        wf_input = {'who': 'Thanos'}
110
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input)
111
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
112
113
        # Assert action execution is running.
114
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
115
116
        self.assertTrue(lv_ac_db.action_is_workflow)
117
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
118
119
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
120
        wf_ex_db = wf_ex_dbs[0]
121
122
        # Check required attributes.
123
        self.assertEqual(len(wf_ex_dbs), 1)
124
        self.assertIsNotNone(wf_ex_db.id)
125
        self.assertGreater(wf_ex_db.rev, 0)
126
        self.assertEqual(wf_ex_db.action_execution, str(ac_ex_db.id))
127
        self.assertEqual(wf_ex_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
128
129
        # Check context.
130
        self.assertIn('workflow_execution', lv_ac_db.context)
131
        self.assertEqual(lv_ac_db.context['workflow_execution'], str(wf_ex_db.id))
132
133
        # Check graph.
134
        self.assertIsNotNone(wf_ex_db.graph)
135
        self.assertTrue(isinstance(wf_ex_db.graph, dict))
136
        self.assertIn('nodes', wf_ex_db.graph)
137
        self.assertIn('adjacency', wf_ex_db.graph)
138
139
        # Check task flow.
140
        self.assertIsNotNone(wf_ex_db.flow)
141
        self.assertTrue(isinstance(wf_ex_db.flow, dict))
142
        self.assertIn('tasks', wf_ex_db.flow)
143
        self.assertIn('sequence', wf_ex_db.flow)
144
145
        # Check input.
146
        self.assertDictEqual(wf_ex_db.input, wf_input)
147
148
        # Assert task1 is already completed.
149
        query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'}
150
        tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
151
        tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
152
        tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
153
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
154
155
        # Manually handle action execution completion.
156
        wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
157
158
        # Assert task1 succeeded and workflow is still running.
159
        tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id)
160
        self.assertEqual(tk1_ex_db.status, wf_states.SUCCEEDED)
161
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
162
        self.assertEqual(wf_ex_db.status, wf_states.RUNNING)
163
164
        # Assert task2 is already completed.
165
        query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task2'}
166
        tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
167
        tk2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk2_ex_db.id))[0]
168
        tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction['id'])
169
        self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
170
171
        # Manually handle action execution completion.
172
        wf_svc.handle_action_execution_completion(tk2_ac_ex_db)
173
174
        # Assert task2 succeeded and workflow is still running.
175
        tk2_ex_db = wf_db_access.TaskExecution.get_by_id(tk2_ex_db.id)
176
        self.assertEqual(tk2_ex_db.status, wf_states.SUCCEEDED)
177
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
178
        self.assertEqual(wf_ex_db.status, wf_states.RUNNING)
179
180
        # Assert task3 is already completed.
181
        query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task3'}
182
        tk3_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
183
        tk3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk3_ex_db.id))[0]
184
        tk3_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk3_ac_ex_db.liveaction['id'])
185
        self.assertEqual(tk3_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
186
187
        # Manually handle action execution completion.
188
        wf_svc.handle_action_execution_completion(tk3_ac_ex_db)
189
190
        # Assert task3 succeeded and workflow is completed.
191
        tk3_ex_db = wf_db_access.TaskExecution.get_by_id(tk3_ex_db.id)
192
        self.assertEqual(tk3_ex_db.status, wf_states.SUCCEEDED)
193
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
194
        self.assertEqual(wf_ex_db.status, wf_states.SUCCEEDED)
195
196
        # Check workflow output.
197
        expected_output = {'msg': '%s, All your base are belong to us!' % wf_input['who']}
198
        self.assertDictEqual(wf_ex_db.output, expected_output)
199