Completed
Pull Request — master (#3048)
by W
04:48
created

MistralQuerierTest.test_get_workflow_result()   A

Complexity

Conditions 1

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 15
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import datetime
18
import json
19
import requests
20
import uuid
21
22
import mock
23
from mock import call
24
25
from mistralclient.api.v2 import executions
26
from mistralclient.api.v2 import tasks
27
from oslo_config import cfg
28
29
import st2tests.config as tests_config
30
tests_config.parse_args()
31
32
from st2common.constants import action as action_constants
33
from st2common.services import action as action_service
34
from st2common.util import loader
35
from st2tests import DbTestCase
36
37
38
MOCK_WF_TASKS_SUCCEEDED = [
39
    {'name': 'task1', 'state': 'SUCCESS'},
40
    {'name': 'task2', 'state': 'SUCCESS'}
41
]
42
43
MOCK_WF_TASKS_ERRORED = [
44
    {'name': 'task1', 'state': 'SUCCESS'},
45
    {'name': 'task2', 'state': 'ERROR'}
46
]
47
48
MOCK_WF_TASKS_RUNNING = [
49
    {'name': 'task1', 'state': 'SUCCESS'},
50
    {'name': 'task2', 'state': 'RUNNING'}
51
]
52
53
MOCK_WF_EX_DATA = {
54
    'id': uuid.uuid4().hex,
55
    'name': 'main',
56
    'output': '{"k1": "v1"}',
57
    'state': 'SUCCESS',
58
    'state_info': None
59
}
60
61
MOCK_WF_EX = executions.Execution(None, MOCK_WF_EX_DATA)
62
63
MOCK_WF_EX_TASKS_DATA = [
64
    {
65
        'id': uuid.uuid4().hex,
66
        'name': 'task1',
67
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
68
        'workflow_name': MOCK_WF_EX_DATA['name'],
69
        'created_at': str(datetime.datetime.utcnow()),
70
        'updated_at': str(datetime.datetime.utcnow()),
71
        'state': 'SUCCESS',
72
        'state_info': None,
73
        'input': '{"a": "b"}',
74
        'result': '{"c": "d"}',
75
        'published': '{"c": "d"}'
76
    },
77
    {
78
        'id': uuid.uuid4().hex,
79
        'name': 'task2',
80
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
81
        'workflow_name': MOCK_WF_EX_DATA['name'],
82
        'created_at': str(datetime.datetime.utcnow()),
83
        'updated_at': str(datetime.datetime.utcnow()),
84
        'state': 'SUCCESS',
85
        'state_info': None,
86
        'input': '{"e": "f", "g": "h"}',
87
        'result': '{"i": "j", "k": "l"}',
88
        'published': '{"k": "l"}'
89
    }
90
]
91
92
MOCK_WF_EX_TASKS = [
93
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[0]),
94
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[1])
95
]
96
97
MOCK_QRY_CONTEXT = {
98
    'mistral': {
99
        'execution_id': uuid.uuid4().hex
100
    }
101
}
102
103
104
class MistralQuerierTest(DbTestCase):
105
106
    @classmethod
107
    def setUpClass(cls):
108
        super(MistralQuerierTest, cls).setUpClass()
109
110
        # Override the retry configuration here otherwise st2tests.config.parse_args
111
        # in DbTestCase.setUpClass will reset these overrides.
112
        cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
113
        cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
114
        cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
115
116
        # Register query module.
117
        cls.query_module = loader.register_query_module('mistral_v2')
118
119
    def setUp(self):
120
        super(MistralQuerierTest, self).setUp()
121
        self.querier = self.query_module.get_instance()
122
123
    @mock.patch.object(
124
        action_service, 'is_action_canceled_or_canceling',
125
        mock.MagicMock(return_value=False))
126
    def test_determine_status_wf_running_tasks_running(self):
127
        wf_id = uuid.uuid4().hex
128
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_RUNNING)
129
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
130
131
    @mock.patch.object(
132
        action_service, 'is_action_canceled_or_canceling',
133
        mock.MagicMock(return_value=False))
134
    def test_determine_status_wf_running_tasks_completed(self):
135
        wf_id = uuid.uuid4().hex
136
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_SUCCEEDED)
137
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
138
139
    @mock.patch.object(
140
        action_service, 'is_action_canceled_or_canceling',
141
        mock.MagicMock(return_value=False))
142
    def test_determine_status_wf_succeeded_tasks_completed(self):
143
        wf_id = uuid.uuid4().hex
144
        status = self.querier._determine_execution_status(wf_id, 'SUCCESS', MOCK_WF_TASKS_SUCCEEDED)
145
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
146
147
    @mock.patch.object(
148
        action_service, 'is_action_canceled_or_canceling',
149
        mock.MagicMock(return_value=False))
150
    def test_determine_status_wf_succeeded_tasks_running(self):
151
        wf_id = uuid.uuid4().hex
152
        status = self.querier._determine_execution_status(wf_id, 'SUCCESS', MOCK_WF_TASKS_RUNNING)
153
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
154
155
    @mock.patch.object(
156
        action_service, 'is_action_canceled_or_canceling',
157
        mock.MagicMock(return_value=False))
158
    def test_determine_status_wf_errored_tasks_completed(self):
159
        wf_id = uuid.uuid4().hex
160
        status = self.querier._determine_execution_status(wf_id, 'ERROR', MOCK_WF_TASKS_SUCCEEDED)
161
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
162
163
    @mock.patch.object(
164
        action_service, 'is_action_canceled_or_canceling',
165
        mock.MagicMock(return_value=False))
166
    def test_determine_status_wf_errored_tasks_running(self):
167
        wf_id = uuid.uuid4().hex
168
        status = self.querier._determine_execution_status(wf_id, 'ERROR', MOCK_WF_TASKS_RUNNING)
169
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
170
171
    @mock.patch.object(
172
        action_service, 'is_action_canceled_or_canceling',
173
        mock.MagicMock(return_value=True))
174
    def test_determine_status_wf_canceled_tasks_completed(self):
175
        wf_id = uuid.uuid4().hex
176
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_SUCCEEDED)
177
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
178
179
    @mock.patch.object(
180
        action_service, 'is_action_canceled_or_canceling',
181
        mock.MagicMock(return_value=True))
182
    def test_determine_status_wf_canceled_tasks_running(self):
183
        wf_id = uuid.uuid4().hex
184
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_RUNNING)
185
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
186
187
    @mock.patch.object(
188
        action_service, 'is_action_canceled_or_canceling',
189
        mock.MagicMock(return_value=True))
190
    def test_determine_status_wf_canceled_exec_running_tasks_completed(self):
191
        wf_id = uuid.uuid4().hex
192
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_SUCCEEDED)
193
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
194
195
    @mock.patch.object(
196
        action_service, 'is_action_canceled_or_canceling',
197
        mock.MagicMock(return_value=True))
198
    def test_determine_status_wf_canceled_exec_running_tasks_running(self):
199
        wf_id = uuid.uuid4().hex
200
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_RUNNING)
201
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
202
203
    @mock.patch.object(
204
        action_service, 'is_action_canceled_or_canceling',
205
        mock.MagicMock(return_value=False))
206
    def test_determine_status_wf_running_exec_paused_tasks_completed(self):
207
        wf_id = uuid.uuid4().hex
208
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_SUCCEEDED)
209
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
210
211
    @mock.patch.object(
212
        action_service, 'is_action_canceled_or_canceling',
213
        mock.MagicMock(return_value=False))
214
    def test_determine_status_wf_running_exec_paused_tasks_running(self):
215
        wf_id = uuid.uuid4().hex
216
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_RUNNING)
217
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
218
219
    @mock.patch.object(
220
        executions.ExecutionManager, 'get',
221
        mock.MagicMock(return_value=MOCK_WF_EX))
222
    def test_get_workflow_result(self):
223
        result = self.querier._get_workflow_result(uuid.uuid4().hex)
224
225
        expected = {
226
            'k1': 'v1',
227
            'extra': {
228
                'state': MOCK_WF_EX.state,
229
                'state_info': MOCK_WF_EX.state_info
230
            }
231
        }
232
233
        self.assertDictEqual(expected, result)
234
235
    @mock.patch.object(
236
        tasks.TaskManager, 'list',
237
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
238
    def test_get_workflow_tasks(self):
239
        tasks = self.querier._get_workflow_tasks(uuid.uuid4().hex)
240
241
        expected = copy.deepcopy(MOCK_WF_EX_TASKS_DATA)
242
        for task in expected:
243
            task['input'] = json.loads(task['input'])
244
            task['result'] = json.loads(task['result'])
245
            task['published'] = json.loads(task['published'])
246
247
        for i in range(0, len(tasks)):
248
            self.assertDictEqual(expected[i], tasks[i])
249
250
    @mock.patch.object(
251
        executions.ExecutionManager, 'get',
252
        mock.MagicMock(return_value=MOCK_WF_EX))
253
    @mock.patch.object(
254
        tasks.TaskManager, 'list',
255
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
256
    @mock.patch.object(
257
        action_service, 'is_action_canceled_or_canceling',
258
        mock.MagicMock(return_value=False))
259
    def test_query(self):
260
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
261
262
        expected = {
263
            'k1': 'v1',
264
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
265
            'extra': {
266
                'state': MOCK_WF_EX.state,
267
                'state_info': MOCK_WF_EX.state_info
268
            }
269
        }
270
271
        for task in expected['tasks']:
272
            task['input'] = json.loads(task['input'])
273
            task['result'] = json.loads(task['result'])
274
            task['published'] = json.loads(task['published'])
275
276
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
277
        self.assertDictEqual(expected, result)
278
279
    @mock.patch.object(
280
        executions.ExecutionManager, 'get',
281
        mock.MagicMock(side_effect=[
282
            requests.exceptions.ConnectionError(),
283
            MOCK_WF_EX]))
284
    @mock.patch.object(
285
        tasks.TaskManager, 'list',
286
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
287
    @mock.patch.object(
288
        action_service, 'is_action_canceled_or_canceling',
289
        mock.MagicMock(return_value=False))
290
    def test_query_get_workflow_retry(self):
291
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
292
293
        expected = {
294
            'k1': 'v1',
295
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
296
            'extra': {
297
                'state': MOCK_WF_EX.state,
298
                'state_info': MOCK_WF_EX.state_info
299
            }
300
        }
301
302
        for task in expected['tasks']:
303
            task['input'] = json.loads(task['input'])
304
            task['result'] = json.loads(task['result'])
305
            task['published'] = json.loads(task['published'])
306
307
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
308
        self.assertDictEqual(expected, result)
309
310
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
311
        executions.ExecutionManager.get.assert_has_calls(calls)
312
313
    @mock.patch.object(
314
        executions.ExecutionManager, 'get',
315
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
316
    def test_query_get_workflow_retry_exhausted(self):
317
        self.assertRaises(
318
            requests.exceptions.ConnectionError,
319
            self.querier.query,
320
            uuid.uuid4().hex,
321
            MOCK_QRY_CONTEXT)
322
323
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
324
        executions.ExecutionManager.get.assert_has_calls(calls)
325
326
    @mock.patch.object(
327
        executions.ExecutionManager, 'get',
328
        mock.MagicMock(return_value=MOCK_WF_EX))
329
    @mock.patch.object(
330
        tasks.TaskManager, 'list',
331
        mock.MagicMock(side_effect=[
332
            requests.exceptions.ConnectionError(),
333
            MOCK_WF_EX_TASKS]))
334
    @mock.patch.object(
335
        action_service, 'is_action_canceled_or_canceling',
336
        mock.MagicMock(return_value=False))
337
    def test_query_get_workflow_tasks_retry(self):
338
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
339
340
        expected = {
341
            'k1': 'v1',
342
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
343
            'extra': {
344
                'state': MOCK_WF_EX.state,
345
                'state_info': MOCK_WF_EX.state_info
346
            }
347
        }
348
349
        for task in expected['tasks']:
350
            task['input'] = json.loads(task['input'])
351
            task['result'] = json.loads(task['result'])
352
            task['published'] = json.loads(task['published'])
353
354
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
355
        self.assertDictEqual(expected, result)
356
357
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
358
        calls = [mock_call for i in range(0, 2)]
359
        tasks.TaskManager.list.assert_has_calls(calls)
360
361
    @mock.patch.object(
362
        executions.ExecutionManager, 'get',
363
        mock.MagicMock(return_value=MOCK_WF_EX))
364
    @mock.patch.object(
365
        tasks.TaskManager, 'list',
366
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
367
    def test_query_get_workflow_tasks_retry_exhausted(self):
368
        self.assertRaises(
369
            requests.exceptions.ConnectionError,
370
            self.querier.query,
371
            uuid.uuid4().hex,
372
            MOCK_QRY_CONTEXT)
373
374
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
375
        calls = [mock_call for i in range(0, 2)]
376
        tasks.TaskManager.list.assert_has_calls(calls)
377
378
    def test_query_missing_context(self):
379
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {})
380
381
    def test_query_missing_mistral_execution_id(self):
382
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {'mistral': {}})
383