Test Setup Failed
Pull Request — master (#4154)
by W
03:36
created

OrchestraRunnerPauseResumeTest   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 640
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 640
rs 9.8867
wmc 13

12 Methods

Rating   Name   Duplication   Size   Complexity  
A test_pause_with_active_children() 0 13 1
B test_resume_from_subworkflow_when_parent_is_running() 0 107 1
A test_resume_cascade_to_subworkflow() 0 67 1
B test_resume() 0 39 1
A test_pause() 0 13 1
B test_pause_subworkflow_not_cascade_up_to_workflow() 0 27 1
A get_runner_class() 0 3 1
A test_pause_workflow_cascade_down_to_subworkflow() 0 58 1
B test_pause_subworkflow_while_another_subworkflow_completed() 0 79 1
B test_resume_from_subworkflow_when_parent_is_paused() 0 107 1
B test_pause_subworkflow_while_another_subworkflow_running() 0 79 1
A setUpClass() 0 15 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
import st2tests
21
22
from orchestra import states as wf_states
23
from oslo_config import cfg
24
25
# XXX: actionsensor import depends on config being setup.
26
import st2tests.config as tests_config
27
tests_config.parse_args()
28
29
from tests.unit import base
30
31
from st2actions.notifier import notifier
32
from st2common.bootstrap import actionsregistrar
33
from st2common.bootstrap import runnersregistrar
34
from st2common.constants import action as ac_const
35
from st2common.models.db import liveaction as lv_db_models
36
from st2common.persistence import execution as ex_db_access
37
from st2common.persistence import liveaction as lv_db_access
38
from st2common.persistence import workflow as wf_db_access
39
from st2common.runners import base as runners
40
from st2common.services import action as ac_svc
41
from st2common.services import workflows as wf_svc
42
from st2common.transport import liveaction as lv_ac_xport
43
from st2common.transport import workflow as wf_ex_xport
44
from st2common.transport import publishers
45
from st2tests.mocks import liveaction as mock_lv_ac_xport
46
from st2tests.mocks import workflow as mock_wf_ex_xport
47
48
49
TEST_FIXTURES = {
50
    'workflows': [
51
        'sequential.yaml',
52
        'subworkflow.yaml',
53
        'subworkflows.yaml'
54
    ],
55
    'actions': [
56
        'sequential.yaml',
57
        'subworkflow.yaml',
58
        'subworkflows.yaml'
59
    ]
60
}
61
62
TEST_PACK = 'orchestra_tests'
63
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
64
65
PACKS = [
66
    TEST_PACK_PATH,
67
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
68
]
69
70
71
@mock.patch.object(
72
    publishers.CUDPublisher,
73
    'publish_update',
74
    mock.MagicMock(return_value=None))
75
@mock.patch.object(
76
    lv_ac_xport.LiveActionPublisher,
77
    'publish_create',
78
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
79
@mock.patch.object(
80
    lv_ac_xport.LiveActionPublisher,
81
    'publish_state',
82
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
83
@mock.patch.object(
84
    wf_ex_xport.WorkflowExecutionPublisher,
85
    'publish_create',
86
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
87
@mock.patch.object(
88
    wf_ex_xport.WorkflowExecutionPublisher,
89
    'publish_state',
90
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
91
class OrchestraRunnerPauseResumeTest(st2tests.DbTestCase):
92
93
    @classmethod
94
    def setUpClass(cls):
95
        super(OrchestraRunnerPauseResumeTest, cls).setUpClass()
96
97
        # Register runners.
98
        runnersregistrar.register_runners()
99
100
        # Register test pack(s).
101
        actions_registrar = actionsregistrar.ActionsRegistrar(
102
            use_pack_cache=False,
103
            fail_on_failure=True
104
        )
105
106
        for pack in PACKS:
107
            actions_registrar.register_from_pack(pack)
108
109
    @classmethod
110
    def get_runner_class(cls, runner_name):
111
        return runners.get_runner(runner_name, runner_name).__class__
112
113
    @mock.patch.object(
114
        ac_svc, 'is_children_active',
115
        mock.MagicMock(return_value=False))
116
    def test_pause(self):
117
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][0])
118
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
119
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
120
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
121
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
122
123
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
124
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
125
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
126
127
    @mock.patch.object(
128
        ac_svc, 'is_children_active',
129
        mock.MagicMock(return_value=True))
130
    def test_pause_with_active_children(self):
131
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][0])
132
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
133
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
134
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
135
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
136
137
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
138
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
139
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
140
141
    def test_pause_subworkflow_not_cascade_up_to_workflow(self):
142
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][1])
143
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
144
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
145
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
146
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
147
148
        # Identify the records for the subworkflow.
149
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
150
        self.assertEqual(len(wf_ex_dbs), 1)
151
152
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
153
        self.assertEqual(len(tk_ex_dbs), 1)
154
155
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
156
        self.assertEqual(len(tk_ac_ex_dbs), 1)
157
158
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
159
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
160
161
        # Pause the subworkflow.
162
        tk_lv_ac_db, tk_ac_ex_db = ac_svc.request_pause(tk_lv_ac_db, cfg.CONF.system_user.user)
163
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
164
165
        # Assert the main workflow is still running.
166
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
167
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
168
169
    def test_pause_workflow_cascade_down_to_subworkflow(self):
170
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][1])
171
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
172
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
173
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
174
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
175
176
        # Identify the records for the main workflow.
177
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
178
        self.assertEqual(len(wf_ex_dbs), 1)
179
180
        wf_ex_db = wf_ex_dbs[0]
181
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
182
        self.assertEqual(len(tk_ex_dbs), 1)
183
184
        tk_ex_db = tk_ex_dbs[0]
185
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))
186
        self.assertEqual(len(tk_ac_ex_dbs), 1)
187
188
        tk_ac_ex_db = tk_ac_ex_dbs[0]
189
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
190
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
191
192
        # Identify the records for the subworkflow.
193
        sub_wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(tk_ac_ex_db.id))
194
        self.assertEqual(len(sub_wf_ex_dbs), 1)
195
196
        sub_wf_ex_db = sub_wf_ex_dbs[0]
197
        sub_tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(sub_wf_ex_db.id))
198
        self.assertEqual(len(sub_tk_ex_dbs), 1)
199
200
        sub_tk_ex_db = sub_tk_ex_dbs[0]
201
        sub_tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(sub_tk_ex_db.id))
202
        self.assertEqual(len(sub_tk_ac_ex_dbs), 1)
203
204
        # Pause the main workflow and assert it is pausing because subworkflow is still running.
205
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
206
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
207
208
        # Assert the subworkflow is pausing.
209
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
210
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
211
212
        # Manually handle action execution completion for the task in the subworkflow.
213
        sub_tk_ac_ex_db = sub_tk_ac_ex_dbs[0]
214
        self.assertEqual(sub_tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
215
        notifier.get_notifier().process(sub_tk_ac_ex_db)
216
217
        # Assert the subworkflow is paused and manually process the execution update.
218
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
219
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
220
        tk_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk_ac_ex_db.id))
221
        self.assertEqual(tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
222
        notifier.get_notifier().process(tk_ac_ex_db)
223
224
        # Assert the main workflow is paused.
225
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
226
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
227
228
    def test_pause_subworkflow_while_another_subworkflow_running(self):
229
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][2])
230
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
231
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
232
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
233
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
234
235
        # Identify the records for the main workflow.
236
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
237
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
238
        self.assertEqual(len(tk_ex_dbs), 2)
239
240
        # Identify the records for the subworkflows.
241
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
242
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
243
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
244
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
245
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
246
247
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
248
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
249
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
250
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
251
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
252
253
        # Pause the subworkflow.
254
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
255
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
256
257
        # Assert the main workflow is still running.
258
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
259
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
260
261
        # Assert the other subworkflow is still running.
262
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
263
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
264
265
        # Manually notify action execution completion for the task in the subworkflow.
266
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
267
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
268
        notifier.get_notifier().process(t1_t1_ac_ex_db)
269
270
        # Assert the subworkflow is paused and manually notify the paused of the
271
        # corresponding action execution in the main workflow.
272
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
273
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
274
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
275
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
276
        notifier.get_notifier().process(t1_ac_ex_db)
277
278
        # Assert the main workflow is still running.
279
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
280
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
281
282
        # Assert the other subworkflow is still running.
283
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
284
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
285
286
        # Manually notify action execution completion for the tasks in the other subworkflow.
287
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
288
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
289
        notifier.get_notifier().process(t2_t1_ac_ex_db)
290
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
291
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
292
        notifier.get_notifier().process(t2_t2_ac_ex_db)
293
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
294
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
295
        notifier.get_notifier().process(t2_t3_ac_ex_db)
296
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
297
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
298
299
        # Assert this other subworkflow is completed and manually notify the
300
        # completion to the corresponding action execution in the main workflow.
301
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
302
        notifier.get_notifier().process(t2_ac_ex_db)
303
304
        # Assert the main workflow is paused because no other tasks is running.
305
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
306
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
307
308
    def test_pause_subworkflow_while_another_subworkflow_completed(self):
309
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][2])
310
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
311
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
312
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
313
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
314
315
        # Identify the records for the main workflow.
316
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
317
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
318
        self.assertEqual(len(tk_ex_dbs), 2)
319
320
        # Identify the records for the subworkflows.
321
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
322
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
323
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
324
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
325
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
326
327
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
328
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
329
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
330
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
331
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
332
333
        # Pause the subworkflow.
334
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
335
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
336
337
        # Assert the main workflow is still running.
338
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
339
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
340
341
        # Assert the other subworkflow is still running.
342
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
343
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
344
345
        # Manually notify action execution completion for the tasks in the other subworkflow.
346
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
347
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
348
        notifier.get_notifier().process(t2_t1_ac_ex_db)
349
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
350
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
351
        notifier.get_notifier().process(t2_t2_ac_ex_db)
352
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
353
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
354
        notifier.get_notifier().process(t2_t3_ac_ex_db)
355
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
356
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
357
358
        # Assert this other subworkflow is completed and manually notify the
359
        # completion to the corresponding action execution in the main workflow.
360
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
361
        notifier.get_notifier().process(t2_ac_ex_db)
362
363
        # Assert the main workflow is pausing.
364
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
365
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
366
367
        # Assert the target subworkflow is still pausing.
368
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
369
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
370
371
        # Manually notify action execution completion for the task in the subworkflow.
372
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
373
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
374
        notifier.get_notifier().process(t1_t1_ac_ex_db)
375
376
        # Assert the subworkflow is paused and manually notify the paused of the
377
        # corresponding action execution in the main workflow.
378
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
379
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
380
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
381
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
382
        notifier.get_notifier().process(t1_ac_ex_db)
383
384
        # Assert the main workflow is paused because no other tasks is running.
385
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
386
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
387
388
    @mock.patch.object(
389
        ac_svc, 'is_children_active',
390
        mock.MagicMock(return_value=False))
391
    def test_resume(self):
392
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][0])
393
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
394
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
395
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
396
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
397
398
        # Pause the workflow.
399
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
400
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
401
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
402
403
        # Identify the records for the running task(s) and manually complete it.
404
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
405
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
406
        self.assertEqual(len(tk_ex_dbs), 1)
407
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))
408
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_dbs[0].liveaction['id'])
409
        self.assertEqual(tk_ac_ex_dbs[0].status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
410
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
411
        wf_svc.handle_action_execution_completion(tk_ac_ex_dbs[0])
412
413
        # Ensure the workflow is paused.
414
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
415
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED, lv_ac_db.result)
416
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
417
        self.assertEqual(wf_ex_dbs[0].status, wf_states.PAUSED)
418
419
        # Resume the workflow.
420
        lv_ac_db, ac_ex_db = ac_svc.request_resume(lv_ac_db, cfg.CONF.system_user.user)
421
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
422
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
423
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
424
        self.assertEqual(wf_ex_dbs[0].status, wf_states.RUNNING)
425
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_dbs[0].id))
426
        self.assertEqual(len(tk_ex_dbs), 2)
427
428
    def test_resume_cascade_to_subworkflow(self):
429
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][1])
430
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
431
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
432
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
433
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
434
435
        # Identify the records for the main workflow.
436
        wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))
437
        self.assertEqual(len(wf_ex_dbs), 1)
438
439
        wf_ex_db = wf_ex_dbs[0]
440
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
441
        self.assertEqual(len(tk_ex_dbs), 1)
442
443
        tk_ex_db = tk_ex_dbs[0]
444
        tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))
445
        self.assertEqual(len(tk_ac_ex_dbs), 1)
446
447
        tk_ac_ex_db = tk_ac_ex_dbs[0]
448
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
449
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
450
451
        # Identify the records for the subworkflow.
452
        sub_wf_ex_dbs = wf_db_access.WorkflowExecution.query(action_execution=str(tk_ac_ex_db.id))
453
        self.assertEqual(len(sub_wf_ex_dbs), 1)
454
455
        sub_wf_ex_db = sub_wf_ex_dbs[0]
456
        sub_tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(sub_wf_ex_db.id))
457
        self.assertEqual(len(sub_tk_ex_dbs), 1)
458
459
        sub_tk_ex_db = sub_tk_ex_dbs[0]
460
        sub_tk_ac_ex_dbs = ex_db_access.ActionExecution.query(task_execution=str(sub_tk_ex_db.id))
461
        self.assertEqual(len(sub_tk_ac_ex_dbs), 1)
462
463
        # Pause the main workflow and assert it is pausing because subworkflow is still running.
464
        lv_ac_db, ac_ex_db = ac_svc.request_pause(lv_ac_db, cfg.CONF.system_user.user)
465
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
466
467
        # Assert the subworkflow is pausing.
468
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
469
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
470
471
        # Manually handle action execution completion for the task in the subworkflow.
472
        sub_tk_ac_ex_db = sub_tk_ac_ex_dbs[0]
473
        self.assertEqual(sub_tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
474
        notifier.get_notifier().process(sub_tk_ac_ex_db)
475
476
        # Assert the subworkflow is paused and manually process the execution update.
477
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
478
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
479
        tk_ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(tk_ac_ex_db.id))
480
        self.assertEqual(tk_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
481
        notifier.get_notifier().process(tk_ac_ex_db)
482
483
        # Assert the main workflow is paused.
484
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
485
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
486
487
        # Resume the main workflow and assert it is running.
488
        lv_ac_db, ac_ex_db = ac_svc.request_resume(lv_ac_db, cfg.CONF.system_user.user)
489
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
490
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
491
492
        # Assert the subworkflow is running.
493
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(tk_lv_ac_db.id))
494
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
495
496
    def test_resume_from_subworkflow_when_parent_is_paused(self):
497
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][2])
498
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
499
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
500
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
501
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
502
503
        # Identify the records for the main workflow.
504
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
505
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
506
        self.assertEqual(len(tk_ex_dbs), 2)
507
508
        # Identify the records for the subworkflows.
509
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
510
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
511
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
512
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
513
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
514
515
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
516
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
517
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
518
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
519
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
520
521
        # Pause the subworkflow.
522
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
523
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
524
525
        # Assert the main workflow is still running.
526
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
527
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
528
529
        # Assert the other subworkflow is still running.
530
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
531
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
532
533
        # Manually notify action execution completion for the task in the subworkflow.
534
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
535
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
536
        notifier.get_notifier().process(t1_t1_ac_ex_db)
537
538
        # Assert the subworkflow is paused and manually notify the paused of the
539
        # corresponding action execution in the main workflow.
540
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
541
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
542
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
543
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
544
        notifier.get_notifier().process(t1_ac_ex_db)
545
546
        # Assert the main workflow is pausing.
547
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
548
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
549
550
        # Assert the other subworkflow is still running.
551
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
552
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
553
554
        # Manually notify action execution completion for the tasks in the other subworkflow.
555
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
556
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
557
        notifier.get_notifier().process(t2_t1_ac_ex_db)
558
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
559
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
560
        notifier.get_notifier().process(t2_t2_ac_ex_db)
561
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
562
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
563
        notifier.get_notifier().process(t2_t3_ac_ex_db)
564
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
565
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
566
567
        # Assert this other subworkflow is completed and manually notify the
568
        # completion to the corresponding action execution in the main workflow.
569
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
570
        notifier.get_notifier().process(t2_ac_ex_db)
571
572
        # Assert the main workflow is paused because no other tasks is running.
573
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
574
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
575
576
        # Resume the subworkflow and assert it is running.
577
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_resume(t1_lv_ac_db, cfg.CONF.system_user.user)
578
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
579
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
580
581
        # Assert the main workflow is running.
582
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
583
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
584
585
        # Manually notify action execution completion for the tasks in the subworkflow.
586
        t1_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[1]
587
        t1_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t2_ex_db.id))[0]
588
        notifier.get_notifier().process(t1_t2_ac_ex_db)
589
        t1_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[2]
590
        t1_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t3_ex_db.id))[0]
591
        notifier.get_notifier().process(t1_t3_ac_ex_db)
592
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
593
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
594
595
        # Assert the subworkflow is completed and manually notify the
596
        # completion to the corresponding action execution in the main workflow.
597
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
598
        notifier.get_notifier().process(t1_ac_ex_db)
599
600
        # Assert the main workflow is completed.
601
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
602
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
603
604
    def test_resume_from_subworkflow_when_parent_is_running(self):
605
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][2])
606
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
607
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
608
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
609
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING, lv_ac_db.result)
610
611
        # Identify the records for the main workflow.
612
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
613
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
614
        self.assertEqual(len(tk_ex_dbs), 2)
615
616
        # Identify the records for the subworkflows.
617
        t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[0].id))[0]
618
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(t1_ac_ex_db.liveaction['id'])
619
        t1_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t1_ac_ex_db.id))[0]
620
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
621
        self.assertEqual(t1_wf_ex_db.status, wf_states.RUNNING)
622
623
        t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_dbs[1].id))[0]
624
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
625
        t2_wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(t2_ac_ex_db.id))[0]
626
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
627
        self.assertEqual(t2_wf_ex_db.status, wf_states.RUNNING)
628
629
        # Pause the subworkflow.
630
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_pause(t1_lv_ac_db, cfg.CONF.system_user.user)
631
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSING)
632
633
        # Assert the main workflow is still running.
634
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
635
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
636
637
        # Assert the other subworkflow is still running.
638
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
639
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
640
641
        # Manually notify action execution completion for the task in the subworkflow.
642
        t1_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[0]
643
        t1_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t1_ex_db.id))[0]
644
        notifier.get_notifier().process(t1_t1_ac_ex_db)
645
646
        # Assert the subworkflow is paused and manually notify the paused of the
647
        # corresponding action execution in the main workflow.
648
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
649
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
650
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
651
        self.assertEqual(t1_ac_ex_db.status, ac_const.LIVEACTION_STATUS_PAUSED)
652
        notifier.get_notifier().process(t1_ac_ex_db)
653
654
        # Assert the main workflow is still running.
655
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
656
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
657
658
        # Assert the other subworkflow is still running.
659
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
660
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
661
662
        # Resume the subworkflow and assert it is running.
663
        t1_lv_ac_db, t1_ac_ex_db = ac_svc.request_resume(t1_lv_ac_db, cfg.CONF.system_user.user)
664
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
665
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
666
667
        # Assert the main workflow is running.
668
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
669
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
670
671
        # Assert the other subworkflow is still running.
672
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(t2_ac_ex_db.liveaction['id'])
673
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_RUNNING)
674
675
        # Manually notify action execution completion for the tasks in the subworkflow.
676
        t1_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[1]
677
        t1_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t2_ex_db.id))[0]
678
        notifier.get_notifier().process(t1_t2_ac_ex_db)
679
        t1_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t1_wf_ex_db.id))[2]
680
        t1_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t1_t3_ex_db.id))[0]
681
        notifier.get_notifier().process(t1_t3_ac_ex_db)
682
        t1_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t1_lv_ac_db.id))
683
        self.assertEqual(t1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
684
685
        # Assert the subworkflow is completed and manually notify the
686
        # completion to the corresponding action execution in the main workflow.
687
        t1_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t1_ac_ex_db.id)
688
        notifier.get_notifier().process(t1_ac_ex_db)
689
690
        # Manually notify action execution completion for the tasks in the other subworkflow.
691
        t2_t1_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[0]
692
        t2_t1_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t1_ex_db.id))[0]
693
        notifier.get_notifier().process(t2_t1_ac_ex_db)
694
        t2_t2_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[1]
695
        t2_t2_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t2_ex_db.id))[0]
696
        notifier.get_notifier().process(t2_t2_ac_ex_db)
697
        t2_t3_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(t2_wf_ex_db.id))[2]
698
        t2_t3_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(t2_t3_ex_db.id))[0]
699
        notifier.get_notifier().process(t2_t3_ac_ex_db)
700
        t2_lv_ac_db = lv_db_access.LiveAction.get_by_id(str(t2_lv_ac_db.id))
701
        self.assertEqual(t2_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
702
703
        # Assert this other subworkflow is completed and manually notify the
704
        # completion to the corresponding action execution in the main workflow.
705
        t2_ac_ex_db = ex_db_access.ActionExecution.get_by_id(t2_ac_ex_db.id)
706
        notifier.get_notifier().process(t2_ac_ex_db)
707
708
        # Assert the main workflow is completed.
709
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
710
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
711