Conditions | 6 |
Total Lines | 64 |
Code Lines | 32 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | from collections import deque |
||
10 | def rx_subject_replay(buffer_size: int, subject_handler: Optional[SubjectHandler] = None) -> Subject: |
||
11 | """Create a replay subject. |
||
12 | |||
13 | A ReplaySubject is similar to a BehaviorSubject in that it can send |
||
14 | old values to new subscribers, but it can also record a part |
||
15 | of the Observable execution. |
||
16 | |||
17 | A ReplaySubject records multiple values from the Observable |
||
18 | execution and replays them to new subscribers. |
||
19 | When a replay occurs, completed and error events are also replayed. |
||
20 | |||
21 | Args: |
||
22 | buffer_size (int): buffer size, or #items which be replayed on subscription |
||
23 | subject_handler (Optional[SubjectHandler]): optional suject handler callback |
||
24 | |||
25 | Returns: |
||
26 | (Subject): the subject |
||
27 | |||
28 | Raise: |
||
29 | (RuntimeError): if buffer_size <= 0 |
||
30 | |||
31 | """ |
||
32 | if buffer_size <= 0: |
||
33 | raise RuntimeError("buffer_size must be greater than zero!") |
||
34 | |||
35 | _queue: Deque = deque(maxlen=buffer_size) |
||
36 | _has_completed = False |
||
37 | _error = None |
||
38 | _subject = rx_subject(subject_handler=subject_handler) |
||
39 | |||
40 | async def _on_next(item: Any) -> None: |
||
41 | nonlocal _queue, _subject |
||
42 | |||
43 | _queue.append(item) |
||
44 | await _subject.on_next(item) |
||
45 | |||
46 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
47 | nonlocal _queue, _subject, _has_completed, _error |
||
48 | |||
49 | subscription = await _subject.subscribe(an_observer) |
||
50 | |||
51 | if _queue: |
||
52 | for value in _queue: |
||
53 | await an_observer.on_next(value) |
||
54 | |||
55 | if _error: |
||
56 | await an_observer.on_error(_error) |
||
57 | elif _has_completed: |
||
58 | await an_observer.on_completed() |
||
59 | |||
60 | return subscription |
||
61 | |||
62 | async def _on_complete(): |
||
63 | nonlocal _has_completed |
||
|
|||
64 | _has_completed = True |
||
65 | await _subject.on_completed() |
||
66 | |||
67 | async def _on_error(err: Any) -> Optional[NoReturn]: |
||
68 | nonlocal _error |
||
69 | _error = err |
||
70 | await _subject.on_error(err=err) |
||
71 | return None |
||
72 | |||
73 | return subject(subscribe=_subscribe, on_next=_on_next, on_error=_on_error, on_completed=_on_complete) |
||
74 |