Completed
Push — master ( 8f3758...257d3c )
by Ramon
29s queued 11s
created

jsons._multitasking   A

Complexity

Total Complexity 9

Size/Duplication

Total Lines 84
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 9
eloc 61
dl 0
loc 84
rs 10
c 0
b 0
f 0

4 Functions

Rating   Name   Duplication   Size   Complexity  
A multi_task() 0 15 2
A _start_tasks() 0 28 3
A _get_list_to_fill() 0 7 2
A _fill() 0 13 2
1
"""
2
PRIVATE MODULE: do not import (from) it directly.
3
4
Functionality for processing iterables in parallel.
5
"""
6
from multiprocessing import Process, Manager
7
from typing import List, Callable, Union
8
9
from typish import Something
10
11
12
Subscriptable = Something['__getitem__': Callable[[int], object]]
13
14
15
def multi_task(
16
        func: Callable,
17
        obj: Subscriptable,
18
        tasks: int,
19
        task_type: type,
20
        *args,
21
        **kwargs):
22
    result = _get_list_to_fill(obj, task_type)
23
    tasks_instances = _start_tasks(tasks=tasks, task_type=task_type, func=func,
24
                                   list_to_fill=result, obj=obj, args=args,
25
                                   kwargs=kwargs)
26
    for task in tasks_instances:
27
        task.join()
28
29
    return list(result)
30
31
32
def _get_list_to_fill(obj: list, task_type: type) -> Union[list, Manager]:
33
    # Return a list or manager that contains enough spots to fill.
34
    result = [0] * len(obj)
35
    if issubclass(task_type, Process):
36
        manager = Manager()
37
        result = manager.list(result)
38
    return result
39
40
41
def _start_tasks(
42
        tasks: int,
43
        task_type: type,
44
        func: Callable,
45
        list_to_fill: list,
46
        obj: Subscriptable,
47
        args,
48
        kwargs) -> List[Something['join': Callable[[], None]]]:
49
    # Start the tasks and return their instances so they can be joined.
50
51
    tasks_instances = []
52
    tasks_used = min(tasks, len(obj))
53
    tasks_left = tasks - tasks_used or 1
54
55
    # Divide the list in parts.
56
    slice_size = int(len(obj) / tasks_used)
57
    rest_size = len(obj) % tasks_used
58
    for i in range(tasks_used):
59
        start = i * slice_size
60
        end = (i + 1) * slice_size
61
        if i == tasks_used - 1:
62
            end += rest_size
63
        task = task_type(
64
            target=_fill,
65
            args=(func, list_to_fill, obj, start, end, tasks_left, args, kwargs))
66
        task.start()
67
        tasks_instances.append(task)
68
    return tasks_instances
69
70
71
def _fill(
72
        func,
73
        list_to_fill: list,
74
        obj: Subscriptable,
75
        start: int,
76
        end: int,
77
        tasks: int,
78
        args,
79
        kwargs):
80
    # Fill the given list with results from func.
81
    for i_ in range(start, end):
82
        loaded = func(obj[i_], tasks=tasks, *args, **kwargs)
83
        list_to_fill[i_] = loaded
84