rx_publish_behavior()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 18
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 18
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
from typing import Optional
2
3
from ..protocol import ConnectableObservable, ConnectableObservableHandler, Observable, SubjectHandler
4
from ..subject import rx_subject_behavior
5
from .rx_publish import rx_publish
6
7
__all__ = ["rx_publish_behavior"]
8
9
10
def rx_publish_behavior(
11
    an_observable: Observable, subject_handler: Optional[SubjectHandler] = None, connection_handler: Optional[ConnectableObservableHandler] = None
12
) -> ConnectableObservable:
13
    """Create a publish_behavior.
14
15
    A publish_behavior uses a behavior_subject under the hood to make multiple
16
    Observers see the same Observable execution.
17
18
    Args:
19
        an_observable (Observable): observable to connect
20
        subject_handler (Optional[SubjectHandler]): optional subject handler
21
        connection_handler (Optional[ConnectableObservableHandler]): optional connection handler
22
23
    Returns:
24
        (ConnectableObservable): the publish_behavior instance
25
26
    """
27
    return rx_publish(subject_factory=rx_subject_behavior, an_observable=an_observable, subject_handler=subject_handler, connection_handler=connection_handler)
28