async_rx.protocol.subscription   A
last analyzed

Complexity

Total Complexity 4

Size/Duplication

Total Lines 42
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 4
eloc 16
dl 0
loc 42
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
A disposable_subscription_on_cancel() 0 19 3
A default_subscription() 0 10 1
1
"""Subscription utilities."""
2
from typing import Optional
3
4
import curio
5
6
from .definition import Observable, Observer, Subscription
7
8
__all__ = ["default_subscription", "disposable_subscription_on_cancel"]
9
10
11
async def default_subscription() -> None:
12
    """Default subcribe implementation method.
13
14
    Do nothing.
15
16
    Returns:
17
        (None) - nothing to return.
18
19
    """
20
    pass
21
22
23
async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]:
24
    """Subscribe implementation wich dispose herself on cancel.
25
26
    Arguments:
27
        an_observable (Observable): observable
28
        an_observer (Observer): observer
29
30
    Returns:
31
        (Optional[Subscription]): the subscription if task is not cancelled
32
33
    """
34
    _subscription: Optional[Subscription] = None
35
    try:
36
        _subscription = await an_observable.subscribe(an_observer=an_observer)
37
    except curio.CancelledError:  # pragma: no cover
38
        if _subscription:
39
            await _subscription()
40
            _subscription = None
41
    return _subscription
42