Passed
Push — develop ( b8d4ca...421369 )
by Plexxi
07:01 queued 03:57
created

test_determine_status_wf_succeeded_tasks_completed()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 0
Metric Value
cc 1
c 3
b 1
f 0
dl 0
loc 7
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import copy
17
import datetime
18
import json
19
import requests
20
import uuid
21
22
import mock
23
from mock import call
24
25
from mistralclient.api.v2 import executions
26
from mistralclient.api.v2 import tasks
27
from oslo_config import cfg
28
29
import st2tests.config as tests_config
30
tests_config.parse_args()
31
32
# Set defaults for retry options before loading the query module.
33
cfg.CONF.set_override('retry_exp_msec', 100, group='mistral')
34
cfg.CONF.set_override('retry_exp_max_msec', 200, group='mistral')
35
cfg.CONF.set_override('retry_stop_max_msec', 200, group='mistral')
36
37
from mistral_v2.query import mistral_v2 as mistral
38
from st2common.constants import action as action_constants
39
from st2common.services import action as action_service
40
from st2tests import DbTestCase
41
42
MOCK_WF_TASKS_SUCCEEDED = [
43
    {'name': 'task1', 'state': 'SUCCESS'},
44
    {'name': 'task2', 'state': 'SUCCESS'}
45
]
46
47
MOCK_WF_TASKS_ERRORED = [
48
    {'name': 'task1', 'state': 'SUCCESS'},
49
    {'name': 'task2', 'state': 'ERROR'}
50
]
51
52
MOCK_WF_TASKS_RUNNING = [
53
    {'name': 'task1', 'state': 'SUCCESS'},
54
    {'name': 'task2', 'state': 'RUNNING'}
55
]
56
57
MOCK_WF_EX_DATA = {
58
    'id': uuid.uuid4().hex,
59
    'name': 'main',
60
    'output': '{"k1": "v1"}',
61
    'state': 'SUCCESS',
62
    'state_info': None
63
}
64
65
MOCK_WF_EX = executions.Execution(None, MOCK_WF_EX_DATA)
66
67
MOCK_WF_EX_TASKS_DATA = [
68
    {
69
        'id': uuid.uuid4().hex,
70
        'name': 'task1',
71
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
72
        'workflow_name': MOCK_WF_EX_DATA['name'],
73
        'created_at': str(datetime.datetime.utcnow()),
74
        'updated_at': str(datetime.datetime.utcnow()),
75
        'state': 'SUCCESS',
76
        'state_info': None,
77
        'input': '{"a": "b"}',
78
        'result': '{"c": "d"}',
79
        'published': '{"c": "d"}'
80
    },
81
    {
82
        'id': uuid.uuid4().hex,
83
        'name': 'task2',
84
        'workflow_execution_id': MOCK_WF_EX_DATA['id'],
85
        'workflow_name': MOCK_WF_EX_DATA['name'],
86
        'created_at': str(datetime.datetime.utcnow()),
87
        'updated_at': str(datetime.datetime.utcnow()),
88
        'state': 'SUCCESS',
89
        'state_info': None,
90
        'input': '{"e": "f", "g": "h"}',
91
        'result': '{"i": "j", "k": "l"}',
92
        'published': '{"k": "l"}'
93
    }
94
]
95
96
MOCK_WF_EX_TASKS = [
97
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[0]),
98
    tasks.Task(None, MOCK_WF_EX_TASKS_DATA[1])
99
]
100
101
MOCK_QRY_CONTEXT = {
102
    'mistral': {
103
        'execution_id': uuid.uuid4().hex
104
    }
105
}
106
107
108
class MistralQuerierTest(DbTestCase):
109
110
    def setUp(self):
111
        super(MistralQuerierTest, self).setUp()
112
        self.querier = mistral.get_query_instance()
113
114
    @mock.patch.object(
115
        action_service, 'is_action_canceled_or_canceling',
116
        mock.MagicMock(return_value=False))
117
    def test_determine_status_wf_running_tasks_running(self):
118
        wf_id = uuid.uuid4().hex
119
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_RUNNING)
120
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
121
122
    @mock.patch.object(
123
        action_service, 'is_action_canceled_or_canceling',
124
        mock.MagicMock(return_value=False))
125
    def test_determine_status_wf_running_tasks_completed(self):
126
        wf_id = uuid.uuid4().hex
127
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_SUCCEEDED)
128
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
129
130
    @mock.patch.object(
131
        action_service, 'is_action_canceled_or_canceling',
132
        mock.MagicMock(return_value=False))
133
    def test_determine_status_wf_succeeded_tasks_completed(self):
134
        wf_id = uuid.uuid4().hex
135
        status = self.querier._determine_execution_status(wf_id, 'SUCCESS', MOCK_WF_TASKS_SUCCEEDED)
136
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
137
138
    @mock.patch.object(
139
        action_service, 'is_action_canceled_or_canceling',
140
        mock.MagicMock(return_value=False))
141
    def test_determine_status_wf_succeeded_tasks_running(self):
142
        wf_id = uuid.uuid4().hex
143
        status = self.querier._determine_execution_status(wf_id, 'SUCCESS', MOCK_WF_TASKS_RUNNING)
144
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
145
146
    @mock.patch.object(
147
        action_service, 'is_action_canceled_or_canceling',
148
        mock.MagicMock(return_value=False))
149
    def test_determine_status_wf_errored_tasks_completed(self):
150
        wf_id = uuid.uuid4().hex
151
        status = self.querier._determine_execution_status(wf_id, 'ERROR', MOCK_WF_TASKS_SUCCEEDED)
152
        self.assertEqual(action_constants.LIVEACTION_STATUS_FAILED, status)
153
154
    @mock.patch.object(
155
        action_service, 'is_action_canceled_or_canceling',
156
        mock.MagicMock(return_value=False))
157
    def test_determine_status_wf_errored_tasks_running(self):
158
        wf_id = uuid.uuid4().hex
159
        status = self.querier._determine_execution_status(wf_id, 'ERROR', MOCK_WF_TASKS_RUNNING)
160
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
161
162
    @mock.patch.object(
163
        action_service, 'is_action_canceled_or_canceling',
164
        mock.MagicMock(return_value=True))
165
    def test_determine_status_wf_canceled_tasks_completed(self):
166
        wf_id = uuid.uuid4().hex
167
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_SUCCEEDED)
168
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
169
170
    @mock.patch.object(
171
        action_service, 'is_action_canceled_or_canceling',
172
        mock.MagicMock(return_value=True))
173
    def test_determine_status_wf_canceled_tasks_running(self):
174
        wf_id = uuid.uuid4().hex
175
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_RUNNING)
176
        self.assertEqual(action_constants.LIVEACTION_STATUS_CANCELED, status)
177
178
    @mock.patch.object(
179
        action_service, 'is_action_canceled_or_canceling',
180
        mock.MagicMock(return_value=True))
181
    def test_determine_status_wf_canceled_exec_running_tasks_completed(self):
182
        wf_id = uuid.uuid4().hex
183
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_SUCCEEDED)
184
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
185
186
    @mock.patch.object(
187
        action_service, 'is_action_canceled_or_canceling',
188
        mock.MagicMock(return_value=True))
189
    def test_determine_status_wf_canceled_exec_running_tasks_running(self):
190
        wf_id = uuid.uuid4().hex
191
        status = self.querier._determine_execution_status(wf_id, 'RUNNING', MOCK_WF_TASKS_RUNNING)
192
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
193
194
    @mock.patch.object(
195
        action_service, 'is_action_canceled_or_canceling',
196
        mock.MagicMock(return_value=False))
197
    def test_determine_status_wf_running_exec_paused_tasks_completed(self):
198
        wf_id = uuid.uuid4().hex
199
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_SUCCEEDED)
200
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
201
202
    @mock.patch.object(
203
        action_service, 'is_action_canceled_or_canceling',
204
        mock.MagicMock(return_value=False))
205
    def test_determine_status_wf_running_exec_paused_tasks_running(self):
206
        wf_id = uuid.uuid4().hex
207
        status = self.querier._determine_execution_status(wf_id, 'PAUSED', MOCK_WF_TASKS_RUNNING)
208
        self.assertEqual(action_constants.LIVEACTION_STATUS_RUNNING, status)
209
210
    @mock.patch.object(
211
        executions.ExecutionManager, 'get',
212
        mock.MagicMock(return_value=MOCK_WF_EX))
213
    def test_get_workflow_result(self):
214
        result = self.querier._get_workflow_result(uuid.uuid4().hex)
215
216
        expected = {
217
            'k1': 'v1',
218
            'extra': {
219
                'state': MOCK_WF_EX.state,
220
                'state_info': MOCK_WF_EX.state_info
221
            }
222
        }
223
224
        self.assertDictEqual(expected, result)
225
226
    @mock.patch.object(
227
        tasks.TaskManager, 'list',
228
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
229
    def test_get_workflow_tasks(self):
230
        tasks = self.querier._get_workflow_tasks(uuid.uuid4().hex)
231
232
        expected = copy.deepcopy(MOCK_WF_EX_TASKS_DATA)
233
        for task in expected:
234
            task['input'] = json.loads(task['input'])
235
            task['result'] = json.loads(task['result'])
236
            task['published'] = json.loads(task['published'])
237
238
        for i in range(0, len(tasks)):
239
            self.assertDictEqual(expected[i], tasks[i])
240
241
    @mock.patch.object(
242
        executions.ExecutionManager, 'get',
243
        mock.MagicMock(return_value=MOCK_WF_EX))
244
    @mock.patch.object(
245
        tasks.TaskManager, 'list',
246
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
247
    @mock.patch.object(
248
        action_service, 'is_action_canceled_or_canceling',
249
        mock.MagicMock(return_value=False))
250
    def test_query(self):
251
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
252
253
        expected = {
254
            'k1': 'v1',
255
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
256
            'extra': {
257
                'state': MOCK_WF_EX.state,
258
                'state_info': MOCK_WF_EX.state_info
259
            }
260
        }
261
262
        for task in expected['tasks']:
263
            task['input'] = json.loads(task['input'])
264
            task['result'] = json.loads(task['result'])
265
            task['published'] = json.loads(task['published'])
266
267
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
268
        self.assertDictEqual(expected, result)
269
270
    @mock.patch.object(
271
        executions.ExecutionManager, 'get',
272
        mock.MagicMock(side_effect=[
273
            requests.exceptions.ConnectionError(),
274
            MOCK_WF_EX]))
275
    @mock.patch.object(
276
        tasks.TaskManager, 'list',
277
        mock.MagicMock(return_value=MOCK_WF_EX_TASKS))
278
    @mock.patch.object(
279
        action_service, 'is_action_canceled_or_canceling',
280
        mock.MagicMock(return_value=False))
281
    def test_query_get_workflow_retry(self):
282
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
283
284
        expected = {
285
            'k1': 'v1',
286
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
287
            'extra': {
288
                'state': MOCK_WF_EX.state,
289
                'state_info': MOCK_WF_EX.state_info
290
            }
291
        }
292
293
        for task in expected['tasks']:
294
            task['input'] = json.loads(task['input'])
295
            task['result'] = json.loads(task['result'])
296
            task['published'] = json.loads(task['published'])
297
298
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
299
        self.assertDictEqual(expected, result)
300
301
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
302
        executions.ExecutionManager.get.assert_has_calls(calls)
303
304
    @mock.patch.object(
305
        executions.ExecutionManager, 'get',
306
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
307
    def test_query_get_workflow_retry_exhausted(self):
308
        self.assertRaises(
309
            requests.exceptions.ConnectionError,
310
            self.querier.query,
311
            uuid.uuid4().hex,
312
            MOCK_QRY_CONTEXT)
313
314
        calls = [call(MOCK_QRY_CONTEXT['mistral']['execution_id']) for i in range(0, 2)]
315
        executions.ExecutionManager.get.assert_has_calls(calls)
316
317
    @mock.patch.object(
318
        executions.ExecutionManager, 'get',
319
        mock.MagicMock(return_value=MOCK_WF_EX))
320
    @mock.patch.object(
321
        tasks.TaskManager, 'list',
322
        mock.MagicMock(side_effect=[
323
            requests.exceptions.ConnectionError(),
324
            MOCK_WF_EX_TASKS]))
325
    @mock.patch.object(
326
        action_service, 'is_action_canceled_or_canceling',
327
        mock.MagicMock(return_value=False))
328
    def test_query_get_workflow_tasks_retry(self):
329
        (status, result) = self.querier.query(uuid.uuid4().hex, MOCK_QRY_CONTEXT)
330
331
        expected = {
332
            'k1': 'v1',
333
            'tasks': copy.deepcopy(MOCK_WF_EX_TASKS_DATA),
334
            'extra': {
335
                'state': MOCK_WF_EX.state,
336
                'state_info': MOCK_WF_EX.state_info
337
            }
338
        }
339
340
        for task in expected['tasks']:
341
            task['input'] = json.loads(task['input'])
342
            task['result'] = json.loads(task['result'])
343
            task['published'] = json.loads(task['published'])
344
345
        self.assertEqual(action_constants.LIVEACTION_STATUS_SUCCEEDED, status)
346
        self.assertDictEqual(expected, result)
347
348
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
349
        calls = [mock_call for i in range(0, 2)]
350
        tasks.TaskManager.list.assert_has_calls(calls)
351
352
    @mock.patch.object(
353
        executions.ExecutionManager, 'get',
354
        mock.MagicMock(return_value=MOCK_WF_EX))
355
    @mock.patch.object(
356
        tasks.TaskManager, 'list',
357
        mock.MagicMock(side_effect=[requests.exceptions.ConnectionError()] * 4))
358
    def test_query_get_workflow_tasks_retry_exhausted(self):
359
        self.assertRaises(
360
            requests.exceptions.ConnectionError,
361
            self.querier.query,
362
            uuid.uuid4().hex,
363
            MOCK_QRY_CONTEXT)
364
365
        mock_call = call(workflow_execution_id=MOCK_QRY_CONTEXT['mistral']['execution_id'])
366
        calls = [mock_call for i in range(0, 2)]
367
        tasks.TaskManager.list.assert_has_calls(calls)
368
369
    def test_query_missing_context(self):
370
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {})
371
372
    def test_query_missing_mistral_execution_id(self):
373
        self.assertRaises(Exception, self.querier.query, uuid.uuid4().hex, {'mistral': {}})
374