| Conditions | 8 |
| Total Lines | 95 |
| Code Lines | 31 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | from typing import Any, Optional, Union |
||
| 8 | def rx_subject(subject_handler: Optional[SubjectHandler] = None) -> Subject: |
||
| 9 | """Create a subject. |
||
| 10 | |||
| 11 | A Subject is like an Observable, but can multicast to many Observers. |
||
| 12 | Subjects are like EventEmitters: they maintain a registry of many listeners, |
||
| 13 | and then dispatch events/items to them. |
||
| 14 | |||
| 15 | As subject is also an Observer, it can subscribe to an observable which act at his stream data source. |
||
| 16 | |||
| 17 | Args: |
||
| 18 | subject_handler (Optional[SubjectHandler]): optional suject handler callback |
||
| 19 | |||
| 20 | Returns: |
||
| 21 | (Subject): the subject |
||
| 22 | |||
| 23 | Example 1: |
||
| 24 | |||
| 25 | .. highlight:: python |
||
| 26 | .. code-block:: python |
||
| 27 | |||
| 28 | # create a subject |
||
| 29 | a_subject = subject(subject_handler=my_handler) |
||
| 30 | |||
| 31 | # few observer subscribe on this subject |
||
| 32 | sub_1 = await a_subject.subscribe(obs_1) |
||
| 33 | sub_2 = await a_subject.subscribe(obs_2) |
||
| 34 | |||
| 35 | # the subject subscribe himself on an observable |
||
| 36 | await rx_range(start=0, stop=10).subscribe(a_subject) |
||
| 37 | |||
| 38 | # obs_1 and obs_2 receive 10 #items |
||
| 39 | |||
| 40 | Example 2: |
||
| 41 | A subject as event emitter |
||
| 42 | |||
| 43 | .. highlight:: python |
||
| 44 | .. code-block:: python |
||
| 45 | |||
| 46 | # create a subject |
||
| 47 | a_subject = subject() |
||
| 48 | |||
| 49 | # few observer subscribe on this subject |
||
| 50 | sub_1 = await a_subject.subscribe(obs_1) |
||
| 51 | sub_2 = await a_subject.subscribe(obs_2) |
||
| 52 | |||
| 53 | # send your data by your self |
||
| 54 | await a_subject.on_next("my value") # obs_1 and obs_2 receive "my value" |
||
| 55 | await a_subject.on_completed() # obs_1 and obs_2 receive on_completed |
||
| 56 | |||
| 57 | |||
| 58 | """ |
||
| 59 | _registry = [] # list of registered observer |
||
| 60 | |||
| 61 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
| 62 | nonlocal _registry |
||
| 63 | |||
| 64 | _registry.append(an_observer) |
||
| 65 | |||
| 66 | if subject_handler: |
||
| 67 | await subject_handler.on_subscribe(count=len(_registry), source=an_observer) |
||
| 68 | |||
| 69 | async def unsubscribe() -> None: |
||
| 70 | nonlocal _registry |
||
| 71 | if an_observer in _registry: |
||
| 72 | _registry.remove(an_observer) |
||
| 73 | |||
| 74 | if subject_handler: |
||
| 75 | await subject_handler.on_unsubscribe(count=len(_registry), source=an_observer) |
||
| 76 | |||
| 77 | return unsubscribe |
||
| 78 | |||
| 79 | async def _on_next(item: Any) -> None: |
||
| 80 | nonlocal _registry |
||
| 81 | |||
| 82 | for o in _registry: |
||
| 83 | await o.on_next(item) |
||
| 84 | |||
| 85 | async def _on_error(err: Union[Any, Exception]) -> None: |
||
| 86 | nonlocal _registry |
||
| 87 | |||
| 88 | for o in _registry: |
||
| 89 | try: |
||
| 90 | await o.on_error(err=err) |
||
| 91 | except Exception: # pragma: no cover |
||
| 92 | pass |
||
| 93 | |||
| 94 | async def _on_completed() -> None: |
||
| 95 | nonlocal _registry |
||
| 96 | |||
| 97 | for o in _registry: |
||
| 98 | await o.on_completed() |
||
| 99 | |||
| 100 | _obs = ensure_observable_contract_operator(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) |
||
| 101 | |||
| 102 | return subject(subscribe=_subscribe, on_next=_obs.on_next, on_error=_obs.on_error, on_completed=_obs.on_completed) |
||
| 103 |