1
|
|
|
"""Define rx_create.""" |
2
|
|
|
from typing import NoReturn, Optional, Union |
3
|
|
|
|
4
|
|
|
from ..protocol import Observable, Observer, Subscribe, Subscription, observable |
5
|
|
|
|
6
|
|
|
__all__ = ["rx_create"] |
7
|
|
|
|
8
|
|
|
|
9
|
|
|
def rx_create(subscribe: Subscribe, ensure_contract: Optional[bool] = True, max_observer: Optional[int] = None) -> Union[Observable, NoReturn]: |
10
|
|
|
"""Create an observable with specific delayed execution 'subscribe'. |
11
|
|
|
|
12
|
|
|
Observables can be created with create, but usually we use the so-called |
13
|
|
|
creation operators, like of, from, interval, etc. |
14
|
|
|
Subscribing to an Observable is like calling a function, providing callbacks |
15
|
|
|
where the data will be delivered to. |
16
|
|
|
|
17
|
|
|
Args: |
18
|
|
|
subscribe (Subscribe): subcribe function to use on observable |
19
|
|
|
ensure_contract (bool): boolean flag (default True) to ensure that |
20
|
|
|
this observable will follow Observable contract. |
21
|
|
|
max_observer (int): maximum observer on this Observable (default None <=> unlimited) |
22
|
|
|
|
23
|
|
|
Returns: |
24
|
|
|
(Observable): an observable instance. |
25
|
|
|
|
26
|
|
|
Raise: |
27
|
|
|
(RuntimeError): if subscribe parameter is undefined |
28
|
|
|
|
29
|
|
|
""" |
30
|
|
|
if subscribe is None: |
31
|
|
|
raise RuntimeError('a subscribe function must be provided') |
32
|
|
|
|
33
|
|
|
if max_observer: |
34
|
|
|
current_observer = 0 |
35
|
|
|
|
36
|
|
|
async def _subscribe_tracked(an_observer: Observer) -> Subscription: |
37
|
|
|
nonlocal current_observer |
|
|
|
|
38
|
|
|
|
39
|
|
|
if current_observer == max_observer: |
40
|
|
|
raise RuntimeError(f'{max_observer} #observers limit reached') |
41
|
|
|
|
42
|
|
|
current_observer = current_observer + 1 |
43
|
|
|
subscription = await subscribe(an_observer) |
44
|
|
|
|
45
|
|
|
async def _unsubscribe(): |
46
|
|
|
nonlocal current_observer |
|
|
|
|
47
|
|
|
current_observer = current_observer - 1 |
48
|
|
|
return await subscription() |
49
|
|
|
|
50
|
|
|
return _unsubscribe |
51
|
|
|
|
52
|
|
|
return observable(subscribe=_subscribe_tracked, ensure_contract=ensure_contract) |
53
|
|
|
|
54
|
|
|
return observable(subscribe=subscribe, ensure_contract=ensure_contract) |
55
|
|
|
|