Passed
Pull Request — master (#101)
by Simon
01:37
created

distribution.multiprocessing_wrapper()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 3
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
from sys import platform
6
from tqdm import tqdm
7
8
if platform.startswith("linux"):
9
    initializer = tqdm.set_lock
10
    initargs = (tqdm.get_lock(),)
11
else:
12
    initializer = None
13
    initargs = ()
14
15
16
def single_process(process_func, process_infos):
17
    return [process_func(info) for info in process_infos]
18
19
20
def multiprocessing_wrapper(process_func, process_infos, n_processes):
21
    import multiprocessing as mp
22
23
    process_infos = tuple(process_infos)
24
25
    with mp.Pool(
26
        n_processes, initializer=initializer, initargs=initargs
27
    ) as pool:
28
        return pool.map(process_func, process_infos)
29
30
31
def pathos_wrapper(process_func, search_processes_paras, n_processes):
32
    import pathos.multiprocessing as pmp
33
34
    with pmp.Pool(
35
        n_processes, initializer=initializer, initargs=initargs
36
    ) as pool:
37
        return pool.map(process_func, search_processes_paras)
38
39
40
def joblib_wrapper(process_func, search_processes_paras, n_processes):
41
    from joblib import Parallel, delayed
42
43
    jobs = [
44
        delayed(process_func)(*info_dict)
45
        for info_dict in search_processes_paras
46
    ]
47
    return Parallel(n_jobs=n_processes)(jobs)
48