Conditions | 8 |
Total Lines | 95 |
Code Lines | 31 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | from typing import Any, Optional, Union |
||
8 | def rx_subject(subject_handler: Optional[SubjectHandler] = None) -> Subject: |
||
9 | """Create a subject. |
||
10 | |||
11 | A Subject is like an Observable, but can multicast to many Observers. |
||
12 | Subjects are like EventEmitters: they maintain a registry of many listeners, |
||
13 | and then dispatch events/items to them. |
||
14 | |||
15 | As subject is also an Observer, it can subscribe to an observable which act at his stream data source. |
||
16 | |||
17 | Args: |
||
18 | subject_handler (Optional[SubjectHandler]): optional suject handler callback |
||
19 | |||
20 | Returns: |
||
21 | (Subject): the subject |
||
22 | |||
23 | Example 1: |
||
24 | |||
25 | .. highlight:: python |
||
26 | .. code-block:: python |
||
27 | |||
28 | # create a subject |
||
29 | a_subject = subject(subject_handler=my_handler) |
||
30 | |||
31 | # few observer subscribe on this subject |
||
32 | sub_1 = await a_subject.subscribe(obs_1) |
||
33 | sub_2 = await a_subject.subscribe(obs_2) |
||
34 | |||
35 | # the subject subscribe himself on an observable |
||
36 | await rx_range(start=0, stop=10).subscribe(a_subject) |
||
37 | |||
38 | # obs_1 and obs_2 receive 10 #items |
||
39 | |||
40 | Example 2: |
||
41 | A subject as event emitter |
||
42 | |||
43 | .. highlight:: python |
||
44 | .. code-block:: python |
||
45 | |||
46 | # create a subject |
||
47 | a_subject = subject() |
||
48 | |||
49 | # few observer subscribe on this subject |
||
50 | sub_1 = await a_subject.subscribe(obs_1) |
||
51 | sub_2 = await a_subject.subscribe(obs_2) |
||
52 | |||
53 | # send your data by your self |
||
54 | await a_subject.on_next("my value") # obs_1 and obs_2 receive "my value" |
||
55 | await a_subject.on_completed() # obs_1 and obs_2 receive on_completed |
||
56 | |||
57 | |||
58 | """ |
||
59 | _registry = [] # list of registered observer |
||
60 | |||
61 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
62 | nonlocal _registry |
||
63 | |||
64 | _registry.append(an_observer) |
||
65 | |||
66 | if subject_handler: |
||
67 | await subject_handler.on_subscribe(count=len(_registry), source=an_observer) |
||
68 | |||
69 | async def unsubscribe() -> None: |
||
70 | nonlocal _registry |
||
71 | if an_observer in _registry: |
||
72 | _registry.remove(an_observer) |
||
73 | |||
74 | if subject_handler: |
||
75 | await subject_handler.on_unsubscribe(count=len(_registry), source=an_observer) |
||
76 | |||
77 | return unsubscribe |
||
78 | |||
79 | async def _on_next(item: Any) -> None: |
||
80 | nonlocal _registry |
||
81 | |||
82 | for o in _registry: |
||
83 | await o.on_next(item) |
||
84 | |||
85 | async def _on_error(err: Union[Any, Exception]) -> None: |
||
86 | nonlocal _registry |
||
87 | |||
88 | for o in _registry: |
||
89 | try: |
||
90 | await o.on_error(err=err) |
||
91 | except Exception: # pragma: no cover |
||
92 | pass |
||
93 | |||
94 | async def _on_completed() -> None: |
||
95 | nonlocal _registry |
||
96 | |||
97 | for o in _registry: |
||
98 | await o.on_completed() |
||
99 | |||
100 | _obs = ensure_observable_contract_operator(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) |
||
101 | |||
102 | return subject(subscribe=_subscribe, on_next=_obs.on_next, on_error=_obs.on_error, on_completed=_obs.on_completed) |
||
103 |