@@ 901-938 (lines=38) @@ | ||
898 | # for t in threads: |
|
899 | # t.join() |
|
900 | ||
901 | @pytest.mark.level(2) |
|
902 | @pytest.mark.timeout(30) |
|
903 | def test_search_concurrent_multithreads(self, args): |
|
904 | ''' |
|
905 | target: test concurrent search with multiprocessess |
|
906 | method: search with 10 processes, each process uses dependent connection |
|
907 | expected: status ok and the returned vectors should be query_records |
|
908 | ''' |
|
909 | nb = 100 |
|
910 | top_k = 10 |
|
911 | threads_num = 4 |
|
912 | threads = [] |
|
913 | collection = gen_unique_str("test_search_concurrent_multiprocessing") |
|
914 | uri = "tcp://%s:%s" % (args["ip"], args["port"]) |
|
915 | param = {'collection_name': collection, |
|
916 | 'dimension': dim, |
|
917 | 'index_type': IndexType.FLAT, |
|
918 | 'store_raw_vector': False} |
|
919 | # create collection |
|
920 | milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
|
921 | milvus.create_collection(param) |
|
922 | vectors, ids = self.init_data(milvus, collection, nb=nb) |
|
923 | query_vecs = vectors[nb // 2:nb] |
|
924 | ||
925 | def search(milvus): |
|
926 | status, result = milvus.search(collection, top_k, query_vecs) |
|
927 | assert len(result) == len(query_vecs) |
|
928 | for i in range(len(query_vecs)): |
|
929 | assert result[i][0].id in ids |
|
930 | assert result[i][0].distance == 0.0 |
|
931 | ||
932 | for i in range(threads_num): |
|
933 | milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
|
934 | t = threading.Thread(target=search, args=(milvus,)) |
|
935 | threads.append(t) |
|
936 | t.start() |
|
937 | time.sleep(0.2) |
|
938 | for t in threads: |
|
939 | t.join() |
|
940 | ||
941 | # TODO: enable |
|
@@ 942-978 (lines=37) @@ | ||
939 | t.join() |
|
940 | ||
941 | # TODO: enable |
|
942 | @pytest.mark.timeout(30) |
|
943 | def _test_search_concurrent_multiprocessing(self, args): |
|
944 | ''' |
|
945 | target: test concurrent search with multiprocessess |
|
946 | method: search with 10 processes, each process uses dependent connection |
|
947 | expected: status ok and the returned vectors should be query_records |
|
948 | ''' |
|
949 | nb = 100 |
|
950 | top_k = 10 |
|
951 | process_num = 4 |
|
952 | processes = [] |
|
953 | collection = gen_unique_str("test_search_concurrent_multiprocessing") |
|
954 | uri = "tcp://%s:%s" % (args["ip"], args["port"]) |
|
955 | param = {'collection_name': collection, |
|
956 | 'dimension': dim, |
|
957 | 'index_type': IndexType.FLAT, |
|
958 | 'store_raw_vector': False} |
|
959 | # create collection |
|
960 | milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
|
961 | milvus.create_collection(param) |
|
962 | vectors, ids = self.init_data(milvus, collection, nb=nb) |
|
963 | query_vecs = vectors[nb // 2:nb] |
|
964 | ||
965 | def search(milvus): |
|
966 | status, result = milvus.search(collection, top_k, query_vecs) |
|
967 | assert len(result) == len(query_vecs) |
|
968 | for i in range(len(query_vecs)): |
|
969 | assert result[i][0].id in ids |
|
970 | assert result[i][0].distance == 0.0 |
|
971 | ||
972 | for i in range(process_num): |
|
973 | milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
|
974 | p = Process(target=search, args=(milvus,)) |
|
975 | processes.append(p) |
|
976 | p.start() |
|
977 | time.sleep(0.2) |
|
978 | for p in processes: |
|
979 | p.join() |
|
980 | ||
981 | def test_search_multi_collection_L2(search, args): |