async_rx.subject.rx_subject   A
last analyzed

Complexity

Total Complexity 8

Size/Duplication

Total Lines 103
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 8
eloc 35
dl 0
loc 103
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B rx_subject() 0 95 8
1
from typing import Any, Optional, Union
2
3
from ..protocol import Observer, Subject, SubjectHandler, Subscription, ensure_observable_contract_operator, rx_observer, subject
4
5
__all__ = ["rx_subject"]
6
7
8
def rx_subject(subject_handler: Optional[SubjectHandler] = None) -> Subject:
9
    """Create a subject.
10
11
    A Subject is like an Observable, but can multicast to many Observers.
12
    Subjects are like EventEmitters: they maintain a registry of many listeners,
13
    and then dispatch events/items to them.
14
15
    As subject is also an Observer, it can subscribe to an observable which act at his stream data source.
16
17
    Args:
18
        subject_handler (Optional[SubjectHandler]): optional suject handler callback
19
20
    Returns:
21
        (Subject): the subject
22
23
    Example 1:
24
25
    .. highlight:: python
26
    .. code-block:: python
27
28
        # create a subject
29
        a_subject = subject(subject_handler=my_handler)
30
31
        # few observer subscribe on this subject
32
        sub_1 = await a_subject.subscribe(obs_1)
33
        sub_2 = await a_subject.subscribe(obs_2)
34
35
        # the subject subscribe himself on an observable
36
        await rx_range(start=0, stop=10).subscribe(a_subject)
37
38
        # obs_1 and obs_2 receive 10 #items
39
40
    Example 2:
41
    A subject as event emitter
42
43
    .. highlight:: python
44
    .. code-block:: python
45
46
        # create a subject
47
        a_subject = subject()
48
49
        # few observer subscribe on this subject
50
        sub_1 = await a_subject.subscribe(obs_1)
51
        sub_2 = await a_subject.subscribe(obs_2)
52
53
        # send your data by your self
54
        await a_subject.on_next("my value") # obs_1 and obs_2 receive "my value"
55
        await a_subject.on_completed() # obs_1 and obs_2 receive on_completed
56
57
58
    """
59
    _registry = []  # list of registered observer
60
61
    async def _subscribe(an_observer: Observer) -> Subscription:
62
        nonlocal _registry
63
64
        _registry.append(an_observer)
65
66
        if subject_handler:
67
            await subject_handler.on_subscribe(count=len(_registry), source=an_observer)
68
69
        async def unsubscribe() -> None:
70
            nonlocal _registry
71
            if an_observer in _registry:
72
                _registry.remove(an_observer)
73
74
                if subject_handler:
75
                    await subject_handler.on_unsubscribe(count=len(_registry), source=an_observer)
76
77
        return unsubscribe
78
79
    async def _on_next(item: Any) -> None:
80
        nonlocal _registry
81
82
        for o in _registry:
83
            await o.on_next(item)
84
85
    async def _on_error(err: Union[Any, Exception]) -> None:
86
        nonlocal _registry
87
88
        for o in _registry:
89
            try:
90
                await o.on_error(err=err)
91
            except Exception:  # pragma: no cover
92
                pass
93
94
    async def _on_completed() -> None:
95
        nonlocal _registry
96
97
        for o in _registry:
98
            await o.on_completed()
99
100
    _obs = ensure_observable_contract_operator(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed))
101
102
    return subject(subscribe=_subscribe, on_next=_obs.on_next, on_error=_obs.on_error, on_completed=_obs.on_completed)
103