| Total Complexity | 2 |
| Total Lines | 25 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | """Connectable Observable utilities.""" |
||
| 2 | |||
| 3 | from collections import namedtuple |
||
| 4 | |||
| 5 | from .definition import ConnectableObservable, ConnectableObservableEventHandler, ConnectableObservableHandler, ConnectHandler, RefCountHandler, Subscribe |
||
| 6 | |||
| 7 | __all__ = ["connectable_observable", "connectable_observable_handler"] |
||
| 8 | |||
| 9 | |||
| 10 | ConnectableObservableDefinition = namedtuple("ConnectableObservableDefinition", ["connect", "ref_count", "subscribe"]) |
||
| 11 | """Implements ConnectableObservable Protocol.""" |
||
| 12 | |||
| 13 | ConnectableObservableHandlerDefinition = namedtuple("ConnectableObservableHandlerDefinition", ["on_connect", "on_disconnect"]) |
||
| 14 | """Implements ConnectableObservableHandler Protocol.""" |
||
| 15 | |||
| 16 | |||
| 17 | def connectable_observable(connect: ConnectHandler, ref_count: RefCountHandler, subscribe: Subscribe) -> ConnectableObservable: |
||
| 18 | return ConnectableObservableDefinition(connect=connect, ref_count=ref_count, subscribe=subscribe) |
||
| 19 | |||
| 20 | |||
| 21 | def connectable_observable_handler( |
||
| 22 | on_connect: ConnectableObservableEventHandler, on_disconnect: ConnectableObservableEventHandler |
||
| 23 | ) -> ConnectableObservableHandler: |
||
| 24 | return ConnectableObservableHandlerDefinition(on_connect=on_connect, on_disconnect=on_disconnect) |
||
| 25 |