Passed
Push — master ( 0b1d4e...594593 )
by W
04:59
created

MistralQuerierTest.test_query_missing_context()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import datetime
18
import json
19
import requests
20
import uuid
21
22
import mock
23
from mock import call
24
25
from mistralclient.api.v2 import executions
26
from mistralclient.api.v2 import tasks
27
from oslo_config import cfg
28
29
import st2tests.config as tests_config
30
tests_config.parse_args()
31
32
from st2common.constants import action as action_constants
33
from st2common.services import action as action_service
34
from st2common.util import loader
35
from st2tests import DbTestCase
36
37
38
MOCK_WF_TASKS_SUCCEEDED = [
39
    {'name': 'task1', 'state': 'SUCCESS'},
40
    {'name': 'task2', 'state': 'SUCCESS'}
41
]
42
43
MOCK_WF_TASKS_ERRORED = [
44
    {'name': 'task1', 'state': 'SUCCESS'},
45
    {'name': 'task2', 'state': 'ERROR'}
46
]
47
48
MOCK_WF_TASKS_RUNNING = [
49
    {'name': 'task1', 'state': 'SUCCESS'},
50
    {'name': 'task2', 'state': 'RUNNING'}
51
]
52
53
MOCK_WF_TASKS_WAITING = [
54
    {'name': 'task1', 'state': 'SUCCESS'},
55
    {'name': 'task2', 'state': 'WAITING'}
56
]
57
58
MOCK_WF_EX_DATA = {
59
    'id': uuid.uuid4().hex,
60
    'name': 'main',
61
    'output': '{"k1": "v1"}',
62
    'state': 'SUCCESS',
63
    'state_info': None
64
}
65
66
MOCK_WF_EX = executions.Execution(None, MOCK_WF_EX_DATA)
67
68
MOCK_WF_EX_TASKS_DATA = [
69
    {
70
        'id': uuid.uuid4().hex,
71
        'name': 'task1',
72
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
73
        'workflow_name': MOCK_WF_EX_DATA['name'],
74
        'created_at': str(datetime.datetime.utcnow()),
75
        'updated_at': str(datetime.datetime.utcnow()),
76
        'state': 'SUCCESS',
77
        'state_info': None,
78
        'input': '{"a": "b"}',
79
        'result': '{"c": "d"}',
80
        'published': '{"c": "d"}'
81
    },
82
    {
83
        'id': uuid.uuid4().hex,
84
        'name': 'task2',
85
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
86
        'workflow_name': MOCK_WF_EX_DATA['name'],
87
        'created_at': str(datetime.datetime.utcnow()),
88
        'updated_at': str(datetime.datetime.utcnow()),
89
        'state': 'SUCCESS',
90
        'state_info': None,
91
        'input': '{"e": "f", "g": "h"}',
92
        'result': '{"i": "j", "k": "l"}',
93
        'published': '{"k": "l"}'
94
    }
95
]
96
97
MOCK_WF_EX_TASKS = [
98
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[0]),
99
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[1])
100
]
101
102
MOCK_QRY_CONTEXT = {
103
    'mistral': {
104
        'execution_id': uuid.uuid4().hex
105
    }
106
}
107
108
109
class MistralQuerierTest(DbTestCase):
110
111
    @classmethod
112
    def setUpClass(cls):
113
        super(MistralQuerierTest, cls).setUpClass()
114
115
        # Override the retry configuration here otherwise st2tests.config.parse_args
116
        # in DbTestCase.setUpClass will reset these overrides.
117
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
118
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
119
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
120
121
        # Register query module.
122
        cls.query_module = loader.register_query_module('mistral_v2')
123
124
    def setUp(self):
125
        super(MistralQuerierTest, self).setUp()
126
        self.querier = self.query_module.get_instance()
127
128
    @mock.patch.object(
129
        action_service, 'is_action_canceled_or_canceling',
130
        mock.MagicMock(return_value=False))
131
    def test_determine_status_wf_running_tasks_running(self):
132
        wf_id = uuid.uuid4().hex
133
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_RUNNING)
134
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
135
136
    @mock.patch.object(
137
        action_service, 'is_action_canceled_or_canceling',
138
        mock.MagicMock(return_value=False))
139
    def test_determine_status_wf_running_tasks_completed(self):
140
        wf_id = uuid.uuid4().hex
141
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_SUCCEEDED)
142
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
143
144
    @mock.patch.object(
145
        action_service, 'is_action_canceled_or_canceling',
146
        mock.MagicMock(return_value=False))
147
    def test_determine_status_wf_succeeded_tasks_completed(self):
148
        wf_id = uuid.uuid4().hex
149
        status = self.querier._determine_execution_status(wf_id, 'SUCCESS', MOCK_WF_TASKS_SUCCEEDED)
150
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
151
152
    @mock.patch.object(
153
        action_service, 'is_action_canceled_or_canceling',
154
        mock.MagicMock(return_value=False))
155
    def test_determine_status_wf_succeeded_tasks_running(self):
156
        wf_id = uuid.uuid4().hex
157
        status = self.querier._determine_execution_status(wf_id, 'SUCCESS', MOCK_WF_TASKS_RUNNING)
158
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
159
160
    @mock.patch.object(
161
        action_service, 'is_action_canceled_or_canceling',
162
        mock.MagicMock(return_value=False))
163
    def test_determine_status_wf_errored_tasks_completed(self):
164
        wf_id = uuid.uuid4().hex
165
        status = self.querier._determine_execution_status(wf_id, 'ERROR', MOCK_WF_TASKS_SUCCEEDED)
166
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
167
168
    @mock.patch.object(
169
        action_service, 'is_action_canceled_or_canceling',
170
        mock.MagicMock(return_value=False))
171
    def test_determine_status_wf_errored_tasks_running(self):
172
        wf_id = uuid.uuid4().hex
173
        status = self.querier._determine_execution_status(wf_id, 'ERROR', MOCK_WF_TASKS_RUNNING)
174
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
175
176
    @mock.patch.object(
177
        action_service, 'is_action_canceled_or_canceling',
178
        mock.MagicMock(return_value=True))
179
    def test_determine_status_wf_canceled_tasks_completed(self):
180
        wf_id = uuid.uuid4().hex
181
        status = self.querier._determine_execution_status(
182
            wf_id, 'CANCELLED', MOCK_WF_TASKS_SUCCEEDED)
183
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
184
185
    @mock.patch.object(
186
        action_service, 'is_action_canceled_or_canceling',
187
        mock.MagicMock(return_value=True))
188
    def test_determine_status_wf_canceled_tasks_running(self):
189
        wf_id = uuid.uuid4().hex
190
        status = self.querier._determine_execution_status(wf_id, 'CANCELLED', MOCK_WF_TASKS_RUNNING)
191
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
192
193
    @mock.patch.object(
194
        action_service, 'is_action_canceled_or_canceling',
195
        mock.MagicMock(return_value=True))
196
    def test_determine_status_wf_canceled_tasks_waiting(self):
197
        wf_id = uuid.uuid4().hex
198
        status = self.querier._determine_execution_status(wf_id, 'CANCELLED', MOCK_WF_TASKS_WAITING)
199
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
200
201
    @mock.patch.object(
202
        action_service, 'is_action_canceled_or_canceling',
203
        mock.MagicMock(return_value=True))
204
    def test_determine_status_wf_canceled_exec_running_tasks_completed(self):
205
        wf_id = uuid.uuid4().hex
206
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_SUCCEEDED)
207
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
208
209
    @mock.patch.object(
210
        action_service, 'is_action_canceled_or_canceling',
211
        mock.MagicMock(return_value=True))
212
    def test_determine_status_wf_canceled_exec_running_tasks_running(self):
213
        wf_id = uuid.uuid4().hex
214
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_RUNNING)
215
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
216
217
    @mock.patch.object(
218
        action_service, 'is_action_canceled_or_canceling',
219
        mock.MagicMock(return_value=True))
220
    def test_determine_status_wf_canceled_exec_running_tasks_waiting(self):
221
        wf_id = uuid.uuid4().hex
222
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_WAITING)
223
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
224
225
    @mock.patch.object(
226
        action_service, 'is_action_canceled_or_canceling',
227
        mock.MagicMock(return_value=False))
228
    def test_determine_status_wf_running_exec_paused_tasks_completed(self):
229
        wf_id = uuid.uuid4().hex
230
        status = self.querier._determine_execution_status(
231
            wf_id, 'PAUSED', MOCK_WF_TASKS_SUCCEEDED)
232
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
233
234
    @mock.patch.object(
235
        action_service, 'is_action_canceled_or_canceling',
236
        mock.MagicMock(return_value=False))
237
    def test_determine_status_wf_running_exec_cancelled_tasks_running(self):
238
        wf_id = uuid.uuid4().hex
239
        status = self.querier._determine_execution_status(wf_id, 'CANCELLED', MOCK_WF_TASKS_RUNNING)
240
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELING, status)
241
242
    @mock.patch.object(
243
        executions.ExecutionManager, 'get',
244
        mock.MagicMock(return_value=MOCK_WF_EX))
245
    def test_get_workflow_result(self):
246
        result = self.querier._get_workflow_result(uuid.uuid4().hex)
247
248
        expected = {
249
            'k1': 'v1',
250
            'extra': {
251
                'state': MOCK_WF_EX.state,
252
                'state_info': MOCK_WF_EX.state_info
253
            }
254
        }
255
256
        self.assertDictEqual(expected, result)
257
258
    @mock.patch.object(
259
        tasks.TaskManager, 'list',
260
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
261
    @mock.patch.object(
262
        tasks.TaskManager, 'get',
263
        mock.MagicMock(side_effect=[
264
            MOCK_WF_EX_TASKS[0],
265
            MOCK_WF_EX_TASKS[1]]))
266
    def test_get_workflow_tasks(self):
267
        tasks = self.querier._get_workflow_tasks(uuid.uuid4().hex)
268
269
        expected = copy.deepcopy(MOCK_WF_EX_TASKS_DATA)
270
        for task in expected:
271
            task['input'] = json.loads(task['input'])
272
            task['result'] = json.loads(task['result'])
273
            task['published'] = json.loads(task['published'])
274
275
        for i in range(0, len(tasks)):
276
            self.assertDictEqual(expected[i], tasks[i])
277
278
    @mock.patch.object(
279
        executions.ExecutionManager, 'get',
280
        mock.MagicMock(return_value=MOCK_WF_EX))
281
    @mock.patch.object(
282
        tasks.TaskManager, 'list',
283
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
284
    @mock.patch.object(
285
        tasks.TaskManager, 'get',
286
        mock.MagicMock(side_effect=[
287
            MOCK_WF_EX_TASKS[0],
288
            MOCK_WF_EX_TASKS[1]]))
289
    @mock.patch.object(
290
        action_service, 'is_action_canceled_or_canceling',
291
        mock.MagicMock(return_value=False))
292
    def test_query(self):
293
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
294
295
        expected = {
296
            'k1': 'v1',
297
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
298
            'extra': {
299
                'state': MOCK_WF_EX.state,
300
                'state_info': MOCK_WF_EX.state_info
301
            }
302
        }
303
304
        for task in expected['tasks']:
305
            task['input'] = json.loads(task['input'])
306
            task['result'] = json.loads(task['result'])
307
            task['published'] = json.loads(task['published'])
308
309
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
310
        self.assertDictEqual(expected, result)
311
312
    @mock.patch.object(
313
        executions.ExecutionManager, 'get',
314
        mock.MagicMock(side_effect=[
315
            requests.exceptions.ConnectionError(),
316
            MOCK_WF_EX]))
317
    @mock.patch.object(
318
        tasks.TaskManager, 'list',
319
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
320
    @mock.patch.object(
321
        tasks.TaskManager, 'get',
322
        mock.MagicMock(side_effect=[
323
            MOCK_WF_EX_TASKS[0],
324
            MOCK_WF_EX_TASKS[1]]))
325
    @mock.patch.object(
326
        action_service, 'is_action_canceled_or_canceling',
327
        mock.MagicMock(return_value=False))
328
    def test_query_get_workflow_retry(self):
329
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
330
331
        expected = {
332
            'k1': 'v1',
333
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
334
            'extra': {
335
                'state': MOCK_WF_EX.state,
336
                'state_info': MOCK_WF_EX.state_info
337
            }
338
        }
339
340
        for task in expected['tasks']:
341
            task['input'] = json.loads(task['input'])
342
            task['result'] = json.loads(task['result'])
343
            task['published'] = json.loads(task['published'])
344
345
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
346
        self.assertDictEqual(expected, result)
347
348
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
349
        executions.ExecutionManager.get.assert_has_calls(calls)
350
351
    @mock.patch.object(
352
        executions.ExecutionManager, 'get',
353
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
354
    def test_query_get_workflow_retry_exhausted(self):
355
        self.assertRaises(
356
            requests.exceptions.ConnectionError,
357
            self.querier.query,
358
            uuid.uuid4().hex,
359
            MOCK_QRY_CONTEXT)
360
361
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
362
        executions.ExecutionManager.get.assert_has_calls(calls)
363
364
    @mock.patch.object(
365
        executions.ExecutionManager, 'get',
366
        mock.MagicMock(return_value=MOCK_WF_EX))
367
    @mock.patch.object(
368
        tasks.TaskManager, 'list',
369
        mock.MagicMock(side_effect=[
370
            requests.exceptions.ConnectionError(),
371
            MOCK_WF_EX_TASKS]))
372
    @mock.patch.object(
373
        tasks.TaskManager, 'get',
374
        mock.MagicMock(side_effect=[
375
            MOCK_WF_EX_TASKS[0],
376
            MOCK_WF_EX_TASKS[1]]))
377
    @mock.patch.object(
378
        action_service, 'is_action_canceled_or_canceling',
379
        mock.MagicMock(return_value=False))
380
    def test_query_list_workflow_tasks_retry(self):
381
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
382
383
        expected = {
384
            'k1': 'v1',
385
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
386
            'extra': {
387
                'state': MOCK_WF_EX.state,
388
                'state_info': MOCK_WF_EX.state_info
389
            }
390
        }
391
392
        for task in expected['tasks']:
393
            task['input'] = json.loads(task['input'])
394
            task['result'] = json.loads(task['result'])
395
            task['published'] = json.loads(task['published'])
396
397
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
398
        self.assertDictEqual(expected, result)
399
400
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
401
        calls = [mock_call for i in range(0, 2)]
402
        tasks.TaskManager.list.assert_has_calls(calls)
403
404
        calls = [call(MOCK_WF_EX_TASKS[0].id), call(MOCK_WF_EX_TASKS[1].id)]
405
        tasks.TaskManager.get.assert_has_calls(calls)
406
407
    @mock.patch.object(
408
        executions.ExecutionManager, 'get',
409
        mock.MagicMock(return_value=MOCK_WF_EX))
410
    @mock.patch.object(
411
        tasks.TaskManager, 'list',
412
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
413
    @mock.patch.object(
414
        tasks.TaskManager, 'get',
415
        mock.MagicMock(side_effect=[
416
            requests.exceptions.ConnectionError(),
417
            MOCK_WF_EX_TASKS[0],
418
            MOCK_WF_EX_TASKS[1]]))
419
    @mock.patch.object(
420
        action_service, 'is_action_canceled_or_canceling',
421
        mock.MagicMock(return_value=False))
422
    def test_query_get_workflow_tasks_retry(self):
423
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
424
425
        expected = {
426
            'k1': 'v1',
427
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
428
            'extra': {
429
                'state': MOCK_WF_EX.state,
430
                'state_info': MOCK_WF_EX.state_info
431
            }
432
        }
433
434
        for task in expected['tasks']:
435
            task['input'] = json.loads(task['input'])
436
            task['result'] = json.loads(task['result'])
437
            task['published'] = json.loads(task['published'])
438
439
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
440
        self.assertDictEqual(expected, result)
441
442
        calls = [
443
            call(MOCK_WF_EX_TASKS[0].id),
444
            call(MOCK_WF_EX_TASKS[0].id),
445
            call(MOCK_WF_EX_TASKS[1].id)
446
        ]
447
448
        tasks.TaskManager.get.assert_has_calls(calls)
449
450
    @mock.patch.object(
451
        executions.ExecutionManager, 'get',
452
        mock.MagicMock(return_value=MOCK_WF_EX))
453
    @mock.patch.object(
454
        tasks.TaskManager, 'list',
455
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
456
    def test_query_list_workflow_tasks_retry_exhausted(self):
457
        self.assertRaises(
458
            requests.exceptions.ConnectionError,
459
            self.querier.query,
460
            uuid.uuid4().hex,
461
            MOCK_QRY_CONTEXT)
462
463
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
464
        calls = [mock_call for i in range(0, 2)]
465
        tasks.TaskManager.list.assert_has_calls(calls)
466
467
    @mock.patch.object(
468
        executions.ExecutionManager, 'get',
469
        mock.MagicMock(return_value=MOCK_WF_EX))
470
    @mock.patch.object(
471
        tasks.TaskManager, 'list',
472
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
473
    @mock.patch.object(
474
        tasks.TaskManager, 'get',
475
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
476
    def test_query_get_workflow_tasks_retry_exhausted(self):
477
        self.assertRaises(
478
            requests.exceptions.ConnectionError,
479
            self.querier.query,
480
            uuid.uuid4().hex,
481
            MOCK_QRY_CONTEXT)
482
483
        calls = [
484
            call(MOCK_WF_EX_TASKS[0].id),
485
            call(MOCK_WF_EX_TASKS[0].id)
486
        ]
487
488
        tasks.TaskManager.get.assert_has_calls(calls)
489
490
    def test_query_missing_context(self):
491
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {})
492
493
    def test_query_missing_mistral_execution_id(self):
494
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {'mistral': {}})
495