Test Failed
Push — master ( 764a4a...05d9c7 )
by Kolen
01:39
created

map_parallel._starmap_parallel_dask()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 3
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
__version__ = '0.1.1'
2
3
from functools import partial
4
from itertools import starmap
5
from typing import Optional
6
7
8
def _starfunc(f, x):
9
    '''return f(*x)
10
    '''
11
    return f(*x)
12
13
14
def _map_parallel_multiprocessing(
15
    f, *args,
16
    processes: Optional[int] = None,
17
):
18
    from concurrent.futures import ProcessPoolExecutor
19
20
    with ProcessPoolExecutor(max_workers=processes) as process_pool_executor:
21
        return list(process_pool_executor.map(f, *args))
22
23
24
def _starmap_parallel_multiprocessing(
25
    f, args,
26
    processes: Optional[int] = None,
27
):
28
    from concurrent.futures import ProcessPoolExecutor
29
30
    with ProcessPoolExecutor(max_workers=processes) as process_pool_executor:
31
        return list(process_pool_executor.map(partial(_starfunc, f), args))
32
33
34
def _map_parallel_multithreading(
35
    f, *args,
36
    processes: Optional[int] = None,
37
):
38
    from concurrent.futures import ThreadPoolExecutor
39
40
    with ThreadPoolExecutor(max_workers=processes) as thread_pool_executor:
41
        return list(thread_pool_executor.map(f, *args))
42
43
44
def _starmap_parallel_multithreading(
45
    f, args,
46
    processes: Optional[int] = None,
47
):
48
    from concurrent.futures import ThreadPoolExecutor
49
50
    with ThreadPoolExecutor(max_workers=processes) as thread_pool_executor:
51
        return list(thread_pool_executor.map(partial(_starfunc, f), args))
52
53
54
def _map_parallel_dask(
55
    f, *args,
56
    processes: Optional[int] = None,
57
):
58
    from dask.distributed import Client, LocalCluster
59
60
    cluster = LocalCluster(n_workers=processes, dashboard_address=None)
61
    client = Client(cluster)
62
    return [future.result() for future in client.map(f, *args)]
63
64
65
def _starmap_parallel_dask(
66
    f, args,
67
    processes: Optional[int] = None,
68
):
69
    from dask.distributed import Client, LocalCluster
70
71
    cluster = LocalCluster(n_workers=processes, dashboard_address=None)
72
    client = Client(cluster)
73
    return [future.result() for future in client.map(partial(_starfunc, f), args)]
74
75
76
def _map_parallel_mpi(f, *args, **kwargs):
77
    from mpi4py.futures import MPIPoolExecutor
78
79
    with MPIPoolExecutor() as mpi_pool_executor:
80
        return list(mpi_pool_executor.map(f, *args))
81
82
83
def _starmap_parallel_mpi(f, args, **kwargs):
84
    from mpi4py.futures import MPIPoolExecutor
85
86
    with MPIPoolExecutor() as mpi_pool_executor:
87
        return list(mpi_pool_executor.starmap(f, args))
88
89
90
_map_parallel_func = {
91
    'multiprocessing': _map_parallel_multiprocessing,
92
    'multithreading': _map_parallel_multithreading,
93
    'dask': _map_parallel_dask,
94
    'mpi': _map_parallel_mpi,
95
}
96
97
98
_starmap_parallel_func = {
99
    'multiprocessing': _starmap_parallel_multiprocessing,
100
    'multithreading': _starmap_parallel_multithreading,
101
    'dask': _starmap_parallel_dask,
102
    'mpi': _starmap_parallel_mpi,
103
}
104
105
106
def map_parallel(
107
    f, *args,
108
    processes: Optional[int] = None,
109
    mode: str = 'multiprocessing',
110
):
111
    '''equiv to `map(f, *args)` but in parallel
112
113
    :param str mode: backend for parallelization
114
        - multiprocessing: using multiprocessing from standard library
115
        - multithreading: using multithreading from standard library
116
        - dask: using dask.distributed
117
        - mpi: using mpi4py.futures. May not work depending on your MPI vendor
118
        - serial: using map
119
    :param int processes: no. of parallel processes
120
121
    (in the case of mpi, it is determined by mpiexec/mpirun args)
122
    '''
123
    if processes is None or processes > 1:
124
        try:
125
            return _map_parallel_func[mode](f, *args, processes=processes)
126
        except KeyError:
127
            pass
128
    return list(map(f, *args))
129
130
131
def starmap_parallel(
132
    f, args,
133
    processes: Optional[int] = None,
134
    mode: str = 'multiprocessing',
135
):
136
    '''equiv to `starmap(f, args)` but in parallel
137
138
    :param str mode: backend for parallelization
139
        - multiprocessing: using multiprocessing from standard library
140
        - multithreading: using multithreading from standard library
141
        - dask: using dask.distributed
142
        - mpi: using mpi4py.futures. May not work depending on your MPI vendor
143
        - serial: using map
144
    :param int processes: no. of parallel processes
145
146
    (in the case of mpi, it is determined by mpiexec/mpirun args)
147
    '''
148
    if processes is None or processes > 1:
149
        try:
150
            return _starmap_parallel_func[mode](f, args, processes=processes)
151
        except KeyError:
152
            pass
153
    return list(starmap(f, args))
154