async_rx.protocol.connectable   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 25
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 2
eloc 11
dl 0
loc 25
rs 10
c 0
b 0
f 0

2 Functions

Rating   Name   Duplication   Size   Complexity  
A connectable_observable() 0 2 1
A connectable_observable_handler() 0 4 1
1
"""Connectable Observable utilities."""
2
3
from collections import namedtuple
4
5
from .definition import ConnectableObservable, ConnectableObservableEventHandler, ConnectableObservableHandler, ConnectHandler, RefCountHandler, Subscribe
6
7
__all__ = ["connectable_observable", "connectable_observable_handler"]
8
9
10
ConnectableObservableDefinition = namedtuple("ConnectableObservableDefinition", ["connect", "ref_count", "subscribe"])
11
"""Implements ConnectableObservable Protocol."""
12
13
ConnectableObservableHandlerDefinition = namedtuple("ConnectableObservableHandlerDefinition", ["on_connect", "on_disconnect"])
14
"""Implements ConnectableObservableHandler Protocol."""
15
16
17
def connectable_observable(connect: ConnectHandler, ref_count: RefCountHandler, subscribe: Subscribe) -> ConnectableObservable:
18
    return ConnectableObservableDefinition(connect=connect, ref_count=ref_count, subscribe=subscribe)
19
20
21
def connectable_observable_handler(
22
    on_connect: ConnectableObservableEventHandler, on_disconnect: ConnectableObservableEventHandler
23
) -> ConnectableObservableHandler:
24
    return ConnectableObservableHandlerDefinition(on_connect=on_connect, on_disconnect=on_disconnect)
25