Test Setup Failed
Pull Request — master (#4154)
by W
03:42
created

OrchestraErrorHandlingTest   A

Complexity

Total Complexity 15

Size/Duplication

Total Lines 376
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 376
rs 10
wmc 15

13 Methods

Rating   Name   Duplication   Size   Complexity  
B test_fail_start_task_action() 0 29 1
B test_fail_next_task_action() 0 39 1
B test_fail_input_rendering() 0 28 1
B test_fail_vars_rendering() 0 28 1
A get_runner_class() 0 3 1
B test_fail_task_transition() 0 40 1
A setUpClass() 0 15 2
A sort_wf_runtime_errors() 0 2 2
B test_fail_next_task_input() 0 39 1
B test_fail_task_publish() 0 39 1
B test_fail_start_task_input() 0 29 1
B test_fail_output_rendering() 0 37 1
A test_fail_inspection() 0 14 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
18
import mock
19
20
from orchestra import states as wf_states
21
22
import st2tests
23
24
# XXX: actionsensor import depends on config being setup.
25
import st2tests.config as tests_config
26
tests_config.parse_args()
27
28
from tests.unit import base
29
30
from st2common.bootstrap import actionsregistrar
31
from st2common.bootstrap import runnersregistrar
32
from st2common.constants import action as ac_const
33
from st2common.models.db import liveaction as lv_db_models
34
from st2common.persistence import execution as ex_db_access
35
from st2common.persistence import liveaction as lv_db_access
36
from st2common.persistence import workflow as wf_db_access
37
from st2common.runners import base as runners
38
from st2common.services import action as ac_svc
39
from st2common.services import workflows as wf_svc
40
from st2common.transport import liveaction as lv_ac_xport
41
from st2common.transport import workflow as wf_ex_xport
42
from st2common.transport import publishers
43
from st2tests.mocks import liveaction as mock_lv_ac_xport
44
from st2tests.mocks import workflow as mock_wf_ex_xport
45
46
47
TEST_FIXTURES = {
48
    'workflows': [
49
        'fail-inspection.yaml',
50
        'fail-input-rendering.yaml',
51
        'fail-vars-rendering.yaml',
52
        'fail-start-task-action.yaml',
53
        'fail-start-task-input.yaml',
54
        'fail-task-action.yaml',
55
        'fail-task-input.yaml',
56
        'fail-task-transition.yaml',
57
        'fail-task-publish.yaml',
58
        'fail-output-rendering.yaml'
59
    ],
60
    'actions': [
61
        'fail-inspection.yaml',
62
        'fail-input-rendering.yaml',
63
        'fail-vars-rendering.yaml',
64
        'fail-start-task-action.yaml',
65
        'fail-start-task-input.yaml',
66
        'fail-task-action.yaml',
67
        'fail-task-input.yaml',
68
        'fail-task-transition.yaml',
69
        'fail-task-publish.yaml',
70
        'fail-output-rendering.yaml'
71
    ]
72
}
73
74
TEST_PACK = 'orchestra_tests'
75
TEST_PACK_PATH = st2tests.fixturesloader.get_fixtures_packs_base_path() + '/' + TEST_PACK
76
77
PACKS = [
78
    TEST_PACK_PATH,
79
    st2tests.fixturesloader.get_fixtures_packs_base_path() + '/core'
80
]
81
82
83
@mock.patch.object(
84
    publishers.CUDPublisher,
85
    'publish_update',
86
    mock.MagicMock(return_value=None))
87
@mock.patch.object(
88
    lv_ac_xport.LiveActionPublisher,
89
    'publish_create',
90
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_create))
91
@mock.patch.object(
92
    lv_ac_xport.LiveActionPublisher,
93
    'publish_state',
94
    mock.MagicMock(side_effect=mock_lv_ac_xport.MockLiveActionPublisher.publish_state))
95
@mock.patch.object(
96
    wf_ex_xport.WorkflowExecutionPublisher,
97
    'publish_create',
98
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_create))
99
@mock.patch.object(
100
    wf_ex_xport.WorkflowExecutionPublisher,
101
    'publish_state',
102
    mock.MagicMock(side_effect=mock_wf_ex_xport.MockWorkflowExecutionPublisher.publish_state))
103
class OrchestraErrorHandlingTest(st2tests.DbTestCase):
104
105
    @classmethod
106
    def setUpClass(cls):
107
        super(OrchestraErrorHandlingTest, cls).setUpClass()
108
109
        # Register runners.
110
        runnersregistrar.register_runners()
111
112
        # Register test pack(s).
113
        actions_registrar = actionsregistrar.ActionsRegistrar(
114
            use_pack_cache=False,
115
            fail_on_failure=True
116
        )
117
118
        for pack in PACKS:
119
            actions_registrar.register_from_pack(pack)
120
121
    @classmethod
122
    def get_runner_class(cls, runner_name):
123
        return runners.get_runner(runner_name, runner_name).__class__
124
125
    def sort_wf_runtime_errors(self, errors):
126
        return sorted(errors, key=lambda x: x.get('task_id', None))
127
128
    def test_fail_inspection(self):
129
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][0])
130
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
131
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
132
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
133
134
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
135
        self.assertIn('errors', lv_ac_db.result)
136
        self.assertIn('expressions', lv_ac_db.result['errors'])
137
        self.assertGreater(len(lv_ac_db.result['errors']['expressions']), 0)
138
        self.assertIn('context', lv_ac_db.result['errors'])
139
        self.assertGreater(len(lv_ac_db.result['errors']['context']), 0)
140
        self.assertIn('syntax', lv_ac_db.result['errors'])
141
        self.assertGreater(len(lv_ac_db.result['errors']['syntax']), 0)
142
143
    def test_fail_input_rendering(self):
144
        expected_errors = [
145
            {
146
                'message': 'Unknown function "#property#value"'
147
            }
148
        ]
149
150
        expected_result = {'errors': expected_errors}
151
152
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][1])
153
154
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
155
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
156
157
        # Assert action execution for task is not started and workflow failed.
158
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
159
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
160
        self.assertEqual(len(tk_ex_dbs), 0)
161
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
162
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
163
164
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
165
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
166
        self.assertDictEqual(lv_ac_db.result, expected_result)
167
168
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
169
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
170
        self.assertDictEqual(ac_ex_db.result, expected_result)
171
172
    def test_fail_vars_rendering(self):
173
        expected_errors = [
174
            {
175
                'message': 'Unknown function "#property#value"'
176
            }
177
        ]
178
179
        expected_result = {'errors': expected_errors}
180
181
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][2])
182
183
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
184
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
185
186
        # Assert action execution for task is not started and workflow failed.
187
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
188
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
189
        self.assertEqual(len(tk_ex_dbs), 0)
190
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
191
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
192
193
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
194
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
195
        self.assertDictEqual(lv_ac_db.result, expected_result)
196
197
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
198
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
199
        self.assertDictEqual(ac_ex_db.result, expected_result)
200
201
    def test_fail_start_task_action(self):
202
        expected_errors = [
203
            {
204
                'message': 'Unknown function "#property#value"',
205
                'task_id': 'task1'
206
            }
207
        ]
208
209
        expected_result = {'errors': expected_errors}
210
211
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][3])
212
213
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
214
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
215
216
        # Assert action execution for task is not started and workflow failed.
217
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
218
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
219
        self.assertEqual(len(tk_ex_dbs), 0)
220
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
221
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
222
223
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
224
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
225
        self.assertDictEqual(lv_ac_db.result, expected_result)
226
227
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
228
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
229
        self.assertDictEqual(ac_ex_db.result, expected_result)
230
231
    def test_fail_start_task_input(self):
232
        expected_errors = [
233
            {
234
                'message': 'Unknown function "#property#value"',
235
                'task_id': 'task1'
236
            }
237
        ]
238
239
        expected_result = {'errors': expected_errors}
240
241
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][4])
242
243
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
244
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
245
246
        # Assert action execution for task is not started and workflow failed.
247
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
248
        tk_ex_dbs = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))
249
        self.assertEqual(len(tk_ex_dbs), 0)
250
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
251
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
252
253
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
254
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
255
        self.assertDictEqual(lv_ac_db.result, expected_result)
256
257
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
258
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
259
        self.assertDictEqual(ac_ex_db.result, expected_result)
260
261
    def test_fail_next_task_action(self):
262
        expected_errors = [
263
            {
264
                'message': 'Unknown function "#property#value"',
265
                'task_id': 'task2'
266
            }
267
        ]
268
269
        expected_result = {'errors': expected_errors}
270
271
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][5])
272
273
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
274
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
275
276
        # Assert task1 is already completed.
277
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
278
        tk_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))[0]
279
        tk_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))[0]
280
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
281
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
282
283
        # Manually handle action execution completion for task1 which has an error in publish.
284
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
285
286
        # Assert task1 succeeded but workflow failed.
287
        tk_ex_db = wf_db_access.TaskExecution.get_by_id(tk_ex_db.id)
288
        self.assertEqual(tk_ex_db.status, wf_states.SUCCEEDED)
289
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
290
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
291
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
292
293
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
294
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
295
        self.assertDictEqual(lv_ac_db.result, expected_result)
296
297
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
298
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
299
        self.assertDictEqual(ac_ex_db.result, expected_result)
300
301
    def test_fail_next_task_input(self):
302
        expected_errors = [
303
            {
304
                'message': 'Unknown function "#property#value"',
305
                'task_id': 'task2'
306
            }
307
        ]
308
309
        expected_result = {'errors': expected_errors}
310
311
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][6])
312
313
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
314
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
315
316
        # Assert task1 is already completed.
317
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
318
        tk_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))[0]
319
        tk_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))[0]
320
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
321
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
322
323
        # Manually handle action execution completion for task1 which has an error in publish.
324
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
325
326
        # Assert task1 succeeded but workflow failed.
327
        tk_ex_db = wf_db_access.TaskExecution.get_by_id(tk_ex_db.id)
328
        self.assertEqual(tk_ex_db.status, wf_states.SUCCEEDED)
329
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
330
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
331
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
332
333
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
334
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
335
        self.assertDictEqual(lv_ac_db.result, expected_result)
336
337
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
338
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
339
        self.assertDictEqual(ac_ex_db.result, expected_result)
340
341
    def test_fail_task_transition(self):
342
        expected_errors = [
343
            {
344
                'message': 'Unable to resolve key \'foobar\' in expression '
345
                           '\'<% succeeded() and result().foobar %>\' from context.',
346
                'task_transition_id': 'task2__0',
347
                'task_id': 'task1'
348
            }
349
        ]
350
351
        expected_result = {'errors': expected_errors}
352
353
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][7])
354
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
355
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
356
357
        # Assert task1 is already completed.
358
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
359
        tk_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))[0]
360
        tk_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))[0]
361
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
362
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
363
364
        # Manually handle action execution completion for task1 which has an error in publish.
365
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
366
367
        # Assert task1 succeeded but workflow failed.
368
        tk_ex_db = wf_db_access.TaskExecution.get_by_id(tk_ex_db.id)
369
        self.assertEqual(tk_ex_db.status, wf_states.SUCCEEDED)
370
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
371
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
372
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
373
374
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
375
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
376
        self.assertDictEqual(lv_ac_db.result, expected_result)
377
378
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
379
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
380
        self.assertDictEqual(ac_ex_db.result, expected_result)
381
382
    def test_fail_task_publish(self):
383
        expected_errors = [
384
            {
385
                'message': 'Unknown function "foobar"',
386
                'task_transition_id': 'task2__0',
387
                'task_id': 'task1'
388
            }
389
        ]
390
391
        expected_result = {'errors': expected_errors}
392
393
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][8])
394
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
395
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
396
397
        # Assert task1 is already completed.
398
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
399
        tk_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))[0]
400
        tk_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))[0]
401
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
402
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
403
404
        # Manually handle action execution completion for task1 which has an error in publish.
405
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
406
407
        # Assert task1 succeeded but workflow failed.
408
        tk_ex_db = wf_db_access.TaskExecution.get_by_id(tk_ex_db.id)
409
        self.assertEqual(tk_ex_db.status, wf_states.SUCCEEDED)
410
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
411
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
412
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
413
414
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
415
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
416
        self.assertDictEqual(lv_ac_db.result, expected_result)
417
418
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
419
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
420
        self.assertDictEqual(ac_ex_db.result, expected_result)
421
422
    def test_fail_output_rendering(self):
423
        expected_errors = [
424
            {
425
                'message': 'Unknown function "#property#value"'
426
            }
427
        ]
428
429
        expected_result = {'errors': expected_errors}
430
431
        wf_meta = base.get_wf_fixture_meta_data(TEST_PACK_PATH, TEST_FIXTURES['workflows'][9])
432
        lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta['name'])
433
        lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
434
435
        # Assert task1 is already completed.
436
        wf_ex_db = wf_db_access.WorkflowExecution.query(action_execution=str(ac_ex_db.id))[0]
437
        tk_ex_db = wf_db_access.TaskExecution.query(workflow_execution=str(wf_ex_db.id))[0]
438
        tk_ac_ex_db = ex_db_access.ActionExecution.query(task_execution=str(tk_ex_db.id))[0]
439
        tk_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk_ac_ex_db.liveaction['id'])
440
        self.assertEqual(tk_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
441
442
        # Manually handle action execution completion for task1 which has an error in publish.
443
        wf_svc.handle_action_execution_completion(tk_ac_ex_db)
444
445
        # Assert task1 succeeded but workflow failed.
446
        tk_ex_db = wf_db_access.TaskExecution.get_by_id(tk_ex_db.id)
447
        self.assertEqual(tk_ex_db.status, wf_states.SUCCEEDED)
448
        wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)
449
        self.assertEqual(wf_ex_db.status, wf_states.FAILED)
450
        self.assertListEqual(self.sort_wf_runtime_errors(wf_ex_db.errors), expected_errors)
451
452
        lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
453
        self.assertEqual(lv_ac_db.status, ac_const.LIVEACTION_STATUS_FAILED)
454
        self.assertDictEqual(lv_ac_db.result, expected_result)
455
456
        ac_ex_db = ex_db_access.ActionExecution.get_by_id(str(ac_ex_db.id))
457
        self.assertEqual(ac_ex_db.status, ac_const.LIVEACTION_STATUS_FAILED)
458
        self.assertDictEqual(ac_ex_db.result, expected_result)
459