disposable_subscription_on_cancel()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 19
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 9
dl 0
loc 19
rs 9.95
c 0
b 0
f 0
cc 3
nop 2
1
"""Subscription utilities."""
2
from typing import Optional
3
4
import curio
5
6
from .definition import Observable, Observer, Subscription
7
8
__all__ = ["default_subscription", "disposable_subscription_on_cancel"]
9
10
11
async def default_subscription() -> None:
12
    """Default subcribe implementation method.
13
14
    Do nothing.
15
16
    Returns:
17
        (None) - nothing to return.
18
19
    """
20
    pass
21
22
23
async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]:
24
    """Subscribe implementation wich dispose herself on cancel.
25
26
    Arguments:
27
        an_observable (Observable): observable
28
        an_observer (Observer): observer
29
30
    Returns:
31
        (Optional[Subscription]): the subscription if task is not cancelled
32
33
    """
34
    _subscription: Optional[Subscription] = None
35
    try:
36
        _subscription = await an_observable.subscribe(an_observer=an_observer)
37
    except curio.CancelledError:  # pragma: no cover
38
        if _subscription:
39
            await _subscription()
40
            _subscription = None
41
    return _subscription
42