| Conditions | 5 |
| Total Lines | 57 |
| Code Lines | 29 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | from typing import Any, Optional |
||
| 9 | def rx_concat(*observables: Observable) -> Observable: |
||
| 10 | """Concat operator. |
||
| 11 | |||
| 12 | Merge and Concat combine multiple sequences into one. |
||
| 13 | Merge might interweave elements from different sequence |
||
| 14 | whereas Concat emits all elements from the first sequence before turning to the next one. |
||
| 15 | |||
| 16 | Args: |
||
| 17 | observables (Observable): a list of observable instance |
||
| 18 | |||
| 19 | Returns: |
||
| 20 | (Observable): observable instance |
||
| 21 | |||
| 22 | Raise: |
||
| 23 | (RuntimeError): if len(observables) <= 0 |
||
| 24 | |||
| 25 | """ |
||
| 26 | |||
| 27 | if len(observables) <= 0: |
||
| 28 | raise RuntimeError("#observables must be greather than zero") |
||
| 29 | |||
| 30 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
| 31 | _source = list(observables) |
||
| 32 | _current_subscription: Optional[Subscription] = None |
||
| 33 | _observer: Optional[Observer] = None |
||
| 34 | |||
| 35 | async def _on_completed() -> None: |
||
| 36 | nonlocal _source, _current_subscription, _observer |
||
|
|
|||
| 37 | await _unsubscribe() |
||
| 38 | if _source: |
||
| 39 | obs = _source.pop(0) |
||
| 40 | if _observer: |
||
| 41 | _current_subscription = await obs.subscribe(_observer) |
||
| 42 | else: |
||
| 43 | await an_observer.on_completed() |
||
| 44 | return None |
||
| 45 | |||
| 46 | async def _on_error(err: Any) -> None: |
||
| 47 | await _unsubscribe() |
||
| 48 | await an_observer.on_error(err=err) |
||
| 49 | return None |
||
| 50 | |||
| 51 | async def _unsubscribe() -> None: |
||
| 52 | nonlocal _current_subscription |
||
| 53 | |||
| 54 | if _current_subscription: |
||
| 55 | await _current_subscription() |
||
| 56 | _current_subscription = None |
||
| 57 | |||
| 58 | _observer = rx_observer_from(observer=an_observer, on_completed=_on_completed, on_error=_on_error) |
||
| 59 | |||
| 60 | # initiate |
||
| 61 | _current_subscription = await _source.pop(0).subscribe(_observer) |
||
| 62 | |||
| 63 | return _unsubscribe |
||
| 64 | |||
| 65 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
| 66 |