connectable_observable()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
"""Connectable Observable utilities."""
2
3
from collections import namedtuple
4
5
from .definition import ConnectableObservable, ConnectableObservableEventHandler, ConnectableObservableHandler, ConnectHandler, RefCountHandler, Subscribe
6
7
__all__ = ["connectable_observable", "connectable_observable_handler"]
8
9
10
ConnectableObservableDefinition = namedtuple("ConnectableObservableDefinition", ["connect", "ref_count", "subscribe"])
11
"""Implements ConnectableObservable Protocol."""
12
13
ConnectableObservableHandlerDefinition = namedtuple("ConnectableObservableHandlerDefinition", ["on_connect", "on_disconnect"])
14
"""Implements ConnectableObservableHandler Protocol."""
15
16
17
def connectable_observable(connect: ConnectHandler, ref_count: RefCountHandler, subscribe: Subscribe) -> ConnectableObservable:
18
    return ConnectableObservableDefinition(connect=connect, ref_count=ref_count, subscribe=subscribe)
19
20
21
def connectable_observable_handler(
22
    on_connect: ConnectableObservableEventHandler, on_disconnect: ConnectableObservableEventHandler
23
) -> ConnectableObservableHandler:
24
    return ConnectableObservableHandlerDefinition(on_connect=on_connect, on_disconnect=on_disconnect)
25