1
|
|
|
from collections import deque |
2
|
|
|
from typing import Any, Deque, NoReturn, Optional |
3
|
|
|
|
4
|
|
|
from ..protocol import Observer, Subject, SubjectHandler, Subscription, subject |
5
|
|
|
from .rx_subject import rx_subject |
6
|
|
|
|
7
|
|
|
__all__ = ["rx_subject_replay"] |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
def rx_subject_replay(buffer_size: int, subject_handler: Optional[SubjectHandler] = None) -> Subject: |
11
|
|
|
"""Create a replay subject. |
12
|
|
|
|
13
|
|
|
A ReplaySubject is similar to a BehaviorSubject in that it can send |
14
|
|
|
old values to new subscribers, but it can also record a part |
15
|
|
|
of the Observable execution. |
16
|
|
|
|
17
|
|
|
A ReplaySubject records multiple values from the Observable |
18
|
|
|
execution and replays them to new subscribers. |
19
|
|
|
When a replay occurs, completed and error events are also replayed. |
20
|
|
|
|
21
|
|
|
Args: |
22
|
|
|
buffer_size (int): buffer size, or #items which be replayed on subscription |
23
|
|
|
subject_handler (Optional[SubjectHandler]): optional suject handler callback |
24
|
|
|
|
25
|
|
|
Returns: |
26
|
|
|
(Subject): the subject |
27
|
|
|
|
28
|
|
|
Raise: |
29
|
|
|
(RuntimeError): if buffer_size <= 0 |
30
|
|
|
|
31
|
|
|
""" |
32
|
|
|
if buffer_size <= 0: |
33
|
|
|
raise RuntimeError("buffer_size must be greater than zero!") |
34
|
|
|
|
35
|
|
|
_queue: Deque = deque(maxlen=buffer_size) |
36
|
|
|
_has_completed = False |
37
|
|
|
_error = None |
38
|
|
|
_subject = rx_subject(subject_handler=subject_handler) |
39
|
|
|
|
40
|
|
|
async def _on_next(item: Any) -> None: |
41
|
|
|
nonlocal _queue, _subject |
42
|
|
|
|
43
|
|
|
_queue.append(item) |
44
|
|
|
await _subject.on_next(item) |
45
|
|
|
|
46
|
|
|
async def _subscribe(an_observer: Observer) -> Subscription: |
47
|
|
|
nonlocal _queue, _subject, _has_completed, _error |
48
|
|
|
|
49
|
|
|
subscription = await _subject.subscribe(an_observer) |
50
|
|
|
|
51
|
|
|
if _queue: |
52
|
|
|
for value in _queue: |
53
|
|
|
await an_observer.on_next(value) |
54
|
|
|
|
55
|
|
|
if _error: |
56
|
|
|
await an_observer.on_error(_error) |
57
|
|
|
elif _has_completed: |
58
|
|
|
await an_observer.on_completed() |
59
|
|
|
|
60
|
|
|
return subscription |
61
|
|
|
|
62
|
|
|
async def _on_complete(): |
63
|
|
|
nonlocal _has_completed |
|
|
|
|
64
|
|
|
_has_completed = True |
65
|
|
|
await _subject.on_completed() |
66
|
|
|
|
67
|
|
|
async def _on_error(err: Any) -> Optional[NoReturn]: |
68
|
|
|
nonlocal _error |
|
|
|
|
69
|
|
|
_error = err |
70
|
|
|
await _subject.on_error(err=err) |
71
|
|
|
return None |
72
|
|
|
|
73
|
|
|
return subject(subscribe=_subscribe, on_next=_on_next, on_error=_on_error, on_completed=_on_complete) |
74
|
|
|
|