Test Setup Failed
Pull Request — master (#4154)
by W
03:25
created

OrchestraRunnerPauseResumeTest   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 771
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 771
rs 9.7391
c 1
b 0
f 0
wmc 14

13 Methods

Rating   Name   Duplication   Size   Complexity  
A test_pause_with_active_children() 0 13 1
A test_resume_cascade_to_subworkflow() 0 67 1
B test_resume() 0 39 1
A test_pause() 0 13 1
B test_pause_subworkflow_not_cascade_up_to_workflow() 0 27 1
A get_runner_class() 0 3 1
A test_pause_workflow_cascade_down_to_subworkflow() 0 58 1
B test_pause_subworkflow_while_another_subworkflow_completed() 0 79 1
B test_pause_subworkflow_while_another_subworkflow_running() 0 79 1
A setUpClass() 0 15 2
B test_resume_from_subworkflow_when_parent_is_running() 0 117 1
B test_resume_from_subworkflow_when_parent_is_paused() 0 117 1
B test_resume_from_each_subworkflow_when_parent_is_paused() 0 110 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
import st2tests
21
22
from orchestra import states as wf_states
23
from oslo_config import cfg
24
25
# XXX: actionsensor import depends on config being setup.
26
import st2tests.config as tests_config
27
tests_config.parse_args()
28
29
from tests.unit import base
30
31
from st2actions.notifier import notifier
32
from st2common.bootstrap import actionsregistrar
33
from st2common.bootstrap import runnersregistrar
34
from st2common.constants import action as ac_const
35
from st2common.models.db import liveaction as lv_db_models
36
from st2common.persistence import execution as ex_db_access
37
from st2common.persistence import liveaction as lv_db_access
38
from st2common.persistence import workflow as wf_db_access
39
from st2common.runners import base as runners
40
from st2common.services import action as ac_svc
41
from st2common.services import workflows as wf_svc
42
from st2common.transport import liveaction as lv_ac_xport
43
from st2common.transport import workflow as wf_ex_xport
44
from st2common.transport import publishers
45
from st2tests.mocks import liveaction as mock_lv_ac_xport
46
from st2tests.mocks import workflow as mock_wf_ex_xport
47
48
49
TEST_PACK = 'orchestra_tests'
50
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
51
52
PACKS = [
53
    TEST_PACK_PATH,
54
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
55
]
56
57
58
@mock.patch.object(
59
    publishers.CUDPublisher,
60
    'publish_update',
61
    mock.MagicMock(return_value=None))
62
@mock.patch.object(
63
    lv_ac_xport.LiveActionPublisher,
64
    'publish_create',
65
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
66
@mock.patch.object(
67
    lv_ac_xport.LiveActionPublisher,
68
    'publish_state',
69
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
70
@mock.patch.object(
71
    wf_ex_xport.WorkflowExecutionPublisher,
72
    'publish_create',
73
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
74
@mock.patch.object(
75
    wf_ex_xport.WorkflowExecutionPublisher,
76
    'publish_state',
77
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
78
class OrchestraRunnerPauseResumeTest(st2tests.DbTestCase):
79
80
    @classmethod
81
    def setUpClass(cls):
82
        super(OrchestraRunnerPauseResumeTest, cls).setUpClass()
83
84
        # Register runners.
85
        runnersregistrar.register_runners()
86
87
        # Register test pack(s).
88
        actions_registrar = actionsregistrar.ActionsRegistrar(
89
            use_pack_cache=False,
90
            fail_on_failure=True
91
        )
92
93
        for pack in PACKS:
94
            actions_registrar.register_from_pack(pack)
95
96
    @classmethod
97
    def get_runner_class(cls, runner_name):
98
        return runners.get_runner(runner_name, runner_name).__class__
99
100
    @mock.patch.object(
101
        ac_svc, 'is_children_active',
102
        mock.MagicMock(return_value=False))
103
    def test_pause(self):
104
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
105
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
106
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
107
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
108
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
109
110
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
111
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
112
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
113
114
    @mock.patch.object(
115
        ac_svc, 'is_children_active',
116
        mock.MagicMock(return_value=True))
117
    def test_pause_with_active_children(self):
118
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
119
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
120
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
121
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
122
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
123
124
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
125
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
126
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
127
128
    def test_pause_subworkflow_not_cascade_up_to_workflow(self):
129
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflow.yaml')
130
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
131
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
132
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
133
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
134
135
        # Identify the records for the subworkflow.
136
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
137
        self.assertEqual(len(wf_ex_dbs), 1)
138
139
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
140
        self.assertEqual(len(tk_ex_dbs), 1)
141
142
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
143
        self.assertEqual(len(tk_ac_ex_dbs), 1)
144
145
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
146
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
147
148
        # Pause the subworkflow.
149
        tk_lv_ac_db, tk_ac_ex_db = ac_svc.request_pause(tk_lv_ac_db, cfg.CONF.system_user.user)
150
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
151
152
        # Assert the main workflow is still running.
153
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
154
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
155
156
    def test_pause_workflow_cascade_down_to_subworkflow(self):
157
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflow.yaml')
158
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
159
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
160
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
161
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
162
163
        # Identify the records for the main workflow.
164
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
165
        self.assertEqual(len(wf_ex_dbs), 1)
166
167
        wf_ex_db = wf_ex_dbs[0]
168
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
169
        self.assertEqual(len(tk_ex_dbs), 1)
170
171
        tk_ex_db = tk_ex_dbs[0]
172
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))
173
        self.assertEqual(len(tk_ac_ex_dbs), 1)
174
175
        tk_ac_ex_db = tk_ac_ex_dbs[0]
176
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
177
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
178
179
        # Identify the records for the subworkflow.
180
        sub_wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(tk_ac_ex_db.id))
181
        self.assertEqual(len(sub_wf_ex_dbs), 1)
182
183
        sub_wf_ex_db = sub_wf_ex_dbs[0]
184
        sub_tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(sub_wf_ex_db.id))
185
        self.assertEqual(len(sub_tk_ex_dbs), 1)
186
187
        sub_tk_ex_db = sub_tk_ex_dbs[0]
188
        sub_tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(sub_tk_ex_db.id))
189
        self.assertEqual(len(sub_tk_ac_ex_dbs), 1)
190
191
        # Pause the main workflow and assert it is pausing because subworkflow is still running.
192
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
193
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
194
195
        # Assert the subworkflow is pausing.
196
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
197
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
198
199
        # Manually handle action execution completion for the task in the subworkflow.
200
        sub_tk_ac_ex_db = sub_tk_ac_ex_dbs[0]
201
        self.assertEqual(sub_tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
202
        notifier.get_notifier().process(sub_tk_ac_ex_db)
203
204
        # Assert the subworkflow is paused and manually process the execution update.
205
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
206
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
207
        tk_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk_ac_ex_db.id))
208
        self.assertEqual(tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
209
        notifier.get_notifier().process(tk_ac_ex_db)
210
211
        # Assert the main workflow is paused.
212
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
213
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
214
215
    def test_pause_subworkflow_while_another_subworkflow_running(self):
216
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflows.yaml')
217
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
218
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
219
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
220
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
221
222
        # Identify the records for the main workflow.
223
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
224
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
225
        self.assertEqual(len(tk_ex_dbs), 2)
226
227
        # Identify the records for the subworkflows.
228
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
229
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
230
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
231
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
232
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
233
234
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
235
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
236
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
237
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
238
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
239
240
        # Pause the subworkflow.
241
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
242
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
243
244
        # Assert the main workflow is still running.
245
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
246
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
247
248
        # Assert the other subworkflow is still running.
249
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
250
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
251
252
        # Manually notify action execution completion for the task in the subworkflow.
253
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
254
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
255
        notifier.get_notifier().process(t1_t1_ac_ex_db)
256
257
        # Assert the subworkflow is paused and manually notify the paused of the
258
        # corresponding action execution in the main workflow.
259
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
260
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
261
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
262
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
263
        notifier.get_notifier().process(t1_ac_ex_db)
264
265
        # Assert the main workflow is still running.
266
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
267
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
268
269
        # Assert the other subworkflow is still running.
270
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
271
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
272
273
        # Manually notify action execution completion for the tasks in the other subworkflow.
274
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
275
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
276
        notifier.get_notifier().process(t2_t1_ac_ex_db)
277
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
278
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
279
        notifier.get_notifier().process(t2_t2_ac_ex_db)
280
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
281
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
282
        notifier.get_notifier().process(t2_t3_ac_ex_db)
283
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
284
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
285
286
        # Assert this other subworkflow is completed and manually notify the
287
        # completion to the corresponding action execution in the main workflow.
288
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
289
        notifier.get_notifier().process(t2_ac_ex_db)
290
291
        # Assert the main workflow is paused because no other tasks is running.
292
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
293
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
294
295
    def test_pause_subworkflow_while_another_subworkflow_completed(self):
296
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflows.yaml')
297
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
298
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
299
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
300
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
301
302
        # Identify the records for the main workflow.
303
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
304
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
305
        self.assertEqual(len(tk_ex_dbs), 2)
306
307
        # Identify the records for the subworkflows.
308
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
309
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
310
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
311
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
312
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
313
314
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
315
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
316
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
317
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
318
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
319
320
        # Pause the subworkflow.
321
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
322
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
323
324
        # Assert the main workflow is still running.
325
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
326
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
327
328
        # Assert the other subworkflow is still running.
329
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
330
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
331
332
        # Manually notify action execution completion for the tasks in the other subworkflow.
333
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
334
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
335
        notifier.get_notifier().process(t2_t1_ac_ex_db)
336
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
337
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
338
        notifier.get_notifier().process(t2_t2_ac_ex_db)
339
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
340
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
341
        notifier.get_notifier().process(t2_t3_ac_ex_db)
342
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
343
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
344
345
        # Assert this other subworkflow is completed and manually notify the
346
        # completion to the corresponding action execution in the main workflow.
347
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
348
        notifier.get_notifier().process(t2_ac_ex_db)
349
350
        # Assert the main workflow is still running.
351
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
352
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
353
354
        # Assert the target subworkflow is still pausing.
355
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
356
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
357
358
        # Manually notify action execution completion for the task in the subworkflow.
359
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
360
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
361
        notifier.get_notifier().process(t1_t1_ac_ex_db)
362
363
        # Assert the subworkflow is paused and manually notify the paused of the
364
        # corresponding action execution in the main workflow.
365
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
366
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
367
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
368
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
369
        notifier.get_notifier().process(t1_ac_ex_db)
370
371
        # Assert the main workflow is paused because no other tasks is running.
372
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
373
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
374
375
    @mock.patch.object(
376
        ac_svc, 'is_children_active',
377
        mock.MagicMock(return_value=False))
378
    def test_resume(self):
379
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'sequential.yaml')
380
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
381
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
382
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
383
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
384
385
        # Pause the workflow.
386
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
387
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
388
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
389
390
        # Identify the records for the running task(s) and manually complete it.
391
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
392
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
393
        self.assertEqual(len(tk_ex_dbs), 1)
394
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
395
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
396
        self.assertEqual(tk_ac_ex_dbs[0].status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
397
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
398
        wf_svc.handle_action_execution_completion(tk_ac_ex_dbs[0])
399
400
        # Ensure the workflow is paused.
401
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
402
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED, lv_ac_db.result)
403
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
404
        self.assertEqual(wf_ex_dbs[0].status, wf_states.PAUSED)
405
406
        # Resume the workflow.
407
        lv_ac_db, ac_ex_db = ac_svc.request_resume(lv_ac_db, cfg.CONF.system_user.user)
408
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
409
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
410
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
411
        self.assertEqual(wf_ex_dbs[0].status, wf_states.RUNNING)
412
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
413
        self.assertEqual(len(tk_ex_dbs), 2)
414
415
    def test_resume_cascade_to_subworkflow(self):
416
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflow.yaml')
417
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
418
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
419
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
420
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
421
422
        # Identify the records for the main workflow.
423
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
424
        self.assertEqual(len(wf_ex_dbs), 1)
425
426
        wf_ex_db = wf_ex_dbs[0]
427
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
428
        self.assertEqual(len(tk_ex_dbs), 1)
429
430
        tk_ex_db = tk_ex_dbs[0]
431
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))
432
        self.assertEqual(len(tk_ac_ex_dbs), 1)
433
434
        tk_ac_ex_db = tk_ac_ex_dbs[0]
435
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
436
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
437
438
        # Identify the records for the subworkflow.
439
        sub_wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(tk_ac_ex_db.id))
440
        self.assertEqual(len(sub_wf_ex_dbs), 1)
441
442
        sub_wf_ex_db = sub_wf_ex_dbs[0]
443
        sub_tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(sub_wf_ex_db.id))
444
        self.assertEqual(len(sub_tk_ex_dbs), 1)
445
446
        sub_tk_ex_db = sub_tk_ex_dbs[0]
447
        sub_tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(sub_tk_ex_db.id))
448
        self.assertEqual(len(sub_tk_ac_ex_dbs), 1)
449
450
        # Pause the main workflow and assert it is pausing because subworkflow is still running.
451
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
452
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
453
454
        # Assert the subworkflow is pausing.
455
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
456
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
457
458
        # Manually handle action execution completion for the task in the subworkflow.
459
        sub_tk_ac_ex_db = sub_tk_ac_ex_dbs[0]
460
        self.assertEqual(sub_tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
461
        notifier.get_notifier().process(sub_tk_ac_ex_db)
462
463
        # Assert the subworkflow is paused and manually process the execution update.
464
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
465
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
466
        tk_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk_ac_ex_db.id))
467
        self.assertEqual(tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
468
        notifier.get_notifier().process(tk_ac_ex_db)
469
470
        # Assert the main workflow is paused.
471
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
472
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
473
474
        # Resume the main workflow and assert it is running.
475
        lv_ac_db, ac_ex_db = ac_svc.request_resume(lv_ac_db, cfg.CONF.system_user.user)
476
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
477
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
478
479
        # Assert the subworkflow is running.
480
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
481
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
482
483
    def test_resume_from_each_subworkflow_when_parent_is_paused(self):
484
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflows.yaml')
485
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
486
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
487
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
488
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
489
490
        # Identify the records for the main workflow.
491
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
492
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
493
        self.assertEqual(len(tk_ex_dbs), 2)
494
495
        # Identify the records for the subworkflows.
496
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
497
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
498
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
499
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
500
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
501
502
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
503
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
504
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
505
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
506
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
507
508
        # Pause one of the subworkflows.
509
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
510
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
511
512
        # Assert the main workflow is still running.
513
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
514
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
515
516
        # Assert the other subworkflow is still running.
517
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
518
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
519
520
        # Manually notify action execution completion for the task in the subworkflow.
521
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
522
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
523
        notifier.get_notifier().process(t1_t1_ac_ex_db)
524
525
        # Assert the subworkflow is paused and manually notify the paused of the
526
        # corresponding action execution in the main workflow.
527
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
528
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
529
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
530
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
531
        notifier.get_notifier().process(t1_ac_ex_db)
532
533
        # Assert the main workflow is still running.
534
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
535
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
536
537
        # Assert the other subworkflow is still running.
538
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
539
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
540
541
        # Pause the other subworkflow.
542
        t2_lv_ac_db, t2_ac_ex_db = ac_svc.request_pause(t2_lv_ac_db, cfg.CONF.system_user.user)
543
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
544
545
        # Assert the main workflow is still running.
546
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
547
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
548
549
        # Manually notify action execution completion for the task in the subworkflow.
550
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
551
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
552
        notifier.get_notifier().process(t2_t1_ac_ex_db)
553
554
        # Assert the subworkflow is paused and manually notify the paused of the
555
        # corresponding action execution in the main workflow.
556
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
557
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
558
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
559
        self.assertEqual(t2_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
560
        notifier.get_notifier().process(t2_ac_ex_db)
561
562
        # Assert the main workflow is paused
563
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
564
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
565
566
        # Resume the subworkflow and assert it is running.
567
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_resume(t1_lv_ac_db, cfg.CONF.system_user.user)
568
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
569
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
570
571
        # Assert the main workflow is running.
572
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
573
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
574
575
        # Manually notify action execution completion for the tasks in the subworkflow.
576
        t1_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[1]
577
        t1_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t2_ex_db.id))[0]
578
        notifier.get_notifier().process(t1_t2_ac_ex_db)
579
        t1_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[2]
580
        t1_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t3_ex_db.id))[0]
581
        notifier.get_notifier().process(t1_t3_ac_ex_db)
582
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
583
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
584
585
        # Assert the subworkflow is completed and manually notify the
586
        # completion to the corresponding action execution in the main workflow.
587
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
588
        notifier.get_notifier().process(t1_ac_ex_db)
589
590
        # Assert the main workflow is back to paused
591
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
592
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
593
594
    def test_resume_from_subworkflow_when_parent_is_paused(self):
595
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflows.yaml')
596
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
597
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
598
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
599
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
600
601
        # Identify the records for the main workflow.
602
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
603
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
604
        self.assertEqual(len(tk_ex_dbs), 2)
605
606
        # Identify the records for the subworkflows.
607
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
608
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
609
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
610
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
611
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
612
613
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
614
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
615
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
616
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
617
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
618
619
        # Pause the subworkflow.
620
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
621
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
622
623
        # Assert the main workflow is still running.
624
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
625
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
626
627
        # Assert the other subworkflow is still running.
628
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
629
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
630
631
        # Manually notify action execution completion for the task in the subworkflow.
632
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
633
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
634
        notifier.get_notifier().process(t1_t1_ac_ex_db)
635
636
        # Assert the subworkflow is paused and manually notify the paused of the
637
        # corresponding action execution in the main workflow.
638
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
639
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
640
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
641
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
642
        notifier.get_notifier().process(t1_ac_ex_db)
643
644
        # Assert the main workflow is still running.
645
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
646
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
647
648
        # Assert the other subworkflow is still running.
649
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
650
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
651
652
        # Manually notify action execution completion for the tasks in the other subworkflow.
653
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
654
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
655
        notifier.get_notifier().process(t2_t1_ac_ex_db)
656
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
657
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
658
        notifier.get_notifier().process(t2_t2_ac_ex_db)
659
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
660
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
661
        notifier.get_notifier().process(t2_t3_ac_ex_db)
662
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
663
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
664
665
        # Assert this other subworkflow is completed and manually notify the
666
        # completion to the corresponding action execution in the main workflow.
667
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
668
        notifier.get_notifier().process(t2_ac_ex_db)
669
670
        # Assert the main workflow is paused because no other tasks is running.
671
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
672
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
673
674
        # Resume the subworkflow and assert it is running.
675
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_resume(t1_lv_ac_db, cfg.CONF.system_user.user)
676
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
677
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
678
679
        # Assert the main workflow is running.
680
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
681
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
682
683
        # Manually notify action execution completion for the tasks in the subworkflow.
684
        t1_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[1]
685
        t1_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t2_ex_db.id))[0]
686
        notifier.get_notifier().process(t1_t2_ac_ex_db)
687
        t1_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[2]
688
        t1_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t3_ex_db.id))[0]
689
        notifier.get_notifier().process(t1_t3_ac_ex_db)
690
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
691
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
692
693
        # Assert the subworkflow is completed and manually notify the
694
        # completion to the corresponding action execution in the main workflow.
695
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
696
        notifier.get_notifier().process(t1_ac_ex_db)
697
698
        # Assert task3 has started and completed.
699
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
700
        self.assertEqual(len(tk_ex_dbs), 3)
701
        t3_ex_db_qry = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task3'}
702
        t3_ex_db = wf_db_access.TaskExecution.query(**t3_ex_db_qry)[0]
703
        t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t3_ex_db.id))[0]
704
        t3_lv_ac_db = lv_db_access.LiveAction.get_by_id(t3_ac_ex_db.liveaction['id'])
705
        self.assertEqual(t3_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
706
        wf_svc.handle_action_execution_completion(t3_ac_ex_db)
707
708
        # Asser the main workflow is completed.
709
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
710
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
711
712
    def test_resume_from_subworkflow_when_parent_is_running(self):
713
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, 'subworkflows.yaml')
714
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
715
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
716
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
717
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
718
719
        # Identify the records for the main workflow.
720
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
721
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
722
        self.assertEqual(len(tk_ex_dbs), 2)
723
724
        # Identify the records for the subworkflows.
725
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
726
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
727
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
728
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
729
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
730
731
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
732
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
733
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
734
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
735
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
736
737
        # Pause the subworkflow.
738
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
739
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
740
741
        # Assert the main workflow is still running.
742
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
743
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
744
745
        # Assert the other subworkflow is still running.
746
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
747
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
748
749
        # Manually notify action execution completion for the task in the subworkflow.
750
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
751
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
752
        notifier.get_notifier().process(t1_t1_ac_ex_db)
753
754
        # Assert the subworkflow is paused and manually notify the paused of the
755
        # corresponding action execution in the main workflow.
756
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
757
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
758
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
759
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
760
        notifier.get_notifier().process(t1_ac_ex_db)
761
762
        # Assert the main workflow is still running.
763
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
764
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
765
766
        # Assert the other subworkflow is still running.
767
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
768
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
769
770
        # Resume the subworkflow and assert it is running.
771
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_resume(t1_lv_ac_db, cfg.CONF.system_user.user)
772
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
773
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
774
775
        # Assert the main workflow is running.
776
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
777
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
778
779
        # Assert the other subworkflow is still running.
780
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
781
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
782
783
        # Manually notify action execution completion for the tasks in the subworkflow.
784
        t1_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[1]
785
        t1_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t2_ex_db.id))[0]
786
        notifier.get_notifier().process(t1_t2_ac_ex_db)
787
        t1_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[2]
788
        t1_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t3_ex_db.id))[0]
789
        notifier.get_notifier().process(t1_t3_ac_ex_db)
790
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
791
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
792
793
        # Assert the subworkflow is completed and manually notify the
794
        # completion to the corresponding action execution in the main workflow.
795
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
796
        notifier.get_notifier().process(t1_ac_ex_db)
797
798
        # Manually notify action execution completion for the tasks in the other subworkflow.
799
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
800
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
801
        notifier.get_notifier().process(t2_t1_ac_ex_db)
802
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
803
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
804
        notifier.get_notifier().process(t2_t2_ac_ex_db)
805
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
806
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
807
        notifier.get_notifier().process(t2_t3_ac_ex_db)
808
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
809
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
810
811
        # Assert this other subworkflow is completed and manually notify the
812
        # completion to the corresponding action execution in the main workflow.
813
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
814
        notifier.get_notifier().process(t2_ac_ex_db)
815
816
        # Assert task3 has started and completed.
817
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
818
        self.assertEqual(len(tk_ex_dbs), 3)
819
        t3_ex_db_qry = {'workflow_execution': str(wf_ex_db.id), 'task_id': 'task3'}
820
        t3_ex_db = wf_db_access.TaskExecution.query(**t3_ex_db_qry)[0]
821
        t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t3_ex_db.id))[0]
822
        t3_lv_ac_db = lv_db_access.LiveAction.get_by_id(t3_ac_ex_db.liveaction['id'])
823
        self.assertEqual(t3_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
824
        wf_svc.handle_action_execution_completion(t3_ac_ex_db)
825
826
        # Assert the main workflow is completed.
827
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
828
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
829