Test Setup Failed
Pull Request — master (#4154)
by W
03:25
created

OrchestraRunnerCancelTest   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 183
Duplicated Lines 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 183
rs 10
wmc 7

6 Methods

Rating   Name   Duplication   Size   Complexity  
A test_cancel() 0 14 1
A test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows() 0 55 1
B test_cancel_subworkflow_cascade_up_to_workflow() 0 37 1
B test_cancel_workflow_cascade_down_to_subworkflow() 0 32 1
A setUpClass() 0 15 2
A get_runner_class() 0 3 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
import st2tests
21
22
from oslo_config import cfg
23
24
# XXX: actionsensor import depends on config being setup.
25
import st2tests.config as tests_config
26
tests_config.parse_args()
27
28
from tests.unit import base
29
30
from st2common.bootstrap import actionsregistrar
31
from st2common.bootstrap import runnersregistrar
32
from st2common.constants import action as ac_const
33
from st2common.models.db import liveaction as lv_db_models
34
from st2common.persistence import execution as ex_db_access
35
from st2common.persistence import liveaction as lv_db_access
36
from st2common.persistence import workflow as wf_db_access
37
from st2common.runners import base as runners
38
from st2common.services import action as ac_svc
39
from st2common.services import workflows as wf_svc
40
from st2common.transport import liveaction as lv_ac_xport
41
from st2common.transport import workflow as wf_ex_xport
42
from st2common.transport import publishers
43
from st2tests.mocks import liveaction as mock_lv_ac_xport
44
from st2tests.mocks import workflow as mock_wf_ex_xport
45
46
47
TEST_PACK = 'orchestra_tests'
48
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
49
50
PACKS = [
51
    TEST_PACK_PATH,
52
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
53
]
54
55
56
@mock.patch.object(
57
    publishers.CUDPublisher,
58
    'publish_update',
59
    mock.MagicMock(return_value=None))
60
@mock.patch.object(
61
    lv_ac_xport.LiveActionPublisher,
62
    'publish_create',
63
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
64
@mock.patch.object(
65
    lv_ac_xport.LiveActionPublisher,
66
    'publish_state',
67
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
68
@mock.patch.object(
69
    wf_ex_xport.WorkflowExecutionPublisher,
70
    'publish_create',
71
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
72
@mock.patch.object(
73
    wf_ex_xport.WorkflowExecutionPublisher,
74
    'publish_state',
75
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
76
class OrchestraRunnerCancelTest(st2tests.DbTestCase):
77
78
    @classmethod
79
    def setUpClass(cls):
80
        super(OrchestraRunnerCancelTest, cls).setUpClass()
81
82
        # Register runners.
83
        runnersregistrar.register_runners()
84
85
        # Register test pack(s).
86
        actions_registrar = actionsregistrar.ActionsRegistrar(
87
            use_pack_cache=False,
88
            fail_on_failure=True
89
        )
90
91
        for pack in PACKS:
92
            actions_registrar.register_from_pack(pack)
93
94
    @classmethod
95
    def get_runner_class(cls, runner_name):
96
        return runners.get_runner(runner_name, runner_name).__class__
97
98
    @mock.patch.object(
99
        ac_svc, 'is_children_active',
100
        mock.MagicMock(return_value=True))
101
    def test_cancel(self):
102
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
103
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
104
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
105
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
106
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
107
108
        requester = cfg.CONF.system_user.user
109
        lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
110
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
111
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
112
113
    def test_cancel_workflow_cascade_down_to_subworkflow(self):
114
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflow.yaml')
115
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
116
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
117
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
118
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
119
120
        # Identify the records for the subworkflow.
121
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
122
        self.assertEqual(len(wf_ex_dbs), 1)
123
124
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
125
        self.assertEqual(len(tk_ex_dbs), 1)
126
127
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
128
        self.assertEqual(len(tk_ac_ex_dbs), 1)
129
130
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
131
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
132
133
        # Cancel the main workflow.
134
        requester = cfg.CONF.system_user.user
135
        lv_ac_db, ac_ex_db = ac_svc.request_cancellation(lv_ac_db, requester)
136
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
137
138
        # Assert the subworkflow is canceled.
139
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
140
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
141
142
        # Assert the main workflow is canceled.
143
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
144
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
145
146
    def test_cancel_subworkflow_cascade_up_to_workflow(self):
147
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflow.yaml')
148
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
149
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
150
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
151
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
152
153
        # Identify the records for the subworkflow.
154
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
155
        self.assertEqual(len(wf_ex_dbs), 1)
156
157
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
158
        self.assertEqual(len(tk_ex_dbs), 1)
159
160
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
161
        self.assertEqual(len(tk_ac_ex_dbs), 1)
162
163
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
164
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
165
166
        # Cancel the subworkflow.
167
        requester = cfg.CONF.system_user.user
168
        tk_lv_ac_db, tk_ac_ex_db = ac_svc.request_cancellation(tk_lv_ac_db, requester)
169
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
170
171
        # Assert the subworkflow is canceled.
172
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
173
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
174
175
        # Manually handle action execution completion for the task.
176
        tk_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk_ac_ex_db.id))
177
        self.assertEqual(tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
178
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
179
180
        # Assert the main workflow is canceled.
181
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
182
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
183
184
    def test_cancel_subworkflow_cascade_up_to_workflow_with_other_subworkflows(self):
185
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflows.yaml')
186
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
187
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
188
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
189
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
190
191
        # Identify the records for the subworkflow.
192
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
193
        self.assertEqual(len(wf_ex_dbs), 1)
194
195
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
196
        self.assertEqual(len(tk_ex_dbs), 2)
197
198
        tk1_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
199
        self.assertEqual(len(tk1_ac_ex_dbs), 1)
200
201
        tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_dbs[0].liveaction['id'])
202
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
203
204
        tk2_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))
205
        self.assertEqual(len(tk2_ac_ex_dbs), 1)
206
207
        tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk2_ac_ex_dbs[0].liveaction['id'])
208
        self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
209
210
        # Cancel the subworkflow which should cascade up to the root.
211
        requester = cfg.CONF.system_user.user
212
        tk1_lv_ac_db, tk1_ac_ex_db = ac_svc.request_cancellation(tk1_lv_ac_db, requester)
213
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
214
215
        # Assert the main workflow is canceling.
216
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
217
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELING)
218
219
        # Assert both subworkflows are canceled.
220
        tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk1_lv_ac_db.id))
221
        self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
222
        tk2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk2_lv_ac_db.id))
223
        self.assertEqual(tk2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
224
225
        # Manually handle action execution completion for one of the tasks.
226
        tk1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk1_ac_ex_db.id))
227
        self.assertEqual(tk1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
228
        wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
229
230
        # Manually handle action execution completion for the other task.
231
        tk2_ac_ex_db = tk2_ac_ex_dbs[0]
232
        tk2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk2_ac_ex_db.id))
233
        self.assertEqual(tk2_ac_ex_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
234
        wf_svc.handle_action_execution_completion(tk2_ac_ex_db)
235
236
        # Assert the main workflow is canceling.
237
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
238
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_CANCELED)
239