Passed
Push — master ( b86822...b6e78f )
by
unknown
02:07
created

TestCollectionNameInvalid.get_invalid_collection_name()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
import time
2
import pdb
3
import threading
4
import logging
5
from multiprocessing import Pool, Process
6
import pytest
7
from utils import *
8
from constants import *
9
10
DELETE_TIMEOUT = 60
11
default_single_query = {
12
    "bool": {
13
        "must": [
14
            {"vector": {default_float_vec_field_name: {"topk": 10, "query": gen_vectors(1, default_dim),
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_vectors does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable default_float_vec_field_name does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable default_dim does not seem to be defined.
Loading history...
15
                                                       "metric_type": "L2", "params": {"nprobe": 10}}}}
16
        ]
17
    }
18
}
19
20
21
class TestFlushBase:
22
    """
23
    ******************************************************************
24
      The following cases are used to test `flush` function
25
    ******************************************************************
26
    """
27
28
    @pytest.fixture(
29
        scope="function",
30
        params=gen_simple_index()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_simple_index does not seem to be defined.
Loading history...
31
    )
32
    def get_simple_index(self, request, connect):
33
        if str(connect._cmd("mode")[1]) == "GPU":
34
            if request.param["index_type"] not in ivf():
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ivf does not seem to be defined.
Loading history...
35
                pytest.skip("Only support index_type: idmap/flat")
36
        return request.param
37
38
    @pytest.fixture(
39
        scope="function",
40
        params=gen_single_filter_fields()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_single_filter_fields does not seem to be defined.
Loading history...
41
    )
42
    def get_filter_field(self, request):
43
        yield request.param
44
45
    @pytest.fixture(
46
        scope="function",
47
        params=gen_single_vector_fields()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_single_vector_fields does not seem to be defined.
Loading history...
48
    )
49
    def get_vector_field(self, request):
50
        yield request.param
51
52
    def test_flush_collection_not_existed(self, connect, collection):
53
        '''
54
        target: test flush, params collection_name not existed
55
        method: flush, with collection not existed
56
        expected: error raised
57
        '''
58
        collection_new = gen_unique_str("test_flush_1")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_unique_str does not seem to be defined.
Loading history...
59
        with pytest.raises(Exception) as e:
60
            connect.flush([collection_new])
61
62
    def test_flush_empty_collection(self, connect, collection):
63
        '''
64
        method: flush collection with no vectors
65
        expected: no error raised
66
        '''
67
        ids = connect.bulk_insert(collection, default_entities)
68
        assert len(ids) == default_nb
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
69
        status = connect.delete_entity_by_id(collection, ids)
70
        assert status.OK()
71
        connect.flush([collection])
72
        res = connect.count_entities(collection)
73
        assert 0 == res
74
        # with pytest.raises(Exception) as e:
75
        #     connect.flush([collection])
76
77
    def test_add_partition_flush(self, connect, id_collection):
78
        '''
79
        method: add entities into partition in collection, flush serveral times
80
        expected: the length of ids and the collection row count
81
        '''
82
        connect.create_partition(id_collection, default_tag)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_tag does not seem to be defined.
Loading history...
83
        ids = [i for i in range(default_nb)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
84
        ids = connect.bulk_insert(id_collection, default_entities, ids)
85
        connect.flush([id_collection])
86
        res_count = connect.count_entities(id_collection)
87
        assert res_count == default_nb
88
        ids = connect.bulk_insert(id_collection, default_entities, ids, partition_tag=default_tag)
89
        assert len(ids) == default_nb
90
        connect.flush([id_collection])
91
        res_count = connect.count_entities(id_collection)
92
        assert res_count == default_nb * 2
93
94 View Code Duplication
    def test_add_partitions_flush(self, connect, id_collection):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
95
        '''
96
        method: add entities into partitions in collection, flush one
97
        expected: the length of ids and the collection row count
98
        '''
99
        tag_new = gen_unique_str()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_unique_str does not seem to be defined.
Loading history...
100
        connect.create_partition(id_collection, default_tag)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_tag does not seem to be defined.
Loading history...
101
        connect.create_partition(id_collection, tag_new)
102
        ids = [i for i in range(default_nb)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
103
        ids = connect.bulk_insert(id_collection, default_entities, ids, partition_tag=default_tag)
104
        connect.flush([id_collection])
105
        ids = connect.bulk_insert(id_collection, default_entities, ids, partition_tag=tag_new)
106
        connect.flush([id_collection])
107
        res = connect.count_entities(id_collection)
108
        assert res == 2 * default_nb
109
110
    def test_add_collections_flush(self, connect, id_collection):
111
        '''
112
        method: add entities into collections, flush one
113
        expected: the length of ids and the collection row count
114
        '''
115
        collection_new = gen_unique_str()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_unique_str does not seem to be defined.
Loading history...
116
        default_fields = gen_default_fields(False)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_default_fields does not seem to be defined.
Loading history...
117
        connect.create_collection(collection_new, default_fields)
118
        connect.create_partition(id_collection, default_tag)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_tag does not seem to be defined.
Loading history...
119
        connect.create_partition(collection_new, default_tag)
120
        ids = [i for i in range(default_nb)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
121
        ids = connect.bulk_insert(id_collection, default_entities, ids, partition_tag=default_tag)
122
        ids = connect.bulk_insert(collection_new, default_entities, ids, partition_tag=default_tag)
123
        connect.flush([id_collection])
124
        connect.flush([collection_new])
125
        res = connect.count_entities(id_collection)
126
        assert res == default_nb
127
        res = connect.count_entities(collection_new)
128
        assert res == default_nb
129
130
    def test_add_collections_fields_flush(self, connect, id_collection, get_filter_field, get_vector_field):
131
        '''
132
        method: create collection with different fields, and add entities into collections, flush one
133
        expected: the length of ids and the collection row count
134
        '''
135
        nb_new = 5
136
        filter_field = get_filter_field
137
        vector_field = get_vector_field
138
        collection_new = gen_unique_str("test_flush")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_unique_str does not seem to be defined.
Loading history...
139
        fields = {
140
            "fields": [filter_field, vector_field],
141
            "segment_row_limit": default_segment_row_limit,
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_segment_row_limit does not seem to be defined.
Loading history...
142
            "auto_id": False
143
        }
144
        connect.create_collection(collection_new, fields)
145
        connect.create_partition(id_collection, default_tag)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_tag does not seem to be defined.
Loading history...
146
        connect.create_partition(collection_new, default_tag)
147
        entities_new = gen_entities_by_fields(fields["fields"], nb_new, default_dim)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_dim does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gen_entities_by_fields does not seem to be defined.
Loading history...
148
        ids = [i for i in range(default_nb)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
149
        ids_new = [i for i in range(nb_new)]
150
        ids = connect.bulk_insert(id_collection, default_entities, ids, partition_tag=default_tag)
151
        ids = connect.bulk_insert(collection_new, entities_new, ids_new, partition_tag=default_tag)
152
        connect.flush([id_collection])
153
        connect.flush([collection_new])
154
        res = connect.count_entities(id_collection)
155
        assert res == default_nb
156
        res = connect.count_entities(collection_new)
157
        assert res == nb_new
158
159
    def test_add_flush_multiable_times(self, connect, collection):
160
        '''
161
        method: add entities, flush serveral times
162
        expected: no error raised
163
        '''
164
        ids = connect.bulk_insert(collection, default_entities)
165
        for i in range(10):
166
            connect.flush([collection])
167
        res = connect.count_entities(collection)
168
        assert res == len(ids)
169
        # query_vecs = [vectors[0], vectors[1], vectors[-1]]
170
        res = connect.search(collection, default_single_query)
171
        logging.getLogger().debug(res)
172
        assert res
173
174
    def test_add_flush_auto(self, connect, id_collection):
175
        '''
176
        method: add entities
177
        expected: no error raised
178
        '''
179
        ids = [i for i in range(default_nb)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
180
        ids = connect.bulk_insert(id_collection, default_entities, ids)
181
        timeout = 20
182
        start_time = time.time()
183
        while (time.time() - start_time < timeout):
184
            time.sleep(1)
185
            res = connect.count_entities(id_collection)
186
            if res == default_nb:
187
                break
188
        if time.time() - start_time > timeout:
189
            assert False
190
191
    @pytest.fixture(
192
        scope="function",
193
        params=[
194
            1,
195
            100
196
        ],
197
    )
198
    def same_ids(self, request):
199
        yield request.param
200
201
    def test_add_flush_same_ids(self, connect, id_collection, same_ids):
202
        '''
203
        method: add entities, with same ids, count(same ids) < 15, > 15
204
        expected: the length of ids and the collection row count
205
        '''
206
        ids = [i for i in range(default_nb)]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
207
        for i, item in enumerate(ids):
208
            if item <= same_ids:
209
                ids[i] = 0
210
        ids = connect.bulk_insert(id_collection, default_entities, ids)
211
        connect.flush([id_collection])
212
        res = connect.count_entities(id_collection)
213
        assert res == default_nb
214
215
    def test_delete_flush_multiable_times(self, connect, collection):
216
        '''
217
        method: delete entities, flush serveral times
218
        expected: no error raised
219
        '''
220
        ids = connect.bulk_insert(collection, default_entities)
221
        status = connect.delete_entity_by_id(collection, [ids[-1]])
222
        assert status.OK()
223
        for i in range(10):
224
            connect.flush([collection])
225
        # query_vecs = [vectors[0], vectors[1], vectors[-1]]
226
        res = connect.search(collection, default_single_query)
227
        logging.getLogger().debug(res)
228
        assert res
229
230
    # TODO: unable to set config 
231
    @pytest.mark.level(2)
232
    def _test_collection_count_during_flush(self, connect, collection, args):
233
        '''
234
        method: flush collection at background, call `count_entities`
235
        expected: no timeout
236
        '''
237
        ids = []
238
        for i in range(5):
239
            tmp_ids = connect.bulk_insert(collection, default_entities)
240
            connect.flush([collection])
241
            ids.extend(tmp_ids)
242
        disable_flush(connect)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable disable_flush does not seem to be defined.
Loading history...
243
        status = connect.delete_entity_by_id(collection, ids)
244
245
        def flush():
246
            milvus = get_milvus(args["ip"], args["port"], handler=args["handler"])
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_milvus does not seem to be defined.
Loading history...
247
            logging.error("start flush")
248
            milvus.flush([collection])
249
            logging.error("end flush")
250
251
        p = TestThread(target=flush, args=())
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable TestThread does not seem to be defined.
Loading history...
252
        p.start()
253
        time.sleep(0.2)
254
        logging.error("start count")
255
        res = connect.count_entities(collection, timeout=10)
256
        p.join()
257
        res = connect.count_entities(collection)
258
        assert res == 0
259
260
    @pytest.mark.level(2)
261
    def test_delete_flush_during_search(self, connect, collection, args):
262
        '''
263
        method: search at background, call `delete and flush`
264
        expected: no timeout
265
        '''
266
        ids = []
267
        loops = 5
268
        for i in range(loops):
269
            tmp_ids = connect.bulk_insert(collection, default_entities)
270
            connect.flush([collection])
271
            ids.extend(tmp_ids)
272
        nq = 10000
273
        query, query_vecs = gen_query_vectors(default_float_vec_field_name, default_entities, default_top_k, nq)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_top_k does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable default_float_vec_field_name does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gen_query_vectors does not seem to be defined.
Loading history...
274
        time.sleep(0.1)
275
        future = connect.search(collection, query, _async=True)
276
        delete_ids = [ids[0], ids[-1]]
277
        status = connect.delete_entity_by_id(collection, delete_ids)
278
        connect.flush([collection])
279
        res = future.result()
280
        res_count = connect.count_entities(collection, timeout=120)
281
        assert res_count == loops * default_nb - len(delete_ids)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable default_nb does not seem to be defined.
Loading history...
282
283
284
class TestFlushAsync:
285
    @pytest.fixture(scope="function", autouse=True)
286
    def skip_http_check(self, args):
287
        if args["handler"] == "HTTP":
288
            pytest.skip("skip in http mode")
289
290
    """
291
    ******************************************************************
292
      The following cases are used to test `flush` function
293
    ******************************************************************
294
    """
295
296
    def check_status(self):
297
        logging.getLogger().info("In callback check status")
298
299
    def test_flush_empty_collection(self, connect, collection):
300
        '''
301
        method: flush collection with no vectors
302
        expected: status ok
303
        '''
304
        future = connect.flush([collection], _async=True)
305
        status = future.result()
306
307
    def test_flush_async_long(self, connect, collection):
308
        ids = connect.bulk_insert(collection, default_entities)
309
        future = connect.flush([collection], _async=True)
310
        status = future.result()
311
312
    def test_flush_async_long_drop_collection(self, connect, collection):
313
        for i in range(5):
314
            ids = connect.bulk_insert(collection, default_entities)
315
        future = connect.flush([collection], _async=True)
316
        logging.getLogger().info("DROP")
317
        connect.drop_collection(collection)
318
319
    def test_flush_async(self, connect, collection):
320
        connect.bulk_insert(collection, default_entities)
321
        logging.getLogger().info("before")
322
        future = connect.flush([collection], _async=True, _callback=self.check_status)
323
        logging.getLogger().info("after")
324
        future.done()
325
        status = future.result()
326
327
328
class TestCollectionNameInvalid(object):
329
    """
330
    Test adding vectors with invalid collection names
331
    """
332
333
    @pytest.fixture(
334
        scope="function",
335
        # params=gen_invalid_collection_names()
336
        params=gen_invalid_strs()
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gen_invalid_strs does not seem to be defined.
Loading history...
337
    )
338
    def get_invalid_collection_name(self, request):
339
        yield request.param
340
341
    @pytest.mark.level(2)
342
    def test_flush_with_invalid_collection_name(self, connect, get_invalid_collection_name):
343
        collection_name = get_invalid_collection_name
344
        if collection_name is None or not collection_name:
345
            pytest.skip("while collection_name is None, then flush all collections")
346
        with pytest.raises(Exception) as e:
347
            connect.flush(collection_name)
348