Test Setup Failed
Pull Request — master (#4154)
by W
03:25
created

OrchestraRunnerTest.test_run_workflow()   B

Complexity

Conditions 1

Size

Total Lines 103

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 1
dl 0
loc 103
rs 8.2857
c 4
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
from orchestra import states as wf_states
21
22
import st2tests
23
24
# XXX: actionsensor import depends on config being setup.
25
import st2tests.config as tests_config
26
tests_config.parse_args()
27
28
from tests.unit import base
29
30
from st2common.bootstrap import actionsregistrar
31
from st2common.bootstrap import runnersregistrar
32
from st2common.constants import action as ac_const
33
from st2common.models.db import liveaction as lv_db_models
34
from st2common.persistence import execution as ex_db_access
35
from st2common.persistence import liveaction as lv_db_access
36
from st2common.persistence import workflow as wf_db_access
37
from st2common.runners import base as runners
38
from st2common.services import action as ac_svc
39
from st2common.services import workflows as wf_svc
40
from st2common.transport import liveaction as lv_ac_xport
41
from st2common.transport import workflow as wf_ex_xport
42
from st2common.transport import publishers
43
from st2tests.mocks import liveaction as mock_lv_ac_xport
44
from st2tests.mocks import workflow as mock_wf_ex_xport
45
46
47
TEST_PACK = 'orchestra_tests'
48
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
49
50
PACKS = [
51
    TEST_PACK_PATH,
52
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
53
]
54
55
56
@mock.patch.object(
57
    publishers.CUDPublisher,
58
    'publish_update',
59
    mock.MagicMock(return_value=None))
60
@mock.patch.object(
61
    lv_ac_xport.LiveActionPublisher,
62
    'publish_create',
63
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
64
@mock.patch.object(
65
    lv_ac_xport.LiveActionPublisher,
66
    'publish_state',
67
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
68
@mock.patch.object(
69
    wf_ex_xport.WorkflowExecutionPublisher,
70
    'publish_create',
71
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
72
@mock.patch.object(
73
    wf_ex_xport.WorkflowExecutionPublisher,
74
    'publish_state',
75
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
76
class OrchestraRunnerTest(st2tests.DbTestCase):
77
78
    @classmethod
79
    def setUpClass(cls):
80
        super(OrchestraRunnerTest, cls).setUpClass()
81
82
        # Register runners.
83
        runnersregistrar.register_runners()
84
85
        # Register test pack(s).
86
        actions_registrar = actionsregistrar.ActionsRegistrar(
87
            use_pack_cache=False,
88
            fail_on_failure=True
89
        )
90
91
        for pack in PACKS:
92
            actions_registrar.register_from_pack(pack)
93
94
    @classmethod
95
    def get_runner_class(cls, runner_name):
96
        return runners.get_runner(runner_name, runner_name).__class__
97
98
    def test_run_workflow(self):
99
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
100
        wf_input = {'who': 'Thanos'}
101
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'], parameters=wf_input)
102
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
103
104
        # Assert action execution is running.
105
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
106
107
        self.assertTrue(lv_ac_db.action_is_workflow)
108
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
109
110
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
111
        wf_ex_db = wf_ex_dbs[0]
112
113
        # Check required attributes.
114
        self.assertEqual(len(wf_ex_dbs), 1)
115
        self.assertIsNotNone(wf_ex_db.id)
116
        self.assertGreater(wf_ex_db.rev, 0)
117
        self.assertEqual(wf_ex_db.action_execution, str(ac_ex_db.id))
118
        self.assertEqual(wf_ex_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
119
120
        # Check context.
121
        self.assertIn('workflow_execution', lv_ac_db.context)
122
        self.assertEqual(lv_ac_db.context['workflow_execution'], str(wf_ex_db.id))
123
124
        # Check graph.
125
        self.assertIsNotNone(wf_ex_db.graph)
126
        self.assertTrue(isinstance(wf_ex_db.graph, dict))
127
        self.assertIn('nodes', wf_ex_db.graph)
128
        self.assertIn('adjacency', wf_ex_db.graph)
129
130
        # Check task flow.
131
        self.assertIsNotNone(wf_ex_db.flow)
132
        self.assertTrue(isinstance(wf_ex_db.flow, dict))
133
        self.assertIn('tasks', wf_ex_db.flow)
134
        self.assertIn('sequence', wf_ex_db.flow)
135
136
        # Check input.
137
        self.assertDictEqual(wf_ex_db.input, wf_input)
138
139
        # Assert task1 is already completed.
140
        query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task1'}
141
        tk1_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
142
        tk1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk1_ex_db.id))[0]
143
        tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction['id'])
144
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
145
146
        # Manually handle action execution completion.
147
        wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
148
149
        # Assert task1 succeeded and workflow is still running.
150
        tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id)
151
        self.assertEqual(tk1_ex_db.status, wf_states.SUCCEEDED)
152
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
153
        self.assertEqual(wf_ex_db.status, wf_states.RUNNING)
154
155
        # Assert task2 is already completed.
156
        query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task2'}
157
        tk2_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
158
        tk2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk2_ex_db.id))[0]
159
        tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_db.liveaction['id'])
160
        self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
161
162
        # Manually handle action execution completion.
163
        wf_svc.handle_action_execution_completion(tk2_ac_ex_db)
164
165
        # Assert task2 succeeded and workflow is still running.
166
        tk2_ex_db = wf_db_access.TaskExecution.get_by_id(tk2_ex_db.id)
167
        self.assertEqual(tk2_ex_db.status, wf_states.SUCCEEDED)
168
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
169
        self.assertEqual(wf_ex_db.status, wf_states.RUNNING)
170
171
        # Assert task3 is already completed.
172
        query_filters = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task3'}
173
        tk3_ex_db = wf_db_access.TaskExecution.query(**query_filters)[0]
174
        tk3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk3_ex_db.id))[0]
175
        tk3_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk3_ac_ex_db.liveaction['id'])
176
        self.assertEqual(tk3_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
177
178
        # Manually handle action execution completion.
179
        wf_svc.handle_action_execution_completion(tk3_ac_ex_db)
180
181
        # Assert task3 succeeded and workflow is completed.
182
        tk3_ex_db = wf_db_access.TaskExecution.get_by_id(tk3_ex_db.id)
183
        self.assertEqual(tk3_ex_db.status, wf_states.SUCCEEDED)
184
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
185
        self.assertEqual(wf_ex_db.status, wf_states.SUCCEEDED)
186
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
187
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
188
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
189
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
190
191
        # Check workflow output.
192
        expected_output = {'msg': '%s, All your base are belong to us!' % wf_input['who']}
193
194
        self.assertDictEqual(wf_ex_db.output, expected_output)
195
196
        # Check liveaction and action execution result.
197
        expected_result = {'output': expected_output}
198
199
        self.assertDictEqual(lv_ac_db.result, expected_result)
200
        self.assertDictEqual(ac_ex_db.result, expected_result)
201