Conditions | 8 |
Total Lines | 74 |
Code Lines | 47 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | """rx_zip module. |
||
18 | def rx_zip(*observables: Observable) -> Observable: |
||
19 | """Combine multiple Observables to create an Observable. |
||
20 | |||
21 | The Obsevable values are calculated from the values, in order, |
||
22 | of each of its input Observables. |
||
23 | |||
24 | Args: |
||
25 | (Observable): a list of observable instance |
||
26 | |||
27 | Returns: |
||
28 | (Observable): observable instance |
||
29 | |||
30 | """ |
||
31 | |||
32 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
33 | |||
34 | subscriptions: List[Subscription] = [] |
||
35 | sources = list(observables) |
||
36 | n = len(sources) |
||
37 | queues: List[List] = [[] for _ in range(n)] |
||
38 | _disposable = True # error or completed not send |
||
39 | _is_done = [False] * n |
||
40 | |||
41 | async def _subscription_handler() -> None: |
||
42 | nonlocal subscriptions |
||
43 | for s in subscriptions: |
||
44 | await s() |
||
45 | |||
46 | def _on_completed(i: int) -> CompleteHandler: |
||
47 | async def __on_completed(): |
||
48 | nonlocal _disposable, _is_done |
||
49 | _is_done[i] = True |
||
50 | if _disposable and all(_is_done): |
||
51 | await an_observer.on_completed() |
||
52 | _disposable = False |
||
53 | |||
54 | return __on_completed |
||
55 | |||
56 | async def _on_error(err: Any) -> Optional[NoReturn]: |
||
57 | nonlocal _disposable |
||
58 | _disposable = False |
||
59 | return await an_observer.on_error(err=err) |
||
60 | |||
61 | async def _on_next_tuple(i: int) -> None: |
||
62 | nonlocal _disposable, queues, _is_done |
||
63 | |||
64 | if all(len(q) for q in queues): |
||
65 | try: |
||
66 | queued_values = [x.pop(0) for x in queues] |
||
67 | await an_observer.on_next(tuple(queued_values)) |
||
68 | except Exception as ex: # pragma: no cover |
||
69 | await _on_error(ex) |
||
70 | elif all(x for j, x in enumerate(_is_done) if j != i): |
||
71 | _disposable = False |
||
72 | await an_observer.on_completed() |
||
73 | |||
74 | def _on_next(i: int) -> NextHandler: |
||
75 | async def __on_next(item: Any) -> None: |
||
76 | nonlocal _disposable, queues |
||
77 | if _disposable: |
||
78 | queues[i].append(item) |
||
79 | await _on_next_tuple(i) |
||
80 | |||
81 | return __on_next |
||
82 | |||
83 | # observer factory |
||
84 | def _observer_factory(i: int) -> Observer: |
||
85 | return rx_observer(on_next=_on_next(i), on_completed=_on_completed(i), on_error=_on_error) |
||
86 | |||
87 | subscriptions = [await an_observable.subscribe(_observer_factory(i)) for i, an_observable in enumerate(sources)] |
||
88 | |||
89 | return _subscription_handler |
||
90 | |||
91 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
92 |