rx_publish_replay()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 26
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
dl 0
loc 26
rs 10
c 0
b 0
f 0
cc 1
nop 4
1
from typing import Optional
2
3
from ..protocol import ConnectableObservable, ConnectableObservableHandler, Observable, Subject, SubjectHandler
4
from ..subject import rx_subject_replay
5
from .rx_publish import rx_publish
6
7
__all__ = ["rx_publish_replay"]
8
9
10
def rx_publish_replay(
11
    an_observable: Observable,
12
    buffer_size: int,
13
    subject_handler: Optional[SubjectHandler] = None,
14
    connection_handler: Optional[ConnectableObservableHandler] = None,
15
) -> ConnectableObservable:
16
    """Create a publish_replay.
17
18
    A publish_replay uses a replay_subject under the hood to make
19
    multiple Observers see the same Observable execution.
20
21
    Args:
22
        buffer_size (int): max #items to replay
23
        an_observable (Observable): observable to connect
24
        subject_handler (Optional[SubjectHandler]): optional subject handler
25
        connection_handler (Optional[ConnectableObservableHandler]): optional connection handler
26
27
    Returns:
28
        (ConnectableObservable): the publish_replay instance
29
30
    """
31
32
    def _subject_factory(subject_handler: Optional[SubjectHandler] = None) -> Subject:
33
        return rx_subject_replay(buffer_size=buffer_size, subject_handler=subject_handler)
34
35
    return rx_publish(subject_factory=_subject_factory, an_observable=an_observable, subject_handler=subject_handler, connection_handler=connection_handler)
36