async_rx.observable.rx_create   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 55
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 4
eloc 22
dl 0
loc 55
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_create() 0 46 4
1
"""Define rx_create."""
2
from typing import NoReturn, Optional, Union
3
4
from ..protocol import Observable, Observer, Subscribe, Subscription, observable
5
6
__all__ = ["rx_create"]
7
8
9
def rx_create(subscribe: Subscribe, ensure_contract: Optional[bool] = True, max_observer: Optional[int] = None) -> Union[Observable, NoReturn]:
10
    """Create an observable with specific delayed execution 'subscribe'.
11
12
    Observables can be created with create, but usually we use the so-called
13
    creation operators, like of, from, interval, etc.
14
    Subscribing to an Observable is like calling a function, providing callbacks
15
    where the data will be delivered to.
16
17
    Args:
18
        subscribe (Subscribe): subcribe function to use on observable
19
        ensure_contract (bool): boolean flag (default True) to ensure that
20
            this observable will follow Observable contract.
21
        max_observer (int): maximum observer on this Observable (default None <=> unlimited)
22
23
    Returns:
24
        (Observable): an observable instance.
25
26
    Raise:
27
        (RuntimeError): if subscribe parameter is undefined
28
29
    """
30
    if subscribe is None:
31
        raise RuntimeError('a subscribe function must be provided')
32
33
    if max_observer:
34
        current_observer = 0
35
36
        async def _subscribe_tracked(an_observer: Observer) -> Subscription:
37
            nonlocal current_observer
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_observer does not seem to be defined.
Loading history...
38
39
            if current_observer == max_observer:
40
                raise RuntimeError(f'{max_observer} #observers limit reached')
41
42
            current_observer = current_observer + 1
43
            subscription = await subscribe(an_observer)
44
45
            async def _unsubscribe():
46
                nonlocal current_observer
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable current_observer does not seem to be defined.
Loading history...
47
                current_observer = current_observer - 1
48
                return await subscription()
49
50
            return _unsubscribe
51
52
        return observable(subscribe=_subscribe_tracked, ensure_contract=ensure_contract)
53
54
    return observable(subscribe=subscribe, ensure_contract=ensure_contract)
55