|
@@ 901-938 (lines=38) @@
|
| 898 |
|
# for t in threads: |
| 899 |
|
# t.join() |
| 900 |
|
|
| 901 |
|
@pytest.mark.level(2) |
| 902 |
|
@pytest.mark.timeout(30) |
| 903 |
|
def test_search_concurrent_multithreads(self, args): |
| 904 |
|
''' |
| 905 |
|
target: test concurrent search with multiprocessess |
| 906 |
|
method: search with 10 processes, each process uses dependent connection |
| 907 |
|
expected: status ok and the returned vectors should be query_records |
| 908 |
|
''' |
| 909 |
|
nb = 100 |
| 910 |
|
top_k = 10 |
| 911 |
|
threads_num = 4 |
| 912 |
|
threads = [] |
| 913 |
|
collection = gen_unique_str("test_search_concurrent_multiprocessing") |
| 914 |
|
uri = "tcp://%s:%s" % (args["ip"], args["port"]) |
| 915 |
|
param = {'collection_name': collection, |
| 916 |
|
'dimension': dim, |
| 917 |
|
'index_type': IndexType.FLAT, |
| 918 |
|
'store_raw_vector': False} |
| 919 |
|
# create collection |
| 920 |
|
milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
| 921 |
|
milvus.create_collection(param) |
| 922 |
|
vectors, ids = self.init_data(milvus, collection, nb=nb) |
| 923 |
|
query_vecs = vectors[nb // 2:nb] |
| 924 |
|
|
| 925 |
|
def search(milvus): |
| 926 |
|
status, result = milvus.search(collection, top_k, query_vecs) |
| 927 |
|
assert len(result) == len(query_vecs) |
| 928 |
|
for i in range(len(query_vecs)): |
| 929 |
|
assert result[i][0].id in ids |
| 930 |
|
assert result[i][0].distance == 0.0 |
| 931 |
|
|
| 932 |
|
for i in range(threads_num): |
| 933 |
|
milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
| 934 |
|
t = threading.Thread(target=search, args=(milvus,)) |
| 935 |
|
threads.append(t) |
| 936 |
|
t.start() |
| 937 |
|
time.sleep(0.2) |
| 938 |
|
for t in threads: |
| 939 |
|
t.join() |
| 940 |
|
|
| 941 |
|
# TODO: enable |
|
@@ 942-978 (lines=37) @@
|
| 939 |
|
t.join() |
| 940 |
|
|
| 941 |
|
# TODO: enable |
| 942 |
|
@pytest.mark.timeout(30) |
| 943 |
|
def _test_search_concurrent_multiprocessing(self, args): |
| 944 |
|
''' |
| 945 |
|
target: test concurrent search with multiprocessess |
| 946 |
|
method: search with 10 processes, each process uses dependent connection |
| 947 |
|
expected: status ok and the returned vectors should be query_records |
| 948 |
|
''' |
| 949 |
|
nb = 100 |
| 950 |
|
top_k = 10 |
| 951 |
|
process_num = 4 |
| 952 |
|
processes = [] |
| 953 |
|
collection = gen_unique_str("test_search_concurrent_multiprocessing") |
| 954 |
|
uri = "tcp://%s:%s" % (args["ip"], args["port"]) |
| 955 |
|
param = {'collection_name': collection, |
| 956 |
|
'dimension': dim, |
| 957 |
|
'index_type': IndexType.FLAT, |
| 958 |
|
'store_raw_vector': False} |
| 959 |
|
# create collection |
| 960 |
|
milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
| 961 |
|
milvus.create_collection(param) |
| 962 |
|
vectors, ids = self.init_data(milvus, collection, nb=nb) |
| 963 |
|
query_vecs = vectors[nb // 2:nb] |
| 964 |
|
|
| 965 |
|
def search(milvus): |
| 966 |
|
status, result = milvus.search(collection, top_k, query_vecs) |
| 967 |
|
assert len(result) == len(query_vecs) |
| 968 |
|
for i in range(len(query_vecs)): |
| 969 |
|
assert result[i][0].id in ids |
| 970 |
|
assert result[i][0].distance == 0.0 |
| 971 |
|
|
| 972 |
|
for i in range(process_num): |
| 973 |
|
milvus = get_milvus(args["ip"], args["port"], handler=args["handler"]) |
| 974 |
|
p = Process(target=search, args=(milvus,)) |
| 975 |
|
processes.append(p) |
| 976 |
|
p.start() |
| 977 |
|
time.sleep(0.2) |
| 978 |
|
for p in processes: |
| 979 |
|
p.join() |
| 980 |
|
|
| 981 |
|
def test_search_multi_collection_L2(search, args): |