| Conditions | 4 |
| Total Lines | 59 |
| Code Lines | 33 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | from collections import deque |
||
| 10 | def rx_buffer(observable: Observable, buffer_size: int) -> Observable: |
||
| 11 | """Buffer operator. |
||
| 12 | |||
| 13 | Buffer and Window collect elements from the source sequence and emit them in groups. |
||
| 14 | Buffer projects these elements onto list and emits them, start to process source on first subscription. |
||
| 15 | |||
| 16 | Args: |
||
| 17 | observable (Observable): the source |
||
| 18 | buffer_size (int): buffer size |
||
| 19 | |||
| 20 | Returns: |
||
| 21 | (Observable): observable instance |
||
| 22 | |||
| 23 | Raise: |
||
| 24 | (RuntimeError): if buffer_size <= 0 |
||
| 25 | |||
| 26 | """ |
||
| 27 | |||
| 28 | if buffer_size <= 0: |
||
| 29 | raise RuntimeError('count must be greather than zero') |
||
| 30 | |||
| 31 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
| 32 | _queue: Deque = deque(maxlen=buffer_size) |
||
| 33 | _unsub: Optional[Subscription] = None |
||
| 34 | |||
| 35 | async def flush(): |
||
| 36 | nonlocal _queue |
||
| 37 | if len(_queue) >= buffer_size: |
||
| 38 | await an_observer.on_next(list(_queue)) |
||
| 39 | _queue.clear() |
||
| 40 | |||
| 41 | async def _on_next(item: Any) -> None: |
||
| 42 | nonlocal _queue |
||
| 43 | _queue.append(item) |
||
| 44 | await flush() |
||
| 45 | return None |
||
| 46 | |||
| 47 | async def _on_completed() -> None: |
||
| 48 | nonlocal _queue |
||
| 49 | await flush() |
||
| 50 | await an_observer.on_completed() |
||
| 51 | return None |
||
| 52 | |||
| 53 | async def _on_error(err: Any) -> None: |
||
| 54 | await flush() |
||
| 55 | await an_observer.on_error(err=err) |
||
| 56 | return None |
||
| 57 | |||
| 58 | async def _unsubscribe(): |
||
| 59 | nonlocal _queue |
||
| 60 | _queue.clear() |
||
| 61 | if _unsub: |
||
| 62 | await _unsub() |
||
| 63 | |||
| 64 | _unsub = await observable.subscribe(rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error)) |
||
| 65 | |||
| 66 | return _unsubscribe |
||
| 67 | |||
| 68 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
| 69 |