Passed
Pull Request — master (#110)
by
unknown
01:27
created

hyperactive.optimizers._distribution   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 87
Duplicated Lines 13.79 %

Importance

Changes 0
Metric Value
eloc 50
dl 12
loc 87
rs 10
c 0
b 0
f 0
wmc 14

7 Functions

Rating   Name   Duplication   Size   Complexity  
A joblib_wrapper() 0 5 1
A pathos_wrapper() 0 5 2
A multiprocessing_wrapper() 0 9 2
A run_search() 0 14 3
A single_process() 0 2 1
A _get_distribution() 12 12 4
A proxy() 0 2 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# Author: Simon Blanke
2
# Email: [email protected]
3
# License: MIT License
4
5
from sys import platform
6
from tqdm import tqdm
7
8
from ._process import _process_
9
10
11
if platform.startswith("linux"):
12
    initializer = tqdm.set_lock
13
    initargs = (tqdm.get_lock(),)
14
else:
15
    initializer = None
16
    initargs = ()
17
18
19
def proxy(args):
20
    return _process_(*args)
21
22
23
def single_process(process_func, process_infos):
24
    return [process_func(info) for info in process_infos]
25
26
27
def multiprocessing_wrapper(process_func, process_infos, n_processes):
28
    import multiprocessing as mp
29
30
    process_infos = tuple(process_infos)
31
32
    print("\n process_infos ", process_infos)
33
34
    with mp.Pool(n_processes, initializer=initializer, initargs=initargs) as pool:
35
        return pool.map(process_func, process_infos)
36
37
38
def pathos_wrapper(process_func, search_processes_paras, n_processes):
39
    import pathos.multiprocessing as pmp
40
41
    with pmp.Pool(n_processes, initializer=initializer, initargs=initargs) as pool:
42
        return pool.map(process_func, search_processes_paras)
43
44
45
def joblib_wrapper(process_func, search_processes_paras, n_processes):
46
    from joblib import Parallel, delayed
47
48
    jobs = [delayed(process_func)(*info_dict) for info_dict in search_processes_paras]
49
    return Parallel(n_jobs=n_processes)(jobs)
50
51
52
dist_dict = {
53
    "joblib": (joblib_wrapper, _process_),
54
    "multiprocessing": (multiprocessing_wrapper, proxy),
55
    "pathos": (pathos_wrapper, proxy),
56
}
57
58
59 View Code Duplication
def _get_distribution(distribution):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
60
    if hasattr(distribution, "__call__"):
61
        return (distribution, _process_), {}
62
63
    elif isinstance(distribution, dict):
64
        dist_key = list(distribution.keys())[0]
65
        dist_paras = list(distribution.values())[0]
66
67
        return dist_dict[dist_key], dist_paras
68
69
    elif isinstance(distribution, str):
70
        return dist_dict[distribution], {}
71
72
73
def run_search(searches, distribution, n_processes):
74
    if n_processes == "auto":
75
        n_processes = len(searches)
76
77
    searches_tuple = [(search,) for search in searches]
78
79
    if n_processes == 1:
80
        results_list = single_process(_process_, searches)
81
    else:
82
        (distribution, process_func), dist_paras = _get_distribution(distribution)
83
84
        results_list = distribution(process_func, searches_tuple, n_processes)
85
86
    return results_list
87