Passed
Pull Request — master (#3640)
by Lakshmi
06:19
created

MistralQuerierTest   F

Complexity

Total Complexity 59

Size/Duplication

Total Lines 855
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 855
rs 3.6363
wmc 59

46 Methods

Rating   Name   Duplication   Size   Complexity  
A test_determine_status_wf_running_exec_running_tasks_running() 0 13 1
A test_determine_status_wf_resuming_exec_running_tasks_running() 0 13 1
A test_determine_status_wf_pausing_exec_running_tasks_running() 0 13 1
B test_query_with_up_to_date_tasks_in_liveaction_result() 0 43 2
A test_determine_status_wf_resuming_exec_running_tasks_completed() 0 13 1
A test_determine_status_wf_canceling_exec_running_tasks_running() 0 13 1
A test_determine_status_wf_canceling_exec_canceled_tasks_completed() 0 13 1
A test_query_missing_mistral_execution_id() 0 2 1
B test_query_get_workflow_retry() 0 43 3
A test_query_liveaction_not_found() 0 9 1
A test_determine_status_wf_running_exec_cancelled_tasks_running() 0 13 1
A test_query_get_workflow_tasks_not_found() 0 19 1
A test_query_get_workflow_retry_exhausted() 0 15 2
A test_query_list_workflow_tasks_retry_exhausted() 0 19 2
B test_query_with_outdated_tasks_in_liveaction_result() 0 43 2
A test_query_get_workflow_tasks_retry() 0 47 2
A test_determine_status_wf_running_exec_failed_tasks_running() 0 13 1
A test_determine_status_wf_canceling_exec_running_tasks_waiting() 0 13 1
B test_query_with_last_query_time() 0 42 2
A test_determine_status_wf_canceling_exec_canceled_tasks_paused() 0 13 1
A test_determine_status_wf_canceling_exec_canceled_tasks_waiting() 0 13 1
A setUp() 0 3 1
A setUpClass() 0 12 1
A test_determine_status_wf_running_exec_running_tasks_completed() 0 13 1
A test_determine_status_wf_pausing_exec_paused_tasks_completed() 0 13 1
B test_query() 0 38 2
A test_determine_status_wf_resuming_exec_canceled_tasks_running() 0 13 1
A test_get_workflow_tasks() 0 19 3
A test_query_list_workflow_tasks_retry() 0 47 3
A test_determine_status_wf_pausing_exec_paused_tasks_paused() 0 13 1
A test_determine_status_wf_resuming_exec_paused_tasks_running() 0 13 1
B test_query_get_workflow_tasks_retry_exhausted() 0 25 1
A test_determine_status_wf_canceling_exec_canceled_tasks_running() 0 13 1
A test_determine_status_wf_running_exec_paused_tasks_completed() 0 13 1
A test_determine_status_wf_canceling_exec_running_tasks_completed() 0 13 1
A test_determine_status_wf_pausing_exec_paused_tasks_completed_child_running() 0 13 1
A test_determine_status_wf_running_exec_succeeded_tasks_completed_child_running() 0 13 1
A test_get_workflow_result() 0 15 1
A test_query_get_workflow_not_found() 0 13 1
A test_determine_status_wf_resuming_exec_paused_tasks_completed() 0 13 1
A test_determine_status_wf_running_exec_failed_tasks_completed() 0 13 1
A test_determine_status_wf_running_exec_succeeded_tasks_running() 0 13 1
A test_determine_status_wf_running_exec_succeeded_tasks_completed() 0 13 1
A test_query_missing_context() 0 2 1
A test_determine_status_wf_pausing_exec_paused_tasks_running() 0 13 1
A test_determine_status_wf_running_exec_paused_tasks_running() 0 13 1

How to fix   Complexity   

Complex Class

Complex classes like MistralQuerierTest often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import datetime
18
import json
19
import requests
20
import time
21
import uuid
22
23
import mock
24
from mock import call
25
26
from mistralclient.api import base as mistralclient_base
27
from mistralclient.api.v2 import executions
28
from mistralclient.api.v2 import tasks
29
from oslo_config import cfg
30
31
import st2tests.config as tests_config
32
tests_config.parse_args()
33
34
from st2common.constants import action as action_constants
35
from st2common.exceptions import db as db_exc
36
from st2common.models.db.execution import ActionExecutionDB
37
from st2common.models.db.liveaction import LiveActionDB
38
from st2common.persistence.execution import ActionExecution
39
from st2common.util import action_db as action_utils
40
from st2common.util import loader
41
from st2tests import DbTestCase
42
43
44
MOCK_WF_TASKS_SUCCEEDED = [
45
    {'name': 'task1', 'state': 'SUCCESS'},
46
    {'name': 'task2', 'state': 'SUCCESS'}
47
]
48
49
MOCK_WF_TASKS_ERRORED = [
50
    {'name': 'task1', 'state': 'SUCCESS'},
51
    {'name': 'task2', 'state': 'ERROR'}
52
]
53
54
MOCK_WF_TASKS_RUNNING = [
55
    {'name': 'task1', 'state': 'SUCCESS'},
56
    {'name': 'task2', 'state': 'RUNNING'}
57
]
58
59
MOCK_WF_TASKS_WAITING = [
60
    {'name': 'task1', 'state': 'SUCCESS'},
61
    {'name': 'task2', 'state': 'WAITING'}
62
]
63
64
MOCK_WF_TASKS_PAUSED = [
65
    {'name': 'task1', 'state': 'SUCCESS'},
66
    {'name': 'task2', 'state': 'PAUSED'}
67
]
68
69
MOCK_WF_EX_DATA = {
70
    'id': uuid.uuid4().hex,
71
    'name': 'main',
72
    'output': '{"k1": "v1"}',
73
    'state': 'SUCCESS',
74
    'state_info': None
75
}
76
77
MOCK_WF_EX = executions.Execution(None, MOCK_WF_EX_DATA)
78
79
MOCK_WF_EX_TASKS_DATA = [
80
    {
81
        'id': uuid.uuid4().hex,
82
        'name': 'task1',
83
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
84
        'workflow_name': MOCK_WF_EX_DATA['name'],
85
        'created_at': str(datetime.datetime.utcnow()),
86
        'updated_at': str(datetime.datetime.utcnow()),
87
        'state': 'SUCCESS',
88
        'state_info': None,
89
        'input': '{"a": "b"}',
90
        'result': '{"c": "d"}',
91
        'published': '{"c": "d"}'
92
    },
93
    {
94
        'id': uuid.uuid4().hex,
95
        'name': 'task2',
96
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
97
        'workflow_name': MOCK_WF_EX_DATA['name'],
98
        'created_at': str(datetime.datetime.utcnow()),
99
        'updated_at': str(datetime.datetime.utcnow()),
100
        'state': 'SUCCESS',
101
        'state_info': None,
102
        'input': '{"e": "f", "g": "h"}',
103
        'result': '{"i": "j", "k": "l"}',
104
        'published': '{"k": "l"}'
105
    }
106
]
107
108
MOCK_WF_EX_TASKS = [
109
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[0]),
110
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[1])
111
]
112
113
MOCK_QRY_CONTEXT = {
114
    'mistral': {
115
        'execution_id': uuid.uuid4().hex
116
    }
117
}
118
119
MOCK_LIVEACTION_RESULT = {
120
    'tasks': [
121
        {
122
            'id': MOCK_WF_EX_TASKS_DATA[0]['id'],
123
            'name': MOCK_WF_EX_TASKS_DATA[0]['name'],
124
            'workflow_execution_id': MOCK_WF_EX_TASKS_DATA[0]['workflow_execution_id'],
125
            'workflow_name': MOCK_WF_EX_TASKS_DATA[0]['workflow_name'],
126
            'created_at': MOCK_WF_EX_TASKS_DATA[0]['created_at'],
127
            'updated_at': MOCK_WF_EX_TASKS_DATA[0]['updated_at'],
128
            'state': MOCK_WF_EX_TASKS_DATA[0]['state'],
129
            'state_info': MOCK_WF_EX_TASKS_DATA[0]['state_info'],
130
            'input': json.loads(MOCK_WF_EX_TASKS_DATA[0]['input']),
131
            'result': json.loads(MOCK_WF_EX_TASKS_DATA[0]['result']),
132
            'published': json.loads(MOCK_WF_EX_TASKS_DATA[0]['published'])
133
        }
134
    ]
135
}
136
137
MOCK_WF_EX_INCOMPLETE_TASKS_DATA = [
138
    {
139
        'id': uuid.uuid4().hex,
140
        'name': 'task1',
141
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
142
        'workflow_name': MOCK_WF_EX_DATA['name'],
143
        'created_at': str(datetime.datetime.utcnow() - datetime.timedelta(seconds=180)),
144
        'updated_at': str(datetime.datetime.utcnow()),
145
        'state': 'RUNNING',
146
        'state_info': None,
147
        'input': '{"a": "b"}',
148
        'result': '{"c": "d"}',
149
        'published': '{"c": "d"}'
150
    },
151
    {
152
        'id': uuid.uuid4().hex,
153
        'name': 'task2',
154
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
155
        'workflow_name': MOCK_WF_EX_DATA['name'],
156
        'created_at': str(datetime.datetime.utcnow() - datetime.timedelta(seconds=180)),
157
        'updated_at': str(datetime.datetime.utcnow()),
158
        'state': 'RUNNING',
159
        'state_info': None,
160
        'input': '{"e": "f", "g": "h"}',
161
        'result': '{"i": "j", "k": "l"}',
162
        'published': '{"k": "l"}'
163
    }
164
]
165
166
MOCK_WF_EX_INCOMPLETE_TASKS = [
167
    tasks.Task(None, MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]),
168
    tasks.Task(None, MOCK_WF_EX_INCOMPLETE_TASKS_DATA[1])
169
]
170
171
MOCK_LIVEACTION_OUTDATED_INCOMPLETE_TASKS_RESULT = {
172
    'tasks': [
173
        {
174
            'id': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['id'],
175
            'name': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['name'],
176
            'workflow_execution_id': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['workflow_execution_id'],
177
            'workflow_name': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['workflow_name'],
178
            'created_at': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['created_at'],
179
            'updated_at': str(datetime.datetime.utcnow() - datetime.timedelta(seconds=120)),
180
            'state': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['state'],
181
            'state_info': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['state_info'],
182
            'input': json.loads(MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['input']),
183
            'result': json.loads(MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['result']),
184
            'published': json.loads(MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['published'])
185
        }
186
    ]
187
}
188
189
MOCK_LIVEACTION_UP_TO_DATE_INCOMPLETE_TASKS_RESULT = {
190
    'tasks': [
191
        {
192
            'id': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['id'],
193
            'name': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['name'],
194
            'workflow_execution_id': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['workflow_execution_id'],
195
            'workflow_name': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['workflow_name'],
196
            'created_at': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['created_at'],
197
            'updated_at': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['updated_at'],
198
            'state': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['state'],
199
            'state_info': MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['state_info'],
200
            'input': json.loads(MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['input']),
201
            'result': json.loads(MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['result']),
202
            'published': json.loads(MOCK_WF_EX_INCOMPLETE_TASKS_DATA[0]['published'])
203
        }
204
    ]
205
}
206
207
MOCK_CHILD_ACTIONEXECUTION_RUNNING = ActionExecutionDB(
208
    action={'ref': 'mock.task'},
209
    runner={'name': 'local_runner'},
210
    liveaction={'id': uuid.uuid4().hex},
211
    status=action_constants.LIVEACTION_STATUS_RUNNING,
212
    children=[]
213
)
214
215
MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED = ActionExecutionDB(
216
    action={'ref': 'mock.task'},
217
    runner={'name': 'local_runner'},
218
    liveaction={'id': uuid.uuid4().hex},
219
    status=action_constants.LIVEACTION_STATUS_SUCCEEDED,
220
    children=[]
221
)
222
223
MOCK_CHILD_ACTIONEXECUTION_PAUSED = ActionExecutionDB(
224
    action={'ref': 'mock.task'},
225
    runner={'name': 'mistral_v2'},
226
    liveaction={'id': uuid.uuid4().hex},
227
    status=action_constants.LIVEACTION_STATUS_PAUSED,
228
    children=[]
229
)
230
231
MOCK_LIVEACTION_RUNNING = LiveActionDB(
232
    action='mock.workflow',
233
    status=action_constants.LIVEACTION_STATUS_RUNNING
234
)
235
236
MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING = ActionExecutionDB(
237
    action={'ref': 'mock.workflow'},
238
    runner={'name': 'mistral_v2'},
239
    liveaction={'id': MOCK_LIVEACTION_RUNNING.id},
240
    status=action_constants.LIVEACTION_STATUS_RUNNING,
241
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
242
)
243
244
MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED = ActionExecutionDB(
245
    action={'ref': 'mock.workflow'},
246
    runner={'name': 'mistral_v2'},
247
    liveaction={'id': MOCK_LIVEACTION_RUNNING.id},
248
    status=action_constants.LIVEACTION_STATUS_RUNNING,
249
    children=[MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED.id]
250
)
251
252
MOCK_LIVEACTION_RUNNING_WITH_STREAMING_RESULT = LiveActionDB(
253
    action='mock.workflow',
254
    status=action_constants.LIVEACTION_STATUS_RUNNING,
255
    result=MOCK_LIVEACTION_RESULT
256
)
257
258
MOCK_LIVEACTION_RUNNING_WITH_OUTDATED_INCOMPLETE_TASKS_STREAMING_RESULT = LiveActionDB(
259
    action='mock.workflow',
260
    status=action_constants.LIVEACTION_STATUS_RUNNING,
261
    result=MOCK_LIVEACTION_OUTDATED_INCOMPLETE_TASKS_RESULT
262
)
263
264
MOCK_LIVEACTION_RUNNING_WITH_UP_TO_DATE_INCOMPLETE_TASKS_STREAMING_RESULT = LiveActionDB(
265
    action='mock.workflow',
266
    status=action_constants.LIVEACTION_STATUS_RUNNING,
267
    result=MOCK_LIVEACTION_UP_TO_DATE_INCOMPLETE_TASKS_RESULT
268
)
269
270
MOCK_LIVEACTION_CANCELING = LiveActionDB(
271
    action='mock.workflow',
272
    status=action_constants.LIVEACTION_STATUS_CANCELING
273
)
274
275
MOCK_ACTIONEXECUTION_CANCELING_CHILD_RUNNING = ActionExecutionDB(
276
    action={'ref': 'mock.workflow'},
277
    runner={'name': 'mistral_v2'},
278
    liveaction={'id': MOCK_LIVEACTION_CANCELING.id},
279
    status=action_constants.LIVEACTION_STATUS_CANCELING,
280
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
281
)
282
283
MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED = ActionExecutionDB(
284
    action={'ref': 'mock.workflow'},
285
    runner={'name': 'mistral_v2'},
286
    liveaction={'id': MOCK_LIVEACTION_CANCELING.id},
287
    status=action_constants.LIVEACTION_STATUS_CANCELING,
288
    children=[MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED.id]
289
)
290
291
MOCK_ACTIONEXECUTION_CANCELING_CHILD_PAUSED = ActionExecutionDB(
292
    action={'ref': 'mock.workflow'},
293
    runner={'name': 'mistral_v2'},
294
    liveaction={'id': MOCK_LIVEACTION_CANCELING.id},
295
    status=action_constants.LIVEACTION_STATUS_CANCELING,
296
    children=[MOCK_CHILD_ACTIONEXECUTION_PAUSED.id]
297
)
298
299
MOCK_LIVEACTION_PAUSING = LiveActionDB(
300
    action='mock.workflow',
301
    status=action_constants.LIVEACTION_STATUS_PAUSING
302
)
303
304
MOCK_ACTIONEXECUTION_PAUSING = ActionExecutionDB(
305
    action={'ref': 'mock.workflow'},
306
    runner={'name': 'mistral_v2'},
307
    liveaction={'id': MOCK_LIVEACTION_PAUSING.id},
308
    status=action_constants.LIVEACTION_STATUS_PAUSING,
309
    children=[]
310
)
311
312
MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING = ActionExecutionDB(
313
    action={'ref': 'mock.workflow'},
314
    runner={'name': 'mistral_v2'},
315
    liveaction={'id': MOCK_LIVEACTION_PAUSING.id},
316
    status=action_constants.LIVEACTION_STATUS_PAUSING,
317
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
318
)
319
320
MOCK_ACTIONEXECUTION_PAUSING_CHILD_PAUSED = ActionExecutionDB(
321
    action={'ref': 'mock.workflow'},
322
    runner={'name': 'mistral_v2'},
323
    liveaction={'id': MOCK_LIVEACTION_PAUSING.id},
324
    status=action_constants.LIVEACTION_STATUS_PAUSING,
325
    children=[MOCK_CHILD_ACTIONEXECUTION_PAUSED.id]
326
)
327
328
MOCK_LIVEACTION_RESUMING = LiveActionDB(
329
    action='mock.workflow',
330
    status=action_constants.LIVEACTION_STATUS_RESUMING
331
)
332
333
MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING = ActionExecutionDB(
334
    action={'ref': 'mock.workflow'},
335
    runner={'name': 'mistral_v2'},
336
    liveaction={'id': MOCK_LIVEACTION_RESUMING.id},
337
    status=action_constants.LIVEACTION_STATUS_RESUMING,
338
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
339
)
340
341
MOCK_ACTIONEXECUTION_RESUMING_CHILD_SUCCEEDED = ActionExecutionDB(
342
    action={'ref': 'mock.workflow'},
343
    runner={'name': 'mistral_v2'},
344
    liveaction={'id': MOCK_LIVEACTION_RESUMING.id},
345
    status=action_constants.LIVEACTION_STATUS_RESUMING,
346
    children=[MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED.id]
347
)
348
349
350
class MistralQuerierTest(DbTestCase):
351
352
    @classmethod
353
    def setUpClass(cls):
354
        super(MistralQuerierTest, cls).setUpClass()
355
356
        # Override the retry configuration here otherwise st2tests.config.parse_args
357
        # in DbTestCase.setUpClass will reset these overrides.
358
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
359
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
360
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
361
362
        # Register query module.
363
        cls.query_module = loader.register_query_module('mistral_v2')
364
365
    def setUp(self):
366
        super(MistralQuerierTest, self).setUp()
367
        self.querier = self.query_module.get_instance()
368
369
    @mock.patch.object(
370
        ActionExecution, 'get',
371
        mock.MagicMock(side_effect=[
372
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
373
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
374
    def test_determine_status_wf_running_exec_running_tasks_running(self):
375
        status = self.querier._determine_execution_status(
376
            MOCK_LIVEACTION_RUNNING,
377
            'RUNNING',
378
            MOCK_WF_TASKS_RUNNING
379
        )
380
381
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
382
383
    @mock.patch.object(
384
        ActionExecution, 'get',
385
        mock.MagicMock(side_effect=[
386
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
387
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
388
    def test_determine_status_wf_running_exec_running_tasks_completed(self):
389
        status = self.querier._determine_execution_status(
390
            MOCK_LIVEACTION_RUNNING,
391
            'RUNNING',
392
            MOCK_WF_TASKS_SUCCEEDED
393
        )
394
395
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
396
397
    @mock.patch.object(
398
        ActionExecution, 'get',
399
        mock.MagicMock(side_effect=[
400
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
401
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
402
    def test_determine_status_wf_running_exec_succeeded_tasks_completed(self):
403
        status = self.querier._determine_execution_status(
404
            MOCK_LIVEACTION_RUNNING,
405
            'SUCCESS',
406
            MOCK_WF_TASKS_SUCCEEDED
407
        )
408
409
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
410
411
    @mock.patch.object(
412
        ActionExecution, 'get',
413
        mock.MagicMock(side_effect=[
414
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
415
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
416
    def test_determine_status_wf_running_exec_succeeded_tasks_running(self):
417
        status = self.querier._determine_execution_status(
418
            MOCK_LIVEACTION_RUNNING,
419
            'SUCCESS',
420
            MOCK_WF_TASKS_RUNNING
421
        )
422
423
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
424
425
    @mock.patch.object(
426
        ActionExecution, 'get',
427
        mock.MagicMock(side_effect=[
428
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
429
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
430
    def test_determine_status_wf_running_exec_succeeded_tasks_completed_child_running(self):
431
        status = self.querier._determine_execution_status(
432
            MOCK_LIVEACTION_RUNNING,
433
            'SUCCESS',
434
            MOCK_WF_TASKS_SUCCEEDED
435
        )
436
437
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
438
439
    @mock.patch.object(
440
        ActionExecution, 'get',
441
        mock.MagicMock(side_effect=[
442
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
443
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
444
    def test_determine_status_wf_running_exec_failed_tasks_completed(self):
445
        status = self.querier._determine_execution_status(
446
            MOCK_LIVEACTION_RUNNING,
447
            'ERROR',
448
            MOCK_WF_TASKS_SUCCEEDED
449
        )
450
451
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
452
453
    @mock.patch.object(
454
        ActionExecution, 'get',
455
        mock.MagicMock(side_effect=[
456
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
457
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
458
    def test_determine_status_wf_running_exec_failed_tasks_running(self):
459
        status = self.querier._determine_execution_status(
460
            MOCK_LIVEACTION_RUNNING,
461
            'ERROR',
462
            MOCK_WF_TASKS_RUNNING
463
        )
464
465
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
466
467
    @mock.patch.object(
468
        ActionExecution, 'get',
469
        mock.MagicMock(side_effect=[
470
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
471
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
472
    def test_determine_status_wf_canceling_exec_canceled_tasks_completed(self):
473
        status = self.querier._determine_execution_status(
474
            MOCK_LIVEACTION_CANCELING,
475
            'CANCELLED',
476
            MOCK_WF_TASKS_SUCCEEDED
477
        )
478
479
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
480
481
    @mock.patch.object(
482
        ActionExecution, 'get',
483
        mock.MagicMock(side_effect=[
484
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_RUNNING,
485
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
486
    def test_determine_status_wf_canceling_exec_canceled_tasks_running(self):
487
        status = self.querier._determine_execution_status(
488
            MOCK_LIVEACTION_CANCELING,
489
            'CANCELLED',
490
            MOCK_WF_TASKS_RUNNING
491
        )
492
493
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
494
495
    @mock.patch.object(
496
        ActionExecution, 'get',
497
        mock.MagicMock(side_effect=[
498
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
499
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
500
    def test_determine_status_wf_canceling_exec_canceled_tasks_waiting(self):
501
        status = self.querier._determine_execution_status(
502
            MOCK_LIVEACTION_CANCELING,
503
            'CANCELLED',
504
            MOCK_WF_TASKS_WAITING
505
        )
506
507
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
508
509
    @mock.patch.object(
510
        ActionExecution, 'get',
511
        mock.MagicMock(side_effect=[
512
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_PAUSED,
513
            MOCK_CHILD_ACTIONEXECUTION_PAUSED]))
514
    def test_determine_status_wf_canceling_exec_canceled_tasks_paused(self):
515
        status = self.querier._determine_execution_status(
516
            MOCK_LIVEACTION_CANCELING,
517
            'CANCELLED',
518
            MOCK_WF_TASKS_PAUSED
519
        )
520
521
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
522
523
    @mock.patch.object(
524
        ActionExecution, 'get',
525
        mock.MagicMock(side_effect=[
526
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
527
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
528
    def test_determine_status_wf_canceling_exec_running_tasks_completed(self):
529
        status = self.querier._determine_execution_status(
530
            MOCK_LIVEACTION_CANCELING,
531
            'RUNNING',
532
            MOCK_WF_TASKS_SUCCEEDED
533
        )
534
535
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
536
537
    @mock.patch.object(
538
        ActionExecution, 'get',
539
        mock.MagicMock(side_effect=[
540
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_RUNNING,
541
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
542
    def test_determine_status_wf_canceling_exec_running_tasks_running(self):
543
        status = self.querier._determine_execution_status(
544
            MOCK_LIVEACTION_CANCELING,
545
            'RUNNING',
546
            MOCK_WF_TASKS_RUNNING
547
        )
548
549
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
550
551
    @mock.patch.object(
552
        ActionExecution, 'get',
553
        mock.MagicMock(side_effect=[
554
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
555
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
556
    def test_determine_status_wf_canceling_exec_running_tasks_waiting(self):
557
        status = self.querier._determine_execution_status(
558
            MOCK_LIVEACTION_CANCELING,
559
            'RUNNING',
560
            MOCK_WF_TASKS_WAITING
561
        )
562
563
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
564
565
    @mock.patch.object(
566
        ActionExecution, 'get',
567
        mock.MagicMock(side_effect=[
568
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
569
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
570
    def test_determine_status_wf_running_exec_cancelled_tasks_running(self):
571
        status = self.querier._determine_execution_status(
572
            MOCK_LIVEACTION_RUNNING,
573
            'CANCELLED',
574
            MOCK_WF_TASKS_RUNNING
575
        )
576
577
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
578
579
    @mock.patch.object(
580
        ActionExecution, 'get',
581
        mock.MagicMock(side_effect=[
582
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_PAUSED,
583
            MOCK_CHILD_ACTIONEXECUTION_PAUSED]))
584
    def test_determine_status_wf_pausing_exec_paused_tasks_completed(self):
585
        status = self.querier._determine_execution_status(
586
            MOCK_LIVEACTION_PAUSING,
587
            'PAUSED',
588
            MOCK_WF_TASKS_SUCCEEDED
589
        )
590
591
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSED, status)
592
593
    @mock.patch.object(
594
        ActionExecution, 'get',
595
        mock.MagicMock(side_effect=[
596
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING,
597
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
598
    def test_determine_status_wf_pausing_exec_paused_tasks_completed_child_running(self):
599
        status = self.querier._determine_execution_status(
600
            MOCK_LIVEACTION_PAUSING,
601
            'PAUSED',
602
            MOCK_WF_TASKS_SUCCEEDED
603
        )
604
605
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
606
607
    @mock.patch.object(
608
        ActionExecution, 'get',
609
        mock.MagicMock(side_effect=[
610
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING,
611
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
612
    def test_determine_status_wf_pausing_exec_paused_tasks_running(self):
613
        status = self.querier._determine_execution_status(
614
            MOCK_LIVEACTION_PAUSING,
615
            'PAUSED',
616
            MOCK_WF_TASKS_RUNNING
617
        )
618
619
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
620
621
    @mock.patch.object(
622
        ActionExecution, 'get',
623
        mock.MagicMock(side_effect=[
624
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_PAUSED,
625
            MOCK_CHILD_ACTIONEXECUTION_PAUSED]))
626
    def test_determine_status_wf_pausing_exec_paused_tasks_paused(self):
627
        status = self.querier._determine_execution_status(
628
            MOCK_LIVEACTION_PAUSING,
629
            'PAUSED',
630
            MOCK_WF_TASKS_PAUSED
631
        )
632
633
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSED, status)
634
635
    @mock.patch.object(
636
        ActionExecution, 'get',
637
        mock.MagicMock(side_effect=[
638
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING,
639
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
640
    def test_determine_status_wf_pausing_exec_running_tasks_running(self):
641
        status = self.querier._determine_execution_status(
642
            MOCK_LIVEACTION_PAUSING,
643
            'RUNNING',
644
            MOCK_WF_TASKS_RUNNING
645
        )
646
647
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
648
649
    @mock.patch.object(
650
        ActionExecution, 'get',
651
        mock.MagicMock(side_effect=[
652
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
653
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
654
    def test_determine_status_wf_running_exec_paused_tasks_completed(self):
655
        status = self.querier._determine_execution_status(
656
            MOCK_LIVEACTION_RUNNING,
657
            'PAUSED',
658
            MOCK_WF_TASKS_SUCCEEDED
659
        )
660
661
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSED, status)
662
663
    @mock.patch.object(
664
        ActionExecution, 'get',
665
        mock.MagicMock(side_effect=[
666
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
667
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
668
    def test_determine_status_wf_running_exec_paused_tasks_running(self):
669
        status = self.querier._determine_execution_status(
670
            MOCK_LIVEACTION_RUNNING,
671
            'PAUSED',
672
            MOCK_WF_TASKS_RUNNING
673
        )
674
675
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
676
677
    @mock.patch.object(
678
        ActionExecution, 'get',
679
        mock.MagicMock(side_effect=[
680
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_SUCCEEDED,
681
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
682
    def test_determine_status_wf_resuming_exec_paused_tasks_completed(self):
683
        status = self.querier._determine_execution_status(
684
            MOCK_LIVEACTION_RESUMING,
685
            'PAUSED',
686
            MOCK_WF_TASKS_SUCCEEDED
687
        )
688
689
        self.assertEqual(action_constants.LIVEACTION_STATUS_RESUMING, status)
690
691
    @mock.patch.object(
692
        ActionExecution, 'get',
693
        mock.MagicMock(side_effect=[
694
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_SUCCEEDED,
695
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
696
    def test_determine_status_wf_resuming_exec_running_tasks_completed(self):
697
        status = self.querier._determine_execution_status(
698
            MOCK_LIVEACTION_RESUMING,
699
            'RUNNING',
700
            MOCK_WF_TASKS_SUCCEEDED
701
        )
702
703
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
704
705
    @mock.patch.object(
706
        ActionExecution, 'get',
707
        mock.MagicMock(side_effect=[
708
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING,
709
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
710
    def test_determine_status_wf_resuming_exec_running_tasks_running(self):
711
        status = self.querier._determine_execution_status(
712
            MOCK_LIVEACTION_RESUMING,
713
            'RUNNING',
714
            MOCK_WF_TASKS_RUNNING
715
        )
716
717
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
718
719
    @mock.patch.object(
720
        ActionExecution, 'get',
721
        mock.MagicMock(side_effect=[
722
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING,
723
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
724
    def test_determine_status_wf_resuming_exec_paused_tasks_running(self):
725
        status = self.querier._determine_execution_status(
726
            MOCK_LIVEACTION_RESUMING,
727
            'PAUSED',
728
            MOCK_WF_TASKS_RUNNING
729
        )
730
731
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
732
733
    @mock.patch.object(
734
        ActionExecution, 'get',
735
        mock.MagicMock(side_effect=[
736
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING,
737
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
738
    def test_determine_status_wf_resuming_exec_canceled_tasks_running(self):
739
        status = self.querier._determine_execution_status(
740
            MOCK_LIVEACTION_RESUMING,
741
            'CANCELLED',
742
            MOCK_WF_TASKS_RUNNING
743
        )
744
745
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
746
747
    @mock.patch.object(
748
        executions.ExecutionManager, 'get',
749
        mock.MagicMock(return_value=MOCK_WF_EX))
750
    def test_get_workflow_result(self):
751
        result = self.querier._get_workflow_result(uuid.uuid4().hex, uuid.uuid4().hex)
752
753
        expected = {
754
            'k1': 'v1',
755
            'extra': {
756
                'state': MOCK_WF_EX.state,
757
                'state_info': MOCK_WF_EX.state_info
758
            }
759
        }
760
761
        self.assertDictEqual(expected, result)
762
763
    @mock.patch.object(
764
        tasks.TaskManager, 'list',
765
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
766
    @mock.patch.object(
767
        tasks.TaskManager, 'get',
768
        mock.MagicMock(side_effect=[
769
            MOCK_WF_EX_TASKS[0],
770
            MOCK_WF_EX_TASKS[1]]))
771
    def test_get_workflow_tasks(self):
772
        tasks = self.querier._get_workflow_tasks(uuid.uuid4().hex, uuid.uuid4().hex)
773
774
        expected = copy.deepcopy(MOCK_WF_EX_TASKS_DATA)
775
        for task in expected:
776
            task['input'] = json.loads(task['input'])
777
            task['result'] = json.loads(task['result'])
778
            task['published'] = json.loads(task['published'])
779
780
        for i in range(0, len(tasks)):
781
            self.assertDictEqual(expected[i], tasks[i])
782
783
    @mock.patch.object(
784
        action_utils, 'get_liveaction_by_id',
785
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
786
    @mock.patch.object(
787
        executions.ExecutionManager, 'get',
788
        mock.MagicMock(return_value=MOCK_WF_EX))
789
    @mock.patch.object(
790
        tasks.TaskManager, 'list',
791
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
792
    @mock.patch.object(
793
        tasks.TaskManager, 'get',
794
        mock.MagicMock(side_effect=[
795
            MOCK_WF_EX_TASKS[0],
796
            MOCK_WF_EX_TASKS[1]]))
797
    @mock.patch.object(
798
        ActionExecution, 'get',
799
        mock.MagicMock(side_effect=[
800
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
801
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
802
    def test_query(self):
803
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
804
805
        expected = {
806
            'k1': 'v1',
807
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
808
            'extra': {
809
                'state': MOCK_WF_EX.state,
810
                'state_info': MOCK_WF_EX.state_info
811
            }
812
        }
813
814
        for task in expected['tasks']:
815
            task['input'] = json.loads(task['input'])
816
            task['result'] = json.loads(task['result'])
817
            task['published'] = json.loads(task['published'])
818
819
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
820
        self.assertDictEqual(expected, result)
821
822
    @mock.patch.object(
823
        action_utils, 'get_liveaction_by_id',
824
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING_WITH_STREAMING_RESULT))
825
    @mock.patch.object(
826
        executions.ExecutionManager, 'get',
827
        mock.MagicMock(return_value=MOCK_WF_EX))
828
    @mock.patch.object(
829
        tasks.TaskManager, 'list',
830
        mock.MagicMock(return_value=[MOCK_WF_EX_TASKS[1]]))
831
    @mock.patch.object(
832
        tasks.TaskManager, 'get',
833
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS[1]))
834
    @mock.patch.object(
835
        ActionExecution, 'get',
836
        mock.MagicMock(side_effect=[
837
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
838
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
839
    def test_query_with_last_query_time(self):
840
        last_query_time = time.time() - 3
841
842
        (status, result) = self.querier.query(
843
            uuid.uuid4().hex,
844
            MOCK_QRY_CONTEXT,
845
            last_query_time=last_query_time
846
        )
847
848
        expected = {
849
            'k1': 'v1',
850
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
851
            'extra': {
852
                'state': MOCK_WF_EX.state,
853
                'state_info': MOCK_WF_EX.state_info
854
            }
855
        }
856
857
        for task in expected['tasks']:
858
            task['input'] = json.loads(task['input'])
859
            task['result'] = json.loads(task['result'])
860
            task['published'] = json.loads(task['published'])
861
862
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
863
        self.assertDictEqual(expected, result)
864
865
    @mock.patch.object(
866
        action_utils, 'get_liveaction_by_id',
867
        mock.MagicMock(side_effect=db_exc.StackStormDBObjectNotFoundError()))
868
    def test_query_liveaction_not_found(self):
869
        self.assertRaises(
870
            db_exc.StackStormDBObjectNotFoundError,
871
            self.querier.query,
872
            uuid.uuid4().hex,
873
            MOCK_QRY_CONTEXT
874
        )
875
876
    @mock.patch.object(
877
        action_utils, 'get_liveaction_by_id',
878
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
879
    @mock.patch.object(
880
        executions.ExecutionManager, 'get',
881
        mock.MagicMock(side_effect=[
882
            requests.exceptions.ConnectionError(),
883
            MOCK_WF_EX]))
884
    @mock.patch.object(
885
        tasks.TaskManager, 'list',
886
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
887
    @mock.patch.object(
888
        tasks.TaskManager, 'get',
889
        mock.MagicMock(side_effect=[
890
            MOCK_WF_EX_TASKS[0],
891
            MOCK_WF_EX_TASKS[1]]))
892
    @mock.patch.object(
893
        ActionExecution, 'get',
894
        mock.MagicMock(side_effect=[
895
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
896
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
897
    def test_query_get_workflow_retry(self):
898
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
899
900
        expected = {
901
            'k1': 'v1',
902
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
903
            'extra': {
904
                'state': MOCK_WF_EX.state,
905
                'state_info': MOCK_WF_EX.state_info
906
            }
907
        }
908
909
        for task in expected['tasks']:
910
            task['input'] = json.loads(task['input'])
911
            task['result'] = json.loads(task['result'])
912
            task['published'] = json.loads(task['published'])
913
914
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
915
        self.assertDictEqual(expected, result)
916
917
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
918
        executions.ExecutionManager.get.assert_has_calls(calls)
919
920
    @mock.patch.object(
921
        action_utils, 'get_liveaction_by_id',
922
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
923
    @mock.patch.object(
924
        executions.ExecutionManager, 'get',
925
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
926
    def test_query_get_workflow_retry_exhausted(self):
927
        self.assertRaises(
928
            requests.exceptions.ConnectionError,
929
            self.querier.query,
930
            uuid.uuid4().hex,
931
            MOCK_QRY_CONTEXT)
932
933
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
934
        executions.ExecutionManager.get.assert_has_calls(calls)
935
936
    @mock.patch.object(
937
        action_utils, 'get_liveaction_by_id',
938
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
939
    @mock.patch.object(
940
        executions.ExecutionManager, 'get',
941
        mock.MagicMock(
942
            side_effect=mistralclient_base.APIException(
943
                error_code=404, error_message='Workflow not found.')))
944
    def test_query_get_workflow_not_found(self):
945
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
946
947
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
948
        self.assertEqual('Workflow not found.', result)
949
950
    @mock.patch.object(
951
        action_utils, 'get_liveaction_by_id',
952
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
953
    @mock.patch.object(
954
        executions.ExecutionManager, 'get',
955
        mock.MagicMock(return_value=MOCK_WF_EX))
956
    @mock.patch.object(
957
        tasks.TaskManager, 'list',
958
        mock.MagicMock(side_effect=[
959
            requests.exceptions.ConnectionError(),
960
            MOCK_WF_EX_TASKS]))
961
    @mock.patch.object(
962
        tasks.TaskManager, 'get',
963
        mock.MagicMock(side_effect=[
964
            MOCK_WF_EX_TASKS[0],
965
            MOCK_WF_EX_TASKS[1]]))
966
    @mock.patch.object(
967
        ActionExecution, 'get',
968
        mock.MagicMock(side_effect=[
969
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
970
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
971
    def test_query_list_workflow_tasks_retry(self):
972
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
973
974
        expected = {
975
            'k1': 'v1',
976
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
977
            'extra': {
978
                'state': MOCK_WF_EX.state,
979
                'state_info': MOCK_WF_EX.state_info
980
            }
981
        }
982
983
        for task in expected['tasks']:
984
            task['input'] = json.loads(task['input'])
985
            task['result'] = json.loads(task['result'])
986
            task['published'] = json.loads(task['published'])
987
988
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
989
        self.assertDictEqual(expected, result)
990
991
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
992
        calls = [mock_call for i in range(0, 2)]
993
        tasks.TaskManager.list.assert_has_calls(calls)
994
995
        calls = [call(MOCK_WF_EX_TASKS[0].id), call(MOCK_WF_EX_TASKS[1].id)]
996
        tasks.TaskManager.get.assert_has_calls(calls)
997
998
    @mock.patch.object(
999
        action_utils, 'get_liveaction_by_id',
1000
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
1001
    @mock.patch.object(
1002
        executions.ExecutionManager, 'get',
1003
        mock.MagicMock(return_value=MOCK_WF_EX))
1004
    @mock.patch.object(
1005
        tasks.TaskManager, 'list',
1006
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
1007
    @mock.patch.object(
1008
        tasks.TaskManager, 'get',
1009
        mock.MagicMock(side_effect=[
1010
            requests.exceptions.ConnectionError(),
1011
            MOCK_WF_EX_TASKS[0],
1012
            MOCK_WF_EX_TASKS[1]]))
1013
    @mock.patch.object(
1014
        ActionExecution, 'get',
1015
        mock.MagicMock(side_effect=[
1016
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
1017
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
1018
    def test_query_get_workflow_tasks_retry(self):
1019
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
1020
1021
        expected = {
1022
            'k1': 'v1',
1023
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
1024
            'extra': {
1025
                'state': MOCK_WF_EX.state,
1026
                'state_info': MOCK_WF_EX.state_info
1027
            }
1028
        }
1029
1030
        for task in expected['tasks']:
1031
            task['input'] = json.loads(task['input'])
1032
            task['result'] = json.loads(task['result'])
1033
            task['published'] = json.loads(task['published'])
1034
1035
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
1036
        self.assertDictEqual(expected, result)
1037
1038
        calls = [
1039
            call(MOCK_WF_EX_TASKS[0].id),
1040
            call(MOCK_WF_EX_TASKS[0].id),
1041
            call(MOCK_WF_EX_TASKS[1].id)
1042
        ]
1043
1044
        tasks.TaskManager.get.assert_has_calls(calls)
1045
1046
    @mock.patch.object(
1047
        action_utils, 'get_liveaction_by_id',
1048
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
1049
    @mock.patch.object(
1050
        executions.ExecutionManager, 'get',
1051
        mock.MagicMock(return_value=MOCK_WF_EX))
1052
    @mock.patch.object(
1053
        tasks.TaskManager, 'list',
1054
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
1055
    def test_query_list_workflow_tasks_retry_exhausted(self):
1056
        self.assertRaises(
1057
            requests.exceptions.ConnectionError,
1058
            self.querier.query,
1059
            uuid.uuid4().hex,
1060
            MOCK_QRY_CONTEXT)
1061
1062
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
1063
        calls = [mock_call for i in range(0, 2)]
1064
        tasks.TaskManager.list.assert_has_calls(calls)
1065
1066
    @mock.patch.object(
1067
        action_utils, 'get_liveaction_by_id',
1068
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
1069
    @mock.patch.object(
1070
        executions.ExecutionManager, 'get',
1071
        mock.MagicMock(return_value=MOCK_WF_EX))
1072
    @mock.patch.object(
1073
        tasks.TaskManager, 'list',
1074
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
1075
    @mock.patch.object(
1076
        tasks.TaskManager, 'get',
1077
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
1078
    def test_query_get_workflow_tasks_retry_exhausted(self):
1079
        self.assertRaises(
1080
            requests.exceptions.ConnectionError,
1081
            self.querier.query,
1082
            uuid.uuid4().hex,
1083
            MOCK_QRY_CONTEXT)
1084
1085
        calls = [
1086
            call(MOCK_WF_EX_TASKS[0].id),
1087
            call(MOCK_WF_EX_TASKS[0].id)
1088
        ]
1089
1090
        tasks.TaskManager.get.assert_has_calls(calls)
1091
1092
    @mock.patch.object(
1093
        action_utils, 'get_liveaction_by_id',
1094
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
1095
    @mock.patch.object(
1096
        executions.ExecutionManager, 'get',
1097
        mock.MagicMock(return_value=MOCK_WF_EX))
1098
    @mock.patch.object(
1099
        tasks.TaskManager, 'list',
1100
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
1101
    @mock.patch.object(
1102
        tasks.TaskManager, 'get',
1103
        mock.MagicMock(
1104
            side_effect=mistralclient_base.APIException(
1105
                error_code=404, error_message='Task not found.')))
1106
    def test_query_get_workflow_tasks_not_found(self):
1107
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
1108
1109
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
1110
        self.assertEqual('Task not found.', result)
1111
1112
    def test_query_missing_context(self):
1113
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {})
1114
1115
    def test_query_missing_mistral_execution_id(self):
1116
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {'mistral': {}})
1117
1118
    @mock.patch.object(
1119
        action_utils, 'get_liveaction_by_id',
1120
        mock.MagicMock(
1121
            return_value=MOCK_LIVEACTION_RUNNING_WITH_OUTDATED_INCOMPLETE_TASKS_STREAMING_RESULT))
1122
    @mock.patch.object(
1123
        executions.ExecutionManager, 'get',
1124
        mock.MagicMock(return_value=MOCK_WF_EX))
1125
    @mock.patch.object(
1126
        tasks.TaskManager, 'list',
1127
        mock.MagicMock(return_value=MOCK_WF_EX_INCOMPLETE_TASKS))
1128
    @mock.patch.object(
1129
        tasks.TaskManager, 'get',
1130
        mock.MagicMock(side_effect=MOCK_WF_EX_INCOMPLETE_TASKS))
1131
    @mock.patch.object(
1132
        ActionExecution, 'get',
1133
        mock.MagicMock(side_effect=[
1134
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
1135
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
1136
    def test_query_with_outdated_tasks_in_liveaction_result(self):
1137
        last_query_time = time.time() + 3
1138
1139
        (status, result) = self.querier.query(
1140
            uuid.uuid4().hex,
1141
            MOCK_QRY_CONTEXT,
1142
            last_query_time=last_query_time
1143
        )
1144
1145
        expected = {
1146
            'k1': 'v1',
1147
            'tasks': copy.deepcopy(MOCK_WF_EX_INCOMPLETE_TASKS_DATA),
1148
            'extra': {
1149
                'state': MOCK_WF_EX.state,
1150
                'state_info': MOCK_WF_EX.state_info
1151
            }
1152
        }
1153
1154
        for task in expected['tasks']:
1155
            task['input'] = json.loads(task['input'])
1156
            task['result'] = json.loads(task['result'])
1157
            task['published'] = json.loads(task['published'])
1158
1159
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
1160
        self.assertDictEqual(expected, result)
1161
1162
    @mock.patch.object(
1163
        action_utils, 'get_liveaction_by_id',
1164
        mock.MagicMock(
1165
            return_value=MOCK_LIVEACTION_RUNNING_WITH_UP_TO_DATE_INCOMPLETE_TASKS_STREAMING_RESULT))
1166
    @mock.patch.object(
1167
        executions.ExecutionManager, 'get',
1168
        mock.MagicMock(return_value=MOCK_WF_EX))
1169
    @mock.patch.object(
1170
        tasks.TaskManager, 'list',
1171
        mock.MagicMock(return_value=MOCK_WF_EX_INCOMPLETE_TASKS))
1172
    @mock.patch.object(
1173
        tasks.TaskManager, 'get',
1174
        mock.MagicMock(return_value=MOCK_WF_EX_INCOMPLETE_TASKS[1]))
1175
    @mock.patch.object(
1176
        ActionExecution, 'get',
1177
        mock.MagicMock(side_effect=[
1178
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
1179
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
1180
    def test_query_with_up_to_date_tasks_in_liveaction_result(self):
1181
        last_query_time = time.time() + 3
1182
1183
        (status, result) = self.querier.query(
1184
            uuid.uuid4().hex,
1185
            MOCK_QRY_CONTEXT,
1186
            last_query_time=last_query_time
1187
        )
1188
1189
        expected = {
1190
            'k1': 'v1',
1191
            'tasks': copy.deepcopy(MOCK_WF_EX_INCOMPLETE_TASKS_DATA),
1192
            'extra': {
1193
                'state': MOCK_WF_EX.state,
1194
                'state_info': MOCK_WF_EX.state_info
1195
            }
1196
        }
1197
1198
        for task in expected['tasks']:
1199
            task['input'] = json.loads(task['input'])
1200
            task['result'] = json.loads(task['result'])
1201
            task['published'] = json.loads(task['published'])
1202
1203
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
1204
        self.assertDictEqual(expected, result)
1205