Completed
Push — master ( a8a578...6c485c )
by Ramon
19s queued 10s
created

jsons.deserializers.default_list._single_task()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
from multiprocessing import Process, Manager
2
from jsons._load_impl import load
3
from jsons.exceptions import JsonsError
4
5
6
def default_list_deserializer(
7
        obj: list,
8
        cls: type = None,
9
        *,
10
        tasks: int = 1,
11
        task_type: type = Process,
12
        **kwargs) -> list:
13
    """
14
    Deserialize a list by deserializing all items of that list.
15
    :param obj: the list that needs deserializing.
16
    :param cls: the type optionally with a generic (e.g. List[str]).
17
    :param tasks: the allowed number of tasks (threads or processes).
18
    :param task_type: the type that is used for multitasking.
19
    :param kwargs: any keyword arguments.
20
    :return: a deserialized list instance.
21
    """
22
    cls_ = None
23
    kwargs_ = {**kwargs}
24
    if cls and hasattr(cls, '__args__'):
25
        cls_ = cls.__args__[0]
26
        # Mark the cls as 'inferred' so that later it is known where cls came
27
        # from and the precedence of classes can be determined.
28
        kwargs_['_inferred_cls'] = True
29
30
    if tasks == 1:
31
        result = _single_task(obj, cls_, **kwargs_)
32
    elif tasks > 1:
33
        result = _multi_task(obj, cls_, tasks, task_type, **kwargs_)
34
    else:
35
        raise JsonsError('Invalid number of tasks: {}'.format(tasks))
36
    return result
37
38
39
def _single_task(
40
        # Load the elements of the list in a single task.
41
        obj: list,
42
        cls: type,
43
        **kwargs):
44
    return [load(x, cls, tasks=1, **kwargs) for x in obj]
45
46
47
def _multi_task(
48
        obj: list,
49
        cls: type,
50
        tasks: int,
51
        task_type: type,
52
        **kwargs):
53
    # Load the elements of the list with multiple tasks.
54
55
    # First, create a list with the correct size for the tasks to fill.
56
    if issubclass(task_type, Process):
57
        manager = Manager()
58
        result = manager.list([0] * len(obj))
59
    else:
60
        result = [0] * len(obj)
61
62
    tasks_used = min(tasks, len(obj))
63
    tasks_left = tasks - tasks_used or 1
64
65
    # Divide the list in parts.
66
    slice_size = int(len(obj) / tasks_used)
67
    rest_size = len(obj) % tasks_used
68
69
    # Start the tasks and store them to join them later.
70
    tasks_instances = []
71
    for i in range(tasks_used):
72
        start = i
73
        end = (i + 1) * slice_size
74
        if i == tasks_used - 1:
75
            end += rest_size
76
        task = task_type(
77
            target=_fill,
78
            args=(obj, cls, result, start, end, tasks_left, kwargs))
79
        task.start()
80
        tasks_instances.append(task)
81
82
    for task in tasks_instances:
83
        task.join()
84
85
    return list(result)
86
87
88
def _fill(
89
        obj: list,
90
        cls: type,
91
        result: list,
92
        start: int,
93
        end: int,
94
        tasks: int,
95
        kwargs: dict):
96
    # Fill result with the loaded objects of obj within the range start - end.
97
    for i_ in range(start, end):
98
        loaded = load(obj[i_], cls, tasks=tasks, **kwargs)
99
        result[i_] = loaded
100