async_rx.multicast.rx_publish_behavior   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 28
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 9
dl 0
loc 28
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_publish_behavior() 0 18 1
1
from typing import Optional
2
3
from ..protocol import ConnectableObservable, ConnectableObservableHandler, Observable, SubjectHandler
4
from ..subject import rx_subject_behavior
5
from .rx_publish import rx_publish
6
7
__all__ = ["rx_publish_behavior"]
8
9
10
def rx_publish_behavior(
11
    an_observable: Observable, subject_handler: Optional[SubjectHandler] = None, connection_handler: Optional[ConnectableObservableHandler] = None
12
) -> ConnectableObservable:
13
    """Create a publish_behavior.
14
15
    A publish_behavior uses a behavior_subject under the hood to make multiple
16
    Observers see the same Observable execution.
17
18
    Args:
19
        an_observable (Observable): observable to connect
20
        subject_handler (Optional[SubjectHandler]): optional subject handler
21
        connection_handler (Optional[ConnectableObservableHandler]): optional connection handler
22
23
    Returns:
24
        (ConnectableObservable): the publish_behavior instance
25
26
    """
27
    return rx_publish(subject_factory=rx_subject_behavior, an_observable=an_observable, subject_handler=subject_handler, connection_handler=connection_handler)
28