async_rx.protocol.rx_collector   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 91
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 3
eloc 59
dl 0
loc 91
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B rx_collector() 0 78 3
1
from collections import namedtuple
2
from typing import Any, TypeVar
3
4
from .definition import Collector
5
6
__all__ = ["rx_collector"]
7
8
T = TypeVar("T")
9
10
CollectorDefinition = namedtuple("CollectorDefinition", ["on_next", "on_error", "on_completed", "result", "is_finish", "has_error", "error"])
11
12
13
def rx_collector(initial_value: T) -> Collector:
14
    """Create an observer collector.
15
16
    Args:
17
        initial_value (T): initial value which determin result type (list, dict, base type)
18
19
    Returns:
20
        (Collector[T]): a collector instance
21
22
    """
23
24
    _is_finish = False
25
    _has_error = False
26
    _error = None
27
28
    if isinstance(initial_value, dict):
29
        _dict = dict(initial_value)
30
31
        async def _on_next(item: Any):
32
            nonlocal _dict
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _dict does not seem to be defined.
Loading history...
33
            (k, v) = item
34
            _dict[k] = v
35
36
        def _get_result() -> Any:
37
            nonlocal _dict
0 ignored issues
show
introduced by
The variable _dict does not seem to be defined for all execution paths.
Loading history...
38
            return _dict
39
40
    elif isinstance(initial_value, list):
41
        _list = list(initial_value)
42
43
        async def _on_next(item: Any):
44
            nonlocal _list
0 ignored issues
show
introduced by
The variable _list does not seem to be defined for all execution paths.
Loading history...
45
            _list.append(item)
46
47
        def _get_result() -> Any:
48
            nonlocal _list
0 ignored issues
show
introduced by
The variable _list does not seem to be defined for all execution paths.
Loading history...
49
            return _list
50
51
    else:
52
        _value = initial_value
53
54
        async def _on_next(item: Any):
55
            nonlocal _value
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _value does not seem to be defined.
Loading history...
56
            _value = item
57
58
        def _get_result() -> Any:
59
            nonlocal _value
60
            return _value
61
62
    async def _on_completed():
63
        nonlocal _is_finish
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _is_finish does not seem to be defined.
Loading history...
64
        _is_finish = True
65
66
    async def _on_error(err: Any):
67
        nonlocal _has_error, _error
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _error does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable _has_error does not seem to be defined.
Loading history...
68
        _error = err
69
        _has_error = True
70
71
    def _get_is_finish():
72
        nonlocal _is_finish
73
        return _is_finish
74
75
    def _get_has_error():
76
        nonlocal _has_error
77
        return _has_error
78
79
    def _get_error():
80
        nonlocal _error
81
        return _error
82
83
    return CollectorDefinition(
84
        on_next=_on_next,
85
        on_error=_on_error,
86
        on_completed=_on_completed,
87
        result=_get_result,
88
        is_finish=_get_is_finish,
89
        has_error=_get_has_error,
90
        error=_get_error,
91
    )
92