Conditions | 3 |
Total Lines | 19 |
Code Lines | 9 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
1 | """Subscription utilities.""" |
||
23 | async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]: |
||
24 | """Subscribe implementation wich dispose herself on cancel. |
||
25 | |||
26 | Arguments: |
||
27 | an_observable (Observable): observable |
||
28 | an_observer (Observer): observer |
||
29 | |||
30 | Returns: |
||
31 | (Optional[Subscription]): the subscription if task is not cancelled |
||
32 | |||
33 | """ |
||
34 | _subscription: Optional[Subscription] = None |
||
35 | try: |
||
36 | _subscription = await an_observable.subscribe(an_observer=an_observer) |
||
37 | except curio.CancelledError: # pragma: no cover |
||
38 | if _subscription: |
||
39 | await _subscription() |
||
40 | _subscription = None |
||
41 | return _subscription |
||
42 |