Total Complexity | 66 |
Total Lines | 748 |
Duplicated Lines | 3.07 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like bench_gpu_1bn often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # Copyright (c) Facebook, Inc. and its affiliates. |
||
2 | # |
||
3 | # This source code is licensed under the MIT license found in the |
||
4 | # LICENSE file in the root directory of this source tree. |
||
5 | |||
6 | #! /usr/bin/env python2 |
||
7 | |||
8 | from __future__ import print_function |
||
9 | import numpy as np |
||
10 | import time |
||
11 | import os |
||
12 | import sys |
||
13 | import faiss |
||
14 | import re |
||
15 | |||
16 | from multiprocessing.dummy import Pool as ThreadPool |
||
17 | from datasets import ivecs_read |
||
18 | |||
19 | #################################################################### |
||
20 | # Parse command line |
||
21 | #################################################################### |
||
22 | |||
23 | |||
24 | def usage(): |
||
25 | print(""" |
||
26 | |||
27 | Usage: bench_gpu_1bn.py dataset indextype [options] |
||
28 | |||
29 | dataset: set of vectors to operate on. |
||
30 | Supported: SIFT1M, SIFT2M, ..., SIFT1000M or Deep1B |
||
31 | |||
32 | indextype: any index type supported by index_factory that runs on GPU. |
||
33 | |||
34 | General options |
||
35 | |||
36 | -ngpu ngpu nb of GPUs to use (default = all) |
||
37 | -tempmem N use N bytes of temporary GPU memory |
||
38 | -nocache do not read or write intermediate files |
||
39 | -float16 use 16-bit floats on the GPU side |
||
40 | |||
41 | Add options |
||
42 | |||
43 | -abs N split adds in blocks of no more than N vectors |
||
44 | -max_add N copy sharded dataset to CPU each max_add additions |
||
45 | (to avoid memory overflows with geometric reallocations) |
||
46 | -altadd Alternative add function, where the index is not stored |
||
47 | on GPU during add. Slightly faster for big datasets on |
||
48 | slow GPUs |
||
49 | |||
50 | Search options |
||
51 | |||
52 | -R R: nb of replicas of the same dataset (the dataset |
||
53 | will be copied across ngpu/R, default R=1) |
||
54 | -noptables do not use precomputed tables in IVFPQ. |
||
55 | -qbs N split queries in blocks of no more than N vectors |
||
56 | -nnn N search N neighbors for each query |
||
57 | -nprobe 4,16,64 try this number of probes |
||
58 | -knngraph instead of the standard setup for the dataset, |
||
59 | compute a k-nn graph with nnn neighbors per element |
||
60 | -oI xx%d.npy output the search result indices to this numpy file, |
||
61 | %d will be replaced with the nprobe |
||
62 | -oD xx%d.npy output the search result distances to this file |
||
63 | |||
64 | """, file=sys.stderr) |
||
65 | sys.exit(1) |
||
66 | |||
67 | |||
68 | # default values |
||
69 | |||
70 | dbname = None |
||
71 | index_key = None |
||
72 | |||
73 | ngpu = faiss.get_num_gpus() |
||
74 | |||
75 | replicas = 1 # nb of replicas of sharded dataset |
||
76 | add_batch_size = 32768 |
||
77 | query_batch_size = 16384 |
||
78 | nprobes = [1 << l for l in range(9)] |
||
79 | knngraph = False |
||
80 | use_precomputed_tables = True |
||
81 | tempmem = -1 # if -1, use system default |
||
82 | max_add = -1 |
||
83 | use_float16 = False |
||
84 | use_cache = True |
||
85 | nnn = 10 |
||
86 | altadd = False |
||
87 | I_fname = None |
||
88 | D_fname = None |
||
89 | |||
90 | args = sys.argv[1:] |
||
91 | |||
92 | while args: |
||
93 | a = args.pop(0) |
||
94 | if a == '-h': usage() |
||
95 | elif a == '-ngpu': ngpu = int(args.pop(0)) |
||
96 | elif a == '-R': replicas = int(args.pop(0)) |
||
97 | elif a == '-noptables': use_precomputed_tables = False |
||
98 | elif a == '-abs': add_batch_size = int(args.pop(0)) |
||
99 | elif a == '-qbs': query_batch_size = int(args.pop(0)) |
||
100 | elif a == '-nnn': nnn = int(args.pop(0)) |
||
101 | elif a == '-tempmem': tempmem = int(args.pop(0)) |
||
102 | elif a == '-nocache': use_cache = False |
||
103 | elif a == '-knngraph': knngraph = True |
||
104 | elif a == '-altadd': altadd = True |
||
105 | elif a == '-float16': use_float16 = True |
||
106 | elif a == '-nprobe': nprobes = [int(x) for x in args.pop(0).split(',')] |
||
107 | elif a == '-max_add': max_add = int(args.pop(0)) |
||
108 | elif not dbname: dbname = a |
||
109 | elif not index_key: index_key = a |
||
110 | else: |
||
111 | print("argument %s unknown" % a, file=sys.stderr) |
||
112 | sys.exit(1) |
||
113 | |||
114 | cacheroot = '/tmp/bench_gpu_1bn' |
||
115 | |||
116 | if not os.path.isdir(cacheroot): |
||
117 | print("%s does not exist, creating it" % cacheroot) |
||
118 | os.mkdir(cacheroot) |
||
119 | |||
120 | ################################################################# |
||
121 | # Small Utility Functions |
||
122 | ################################################################# |
||
123 | |||
124 | # we mem-map the biggest files to avoid having them in memory all at |
||
125 | # once |
||
126 | |||
127 | def mmap_fvecs(fname): |
||
128 | x = np.memmap(fname, dtype='int32', mode='r') |
||
129 | d = x[0] |
||
130 | return x.view('float32').reshape(-1, d + 1)[:, 1:] |
||
131 | |||
132 | def mmap_bvecs(fname): |
||
133 | x = np.memmap(fname, dtype='uint8', mode='r') |
||
134 | d = x[:4].view('int32')[0] |
||
135 | return x.reshape(-1, d + 4)[:, 4:] |
||
136 | |||
137 | |||
138 | def rate_limited_imap(f, l): |
||
139 | """A threaded imap that does not produce elements faster than they |
||
140 | are consumed""" |
||
141 | pool = ThreadPool(1) |
||
142 | res = None |
||
143 | for i in l: |
||
144 | res_next = pool.apply_async(f, (i, )) |
||
145 | if res: |
||
146 | yield res.get() |
||
147 | res = res_next |
||
148 | yield res.get() |
||
149 | |||
150 | |||
151 | class IdentPreproc: |
||
152 | """a pre-processor is either a faiss.VectorTransform or an IndentPreproc""" |
||
153 | |||
154 | def __init__(self, d): |
||
155 | self.d_in = self.d_out = d |
||
156 | |||
157 | def apply_py(self, x): |
||
158 | return x |
||
159 | |||
160 | |||
161 | def sanitize(x): |
||
162 | """ convert array to a c-contiguous float array """ |
||
163 | return np.ascontiguousarray(x.astype('float32')) |
||
164 | |||
165 | |||
166 | def dataset_iterator(x, preproc, bs): |
||
167 | """ iterate over the lines of x in blocks of size bs""" |
||
168 | |||
169 | nb = x.shape[0] |
||
170 | block_ranges = [(i0, min(nb, i0 + bs)) |
||
171 | for i0 in range(0, nb, bs)] |
||
172 | |||
173 | def prepare_block(i01): |
||
174 | i0, i1 = i01 |
||
175 | xb = sanitize(x[i0:i1]) |
||
176 | return i0, preproc.apply_py(xb) |
||
177 | |||
178 | return rate_limited_imap(prepare_block, block_ranges) |
||
179 | |||
180 | |||
181 | def eval_intersection_measure(gt_I, I): |
||
182 | """ measure intersection measure (used for knngraph)""" |
||
183 | inter = 0 |
||
184 | rank = I.shape[1] |
||
185 | assert gt_I.shape[1] >= rank |
||
186 | for q in range(nq_gt): |
||
|
|||
187 | inter += faiss.ranklist_intersection_size( |
||
188 | rank, faiss.swig_ptr(gt_I[q, :]), |
||
189 | rank, faiss.swig_ptr(I[q, :].astype('int64'))) |
||
190 | return inter / float(rank * nq_gt) |
||
191 | |||
192 | |||
193 | ################################################################# |
||
194 | # Prepare dataset |
||
195 | ################################################################# |
||
196 | |||
197 | print("Preparing dataset", dbname) |
||
198 | |||
199 | View Code Duplication | if dbname.startswith('SIFT'): |
|
200 | # SIFT1M to SIFT1000M |
||
201 | dbsize = int(dbname[4:-1]) |
||
202 | xb = mmap_bvecs('bigann/bigann_base.bvecs') |
||
203 | xq = mmap_bvecs('bigann/bigann_query.bvecs') |
||
204 | xt = mmap_bvecs('bigann/bigann_learn.bvecs') |
||
205 | |||
206 | # trim xb to correct size |
||
207 | xb = xb[:dbsize * 1000 * 1000] |
||
208 | |||
209 | gt_I = ivecs_read('bigann/gnd/idx_%dM.ivecs' % dbsize) |
||
210 | |||
211 | elif dbname == 'Deep1B': |
||
212 | xb = mmap_fvecs('deep1b/base.fvecs') |
||
213 | xq = mmap_fvecs('deep1b/deep1B_queries.fvecs') |
||
214 | xt = mmap_fvecs('deep1b/learn.fvecs') |
||
215 | # deep1B's train is is outrageously big |
||
216 | xt = xt[:10 * 1000 * 1000] |
||
217 | gt_I = ivecs_read('deep1b/deep1B_groundtruth.ivecs') |
||
218 | |||
219 | else: |
||
220 | print('unknown dataset', dbname, file=sys.stderr) |
||
221 | sys.exit(1) |
||
222 | |||
223 | |||
224 | if knngraph: |
||
225 | # convert to knn-graph dataset |
||
226 | xq = xb |
||
227 | xt = xb |
||
228 | # we compute the ground-truth on this number of queries for validation |
||
229 | nq_gt = 10000 |
||
230 | gt_sl = 100 |
||
231 | |||
232 | # ground truth will be computed below |
||
233 | gt_I = None |
||
234 | |||
235 | |||
236 | print("sizes: B %s Q %s T %s gt %s" % ( |
||
237 | xb.shape, xq.shape, xt.shape, |
||
238 | gt_I.shape if gt_I is not None else None)) |
||
239 | |||
240 | |||
241 | |||
242 | ################################################################# |
||
243 | # Parse index_key and set cache files |
||
244 | # |
||
245 | # The index_key is a valid factory key that would work, but we |
||
246 | # decompose the training to do it faster |
||
247 | ################################################################# |
||
248 | |||
249 | |||
250 | pat = re.compile('(OPQ[0-9]+(_[0-9]+)?,|PCAR[0-9]+,)?' + |
||
251 | '(IVF[0-9]+),' + |
||
252 | '(PQ[0-9]+|Flat)') |
||
253 | |||
254 | matchobject = pat.match(index_key) |
||
255 | |||
256 | assert matchobject, 'could not parse ' + index_key |
||
257 | |||
258 | mog = matchobject.groups() |
||
259 | |||
260 | preproc_str = mog[0] |
||
261 | ivf_str = mog[2] |
||
262 | pqflat_str = mog[3] |
||
263 | |||
264 | ncent = int(ivf_str[3:]) |
||
265 | |||
266 | prefix = '' |
||
267 | |||
268 | if knngraph: |
||
269 | gt_cachefile = '%s/BK_gt_%s.npy' % (cacheroot, dbname) |
||
270 | prefix = 'BK_' |
||
271 | # files must be kept distinct because the training set is not the |
||
272 | # same for the knngraph |
||
273 | |||
274 | if preproc_str: |
||
275 | preproc_cachefile = '%s/%spreproc_%s_%s.vectrans' % ( |
||
276 | cacheroot, prefix, dbname, preproc_str[:-1]) |
||
277 | else: |
||
278 | preproc_cachefile = None |
||
279 | preproc_str = '' |
||
280 | |||
281 | cent_cachefile = '%s/%scent_%s_%s%s.npy' % ( |
||
282 | cacheroot, prefix, dbname, preproc_str, ivf_str) |
||
283 | |||
284 | index_cachefile = '%s/%s%s_%s%s,%s.index' % ( |
||
285 | cacheroot, prefix, dbname, preproc_str, ivf_str, pqflat_str) |
||
286 | |||
287 | |||
288 | if not use_cache: |
||
289 | preproc_cachefile = None |
||
290 | cent_cachefile = None |
||
291 | index_cachefile = None |
||
292 | |||
293 | print("cachefiles:") |
||
294 | print(preproc_cachefile) |
||
295 | print(cent_cachefile) |
||
296 | print(index_cachefile) |
||
297 | |||
298 | |||
299 | ################################################################# |
||
300 | # Wake up GPUs |
||
301 | ################################################################# |
||
302 | |||
303 | print("preparing resources for %d GPUs" % ngpu) |
||
304 | |||
305 | gpu_resources = [] |
||
306 | |||
307 | for i in range(ngpu): |
||
308 | res = faiss.StandardGpuResources() |
||
309 | if tempmem >= 0: |
||
310 | res.setTempMemory(tempmem) |
||
311 | gpu_resources.append(res) |
||
312 | |||
313 | |||
314 | def make_vres_vdev(i0=0, i1=-1): |
||
315 | " return vectors of device ids and resources useful for gpu_multiple" |
||
316 | vres = faiss.GpuResourcesVector() |
||
317 | vdev = faiss.IntVector() |
||
318 | if i1 == -1: |
||
319 | i1 = ngpu |
||
320 | for i in range(i0, i1): |
||
321 | vdev.push_back(i) |
||
322 | vres.push_back(gpu_resources[i]) |
||
323 | return vres, vdev |
||
324 | |||
325 | |||
326 | ################################################################# |
||
327 | # Prepare ground truth (for the knngraph) |
||
328 | ################################################################# |
||
329 | |||
330 | |||
331 | def compute_GT(): |
||
332 | print("compute GT") |
||
333 | t0 = time.time() |
||
334 | |||
335 | gt_I = np.zeros((nq_gt, gt_sl), dtype='int64') |
||
336 | gt_D = np.zeros((nq_gt, gt_sl), dtype='float32') |
||
337 | heaps = faiss.float_maxheap_array_t() |
||
338 | heaps.k = gt_sl |
||
339 | heaps.nh = nq_gt |
||
340 | heaps.val = faiss.swig_ptr(gt_D) |
||
341 | heaps.ids = faiss.swig_ptr(gt_I) |
||
342 | heaps.heapify() |
||
343 | bs = 10 ** 5 |
||
344 | |||
345 | n, d = xb.shape |
||
346 | xqs = sanitize(xq[:nq_gt]) |
||
347 | |||
348 | db_gt = faiss.IndexFlatL2(d) |
||
349 | vres, vdev = make_vres_vdev() |
||
350 | db_gt_gpu = faiss.index_cpu_to_gpu_multiple( |
||
351 | vres, vdev, db_gt) |
||
352 | |||
353 | # compute ground-truth by blocks of bs, and add to heaps |
||
354 | for i0, xsl in dataset_iterator(xb, IdentPreproc(d), bs): |
||
355 | db_gt_gpu.add(xsl) |
||
356 | D, I = db_gt_gpu.search(xqs, gt_sl) |
||
357 | I += i0 |
||
358 | heaps.addn_with_ids( |
||
359 | gt_sl, faiss.swig_ptr(D), faiss.swig_ptr(I), gt_sl) |
||
360 | db_gt_gpu.reset() |
||
361 | print("\r %d/%d, %.3f s" % (i0, n, time.time() - t0), end=' ') |
||
362 | print() |
||
363 | heaps.reorder() |
||
364 | |||
365 | print("GT time: %.3f s" % (time.time() - t0)) |
||
366 | return gt_I |
||
367 | |||
368 | |||
369 | if knngraph: |
||
370 | |||
371 | if gt_cachefile and os.path.exists(gt_cachefile): |
||
372 | print("load GT", gt_cachefile) |
||
373 | gt_I = np.load(gt_cachefile) |
||
374 | else: |
||
375 | gt_I = compute_GT() |
||
376 | if gt_cachefile: |
||
377 | print("store GT", gt_cachefile) |
||
378 | np.save(gt_cachefile, gt_I) |
||
379 | |||
380 | ################################################################# |
||
381 | # Prepare the vector transformation object (pure CPU) |
||
382 | ################################################################# |
||
383 | |||
384 | |||
385 | def train_preprocessor(): |
||
386 | print("train preproc", preproc_str) |
||
387 | d = xt.shape[1] |
||
388 | t0 = time.time() |
||
389 | if preproc_str.startswith('OPQ'): |
||
390 | fi = preproc_str[3:-1].split('_') |
||
391 | m = int(fi[0]) |
||
392 | dout = int(fi[1]) if len(fi) == 2 else d |
||
393 | preproc = faiss.OPQMatrix(d, m, dout) |
||
394 | elif preproc_str.startswith('PCAR'): |
||
395 | dout = int(preproc_str[4:-1]) |
||
396 | preproc = faiss.PCAMatrix(d, dout, 0, True) |
||
397 | else: |
||
398 | assert False |
||
399 | preproc.train(sanitize(xt[:1000000])) |
||
400 | print("preproc train done in %.3f s" % (time.time() - t0)) |
||
401 | return preproc |
||
402 | |||
403 | |||
404 | def get_preprocessor(): |
||
405 | if preproc_str: |
||
406 | if not preproc_cachefile or not os.path.exists(preproc_cachefile): |
||
407 | preproc = train_preprocessor() |
||
408 | if preproc_cachefile: |
||
409 | print("store", preproc_cachefile) |
||
410 | faiss.write_VectorTransform(preproc, preproc_cachefile) |
||
411 | else: |
||
412 | print("load", preproc_cachefile) |
||
413 | preproc = faiss.read_VectorTransform(preproc_cachefile) |
||
414 | else: |
||
415 | d = xb.shape[1] |
||
416 | preproc = IdentPreproc(d) |
||
417 | return preproc |
||
418 | |||
419 | |||
420 | ################################################################# |
||
421 | # Prepare the coarse quantizer |
||
422 | ################################################################# |
||
423 | |||
424 | |||
425 | def train_coarse_quantizer(x, k, preproc): |
||
426 | d = preproc.d_out |
||
427 | clus = faiss.Clustering(d, k) |
||
428 | clus.verbose = True |
||
429 | # clus.niter = 2 |
||
430 | clus.max_points_per_centroid = 10000000 |
||
431 | |||
432 | print("apply preproc on shape", x.shape, 'k=', k) |
||
433 | t0 = time.time() |
||
434 | x = preproc.apply_py(sanitize(x)) |
||
435 | print(" preproc %.3f s output shape %s" % ( |
||
436 | time.time() - t0, x.shape)) |
||
437 | |||
438 | vres, vdev = make_vres_vdev() |
||
439 | index = faiss.index_cpu_to_gpu_multiple( |
||
440 | vres, vdev, faiss.IndexFlatL2(d)) |
||
441 | |||
442 | clus.train(x, index) |
||
443 | centroids = faiss.vector_float_to_array(clus.centroids) |
||
444 | |||
445 | return centroids.reshape(k, d) |
||
446 | |||
447 | |||
448 | def prepare_coarse_quantizer(preproc): |
||
449 | |||
450 | if cent_cachefile and os.path.exists(cent_cachefile): |
||
451 | print("load centroids", cent_cachefile) |
||
452 | centroids = np.load(cent_cachefile) |
||
453 | else: |
||
454 | nt = max(1000000, 256 * ncent) |
||
455 | print("train coarse quantizer...") |
||
456 | t0 = time.time() |
||
457 | centroids = train_coarse_quantizer(xt[:nt], ncent, preproc) |
||
458 | print("Coarse train time: %.3f s" % (time.time() - t0)) |
||
459 | if cent_cachefile: |
||
460 | print("store centroids", cent_cachefile) |
||
461 | np.save(cent_cachefile, centroids) |
||
462 | |||
463 | coarse_quantizer = faiss.IndexFlatL2(preproc.d_out) |
||
464 | coarse_quantizer.add(centroids) |
||
465 | |||
466 | return coarse_quantizer |
||
467 | |||
468 | |||
469 | ################################################################# |
||
470 | # Make index and add elements to it |
||
471 | ################################################################# |
||
472 | |||
473 | |||
474 | def prepare_trained_index(preproc): |
||
475 | |||
476 | coarse_quantizer = prepare_coarse_quantizer(preproc) |
||
477 | d = preproc.d_out |
||
478 | if pqflat_str == 'Flat': |
||
479 | print("making an IVFFlat index") |
||
480 | idx_model = faiss.IndexIVFFlat(coarse_quantizer, d, ncent, |
||
481 | faiss.METRIC_L2) |
||
482 | else: |
||
483 | m = int(pqflat_str[2:]) |
||
484 | assert m < 56 or use_float16, "PQ%d will work only with -float16" % m |
||
485 | print("making an IVFPQ index, m = ", m) |
||
486 | idx_model = faiss.IndexIVFPQ(coarse_quantizer, d, ncent, m, 8) |
||
487 | |||
488 | coarse_quantizer.this.disown() |
||
489 | idx_model.own_fields = True |
||
490 | |||
491 | # finish training on CPU |
||
492 | t0 = time.time() |
||
493 | print("Training vector codes") |
||
494 | x = preproc.apply_py(sanitize(xt[:1000000])) |
||
495 | idx_model.train(x) |
||
496 | print(" done %.3f s" % (time.time() - t0)) |
||
497 | |||
498 | return idx_model |
||
499 | |||
500 | |||
501 | def compute_populated_index(preproc): |
||
502 | """Add elements to a sharded index. Return the index and if available |
||
503 | a sharded gpu_index that contains the same data. """ |
||
504 | |||
505 | indexall = prepare_trained_index(preproc) |
||
506 | |||
507 | co = faiss.GpuMultipleClonerOptions() |
||
508 | co.useFloat16 = use_float16 |
||
509 | co.useFloat16CoarseQuantizer = False |
||
510 | co.usePrecomputed = use_precomputed_tables |
||
511 | co.indicesOptions = faiss.INDICES_CPU |
||
512 | co.verbose = True |
||
513 | co.reserveVecs = max_add if max_add > 0 else xb.shape[0] |
||
514 | co.shard = True |
||
515 | assert co.shard_type in (0, 1, 2) |
||
516 | vres, vdev = make_vres_vdev() |
||
517 | gpu_index = faiss.index_cpu_to_gpu_multiple( |
||
518 | vres, vdev, indexall, co) |
||
519 | |||
520 | print("add...") |
||
521 | t0 = time.time() |
||
522 | nb = xb.shape[0] |
||
523 | for i0, xs in dataset_iterator(xb, preproc, add_batch_size): |
||
524 | i1 = i0 + xs.shape[0] |
||
525 | gpu_index.add_with_ids(xs, np.arange(i0, i1)) |
||
526 | if max_add > 0 and gpu_index.ntotal > max_add: |
||
527 | print("Flush indexes to CPU") |
||
528 | for i in range(ngpu): |
||
529 | index_src_gpu = faiss.downcast_index(gpu_index.at(i)) |
||
530 | index_src = faiss.index_gpu_to_cpu(index_src_gpu) |
||
531 | print(" index %d size %d" % (i, index_src.ntotal)) |
||
532 | index_src.copy_subset_to(indexall, 0, 0, nb) |
||
533 | index_src_gpu.reset() |
||
534 | index_src_gpu.reserveMemory(max_add) |
||
535 | gpu_index.sync_with_shard_indexes() |
||
536 | |||
537 | print('\r%d/%d (%.3f s) ' % ( |
||
538 | i0, nb, time.time() - t0), end=' ') |
||
539 | sys.stdout.flush() |
||
540 | print("Add time: %.3f s" % (time.time() - t0)) |
||
541 | |||
542 | print("Aggregate indexes to CPU") |
||
543 | t0 = time.time() |
||
544 | |||
545 | if hasattr(gpu_index, 'at'): |
||
546 | # it is a sharded index |
||
547 | for i in range(ngpu): |
||
548 | index_src = faiss.index_gpu_to_cpu(gpu_index.at(i)) |
||
549 | print(" index %d size %d" % (i, index_src.ntotal)) |
||
550 | index_src.copy_subset_to(indexall, 0, 0, nb) |
||
551 | else: |
||
552 | # simple index |
||
553 | index_src = faiss.index_gpu_to_cpu(gpu_index) |
||
554 | index_src.copy_subset_to(indexall, 0, 0, nb) |
||
555 | |||
556 | print(" done in %.3f s" % (time.time() - t0)) |
||
557 | |||
558 | if max_add > 0: |
||
559 | # it does not contain all the vectors |
||
560 | gpu_index = None |
||
561 | |||
562 | return gpu_index, indexall |
||
563 | |||
564 | def compute_populated_index_2(preproc): |
||
565 | |||
566 | indexall = prepare_trained_index(preproc) |
||
567 | |||
568 | # set up a 3-stage pipeline that does: |
||
569 | # - stage 1: load + preproc |
||
570 | # - stage 2: assign on GPU |
||
571 | # - stage 3: add to index |
||
572 | |||
573 | stage1 = dataset_iterator(xb, preproc, add_batch_size) |
||
574 | |||
575 | vres, vdev = make_vres_vdev() |
||
576 | coarse_quantizer_gpu = faiss.index_cpu_to_gpu_multiple( |
||
577 | vres, vdev, indexall.quantizer) |
||
578 | |||
579 | def quantize(args): |
||
580 | (i0, xs) = args |
||
581 | _, assign = coarse_quantizer_gpu.search(xs, 1) |
||
582 | return i0, xs, assign.ravel() |
||
583 | |||
584 | stage2 = rate_limited_imap(quantize, stage1) |
||
585 | |||
586 | print("add...") |
||
587 | t0 = time.time() |
||
588 | nb = xb.shape[0] |
||
589 | |||
590 | for i0, xs, assign in stage2: |
||
591 | i1 = i0 + xs.shape[0] |
||
592 | if indexall.__class__ == faiss.IndexIVFPQ: |
||
593 | indexall.add_core_o(i1 - i0, faiss.swig_ptr(xs), |
||
594 | None, None, faiss.swig_ptr(assign)) |
||
595 | elif indexall.__class__ == faiss.IndexIVFFlat: |
||
596 | indexall.add_core(i1 - i0, faiss.swig_ptr(xs), None, |
||
597 | faiss.swig_ptr(assign)) |
||
598 | else: |
||
599 | assert False |
||
600 | |||
601 | print('\r%d/%d (%.3f s) ' % ( |
||
602 | i0, nb, time.time() - t0), end=' ') |
||
603 | sys.stdout.flush() |
||
604 | print("Add time: %.3f s" % (time.time() - t0)) |
||
605 | |||
606 | return None, indexall |
||
607 | |||
608 | |||
609 | |||
610 | def get_populated_index(preproc): |
||
611 | |||
612 | if not index_cachefile or not os.path.exists(index_cachefile): |
||
613 | if not altadd: |
||
614 | gpu_index, indexall = compute_populated_index(preproc) |
||
615 | else: |
||
616 | gpu_index, indexall = compute_populated_index_2(preproc) |
||
617 | if index_cachefile: |
||
618 | print("store", index_cachefile) |
||
619 | faiss.write_index(indexall, index_cachefile) |
||
620 | else: |
||
621 | print("load", index_cachefile) |
||
622 | indexall = faiss.read_index(index_cachefile) |
||
623 | gpu_index = None |
||
624 | |||
625 | co = faiss.GpuMultipleClonerOptions() |
||
626 | co.useFloat16 = use_float16 |
||
627 | co.useFloat16CoarseQuantizer = False |
||
628 | co.usePrecomputed = use_precomputed_tables |
||
629 | co.indicesOptions = 0 |
||
630 | co.verbose = True |
||
631 | co.shard = True # the replicas will be made "manually" |
||
632 | t0 = time.time() |
||
633 | print("CPU index contains %d vectors, move to GPU" % indexall.ntotal) |
||
634 | if replicas == 1: |
||
635 | |||
636 | if not gpu_index: |
||
637 | print("copying loaded index to GPUs") |
||
638 | vres, vdev = make_vres_vdev() |
||
639 | index = faiss.index_cpu_to_gpu_multiple( |
||
640 | vres, vdev, indexall, co) |
||
641 | else: |
||
642 | index = gpu_index |
||
643 | |||
644 | else: |
||
645 | del gpu_index # We override the GPU index |
||
646 | |||
647 | print("Copy CPU index to %d sharded GPU indexes" % replicas) |
||
648 | |||
649 | index = faiss.IndexReplicas() |
||
650 | |||
651 | for i in range(replicas): |
||
652 | gpu0 = ngpu * i / replicas |
||
653 | gpu1 = ngpu * (i + 1) / replicas |
||
654 | vres, vdev = make_vres_vdev(gpu0, gpu1) |
||
655 | |||
656 | print(" dispatch to GPUs %d:%d" % (gpu0, gpu1)) |
||
657 | |||
658 | index1 = faiss.index_cpu_to_gpu_multiple( |
||
659 | vres, vdev, indexall, co) |
||
660 | index1.this.disown() |
||
661 | index.addIndex(index1) |
||
662 | index.own_fields = True |
||
663 | del indexall |
||
664 | print("move to GPU done in %.3f s" % (time.time() - t0)) |
||
665 | return index |
||
666 | |||
667 | |||
668 | |||
669 | ################################################################# |
||
670 | # Perform search |
||
671 | ################################################################# |
||
672 | |||
673 | |||
674 | def eval_dataset(index, preproc): |
||
675 | |||
676 | ps = faiss.GpuParameterSpace() |
||
677 | ps.initialize(index) |
||
678 | |||
679 | nq_gt = gt_I.shape[0] |
||
680 | print("search...") |
||
681 | sl = query_batch_size |
||
682 | nq = xq.shape[0] |
||
683 | for nprobe in nprobes: |
||
684 | ps.set_index_parameter(index, 'nprobe', nprobe) |
||
685 | t0 = time.time() |
||
686 | |||
687 | if sl == 0: |
||
688 | D, I = index.search(preproc.apply_py(sanitize(xq)), nnn) |
||
689 | else: |
||
690 | I = np.empty((nq, nnn), dtype='int32') |
||
691 | D = np.empty((nq, nnn), dtype='float32') |
||
692 | |||
693 | inter_res = '' |
||
694 | |||
695 | for i0, xs in dataset_iterator(xq, preproc, sl): |
||
696 | print('\r%d/%d (%.3f s%s) ' % ( |
||
697 | i0, nq, time.time() - t0, inter_res), end=' ') |
||
698 | sys.stdout.flush() |
||
699 | |||
700 | i1 = i0 + xs.shape[0] |
||
701 | Di, Ii = index.search(xs, nnn) |
||
702 | |||
703 | I[i0:i1] = Ii |
||
704 | D[i0:i1] = Di |
||
705 | |||
706 | if knngraph and not inter_res and i1 >= nq_gt: |
||
707 | ires = eval_intersection_measure( |
||
708 | gt_I[:, :nnn], I[:nq_gt]) |
||
709 | inter_res = ', %.4f' % ires |
||
710 | |||
711 | t1 = time.time() |
||
712 | if knngraph: |
||
713 | ires = eval_intersection_measure(gt_I[:, :nnn], I[:nq_gt]) |
||
714 | print(" probe=%-3d: %.3f s rank-%d intersection results: %.4f" % ( |
||
715 | nprobe, t1 - t0, nnn, ires)) |
||
716 | else: |
||
717 | print(" probe=%-3d: %.3f s" % (nprobe, t1 - t0), end=' ') |
||
718 | gtc = gt_I[:, :1] |
||
719 | nq = xq.shape[0] |
||
720 | for rank in 1, 10, 100: |
||
721 | if rank > nnn: continue |
||
722 | nok = (I[:, :rank] == gtc).sum() |
||
723 | print("1-R@%d: %.4f" % (rank, nok / float(nq)), end=' ') |
||
724 | print() |
||
725 | if I_fname: |
||
726 | I_fname_i = I_fname % I |
||
727 | print("storing", I_fname_i) |
||
728 | np.save(I, I_fname_i) |
||
729 | if D_fname: |
||
730 | D_fname_i = I_fname % I |
||
731 | print("storing", D_fname_i) |
||
732 | np.save(D, D_fname_i) |
||
733 | |||
734 | |||
735 | ################################################################# |
||
736 | # Driver |
||
737 | ################################################################# |
||
738 | |||
739 | |||
740 | preproc = get_preprocessor() |
||
741 | |||
742 | index = get_populated_index(preproc) |
||
743 | |||
744 | eval_dataset(index, preproc) |
||
745 | |||
746 | # make sure index is deleted before the resources |
||
747 | del index |
||
748 |