1
|
|
|
from inspect import iscoroutinefunction |
2
|
|
|
from typing import Any, Callable, Optional |
3
|
|
|
|
4
|
|
|
from ..protocol import Observable, Observer, Subscription, rx_observer_from |
5
|
|
|
from .rx_create import rx_create |
6
|
|
|
|
7
|
|
|
__all__ = ["rx_map"] |
8
|
|
|
|
9
|
|
|
|
10
|
|
|
def rx_map( |
11
|
|
|
observable: Observable, transform: Callable, expand_arg_parameters: Optional[bool] = False, expand_kwarg_parameters: Optional[bool] = False |
12
|
|
|
) -> Observable: |
13
|
|
|
"""Map operator. |
14
|
|
|
|
15
|
|
|
Map operator modifies an Observable<A> into Observable<B> given a function with the type A->B. |
16
|
|
|
|
17
|
|
|
For example, if we take the function x => 10 ∗ x and a list of 1,2,3. The result is 10,20,30, see figure 4. |
18
|
|
|
Note that this function did not change the type of the Observable but did change the values. |
19
|
|
|
|
20
|
|
|
Args: |
21
|
|
|
observable (Observable): an observable instance |
22
|
|
|
transform (Callable): transform function (sync or async) |
23
|
|
|
expand_arg_parameters (Optional[bool]): if true each item will be expanded as args before call transform |
24
|
|
|
(implique expand_kwarg_parameters = False). |
25
|
|
|
expand_kwarg_parameters (Optional[bool]): if true each item will be expanded as kwargs before call transform. |
26
|
|
|
|
27
|
|
|
Returns: |
28
|
|
|
(Observable): observable instance |
29
|
|
|
|
30
|
|
|
""" |
31
|
|
|
|
32
|
|
|
_is_awaitable = iscoroutinefunction(transform) |
33
|
|
|
|
34
|
|
|
async def _subscribe(an_observer: Observer) -> Subscription: |
35
|
|
|
async def _on_next(item: Any): |
36
|
|
|
nonlocal _is_awaitable |
37
|
|
|
|
38
|
|
|
if expand_kwarg_parameters: |
39
|
|
|
_next_item = await transform(**item) if _is_awaitable else transform(**item) |
40
|
|
|
elif expand_arg_parameters: |
41
|
|
|
_next_item = await transform(*item) if _is_awaitable else transform(*item) |
42
|
|
|
else: |
43
|
|
|
_next_item = await transform(item) if _is_awaitable else transform(item) |
44
|
|
|
|
45
|
|
|
await an_observer.on_next(_next_item) |
46
|
|
|
|
47
|
|
|
return await observable.subscribe(rx_observer_from(observer=an_observer, on_next=_on_next)) |
48
|
|
|
|
49
|
|
|
return rx_create(subscribe=_subscribe) |
50
|
|
|
|