for testing and deploying your application
for finding and fixing issues
for empowering human code reviews
"""Subscription utilities."""
from typing import Optional
import curio
from .definition import Observable, Observer, Subscription
__all__ = ["default_subscription", "disposable_subscription_on_cancel"]
async def default_subscription() -> None:
"""Default subcribe implementation method.
Do nothing.
Returns:
(None) - nothing to return.
"""
pass
async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]:
"""Subscribe implementation wich dispose herself on cancel.
Arguments:
an_observable (Observable): observable
an_observer (Observer): observer
(Optional[Subscription]): the subscription if task is not cancelled
_subscription: Optional[Subscription] = None
try:
_subscription = await an_observable.subscribe(an_observer=an_observer)
except curio.CancelledError: # pragma: no cover
if _subscription:
await _subscription()
_subscription = None
return _subscription