Total Complexity | 4 |
Total Lines | 42 |
Duplicated Lines | 0 % |
Changes | 0 |
1 | """Subscription utilities.""" |
||
2 | from typing import Optional |
||
3 | |||
4 | import curio |
||
5 | |||
6 | from .definition import Observable, Observer, Subscription |
||
7 | |||
8 | __all__ = ["default_subscription", "disposable_subscription_on_cancel"] |
||
9 | |||
10 | |||
11 | async def default_subscription() -> None: |
||
12 | """Default subcribe implementation method. |
||
13 | |||
14 | Do nothing. |
||
15 | |||
16 | Returns: |
||
17 | (None) - nothing to return. |
||
18 | |||
19 | """ |
||
20 | pass |
||
21 | |||
22 | |||
23 | async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]: |
||
24 | """Subscribe implementation wich dispose herself on cancel. |
||
25 | |||
26 | Arguments: |
||
27 | an_observable (Observable): observable |
||
28 | an_observer (Observer): observer |
||
29 | |||
30 | Returns: |
||
31 | (Optional[Subscription]): the subscription if task is not cancelled |
||
32 | |||
33 | """ |
||
34 | _subscription: Optional[Subscription] = None |
||
35 | try: |
||
36 | _subscription = await an_observable.subscribe(an_observer=an_observer) |
||
37 | except curio.CancelledError: # pragma: no cover |
||
38 | if _subscription: |
||
39 | await _subscription() |
||
40 | _subscription = None |
||
41 | return _subscription |
||
42 |