Completed
Pull Request — master (#65)
by Ramon
01:41
created

jsons.deserializers.default_list._fill()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 12
rs 9.85
c 0
b 0
f 0
cc 2
nop 7
1
from threading import Thread
2
from jsons._load_impl import load
3
from jsons.exceptions import JsonsError
4
5
6
def default_list_deserializer(
7
        obj: list,
8
        cls: type = None,
9
        *,
10
        threads: int = 1,
11
        **kwargs) -> list:
12
    """
13
    Deserialize a list by deserializing all items of that list.
14
    :param obj: the list that needs deserializing.
15
    :param cls: the type optionally with a generic (e.g. List[str]).
16
    :param threads: the number of threads that is allowed to use.
17
    :param kwargs: any keyword arguments.
18
    :return: a deserialized list instance.
19
    """
20
    cls_ = None
21
    kwargs_ = {**kwargs}
22
    if cls and hasattr(cls, '__args__'):
23
        cls_ = cls.__args__[0]
24
        # Mark the cls as 'inferred' so that later it is known where cls came
25
        # from and the precedence of classes can be determined.
26
        kwargs_['_inferred_cls'] = True
27
28
    if threads == 1:
29
        func = _single_threaded
30
    elif threads > 1:
31
        func = _multi_threaded
32
    else:
33
        raise JsonsError('Invalid number of threads: {}'.format(threads))
34
    return func(obj, cls_, threads, **kwargs_)
35
36
37
def _single_threaded(
38
        # Load the elements of the list in a single thread.
39
        obj: list,
40
        cls: type,
41
        threads: int,
42
        **kwargs):
43
    return [load(x, cls, threads=1, **kwargs) for x in obj]
44
45
46
def _multi_threaded(
47
        obj: list,
48
        cls: type,
49
        threads: int,
50
        **kwargs):
51
    # Load the elements of the list with multiple threads.
52
53
    # First, create a list with the correct size for the threads to fill.
54
    result = [0] * len(obj)
55
56
    threads_used = min(threads, len(obj))
57
    threads_left = threads - threads_used or 1
58
59
    # Divide the list in parts.
60
    slice_size = int(len(obj) / threads_used)
61
    rest_size = len(obj) % threads_used
62
63
    # Start the threads and store them to join them later.
64
    threads_instances = []
65
    for i in range(threads_used):
66
        start = i
67
        end = (i + 1) * slice_size
68
        if i == threads_used - 1:
69
            end += rest_size
70
        thread = Thread(
71
            target=_fill,
72
            args=(obj, cls, result, start, end, threads_left, kwargs))
73
        thread.start()
74
        threads_instances.append(thread)
75
76
    for thread in threads_instances:
77
        thread.join()
78
79
    return result
80
81
82
def _fill(
83
        obj: list,
84
        cls: type,
85
        result: list,
86
        start: int,
87
        end: int,
88
        threads: int,
89
        kwargs: dict):
90
    # Fill result with the loaded objects of obj within the range start - end.
91
    for i_ in range(start, end):
92
        loaded = load(obj[i_], cls, threads=threads, **kwargs)
93
        result[i_] = loaded
94