Conditions | 5 |
Total Lines | 57 |
Code Lines | 29 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | from typing import Any, Optional |
||
9 | def rx_concat(*observables: Observable) -> Observable: |
||
10 | """Concat operator. |
||
11 | |||
12 | Merge and Concat combine multiple sequences into one. |
||
13 | Merge might interweave elements from different sequence |
||
14 | whereas Concat emits all elements from the first sequence before turning to the next one. |
||
15 | |||
16 | Args: |
||
17 | observables (Observable): a list of observable instance |
||
18 | |||
19 | Returns: |
||
20 | (Observable): observable instance |
||
21 | |||
22 | Raise: |
||
23 | (RuntimeError): if len(observables) <= 0 |
||
24 | |||
25 | """ |
||
26 | |||
27 | if len(observables) <= 0: |
||
28 | raise RuntimeError("#observables must be greather than zero") |
||
29 | |||
30 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
31 | _source = list(observables) |
||
32 | _current_subscription: Optional[Subscription] = None |
||
33 | _observer: Optional[Observer] = None |
||
34 | |||
35 | async def _on_completed() -> None: |
||
36 | nonlocal _source, _current_subscription, _observer |
||
|
|||
37 | await _unsubscribe() |
||
38 | if _source: |
||
39 | obs = _source.pop(0) |
||
40 | if _observer: |
||
41 | _current_subscription = await obs.subscribe(_observer) |
||
42 | else: |
||
43 | await an_observer.on_completed() |
||
44 | return None |
||
45 | |||
46 | async def _on_error(err: Any) -> None: |
||
47 | await _unsubscribe() |
||
48 | await an_observer.on_error(err=err) |
||
49 | return None |
||
50 | |||
51 | async def _unsubscribe() -> None: |
||
52 | nonlocal _current_subscription |
||
53 | |||
54 | if _current_subscription: |
||
55 | await _current_subscription() |
||
56 | _current_subscription = None |
||
57 | |||
58 | _observer = rx_observer_from(observer=an_observer, on_completed=_on_completed, on_error=_on_error) |
||
59 | |||
60 | # initiate |
||
61 | _current_subscription = await _source.pop(0).subscribe(_observer) |
||
62 | |||
63 | return _unsubscribe |
||
64 | |||
65 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
66 |