| Conditions | 6 |
| Total Lines | 64 |
| Code Lines | 32 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | from collections import deque |
||
| 10 | def rx_subject_replay(buffer_size: int, subject_handler: Optional[SubjectHandler] = None) -> Subject: |
||
| 11 | """Create a replay subject. |
||
| 12 | |||
| 13 | A ReplaySubject is similar to a BehaviorSubject in that it can send |
||
| 14 | old values to new subscribers, but it can also record a part |
||
| 15 | of the Observable execution. |
||
| 16 | |||
| 17 | A ReplaySubject records multiple values from the Observable |
||
| 18 | execution and replays them to new subscribers. |
||
| 19 | When a replay occurs, completed and error events are also replayed. |
||
| 20 | |||
| 21 | Args: |
||
| 22 | buffer_size (int): buffer size, or #items which be replayed on subscription |
||
| 23 | subject_handler (Optional[SubjectHandler]): optional suject handler callback |
||
| 24 | |||
| 25 | Returns: |
||
| 26 | (Subject): the subject |
||
| 27 | |||
| 28 | Raise: |
||
| 29 | (RuntimeError): if buffer_size <= 0 |
||
| 30 | |||
| 31 | """ |
||
| 32 | if buffer_size <= 0: |
||
| 33 | raise RuntimeError("buffer_size must be greater than zero!") |
||
| 34 | |||
| 35 | _queue: Deque = deque(maxlen=buffer_size) |
||
| 36 | _has_completed = False |
||
| 37 | _error = None |
||
| 38 | _subject = rx_subject(subject_handler=subject_handler) |
||
| 39 | |||
| 40 | async def _on_next(item: Any) -> None: |
||
| 41 | nonlocal _queue, _subject |
||
| 42 | |||
| 43 | _queue.append(item) |
||
| 44 | await _subject.on_next(item) |
||
| 45 | |||
| 46 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
| 47 | nonlocal _queue, _subject, _has_completed, _error |
||
| 48 | |||
| 49 | subscription = await _subject.subscribe(an_observer) |
||
| 50 | |||
| 51 | if _queue: |
||
| 52 | for value in _queue: |
||
| 53 | await an_observer.on_next(value) |
||
| 54 | |||
| 55 | if _error: |
||
| 56 | await an_observer.on_error(_error) |
||
| 57 | elif _has_completed: |
||
| 58 | await an_observer.on_completed() |
||
| 59 | |||
| 60 | return subscription |
||
| 61 | |||
| 62 | async def _on_complete(): |
||
| 63 | nonlocal _has_completed |
||
|
|
|||
| 64 | _has_completed = True |
||
| 65 | await _subject.on_completed() |
||
| 66 | |||
| 67 | async def _on_error(err: Any) -> Optional[NoReturn]: |
||
| 68 | nonlocal _error |
||
| 69 | _error = err |
||
| 70 | await _subject.on_error(err=err) |
||
| 71 | return None |
||
| 72 | |||
| 73 | return subject(subscribe=_subscribe, on_next=_on_next, on_error=_on_error, on_completed=_on_complete) |
||
| 74 |