| Conditions | 9 |
| Total Lines | 79 |
| Code Lines | 39 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | from typing import Any, List, NoReturn, Optional |
||
| 11 | def rx_merge(*observables: Observable) -> Observable: |
||
| 12 | """Flattens multiple Observables together by blending their values into one Observable. |
||
| 13 | |||
| 14 | Creates an output Observable which concurrently emits all values |
||
| 15 | from every given input Observable. |
||
| 16 | 'merge' subscribes to each given input Observable (either the source or |
||
| 17 | an Observable given as argument), and simply forwards (without doing any |
||
| 18 | transformation) all the values from all the input Observables to the output |
||
| 19 | Observable. |
||
| 20 | The output Observable only completes once all input Observables have completed. |
||
| 21 | Any error delivered by an input Observable will be immediately emitted on |
||
| 22 | the output Observable. |
||
| 23 | |||
| 24 | Args: |
||
| 25 | observables (Observable): a list of observable instance |
||
| 26 | |||
| 27 | Returns: |
||
| 28 | (Observable): observable instance |
||
| 29 | |||
| 30 | Raise: |
||
| 31 | (RuntimeError): if #observables < 1 |
||
| 32 | |||
| 33 | """ |
||
| 34 | if len(observables) < 1: |
||
| 35 | raise RuntimeError("#observables must be greather than 1") |
||
| 36 | |||
| 37 | terminated_observable = 0 |
||
| 38 | deliver_next = True |
||
| 39 | subscriptions: List[Subscription] = [] |
||
| 40 | |||
| 41 | async def _subscription_handler() -> None: |
||
| 42 | nonlocal subscriptions |
||
| 43 | for s in subscriptions: |
||
| 44 | await s() |
||
| 45 | |||
| 46 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
| 47 | nonlocal subscriptions |
||
|
|
|||
| 48 | |||
| 49 | async def _on_next(item: Any) -> None: |
||
| 50 | # filter item according to deliver_next |
||
| 51 | nonlocal deliver_next |
||
| 52 | |||
| 53 | if deliver_next: # if no previous error |
||
| 54 | await an_observer.on_next(item) |
||
| 55 | return None |
||
| 56 | |||
| 57 | async def _on_completed() -> None: |
||
| 58 | nonlocal terminated_observable, deliver_next |
||
| 59 | |||
| 60 | if deliver_next: # if no previous error |
||
| 61 | terminated_observable += 1 |
||
| 62 | if terminated_observable == len(observables): # and all observable complete |
||
| 63 | # lock on_next, on_error handler call and other on_completed call. |
||
| 64 | deliver_next = False |
||
| 65 | await an_observer.on_completed() |
||
| 66 | return None |
||
| 67 | |||
| 68 | async def _on_error(err: Any) -> Optional[NoReturn]: |
||
| 69 | nonlocal deliver_next |
||
| 70 | |||
| 71 | if deliver_next: |
||
| 72 | # lock on_next, on_completed handler call and other on_error call. |
||
| 73 | deliver_next = False |
||
| 74 | await an_observer.on_error(err) |
||
| 75 | return None |
||
| 76 | |||
| 77 | # local observer definition |
||
| 78 | _observer = rx_observer(on_next=_on_next, on_completed=_on_completed, on_error=_on_error) |
||
| 79 | |||
| 80 | # local observer subscribe to all observables in parallele |
||
| 81 | _tasks = [] |
||
| 82 | async with curio.TaskGroup(wait=all) as g: |
||
| 83 | for an_observable in observables: |
||
| 84 | _tasks.append(await g.spawn(an_observable.subscribe, _observer)) |
||
| 85 | subscriptions = [t.result for t in _tasks] |
||
| 86 | |||
| 87 | return _subscription_handler |
||
| 88 | |||
| 89 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
| 90 |