Conditions | 8 |
Total Lines | 77 |
Code Lines | 46 |
Lines | 77 |
Ratio | 100 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | from datetime import timedelta |
||
12 | View Code Duplication | def rx_sample(observable: Observable, duration: timedelta) -> Observable: |
|
|
|||
13 | """Sample operator used to rate-limit the sequence. |
||
14 | |||
15 | Sample filter out elements based on the timing. |
||
16 | Sample will emit the LATEST value on a set interval or emit nothing if no new value arrived during the last interval. |
||
17 | |||
18 | |||
19 | Args: |
||
20 | observable (Observable): an observable instance |
||
21 | duration (timedelta): timedelta of interval (the duration) |
||
22 | |||
23 | Returns: |
||
24 | (Observable): observable instance |
||
25 | |||
26 | Raise: |
||
27 | (RuntimeError): if no observable or duration are provided |
||
28 | |||
29 | """ |
||
30 | if not observable or not duration: |
||
31 | raise RuntimeError("observable and duration are mandatory") |
||
32 | |||
33 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
34 | |||
35 | _receive_value = False |
||
36 | _lastest_value = None |
||
37 | _consumer_task = None |
||
38 | _subscription: Optional[Subscription] = None |
||
39 | _duration = duration.total_seconds() |
||
40 | |||
41 | async def consumer(): |
||
42 | nonlocal _duration, _lastest_value, _receive_value |
||
43 | try: |
||
44 | while True: |
||
45 | await curio.sleep(_duration) # add duration delay before process a new one |
||
46 | if _receive_value: |
||
47 | await an_observer.on_next(_lastest_value) |
||
48 | _receive_value = False |
||
49 | |||
50 | except curio.TaskCancelled: |
||
51 | # it's time to finish |
||
52 | pass |
||
53 | |||
54 | async def _on_next(item: Any): |
||
55 | nonlocal _lastest_value, _receive_value |
||
56 | _lastest_value = item |
||
57 | _receive_value = True |
||
58 | |||
59 | async def _cancel_consumer(): |
||
60 | nonlocal _consumer_task |
||
61 | if _consumer_task: |
||
62 | await _consumer_task.cancel() |
||
63 | _consumer_task = None |
||
64 | |||
65 | async def _on_completed(): |
||
66 | nonlocal _consumer_task |
||
67 | await _cancel_consumer() |
||
68 | await an_observer.on_completed() |
||
69 | |||
70 | async def _on_error(err: Any): |
||
71 | nonlocal _consumer_task |
||
72 | await _cancel_consumer() |
||
73 | await an_observer.on_error(err=err) |
||
74 | |||
75 | async def _subscribe(): |
||
76 | nonlocal _consumer_task, _subscription |
||
77 | await _cancel_consumer() |
||
78 | if _subscription: |
||
79 | await _subscription() |
||
80 | _subscription = None |
||
81 | |||
82 | _consumer_task = await curio.spawn(consumer()) |
||
83 | |||
84 | _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) |
||
85 | |||
86 | return _subscribe |
||
87 | |||
88 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
89 |