Test Setup Failed
Pull Request — master (#4154)
by W
03:36
created

OrchestraRunnerCancelTest   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 183
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 183
rs 10
wmc 7

6 Methods

Rating   Name   Duplication   Size   Complexity  
A test_cancel() 0 14 1
A test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows() 0 55 1
B test_cancel_subworkflow_cascade_up_to_workflow() 0 37 1
B test_cancel_workflow_cascade_down_to_subworkflow() 0 32 1
A setUpClass() 0 15 2
A get_runner_class() 0 3 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
import st2tests
21
22
from oslo_config import cfg
23
24
# XXX: actionsensor import depends on config being setup.
25
import st2tests.config as tests_config
26
tests_config.parse_args()
27
28
from tests.unit import base
29
30
from st2common.bootstrap import actionsregistrar
31
from st2common.bootstrap import runnersregistrar
32
from st2common.constants import action as ac_const
33
from st2common.models.db import liveaction as lv_db_models
34
from st2common.persistence import execution as ex_db_access
35
from st2common.persistence import liveaction as lv_db_access
36
from st2common.persistence import workflow as wf_db_access
37
from st2common.runners import base as runners
38
from st2common.services import action as ac_svc
39
from st2common.services import workflows as wf_svc
40
from st2common.transport import liveaction as lv_ac_xport
41
from st2common.transport import workflow as wf_ex_xport
42
from st2common.transport import publishers
43
from st2tests.mocks import liveaction as mock_lv_ac_xport
44
from st2tests.mocks import workflow as mock_wf_ex_xport
45
46
47
TEST_FIXTURES = {
48
    'workflows': [
49
        'sequential.yaml',
50
        'subworkflow.yaml',
51
        'subworkflows.yaml'
52
    ],
53
    'actions': [
54
        'sequential.yaml',
55
        'subworkflow.yaml',
56
        'subworkflows.yaml'
57
    ]
58
}
59
60
TEST_PACK = 'orchestra_tests'
61
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
62
63
PACKS = [
64
    TEST_PACK_PATH,
65
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
66
]
67
68
69
@mock.patch.object(
70
    publishers.CUDPublisher,
71
    'publish_update',
72
    mock.MagicMock(return_value=None))
73
@mock.patch.object(
74
    lv_ac_xport.LiveActionPublisher,
75
    'publish_create',
76
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
77
@mock.patch.object(
78
    lv_ac_xport.LiveActionPublisher,
79
    'publish_state',
80
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
81
@mock.patch.object(
82
    wf_ex_xport.WorkflowExecutionPublisher,
83
    'publish_create',
84
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
85
@mock.patch.object(
86
    wf_ex_xport.WorkflowExecutionPublisher,
87
    'publish_state',
88
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
89
class OrchestraRunnerCancelTest(st2tests.DbTestCase):
90
91
    @classmethod
92
    def setUpClass(cls):
93
        super(OrchestraRunnerCancelTest, cls).setUpClass()
94
95
        # Register runners.
96
        runnersregistrar.register_runners()
97
98
        # Register test pack(s).
99
        actions_registrar = actionsregistrar.ActionsRegistrar(
100
            use_pack_cache=False,
101
            fail_on_failure=True
102
        )
103
104
        for pack in PACKS:
105
            actions_registrar.register_from_pack(pack)
106
107
    @classmethod
108
    def get_runner_class(cls, runner_name):
109
        return runners.get_runner(runner_name, runner_name).__class__
110
111
    @mock.patch.object(
112
        ac_svc, 'is_children_active',
113
        mock.MagicMock(return_value=True))
114
    def test_cancel(self):
115
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][0])
116
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
117
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
118
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
119
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
120
121
        requester = cfg.CONF.system_user.user
122
        lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
123
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
124
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
125
126
    def test_cancel_workflow_cascade_down_to_subworkflow(self):
127
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][1])
128
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
129
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
130
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
131
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
132
133
        # Identify the records for the subworkflow.
134
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
135
        self.assertEqual(len(wf_ex_dbs), 1)
136
137
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
138
        self.assertEqual(len(tk_ex_dbs), 1)
139
140
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
141
        self.assertEqual(len(tk_ac_ex_dbs), 1)
142
143
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
144
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
145
146
        # Cancel the main workflow.
147
        requester = cfg.CONF.system_user.user
148
        lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
149
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
150
151
        # Assert the subworkflow is canceled.
152
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
153
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
154
155
        # Assert the main workflow is canceled.
156
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
157
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
158
159
    def test_cancel_subworkflow_cascade_up_to_workflow(self):
160
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][1])
161
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
162
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
163
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
164
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
165
166
        # Identify the records for the subworkflow.
167
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
168
        self.assertEqual(len(wf_ex_dbs), 1)
169
170
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
171
        self.assertEqual(len(tk_ex_dbs), 1)
172
173
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
174
        self.assertEqual(len(tk_ac_ex_dbs), 1)
175
176
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
177
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
178
179
        # Cancel the subworkflow.
180
        requester = cfg.CONF.system_user.user
181
        tk_lv_ac_db, tk_ac_ex_db = ac_svc.request_cancellation(tk_lv_ac_db, requester)
182
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
183
184
        # Assert the subworkflow is canceled.
185
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
186
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
187
188
        # Manually handle action execution completion for the task.
189
        tk_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk_ac_ex_db.id))
190
        self.assertEqual(tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
191
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
192
193
        # Assert the main workflow is canceled.
194
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
195
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
196
197
    def test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows(self):
198
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][2])
199
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
200
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
201
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
202
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
203
204
        # Identify the records for the subworkflow.
205
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
206
        self.assertEqual(len(wf_ex_dbs), 1)
207
208
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
209
        self.assertEqual(len(tk_ex_dbs), 2)
210
211
        tk1_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
212
        self.assertEqual(len(tk1_ac_ex_dbs), 1)
213
214
        tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_dbs[0].liveaction['id'])
215
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
216
217
        tk2_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))
218
        self.assertEqual(len(tk2_ac_ex_dbs), 1)
219
220
        tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_dbs[0].liveaction['id'])
221
        self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
222
223
        # Cancel the subworkflow which should cascade up to the root.
224
        requester = cfg.CONF.system_user.user
225
        tk1_lv_ac_db, tk1_ac_ex_db = ac_svc.request_cancellation(tk1_lv_ac_db, requester)
226
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
227
228
        # Assert the main workflow is canceling.
229
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
230
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
231
232
        # Assert both subworkflows are canceled.
233
        tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk1_lv_ac_db.id))
234
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
235
        tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk2_lv_ac_db.id))
236
        self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
237
238
        # Manually handle action execution completion for one of the tasks.
239
        tk1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk1_ac_ex_db.id))
240
        self.assertEqual(tk1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
241
        wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
242
243
        # Manually handle action execution completion for the other task.
244
        tk2_ac_ex_db = tk2_ac_ex_dbs[0]
245
        tk2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk2_ac_ex_db.id))
246
        self.assertEqual(tk2_ac_ex_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
247
        wf_svc.handle_action_execution_completion(tk2_ac_ex_db)
248
249
        # Assert the main workflow is canceling.
250
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
251
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
252