Test Failed
Push — master ( 690e5c...d09bfb )
by Kolen
02:09 queued 11s
created

map_parallel   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 162
Duplicated Lines 28.4 %

Importance

Changes 0
Metric Value
eloc 103
dl 46
loc 162
rs 10
c 0
b 0
f 0
wmc 26

7 Functions

Rating   Name   Duplication   Size   Complexity  
A _map_parallel_dask() 0 17 3
A _map_parallel_multithreading() 0 14 3
A _map_parallel_multiprocessing() 0 14 3
A _starmap_parallel_mpi_simple() 0 25 4
A _map_parallel_mpi() 0 9 3
A map_parallel() 29 29 5
A starmap_parallel() 17 17 5

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from __future__ import annotations
2
3
__version__ = '0.1.1'
4
5
from itertools import starmap
6
from typing import TYPE_CHECKING
7
8
if TYPE_CHECKING:
9
    from typing import Optional, Callable, Dict
10
    from collections.abc import Iterable
11
12
13
def _map_parallel_multiprocessing(
14
    f: Callable,
15
    *args: Iterable,
16
    processes: Optional[int] = None,
17
    return_results: bool = True,
18
) -> list:
19
    from concurrent.futures import ProcessPoolExecutor
20
21
    with ProcessPoolExecutor(max_workers=processes) as process_pool_executor:
22
        res = process_pool_executor.map(f, *args)
23
        if return_results:
24
            return list(res)
25
        else:
26
            return []
27
28
29
def _map_parallel_multithreading(
30
    f: Callable,
31
    *args: Iterable,
32
    processes: Optional[int] = None,
33
    return_results: bool = True,
34
) -> list:
35
    from concurrent.futures import ThreadPoolExecutor
36
37
    with ThreadPoolExecutor(max_workers=processes) as thread_pool_executor:
38
        res = thread_pool_executor.map(f, *args)
39
        if return_results:
40
            return list(res)
41
        else:
42
            return []
43
44
45
def _map_parallel_dask(
46
    f: Callable,
47
    *args: Iterable,
48
    processes: Optional[int] = None,
49
    return_results: bool = True,
50
) -> list:
51
    from dask.distributed import Client
52
    from dask.distributed import LocalCluster
53
54
    cluster = LocalCluster(n_workers=processes, dashboard_address=None)
55
    client = Client(cluster)
56
    if return_results:
57
        return [future.result() for future in client.map(f, *args)]
58
    else:
59
        for future in client.map(f, *args):
60
            future.result()
61
        return []
62
63
64
def _map_parallel_mpi(f: Callable, *args: Iterable, return_results: bool = True, **kwargs) -> list:
65
    from mpi4py.futures import MPIPoolExecutor
66
67
    with MPIPoolExecutor() as mpi_pool_executor:
68
        res = mpi_pool_executor.map(f, *args)
69
        if return_results:
70
            return list(res)
71
        else:
72
            return []
73
74
75
def _starmap_parallel_mpi_simple(
76
    f: Callable,
77
    args: Iterable[Iterable],
78
    return_results: bool = True,
79
    **kwargs,
80
) -> list:
81
    from mpi4py import MPI
82
83
    comm = MPI.COMM_WORLD
84
    size = comm.Get_size()
85
    rank = comm.Get_rank()
86
87
    args_list = list(args)
88
    if args_list:
89
        n = len(args_list)
90
        start = (rank * n) // size
91
        end = ((rank + 1) * n) // size
92
        local_args = args_list[start:end]
93
        res = list(starmap(f, local_args))
94
95
        if return_results:
96
            res = comm.gather(res, root=0)
97
            if rank == 0:
98
                return sum(res, [])
99
    return []
100
101
102
_map_parallel_func: Dict[str, Callable] = {
0 ignored issues
show
introduced by
The variable Callable does not seem to be defined in case TYPE_CHECKING on line 8 is False. Are you sure this can never be the case?
Loading history...
103
    'multiprocessing': _map_parallel_multiprocessing,
104
    'multithreading': _map_parallel_multithreading,
105
    'dask': _map_parallel_dask,
106
    'mpi': _map_parallel_mpi,
107
}
108
109
_starmap_parallel_func: Dict[str, Callable] = {
110
    'mpi_simple': _starmap_parallel_mpi_simple,
111
}
112
113
114 View Code Duplication
def map_parallel(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
115
    f: Callable,
116
    *args: Iterable,
117
    processes: Optional[int] = None,
118
    mode: str = 'multiprocessing',
119
    return_results: bool = True,
120
) -> list:
121
    '''equiv to `map(f, *args)` but in parallel
122
123
    :param str mode: backend for parallelization
124
125
        - multiprocessing: using multiprocessing from standard library
126
        - multithreading: using multithreading from standard library
127
        - dask: using dask.distributed
128
        - mpi: using mpi4py.futures. May not work depending on your MPI vendor
129
        - mpi_simple: using mpi4py with simple scheduling that divides works into equal chunks
130
        - serial: using map
131
    :param int processes: no. of parallel processes
132
133
        (in the case of mpi, it is determined by mpiexec/mpirun args)
134
135
    :param bool return_results: (Only affects mode == 'mpi_simple') if True, return results in rank 0.
136
    '''
137
    if processes is None or processes > 1:
138
        if mode in _map_parallel_func:
139
            return _map_parallel_func[mode](f, *args, processes=processes, return_results=return_results)
140
        elif mode in _starmap_parallel_func:
141
            return _starmap_parallel_func[mode](f, zip(*args), processes=processes, return_results=return_results)
142
    return list(map(f, *args))
143
144
145 View Code Duplication
def starmap_parallel(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
146
    f: Callable,
147
    args: Iterable[Iterable],
148
    processes: Optional[int] = None,
149
    mode: str = 'multiprocessing',
150
    return_results: bool = True,
151
) -> list:
152
    '''equiv to `starmap(f, args)` but in parallel
153
154
    See docstring from :func:`~map_parallel.map_parallel`
155
    '''
156
    if processes is None or processes > 1:
157
        if mode in _map_parallel_func:
158
            return _map_parallel_func[mode](f, *zip(*args), processes=processes, return_results=return_results)
159
        elif mode in _starmap_parallel_func:
160
            return _starmap_parallel_func[mode](f, args, processes=processes, return_results=return_results)
161
    return list(starmap(f, args))
162