| Conditions | 3 |
| Total Lines | 19 |
| Code Lines | 9 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
| 1 | """Subscription utilities.""" |
||
| 23 | async def disposable_subscription_on_cancel(an_observable: Observable, an_observer: Observer) -> Optional[Subscription]: |
||
| 24 | """Subscribe implementation wich dispose herself on cancel. |
||
| 25 | |||
| 26 | Arguments: |
||
| 27 | an_observable (Observable): observable |
||
| 28 | an_observer (Observer): observer |
||
| 29 | |||
| 30 | Returns: |
||
| 31 | (Optional[Subscription]): the subscription if task is not cancelled |
||
| 32 | |||
| 33 | """ |
||
| 34 | _subscription: Optional[Subscription] = None |
||
| 35 | try: |
||
| 36 | _subscription = await an_observable.subscribe(an_observer=an_observer) |
||
| 37 | except curio.CancelledError: # pragma: no cover |
||
| 38 | if _subscription: |
||
| 39 | await _subscription() |
||
| 40 | _subscription = None |
||
| 41 | return _subscription |
||
| 42 |