Passed
Pull Request — master (#3507)
by W
05:36
created

MistralQuerierTest.test_query_get_workflow_retry()   B

Complexity

Conditions 3

Size

Total Lines 43

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
dl 0
loc 43
rs 8.8571
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import datetime
18
import json
19
import requests
20
import time
21
import uuid
22
23
import mock
24
from mock import call
25
26
from mistralclient.api import base as mistralclient_base
27
from mistralclient.api.v2 import executions
28
from mistralclient.api.v2 import tasks
29
from oslo_config import cfg
30
31
import st2tests.config as tests_config
32
tests_config.parse_args()
33
34
from st2common.constants import action as action_constants
35
from st2common.exceptions import db as db_exc
36
from st2common.models.db.execution import ActionExecutionDB
37
from st2common.models.db.liveaction import LiveActionDB
38
from st2common.persistence.execution import ActionExecution
39
from st2common.util import action_db as action_utils
40
from st2common.util import loader
41
from st2tests import DbTestCase
42
43
44
MOCK_WF_TASKS_SUCCEEDED = [
45
    {'name': 'task1', 'state': 'SUCCESS'},
46
    {'name': 'task2', 'state': 'SUCCESS'}
47
]
48
49
MOCK_WF_TASKS_ERRORED = [
50
    {'name': 'task1', 'state': 'SUCCESS'},
51
    {'name': 'task2', 'state': 'ERROR'}
52
]
53
54
MOCK_WF_TASKS_RUNNING = [
55
    {'name': 'task1', 'state': 'SUCCESS'},
56
    {'name': 'task2', 'state': 'RUNNING'}
57
]
58
59
MOCK_WF_TASKS_WAITING = [
60
    {'name': 'task1', 'state': 'SUCCESS'},
61
    {'name': 'task2', 'state': 'WAITING'}
62
]
63
64
MOCK_WF_TASKS_PAUSED = [
65
    {'name': 'task1', 'state': 'SUCCESS'},
66
    {'name': 'task2', 'state': 'PAUSED'}
67
]
68
69
MOCK_WF_EX_DATA = {
70
    'id': uuid.uuid4().hex,
71
    'name': 'main',
72
    'output': '{"k1": "v1"}',
73
    'state': 'SUCCESS',
74
    'state_info': None
75
}
76
77
MOCK_WF_EX = executions.Execution(None, MOCK_WF_EX_DATA)
78
79
MOCK_WF_EX_TASKS_DATA = [
80
    {
81
        'id': uuid.uuid4().hex,
82
        'name': 'task1',
83
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
84
        'workflow_name': MOCK_WF_EX_DATA['name'],
85
        'created_at': str(datetime.datetime.utcnow()),
86
        'updated_at': str(datetime.datetime.utcnow()),
87
        'state': 'SUCCESS',
88
        'state_info': None,
89
        'input': '{"a": "b"}',
90
        'result': '{"c": "d"}',
91
        'published': '{"c": "d"}'
92
    },
93
    {
94
        'id': uuid.uuid4().hex,
95
        'name': 'task2',
96
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
97
        'workflow_name': MOCK_WF_EX_DATA['name'],
98
        'created_at': str(datetime.datetime.utcnow()),
99
        'updated_at': str(datetime.datetime.utcnow()),
100
        'state': 'SUCCESS',
101
        'state_info': None,
102
        'input': '{"e": "f", "g": "h"}',
103
        'result': '{"i": "j", "k": "l"}',
104
        'published': '{"k": "l"}'
105
    }
106
]
107
108
MOCK_WF_EX_TASKS = [
109
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[0]),
110
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[1])
111
]
112
113
MOCK_QRY_CONTEXT = {
114
    'mistral': {
115
        'execution_id': uuid.uuid4().hex
116
    }
117
}
118
119
MOCK_LIVEACTION_RESULT = {
120
    'tasks': [
121
        {
122
            'id': MOCK_WF_EX_TASKS_DATA[0]['id'],
123
            'name': MOCK_WF_EX_TASKS_DATA[0]['name'],
124
            'workflow_execution_id': MOCK_WF_EX_TASKS_DATA[0]['workflow_execution_id'],
125
            'workflow_name': MOCK_WF_EX_TASKS_DATA[0]['workflow_name'],
126
            'created_at': MOCK_WF_EX_TASKS_DATA[0]['created_at'],
127
            'updated_at': MOCK_WF_EX_TASKS_DATA[0]['updated_at'],
128
            'state': MOCK_WF_EX_TASKS_DATA[0]['state'],
129
            'state_info': MOCK_WF_EX_TASKS_DATA[0]['state_info'],
130
            'input': json.loads(MOCK_WF_EX_TASKS_DATA[0]['input']),
131
            'result': json.loads(MOCK_WF_EX_TASKS_DATA[0]['result']),
132
            'published': json.loads(MOCK_WF_EX_TASKS_DATA[0]['published'])
133
        }
134
    ]
135
}
136
137
MOCK_CHILD_ACTIONEXECUTION_RUNNING = ActionExecutionDB(
138
    action={'ref': 'mock.task'},
139
    runner={'name': 'local_runner'},
140
    liveaction={'id': uuid.uuid4().hex},
141
    status=action_constants.LIVEACTION_STATUS_RUNNING,
142
    children=[]
143
)
144
145
MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED = ActionExecutionDB(
146
    action={'ref': 'mock.task'},
147
    runner={'name': 'local_runner'},
148
    liveaction={'id': uuid.uuid4().hex},
149
    status=action_constants.LIVEACTION_STATUS_SUCCEEDED,
150
    children=[]
151
)
152
153
MOCK_CHILD_ACTIONEXECUTION_PAUSED = ActionExecutionDB(
154
    action={'ref': 'mock.task'},
155
    runner={'name': 'mistral_v2'},
156
    liveaction={'id': uuid.uuid4().hex},
157
    status=action_constants.LIVEACTION_STATUS_PAUSED,
158
    children=[]
159
)
160
161
MOCK_LIVEACTION_RUNNING = LiveActionDB(
162
    action='mock.workflow',
163
    status=action_constants.LIVEACTION_STATUS_RUNNING
164
)
165
166
MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING = ActionExecutionDB(
167
    action={'ref': 'mock.workflow'},
168
    runner={'name': 'mistral_v2'},
169
    liveaction={'id': MOCK_LIVEACTION_RUNNING.id},
170
    status=action_constants.LIVEACTION_STATUS_RUNNING,
171
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
172
)
173
174
MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED = ActionExecutionDB(
175
    action={'ref': 'mock.workflow'},
176
    runner={'name': 'mistral_v2'},
177
    liveaction={'id': MOCK_LIVEACTION_RUNNING.id},
178
    status=action_constants.LIVEACTION_STATUS_RUNNING,
179
    children=[MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED.id]
180
)
181
182
MOCK_LIVEACTION_RUNNING_WITH_STREAMING_RESULT = LiveActionDB(
183
    action='mock.workflow',
184
    status=action_constants.LIVEACTION_STATUS_RUNNING,
185
    result=MOCK_LIVEACTION_RESULT
186
)
187
188
MOCK_LIVEACTION_CANCELING = LiveActionDB(
189
    action='mock.workflow',
190
    status=action_constants.LIVEACTION_STATUS_CANCELING
191
)
192
193
MOCK_ACTIONEXECUTION_CANCELING_CHILD_RUNNING = ActionExecutionDB(
194
    action={'ref': 'mock.workflow'},
195
    runner={'name': 'mistral_v2'},
196
    liveaction={'id': MOCK_LIVEACTION_CANCELING.id},
197
    status=action_constants.LIVEACTION_STATUS_CANCELING,
198
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
199
)
200
201
MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED = ActionExecutionDB(
202
    action={'ref': 'mock.workflow'},
203
    runner={'name': 'mistral_v2'},
204
    liveaction={'id': MOCK_LIVEACTION_CANCELING.id},
205
    status=action_constants.LIVEACTION_STATUS_CANCELING,
206
    children=[MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED.id]
207
)
208
209
MOCK_ACTIONEXECUTION_CANCELING_CHILD_PAUSED = ActionExecutionDB(
210
    action={'ref': 'mock.workflow'},
211
    runner={'name': 'mistral_v2'},
212
    liveaction={'id': MOCK_LIVEACTION_CANCELING.id},
213
    status=action_constants.LIVEACTION_STATUS_CANCELING,
214
    children=[MOCK_CHILD_ACTIONEXECUTION_PAUSED.id]
215
)
216
217
MOCK_LIVEACTION_PAUSING = LiveActionDB(
218
    action='mock.workflow',
219
    status=action_constants.LIVEACTION_STATUS_PAUSING
220
)
221
222
MOCK_ACTIONEXECUTION_PAUSING = ActionExecutionDB(
223
    action={'ref': 'mock.workflow'},
224
    runner={'name': 'mistral_v2'},
225
    liveaction={'id': MOCK_LIVEACTION_PAUSING.id},
226
    status=action_constants.LIVEACTION_STATUS_PAUSING,
227
    children=[]
228
)
229
230
MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING = ActionExecutionDB(
231
    action={'ref': 'mock.workflow'},
232
    runner={'name': 'mistral_v2'},
233
    liveaction={'id': MOCK_LIVEACTION_PAUSING.id},
234
    status=action_constants.LIVEACTION_STATUS_PAUSING,
235
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
236
)
237
238
MOCK_ACTIONEXECUTION_PAUSING_CHILD_PAUSED = ActionExecutionDB(
239
    action={'ref': 'mock.workflow'},
240
    runner={'name': 'mistral_v2'},
241
    liveaction={'id': MOCK_LIVEACTION_PAUSING.id},
242
    status=action_constants.LIVEACTION_STATUS_PAUSING,
243
    children=[MOCK_CHILD_ACTIONEXECUTION_PAUSED.id]
244
)
245
246
MOCK_LIVEACTION_RESUMING = LiveActionDB(
247
    action='mock.workflow',
248
    status=action_constants.LIVEACTION_STATUS_RESUMING
249
)
250
251
MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING = ActionExecutionDB(
252
    action={'ref': 'mock.workflow'},
253
    runner={'name': 'mistral_v2'},
254
    liveaction={'id': MOCK_LIVEACTION_RESUMING.id},
255
    status=action_constants.LIVEACTION_STATUS_RESUMING,
256
    children=[MOCK_CHILD_ACTIONEXECUTION_RUNNING.id]
257
)
258
259
MOCK_ACTIONEXECUTION_RESUMING_CHILD_SUCCEEDED = ActionExecutionDB(
260
    action={'ref': 'mock.workflow'},
261
    runner={'name': 'mistral_v2'},
262
    liveaction={'id': MOCK_LIVEACTION_RESUMING.id},
263
    status=action_constants.LIVEACTION_STATUS_RESUMING,
264
    children=[MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED.id]
265
)
266
267
268
class MistralQuerierTest(DbTestCase):
269
270
    @classmethod
271
    def setUpClass(cls):
272
        super(MistralQuerierTest, cls).setUpClass()
273
274
        # Override the retry configuration here otherwise st2tests.config.parse_args
275
        # in DbTestCase.setUpClass will reset these overrides.
276
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
277
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
278
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
279
280
        # Register query module.
281
        cls.query_module = loader.register_query_module('mistral_v2')
282
283
    def setUp(self):
284
        super(MistralQuerierTest, self).setUp()
285
        self.querier = self.query_module.get_instance()
286
287
    @mock.patch.object(
288
        ActionExecution, 'get',
289
        mock.MagicMock(side_effect=[
290
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
291
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
292
    def test_determine_status_wf_running_exec_running_tasks_running(self):
293
        status = self.querier._determine_execution_status(
294
            MOCK_LIVEACTION_RUNNING,
295
            'RUNNING',
296
            MOCK_WF_TASKS_RUNNING
297
        )
298
299
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
300
301
    @mock.patch.object(
302
        ActionExecution, 'get',
303
        mock.MagicMock(side_effect=[
304
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
305
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
306
    def test_determine_status_wf_running_exec_running_tasks_completed(self):
307
        status = self.querier._determine_execution_status(
308
            MOCK_LIVEACTION_RUNNING,
309
            'RUNNING',
310
            MOCK_WF_TASKS_SUCCEEDED
311
        )
312
313
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
314
315
    @mock.patch.object(
316
        ActionExecution, 'get',
317
        mock.MagicMock(side_effect=[
318
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
319
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
320
    def test_determine_status_wf_running_exec_succeeded_tasks_completed(self):
321
        status = self.querier._determine_execution_status(
322
            MOCK_LIVEACTION_RUNNING,
323
            'SUCCESS',
324
            MOCK_WF_TASKS_SUCCEEDED
325
        )
326
327
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
328
329
    @mock.patch.object(
330
        ActionExecution, 'get',
331
        mock.MagicMock(side_effect=[
332
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
333
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
334
    def test_determine_status_wf_running_exec_succeeded_tasks_running(self):
335
        status = self.querier._determine_execution_status(
336
            MOCK_LIVEACTION_RUNNING,
337
            'SUCCESS',
338
            MOCK_WF_TASKS_RUNNING
339
        )
340
341
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
342
343
    @mock.patch.object(
344
        ActionExecution, 'get',
345
        mock.MagicMock(side_effect=[
346
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
347
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
348
    def test_determine_status_wf_running_exec_succeeded_tasks_completed_child_running(self):
349
        status = self.querier._determine_execution_status(
350
            MOCK_LIVEACTION_RUNNING,
351
            'SUCCESS',
352
            MOCK_WF_TASKS_SUCCEEDED
353
        )
354
355
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
356
357
    @mock.patch.object(
358
        ActionExecution, 'get',
359
        mock.MagicMock(side_effect=[
360
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
361
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
362
    def test_determine_status_wf_running_exec_failed_tasks_completed(self):
363
        status = self.querier._determine_execution_status(
364
            MOCK_LIVEACTION_RUNNING,
365
            'ERROR',
366
            MOCK_WF_TASKS_SUCCEEDED
367
        )
368
369
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
370
371
    @mock.patch.object(
372
        ActionExecution, 'get',
373
        mock.MagicMock(side_effect=[
374
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
375
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
376
    def test_determine_status_wf_running_exec_failed_tasks_running(self):
377
        status = self.querier._determine_execution_status(
378
            MOCK_LIVEACTION_RUNNING,
379
            'ERROR',
380
            MOCK_WF_TASKS_RUNNING
381
        )
382
383
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
384
385
    @mock.patch.object(
386
        ActionExecution, 'get',
387
        mock.MagicMock(side_effect=[
388
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
389
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
390
    def test_determine_status_wf_canceling_exec_canceled_tasks_completed(self):
391
        status = self.querier._determine_execution_status(
392
            MOCK_LIVEACTION_CANCELING,
393
            'CANCELLED',
394
            MOCK_WF_TASKS_SUCCEEDED
395
        )
396
397
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
398
399
    @mock.patch.object(
400
        ActionExecution, 'get',
401
        mock.MagicMock(side_effect=[
402
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_RUNNING,
403
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
404
    def test_determine_status_wf_canceling_exec_canceled_tasks_running(self):
405
        status = self.querier._determine_execution_status(
406
            MOCK_LIVEACTION_CANCELING,
407
            'CANCELLED',
408
            MOCK_WF_TASKS_RUNNING
409
        )
410
411
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
412
413
    @mock.patch.object(
414
        ActionExecution, 'get',
415
        mock.MagicMock(side_effect=[
416
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
417
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
418
    def test_determine_status_wf_canceling_exec_canceled_tasks_waiting(self):
419
        status = self.querier._determine_execution_status(
420
            MOCK_LIVEACTION_CANCELING,
421
            'CANCELLED',
422
            MOCK_WF_TASKS_WAITING
423
        )
424
425
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
426
427
    @mock.patch.object(
428
        ActionExecution, 'get',
429
        mock.MagicMock(side_effect=[
430
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_PAUSED,
431
            MOCK_CHILD_ACTIONEXECUTION_PAUSED]))
432
    def test_determine_status_wf_canceling_exec_canceled_tasks_paused(self):
433
        status = self.querier._determine_execution_status(
434
            MOCK_LIVEACTION_CANCELING,
435
            'CANCELLED',
436
            MOCK_WF_TASKS_PAUSED
437
        )
438
439
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
440
441
    @mock.patch.object(
442
        ActionExecution, 'get',
443
        mock.MagicMock(side_effect=[
444
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
445
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
446
    def test_determine_status_wf_canceling_exec_running_tasks_completed(self):
447
        status = self.querier._determine_execution_status(
448
            MOCK_LIVEACTION_CANCELING,
449
            'RUNNING',
450
            MOCK_WF_TASKS_SUCCEEDED
451
        )
452
453
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
454
455
    @mock.patch.object(
456
        ActionExecution, 'get',
457
        mock.MagicMock(side_effect=[
458
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_RUNNING,
459
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
460
    def test_determine_status_wf_canceling_exec_running_tasks_running(self):
461
        status = self.querier._determine_execution_status(
462
            MOCK_LIVEACTION_CANCELING,
463
            'RUNNING',
464
            MOCK_WF_TASKS_RUNNING
465
        )
466
467
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
468
469
    @mock.patch.object(
470
        ActionExecution, 'get',
471
        mock.MagicMock(side_effect=[
472
            MOCK_ACTIONEXECUTION_CANCELING_CHILD_SUCCEEDED,
473
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
474
    def test_determine_status_wf_canceling_exec_running_tasks_waiting(self):
475
        status = self.querier._determine_execution_status(
476
            MOCK_LIVEACTION_CANCELING,
477
            'RUNNING',
478
            MOCK_WF_TASKS_WAITING
479
        )
480
481
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
482
483
    @mock.patch.object(
484
        ActionExecution, 'get',
485
        mock.MagicMock(side_effect=[
486
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
487
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
488
    def test_determine_status_wf_running_exec_cancelled_tasks_running(self):
489
        status = self.querier._determine_execution_status(
490
            MOCK_LIVEACTION_RUNNING,
491
            'CANCELLED',
492
            MOCK_WF_TASKS_RUNNING
493
        )
494
495
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
496
497
    @mock.patch.object(
498
        ActionExecution, 'get',
499
        mock.MagicMock(side_effect=[
500
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_PAUSED,
501
            MOCK_CHILD_ACTIONEXECUTION_PAUSED]))
502
    def test_determine_status_wf_pausing_exec_paused_tasks_completed(self):
503
        status = self.querier._determine_execution_status(
504
            MOCK_LIVEACTION_PAUSING,
505
            'PAUSED',
506
            MOCK_WF_TASKS_SUCCEEDED
507
        )
508
509
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSED, status)
510
511
    @mock.patch.object(
512
        ActionExecution, 'get',
513
        mock.MagicMock(side_effect=[
514
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING,
515
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
516
    def test_determine_status_wf_pausing_exec_paused_tasks_completed_child_running(self):
517
        status = self.querier._determine_execution_status(
518
            MOCK_LIVEACTION_PAUSING,
519
            'PAUSED',
520
            MOCK_WF_TASKS_SUCCEEDED
521
        )
522
523
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
524
525
    @mock.patch.object(
526
        ActionExecution, 'get',
527
        mock.MagicMock(side_effect=[
528
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING,
529
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
530
    def test_determine_status_wf_pausing_exec_paused_tasks_running(self):
531
        status = self.querier._determine_execution_status(
532
            MOCK_LIVEACTION_PAUSING,
533
            'PAUSED',
534
            MOCK_WF_TASKS_RUNNING
535
        )
536
537
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
538
539
    @mock.patch.object(
540
        ActionExecution, 'get',
541
        mock.MagicMock(side_effect=[
542
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_PAUSED,
543
            MOCK_CHILD_ACTIONEXECUTION_PAUSED]))
544
    def test_determine_status_wf_pausing_exec_paused_tasks_paused(self):
545
        status = self.querier._determine_execution_status(
546
            MOCK_LIVEACTION_PAUSING,
547
            'PAUSED',
548
            MOCK_WF_TASKS_PAUSED
549
        )
550
551
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSED, status)
552
553
    @mock.patch.object(
554
        ActionExecution, 'get',
555
        mock.MagicMock(side_effect=[
556
            MOCK_ACTIONEXECUTION_PAUSING_CHILD_RUNNING,
557
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
558
    def test_determine_status_wf_pausing_exec_running_tasks_running(self):
559
        status = self.querier._determine_execution_status(
560
            MOCK_LIVEACTION_PAUSING,
561
            'RUNNING',
562
            MOCK_WF_TASKS_RUNNING
563
        )
564
565
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
566
567
    @mock.patch.object(
568
        ActionExecution, 'get',
569
        mock.MagicMock(side_effect=[
570
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
571
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
572
    def test_determine_status_wf_running_exec_paused_tasks_completed(self):
573
        status = self.querier._determine_execution_status(
574
            MOCK_LIVEACTION_RUNNING,
575
            'PAUSED',
576
            MOCK_WF_TASKS_SUCCEEDED
577
        )
578
579
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSED, status)
580
581
    @mock.patch.object(
582
        ActionExecution, 'get',
583
        mock.MagicMock(side_effect=[
584
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_RUNNING,
585
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
586
    def test_determine_status_wf_running_exec_paused_tasks_running(self):
587
        status = self.querier._determine_execution_status(
588
            MOCK_LIVEACTION_RUNNING,
589
            'PAUSED',
590
            MOCK_WF_TASKS_RUNNING
591
        )
592
593
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
594
595
    @mock.patch.object(
596
        ActionExecution, 'get',
597
        mock.MagicMock(side_effect=[
598
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_SUCCEEDED,
599
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
600
    def test_determine_status_wf_resuming_exec_paused_tasks_completed(self):
601
        status = self.querier._determine_execution_status(
602
            MOCK_LIVEACTION_RESUMING,
603
            'PAUSED',
604
            MOCK_WF_TASKS_SUCCEEDED
605
        )
606
607
        self.assertEqual(action_constants.LIVEACTION_STATUS_RESUMING, status)
608
609
    @mock.patch.object(
610
        ActionExecution, 'get',
611
        mock.MagicMock(side_effect=[
612
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_SUCCEEDED,
613
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
614
    def test_determine_status_wf_resuming_exec_running_tasks_completed(self):
615
        status = self.querier._determine_execution_status(
616
            MOCK_LIVEACTION_RESUMING,
617
            'RUNNING',
618
            MOCK_WF_TASKS_SUCCEEDED
619
        )
620
621
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
622
623
    @mock.patch.object(
624
        ActionExecution, 'get',
625
        mock.MagicMock(side_effect=[
626
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING,
627
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
628
    def test_determine_status_wf_resuming_exec_running_tasks_running(self):
629
        status = self.querier._determine_execution_status(
630
            MOCK_LIVEACTION_RESUMING,
631
            'RUNNING',
632
            MOCK_WF_TASKS_RUNNING
633
        )
634
635
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
636
637
    @mock.patch.object(
638
        ActionExecution, 'get',
639
        mock.MagicMock(side_effect=[
640
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING,
641
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
642
    def test_determine_status_wf_resuming_exec_paused_tasks_running(self):
643
        status = self.querier._determine_execution_status(
644
            MOCK_LIVEACTION_RESUMING,
645
            'PAUSED',
646
            MOCK_WF_TASKS_RUNNING
647
        )
648
649
        self.assertEqual(action_constants.LIVEACTION_STATUS_PAUSING, status)
650
651
    @mock.patch.object(
652
        ActionExecution, 'get',
653
        mock.MagicMock(side_effect=[
654
            MOCK_ACTIONEXECUTION_RESUMING_CHILD_RUNNING,
655
            MOCK_CHILD_ACTIONEXECUTION_RUNNING]))
656
    def test_determine_status_wf_resuming_exec_canceled_tasks_running(self):
657
        status = self.querier._determine_execution_status(
658
            MOCK_LIVEACTION_RESUMING,
659
            'CANCELLED',
660
            MOCK_WF_TASKS_RUNNING
661
        )
662
663
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
664
665
    @mock.patch.object(
666
        executions.ExecutionManager, 'get',
667
        mock.MagicMock(return_value=MOCK_WF_EX))
668
    def test_get_workflow_result(self):
669
        result = self.querier._get_workflow_result(uuid.uuid4().hex)
670
671
        expected = {
672
            'k1': 'v1',
673
            'extra': {
674
                'state': MOCK_WF_EX.state,
675
                'state_info': MOCK_WF_EX.state_info
676
            }
677
        }
678
679
        self.assertDictEqual(expected, result)
680
681
    @mock.patch.object(
682
        tasks.TaskManager, 'list',
683
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
684
    @mock.patch.object(
685
        tasks.TaskManager, 'get',
686
        mock.MagicMock(side_effect=[
687
            MOCK_WF_EX_TASKS[0],
688
            MOCK_WF_EX_TASKS[1]]))
689
    def test_get_workflow_tasks(self):
690
        tasks = self.querier._get_workflow_tasks(uuid.uuid4().hex)
691
692
        expected = copy.deepcopy(MOCK_WF_EX_TASKS_DATA)
693
        for task in expected:
694
            task['input'] = json.loads(task['input'])
695
            task['result'] = json.loads(task['result'])
696
            task['published'] = json.loads(task['published'])
697
698
        for i in range(0, len(tasks)):
699
            self.assertDictEqual(expected[i], tasks[i])
700
701
    @mock.patch.object(
702
        action_utils, 'get_liveaction_by_id',
703
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
704
    @mock.patch.object(
705
        executions.ExecutionManager, 'get',
706
        mock.MagicMock(return_value=MOCK_WF_EX))
707
    @mock.patch.object(
708
        tasks.TaskManager, 'list',
709
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
710
    @mock.patch.object(
711
        tasks.TaskManager, 'get',
712
        mock.MagicMock(side_effect=[
713
            MOCK_WF_EX_TASKS[0],
714
            MOCK_WF_EX_TASKS[1]]))
715
    @mock.patch.object(
716
        ActionExecution, 'get',
717
        mock.MagicMock(side_effect=[
718
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
719
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
720
    def test_query(self):
721
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
722
723
        expected = {
724
            'k1': 'v1',
725
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
726
            'extra': {
727
                'state': MOCK_WF_EX.state,
728
                'state_info': MOCK_WF_EX.state_info
729
            }
730
        }
731
732
        for task in expected['tasks']:
733
            task['input'] = json.loads(task['input'])
734
            task['result'] = json.loads(task['result'])
735
            task['published'] = json.loads(task['published'])
736
737
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
738
        self.assertDictEqual(expected, result)
739
740
    @mock.patch.object(
741
        action_utils, 'get_liveaction_by_id',
742
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING_WITH_STREAMING_RESULT))
743
    @mock.patch.object(
744
        executions.ExecutionManager, 'get',
745
        mock.MagicMock(return_value=MOCK_WF_EX))
746
    @mock.patch.object(
747
        tasks.TaskManager, 'list',
748
        mock.MagicMock(return_value=[MOCK_WF_EX_TASKS[1]]))
749
    @mock.patch.object(
750
        tasks.TaskManager, 'get',
751
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS[1]))
752
    @mock.patch.object(
753
        ActionExecution, 'get',
754
        mock.MagicMock(side_effect=[
755
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
756
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
757
    def test_query_with_last_query_time(self):
758
        last_query_time = time.time() - 3
759
760
        (status, result) = self.querier.query(
761
            uuid.uuid4().hex,
762
            MOCK_QRY_CONTEXT,
763
            last_query_time=last_query_time
764
        )
765
766
        expected = {
767
            'k1': 'v1',
768
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
769
            'extra': {
770
                'state': MOCK_WF_EX.state,
771
                'state_info': MOCK_WF_EX.state_info
772
            }
773
        }
774
775
        for task in expected['tasks']:
776
            task['input'] = json.loads(task['input'])
777
            task['result'] = json.loads(task['result'])
778
            task['published'] = json.loads(task['published'])
779
780
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
781
        self.assertDictEqual(expected, result)
782
783
    @mock.patch.object(
784
        action_utils, 'get_liveaction_by_id',
785
        mock.MagicMock(side_effect=db_exc.StackStormDBObjectNotFoundError()))
786
    def test_query_liveaction_not_found(self):
787
        self.assertRaises(
788
            db_exc.StackStormDBObjectNotFoundError,
789
            self.querier.query,
790
            uuid.uuid4().hex,
791
            MOCK_QRY_CONTEXT
792
        )
793
794
    @mock.patch.object(
795
        action_utils, 'get_liveaction_by_id',
796
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
797
    @mock.patch.object(
798
        executions.ExecutionManager, 'get',
799
        mock.MagicMock(side_effect=[
800
            requests.exceptions.ConnectionError(),
801
            MOCK_WF_EX]))
802
    @mock.patch.object(
803
        tasks.TaskManager, 'list',
804
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
805
    @mock.patch.object(
806
        tasks.TaskManager, 'get',
807
        mock.MagicMock(side_effect=[
808
            MOCK_WF_EX_TASKS[0],
809
            MOCK_WF_EX_TASKS[1]]))
810
    @mock.patch.object(
811
        ActionExecution, 'get',
812
        mock.MagicMock(side_effect=[
813
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
814
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
815
    def test_query_get_workflow_retry(self):
816
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
817
818
        expected = {
819
            'k1': 'v1',
820
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
821
            'extra': {
822
                'state': MOCK_WF_EX.state,
823
                'state_info': MOCK_WF_EX.state_info
824
            }
825
        }
826
827
        for task in expected['tasks']:
828
            task['input'] = json.loads(task['input'])
829
            task['result'] = json.loads(task['result'])
830
            task['published'] = json.loads(task['published'])
831
832
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
833
        self.assertDictEqual(expected, result)
834
835
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
836
        executions.ExecutionManager.get.assert_has_calls(calls)
837
838
    @mock.patch.object(
839
        action_utils, 'get_liveaction_by_id',
840
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
841
    @mock.patch.object(
842
        executions.ExecutionManager, 'get',
843
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
844
    def test_query_get_workflow_retry_exhausted(self):
845
        self.assertRaises(
846
            requests.exceptions.ConnectionError,
847
            self.querier.query,
848
            uuid.uuid4().hex,
849
            MOCK_QRY_CONTEXT)
850
851
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
852
        executions.ExecutionManager.get.assert_has_calls(calls)
853
854
    @mock.patch.object(
855
        action_utils, 'get_liveaction_by_id',
856
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
857
    @mock.patch.object(
858
        executions.ExecutionManager, 'get',
859
        mock.MagicMock(
860
            side_effect=mistralclient_base.APIException(
861
                error_code=404, error_message='Workflow not found.')))
862
    def test_query_get_workflow_not_found(self):
863
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
864
865
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
866
        self.assertEqual('Workflow not found.', result)
867
868
    @mock.patch.object(
869
        action_utils, 'get_liveaction_by_id',
870
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
871
    @mock.patch.object(
872
        executions.ExecutionManager, 'get',
873
        mock.MagicMock(return_value=MOCK_WF_EX))
874
    @mock.patch.object(
875
        tasks.TaskManager, 'list',
876
        mock.MagicMock(side_effect=[
877
            requests.exceptions.ConnectionError(),
878
            MOCK_WF_EX_TASKS]))
879
    @mock.patch.object(
880
        tasks.TaskManager, 'get',
881
        mock.MagicMock(side_effect=[
882
            MOCK_WF_EX_TASKS[0],
883
            MOCK_WF_EX_TASKS[1]]))
884
    @mock.patch.object(
885
        ActionExecution, 'get',
886
        mock.MagicMock(side_effect=[
887
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
888
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
889
    def test_query_list_workflow_tasks_retry(self):
890
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
891
892
        expected = {
893
            'k1': 'v1',
894
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
895
            'extra': {
896
                'state': MOCK_WF_EX.state,
897
                'state_info': MOCK_WF_EX.state_info
898
            }
899
        }
900
901
        for task in expected['tasks']:
902
            task['input'] = json.loads(task['input'])
903
            task['result'] = json.loads(task['result'])
904
            task['published'] = json.loads(task['published'])
905
906
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
907
        self.assertDictEqual(expected, result)
908
909
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
910
        calls = [mock_call for i in range(0, 2)]
911
        tasks.TaskManager.list.assert_has_calls(calls)
912
913
        calls = [call(MOCK_WF_EX_TASKS[0].id), call(MOCK_WF_EX_TASKS[1].id)]
914
        tasks.TaskManager.get.assert_has_calls(calls)
915
916
    @mock.patch.object(
917
        action_utils, 'get_liveaction_by_id',
918
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
919
    @mock.patch.object(
920
        executions.ExecutionManager, 'get',
921
        mock.MagicMock(return_value=MOCK_WF_EX))
922
    @mock.patch.object(
923
        tasks.TaskManager, 'list',
924
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
925
    @mock.patch.object(
926
        tasks.TaskManager, 'get',
927
        mock.MagicMock(side_effect=[
928
            requests.exceptions.ConnectionError(),
929
            MOCK_WF_EX_TASKS[0],
930
            MOCK_WF_EX_TASKS[1]]))
931
    @mock.patch.object(
932
        ActionExecution, 'get',
933
        mock.MagicMock(side_effect=[
934
            MOCK_ACTIONEXECUTION_RUNNING_CHILD_SUCCEEDED,
935
            MOCK_CHILD_ACTIONEXECUTION_SUCCEEDED]))
936
    def test_query_get_workflow_tasks_retry(self):
937
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
938
939
        expected = {
940
            'k1': 'v1',
941
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
942
            'extra': {
943
                'state': MOCK_WF_EX.state,
944
                'state_info': MOCK_WF_EX.state_info
945
            }
946
        }
947
948
        for task in expected['tasks']:
949
            task['input'] = json.loads(task['input'])
950
            task['result'] = json.loads(task['result'])
951
            task['published'] = json.loads(task['published'])
952
953
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
954
        self.assertDictEqual(expected, result)
955
956
        calls = [
957
            call(MOCK_WF_EX_TASKS[0].id),
958
            call(MOCK_WF_EX_TASKS[0].id),
959
            call(MOCK_WF_EX_TASKS[1].id)
960
        ]
961
962
        tasks.TaskManager.get.assert_has_calls(calls)
963
964
    @mock.patch.object(
965
        action_utils, 'get_liveaction_by_id',
966
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
967
    @mock.patch.object(
968
        executions.ExecutionManager, 'get',
969
        mock.MagicMock(return_value=MOCK_WF_EX))
970
    @mock.patch.object(
971
        tasks.TaskManager, 'list',
972
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
973
    def test_query_list_workflow_tasks_retry_exhausted(self):
974
        self.assertRaises(
975
            requests.exceptions.ConnectionError,
976
            self.querier.query,
977
            uuid.uuid4().hex,
978
            MOCK_QRY_CONTEXT)
979
980
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
981
        calls = [mock_call for i in range(0, 2)]
982
        tasks.TaskManager.list.assert_has_calls(calls)
983
984
    @mock.patch.object(
985
        action_utils, 'get_liveaction_by_id',
986
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
987
    @mock.patch.object(
988
        executions.ExecutionManager, 'get',
989
        mock.MagicMock(return_value=MOCK_WF_EX))
990
    @mock.patch.object(
991
        tasks.TaskManager, 'list',
992
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
993
    @mock.patch.object(
994
        tasks.TaskManager, 'get',
995
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
996
    def test_query_get_workflow_tasks_retry_exhausted(self):
997
        self.assertRaises(
998
            requests.exceptions.ConnectionError,
999
            self.querier.query,
1000
            uuid.uuid4().hex,
1001
            MOCK_QRY_CONTEXT)
1002
1003
        calls = [
1004
            call(MOCK_WF_EX_TASKS[0].id),
1005
            call(MOCK_WF_EX_TASKS[0].id)
1006
        ]
1007
1008
        tasks.TaskManager.get.assert_has_calls(calls)
1009
1010
    @mock.patch.object(
1011
        action_utils, 'get_liveaction_by_id',
1012
        mock.MagicMock(return_value=MOCK_LIVEACTION_RUNNING))
1013
    @mock.patch.object(
1014
        executions.ExecutionManager, 'get',
1015
        mock.MagicMock(return_value=MOCK_WF_EX))
1016
    @mock.patch.object(
1017
        tasks.TaskManager, 'list',
1018
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
1019
    @mock.patch.object(
1020
        tasks.TaskManager, 'get',
1021
        mock.MagicMock(
1022
            side_effect=mistralclient_base.APIException(
1023
                error_code=404, error_message='Task not found.')))
1024
    def test_query_get_workflow_tasks_not_found(self):
1025
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
1026
1027
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
1028
        self.assertEqual('Task not found.', result)
1029
1030
    def test_query_missing_context(self):
1031
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {})
1032
1033
    def test_query_missing_mistral_execution_id(self):
1034
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {'mistral': {}})
1035