@@ 297-345 (lines=49) @@ | ||
294 | assert result[0][0].distance == 0.0 |
|
295 | ||
296 | # TODO: enable |
|
297 | @pytest.mark.timeout(BUILD_TIMEOUT) |
|
298 | def _test_create_index_multiprocessing_multicollection(self, connect, args): |
|
299 | ''' |
|
300 | target: test create index interface with multiprocess |
|
301 | method: create collection and add vectors in it, create index |
|
302 | expected: return code equals to 0, and search success |
|
303 | ''' |
|
304 | process_num = 8 |
|
305 | loop_num = 8 |
|
306 | processes = [] |
|
307 | ||
308 | collection = [] |
|
309 | j = 0 |
|
310 | while j < (process_num*loop_num): |
|
311 | collection_name = gen_unique_str("test_create_index_multiprocessing") |
|
312 | collection.append(collection_name) |
|
313 | param = {'collection_name': collection_name, |
|
314 | 'dimension': dim, |
|
315 | 'index_type': IndexType.FLAT, |
|
316 | 'store_raw_vector': False} |
|
317 | connect.create_collection(param) |
|
318 | j = j + 1 |
|
319 | ||
320 | def create_index(): |
|
321 | i = 0 |
|
322 | while i < loop_num: |
|
323 | # assert connect.has_collection(collection[ids*process_num+i]) |
|
324 | status, ids = connect.insert(collection[ids*process_num+i], vectors) |
|
325 | ||
326 | status = connect.create_index(collection[ids*process_num+i], IndexType.IVFLAT, {"nlist": NLIST}) |
|
327 | assert status.OK() |
|
328 | query_vec = [vectors[0]] |
|
329 | top_k = 1 |
|
330 | search_param = {"nprobe": nprobe} |
|
331 | status, result = connect.search(collection[ids*process_num+i], top_k, query_vec, params=search_param) |
|
332 | assert len(result) == 1 |
|
333 | assert len(result[0]) == top_k |
|
334 | assert result[0][0].distance == 0.0 |
|
335 | i = i + 1 |
|
336 | ||
337 | for i in range(process_num): |
|
338 | m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) |
|
339 | ids = i |
|
340 | p = Process(target=create_index, args=(m,ids)) |
|
341 | processes.append(p) |
|
342 | p.start() |
|
343 | time.sleep(0.2) |
|
344 | for p in processes: |
|
345 | p.join() |
|
346 | ||
347 | def test_create_index_collection_not_existed(self, connect): |
|
348 | ''' |
|
@@ 186-232 (lines=47) @@ | ||
183 | assert len(result[0]) == top_k |
|
184 | assert result[0][0].distance == 0.0 |
|
185 | ||
186 | @pytest.mark.level(2) |
|
187 | @pytest.mark.timeout(BUILD_TIMEOUT) |
|
188 | def test_create_index_multithread_multicollection(self, connect, args): |
|
189 | ''' |
|
190 | target: test create index interface with multiprocess |
|
191 | method: create collection and add vectors in it, create index |
|
192 | expected: return code equals to 0, and search success |
|
193 | ''' |
|
194 | threads_num = 8 |
|
195 | loop_num = 8 |
|
196 | threads = [] |
|
197 | collection = [] |
|
198 | j = 0 |
|
199 | while j < (threads_num*loop_num): |
|
200 | collection_name = gen_unique_str("test_create_index_multiprocessing") |
|
201 | collection.append(collection_name) |
|
202 | param = {'collection_name': collection_name, |
|
203 | 'dimension': dim, |
|
204 | 'index_type': IndexType.FLAT, |
|
205 | 'store_raw_vector': False} |
|
206 | connect.create_collection(param) |
|
207 | j = j + 1 |
|
208 | ||
209 | def create_index(): |
|
210 | i = 0 |
|
211 | while i < loop_num: |
|
212 | # assert connect.has_collection(collection[ids*process_num+i]) |
|
213 | status, ids = connect.insert(collection[ids*threads_num+i], vectors) |
|
214 | status = connect.create_index(collection[ids*threads_num+i], IndexType.IVFLAT, {"nlist": NLIST}) |
|
215 | assert status.OK() |
|
216 | query_vec = [vectors[0]] |
|
217 | top_k = 1 |
|
218 | search_param = {"nprobe": nprobe} |
|
219 | status, result = connect.search(collection[ids*threads_num+i], top_k, query_vec, params=search_param) |
|
220 | assert len(result) == 1 |
|
221 | assert len(result[0]) == top_k |
|
222 | assert result[0][0].distance == 0.0 |
|
223 | i = i + 1 |
|
224 | for i in range(threads_num): |
|
225 | m = get_milvus(host=args["ip"], port=args["port"], handler=args["handler"]) |
|
226 | ids = i |
|
227 | t = threading.Thread(target=create_index, args=(m, ids)) |
|
228 | threads.append(t) |
|
229 | t.start() |
|
230 | time.sleep(0.2) |
|
231 | for t in threads: |
|
232 | t.join() |
|
233 | ||
234 | @pytest.mark.timeout(BUILD_TIMEOUT) |
|
235 | @pytest.mark.level(2) |