dirutility.multiprocess   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 95
Duplicated Lines 91.58 %

Importance

Changes 0
Metric Value
eloc 43
dl 87
loc 95
rs 10
c 0
b 0
f 0
wmc 13

5 Methods

Rating   Name   Duplication   Size   Complexity  
A PoolProcess.map() 6 6 2
A PoolProcess.map_return() 6 6 2
A PoolProcess.result() 5 5 2
A PoolProcess.__init__() 16 16 1
A PoolProcess.map_tqdm() 13 13 2

1 Function

Rating   Name   Duplication   Size   Complexity  
A pool_process() 33 33 4

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from multiprocessing import cpu_count
2
from multiprocessing.pool import Pool
3
from tqdm import tqdm
4
5
6 View Code Duplication
def pool_process(func, iterable, cpus=cpu_count(), return_vals=False, cpu_reduction=0, progress_bar=False):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
7
    """
8
    Multiprocessing helper function for performing looped operation using multiple processors.
9
10
    :param func: Function to call
11
    :param iterable: Iterable object to perform each function on
12
    :param cpus: Number of cpu cores, defaults to system's cpu count
13
    :param return_vals: Bool, returns output values when True
14
    :param cpu_reduction: Number of cpu core's to not use
15
    :param progress_bar: Display text based progress bar
16
    :return:
17
    """
18
    with Pool(cpus - abs(cpu_reduction)) as pool:
19
        # Return values returned by 'func'
20
        if return_vals:
21
            # Show progress bar
22
            if progress_bar:
23
                vals = [v for v in tqdm(pool.imap_unordered(func, iterable), total=len(iterable))]
24
25
            # No progress bar
26
            else:
27
                vals = pool.map(func, iterable)
28
29
            # Close pool and return values
30
            pool.close()
31
            # pool.join()
32
            return vals
33
34
        # Don't capture values returned by 'func'
35
        else:
36
            pool.map(func, iterable)
37
            pool.close()
38
            return True
39
40
41 View Code Duplication
class PoolProcess:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
42
    _func = None
43
    _iterable = None
44
45
    def __init__(self, func, iterable, cpus=cpu_count(), cpu_reduction=0, filter_nulls=False):
46
        """
47
        Multiprocessing helper function for performing looped operation using multiple processors.
48
49
        :param func: Function to call
50
        :param iterable: Iterable object to perform each function on
51
        :param cpus: Number of cpu cores, defaults to system's cpu count
52
        :param cpu_reduction: Number of cpu core's to not use
53
        :param filter_nulls: Bool, when true None values are removed from the result list before return
54
        """
55
        self._func = func
56
        self._iterable = iterable
57
        self.cpu_count = cpus - abs(cpu_reduction)
58
        self.filter_nulls = filter_nulls
59
60
        self._result = None
61
62
    @property
63
    def result(self):
64
        """Return the results returned by map_return or map_tqdm methods."""
65
        # Remove None values from self._result if filter_nulls is enabled
66
        return [i for i in self._result if i is not None] if self.filter_nulls else self._result
67
68
    def map(self):
69
        """Perform a function on every item in an iterable."""
70
        with Pool(self.cpu_count) as pool:
71
            pool.map(self._func, self._iterable)
72
            pool.close()
73
        return True
74
75
    def map_return(self):
76
        """Perform a function on every item and return a list of yield values."""
77
        with Pool(self.cpu_count) as pool:
78
            self._result = pool.map(self._func, self._iterable)
79
            pool.close()
80
            return self.result
81
82
    def map_tqdm(self, desc=None, unit='it'):
83
        """
84
        Perform a function on every item while displaying a progress bar.
85
86
        :param desc: Optional, progress bar description
87
        :param unit: Optional, progress bar units (default is 'it' for 'iteration')
88
        :return: A list of yielded values
89
        """
90
        tqdm_args = dict(total=len(self._iterable), desc=desc, unit=unit)
91
        with Pool(self.cpu_count) as pool:
92
            self._result = [v for v in tqdm(pool.imap_unordered(self._func, self._iterable), **tqdm_args)]
93
            pool.close()
94
            return self.result
95