jsons._multitasking.multi_task()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 15
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 15
rs 9.7
c 0
b 0
f 0
cc 2
nop 6
1
"""
2
PRIVATE MODULE: do not import (from) it directly.
3
4
Functionality for processing iterables in parallel.
5
"""
6
from multiprocessing import Process, Manager
7
from typing import List, Callable, Union
8
9
from typish import Something
10
11
Subscriptable = Something['__getitem__': Callable[[int], object]]
12
13
14
def multi_task(
15
        func: Callable,
16
        obj: Subscriptable,
17
        tasks: int,
18
        task_type: type,
19
        *args,
20
        **kwargs):
21
    result = _get_list_to_fill(obj, task_type)
22
    tasks_instances = _start_tasks(tasks=tasks, task_type=task_type, func=func,
23
                                   list_to_fill=result, obj=obj, args=args,
24
                                   kwargs=kwargs)
25
    for task in tasks_instances:
26
        task.join()
27
28
    return list(result)
29
30
31
def _get_list_to_fill(obj: list, task_type: type) -> Union[list, Manager]:
32
    # Return a list or manager that contains enough spots to fill.
33
    result = [0] * len(obj)
34
    if issubclass(task_type, Process):
35
        manager = Manager()
36
        result = manager.list(result)
37
    return result
38
39
40
def _start_tasks(
41
        tasks: int,
42
        task_type: type,
43
        func: Callable,
44
        list_to_fill: list,
45
        obj: Subscriptable,
46
        args,
47
        kwargs) -> List[Something['join': Callable[[], None]]]:
48
    # Start the tasks and return their instances so they can be joined.
49
50
    tasks_instances = []
51
    tasks_used = min(tasks, len(obj))
52
    tasks_left = tasks - tasks_used or 1
53
54
    # Divide the list in parts.
55
    slice_size = int(len(obj) / tasks_used)
56
    rest_size = len(obj) % tasks_used
57
    for i in range(tasks_used):
58
        start = i * slice_size
59
        end = (i + 1) * slice_size
60
        if i == tasks_used - 1:
61
            end += rest_size
62
        task = task_type(
63
            target=_fill,
64
            args=(func, list_to_fill, obj, start, end, tasks_left, args, kwargs))
65
        task.start()
66
        tasks_instances.append(task)
67
    return tasks_instances
68
69
70
def _fill(
71
        func,
72
        list_to_fill: list,
73
        obj: Subscriptable,
74
        start: int,
75
        end: int,
76
        tasks: int,
77
        args,
78
        kwargs):
79
    # Fill the given list with results from func.
80
    for i_ in range(start, end):
81
        loaded = func(obj[i_], tasks=tasks, *args, **kwargs)
82
        list_to_fill[i_] = loaded
83