rx_subject_replay()   B
last analyzed

Complexity

Conditions 6

Size

Total Lines 64
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 32
dl 0
loc 64
rs 8.1786
c 0
b 0
f 0
cc 6
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
from collections import deque
2
from typing import Any, Deque, NoReturn, Optional
3
4
from ..protocol import Observer, Subject, SubjectHandler, Subscription, subject
5
from .rx_subject import rx_subject
6
7
__all__ = ["rx_subject_replay"]
8
9
10
def rx_subject_replay(buffer_size: int, subject_handler: Optional[SubjectHandler] = None) -> Subject:
11
    """Create a replay subject.
12
13
    A ReplaySubject is similar to a BehaviorSubject in that it can send
14
    old values to new subscribers, but it can also record a part
15
    of the Observable execution.
16
17
    A ReplaySubject records multiple values from the Observable
18
    execution and replays them to new subscribers.
19
    When a replay occurs, completed and error events are also replayed.
20
21
    Args:
22
        buffer_size (int): buffer size, or #items which be replayed on subscription
23
        subject_handler (Optional[SubjectHandler]): optional suject handler callback
24
25
    Returns:
26
        (Subject): the subject
27
28
    Raise:
29
        (RuntimeError): if buffer_size <= 0
30
31
    """
32
    if buffer_size <= 0:
33
        raise RuntimeError("buffer_size must be greater than zero!")
34
35
    _queue: Deque = deque(maxlen=buffer_size)
36
    _has_completed = False
37
    _error = None
38
    _subject = rx_subject(subject_handler=subject_handler)
39
40
    async def _on_next(item: Any) -> None:
41
        nonlocal _queue, _subject
42
43
        _queue.append(item)
44
        await _subject.on_next(item)
45
46
    async def _subscribe(an_observer: Observer) -> Subscription:
47
        nonlocal _queue, _subject, _has_completed, _error
48
49
        subscription = await _subject.subscribe(an_observer)
50
51
        if _queue:
52
            for value in _queue:
53
                await an_observer.on_next(value)
54
55
        if _error:
56
            await an_observer.on_error(_error)
57
        elif _has_completed:
58
            await an_observer.on_completed()
59
60
        return subscription
61
62
    async def _on_complete():
63
        nonlocal _has_completed
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _has_completed does not seem to be defined.
Loading history...
64
        _has_completed = True
65
        await _subject.on_completed()
66
67
    async def _on_error(err: Any) -> Optional[NoReturn]:
68
        nonlocal _error
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _error does not seem to be defined.
Loading history...
69
        _error = err
70
        await _subject.on_error(err=err)
71
        return None
72
73
    return subject(subscribe=_subscribe, on_next=_on_next, on_error=_on_error, on_completed=_on_complete)
74