async_rx.multicast.rx_publish_replay   A
last analyzed

Complexity

Total Complexity 1

Size/Duplication

Total Lines 36
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 1
eloc 14
dl 0
loc 36
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_publish_replay() 0 26 1
1
from typing import Optional
2
3
from ..protocol import ConnectableObservable, ConnectableObservableHandler, Observable, Subject, SubjectHandler
4
from ..subject import rx_subject_replay
5
from .rx_publish import rx_publish
6
7
__all__ = ["rx_publish_replay"]
8
9
10
def rx_publish_replay(
11
    an_observable: Observable,
12
    buffer_size: int,
13
    subject_handler: Optional[SubjectHandler] = None,
14
    connection_handler: Optional[ConnectableObservableHandler] = None,
15
) -> ConnectableObservable:
16
    """Create a publish_replay.
17
18
    A publish_replay uses a replay_subject under the hood to make
19
    multiple Observers see the same Observable execution.
20
21
    Args:
22
        buffer_size (int): max #items to replay
23
        an_observable (Observable): observable to connect
24
        subject_handler (Optional[SubjectHandler]): optional subject handler
25
        connection_handler (Optional[ConnectableObservableHandler]): optional connection handler
26
27
    Returns:
28
        (ConnectableObservable): the publish_replay instance
29
30
    """
31
32
    def _subject_factory(subject_handler: Optional[SubjectHandler] = None) -> Subject:
33
        return rx_subject_replay(buffer_size=buffer_size, subject_handler=subject_handler)
34
35
    return rx_publish(subject_factory=_subject_factory, an_observable=an_observable, subject_handler=subject_handler, connection_handler=connection_handler)
36