| Total Complexity | 4 |
| Total Lines | 42 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | """Subscription utilities.""" |
||
| 2 | from typing import Optional |
||
| 3 | |||
| 4 | import curio |
||
| 5 | |||
| 6 | from .definition import Observable, Observer, Subscription |
||
| 7 | |||
| 8 | __all__ = ["default_subscription", "disposable_subscription_on_cancel"] |
||
| 9 | |||
| 10 | |||
| 11 | async def default_subscription() -> None: |
||
| 12 | """Default subcribe implementation method. |
||
| 13 | |||
| 14 | Do nothing. |
||
| 15 | |||
| 16 | Returns: |
||
| 17 | (None) - nothing to return. |
||
| 18 | |||
| 19 | """ |
||
| 20 | pass |
||
| 21 | |||
| 22 | |||
| 23 | async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]: |
||
| 24 | """Subscribe implementation wich dispose herself on cancel. |
||
| 25 | |||
| 26 | Arguments: |
||
| 27 | an_observable (Observable): observable |
||
| 28 | an_observer (Observer): observer |
||
| 29 | |||
| 30 | Returns: |
||
| 31 | (Optional[Subscription]): the subscription if task is not cancelled |
||
| 32 | |||
| 33 | """ |
||
| 34 | _subscription: Optional[Subscription] = None |
||
| 35 | try: |
||
| 36 | _subscription = await an_observable.subscribe(an_observer=an_observer) |
||
| 37 | except curio.CancelledError: # pragma: no cover |
||
| 38 | if _subscription: |
||
| 39 | await _subscription() |
||
| 40 | _subscription = None |
||
| 41 | return _subscription |
||
| 42 |