Conditions | 12 |
Total Lines | 85 |
Code Lines | 50 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like async_rx.observable.rx_delay.rx_delay() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | from datetime import timedelta |
||
12 | def rx_delay(observable: Observable, duration: timedelta, buffer_size: Optional[int] = None, ignore_events_if_full: Optional[bool] = True) -> Observable: |
||
13 | """Delay operator. |
||
14 | |||
15 | Delay will project the sequence unmodified, but shifted into the future with a specified |
||
16 | delay. |
||
17 | |||
18 | Underlaying implementation use a queue and a dedicated consumer. |
||
19 | |||
20 | Args: |
||
21 | observable (Observable): an observable instance |
||
22 | duration (timedelta): timedelta of delay (the duration). |
||
23 | buffer_size (Optional[int]): optional buffer size, if not specified size is unlimited |
||
24 | (ignore_events_if_full has no meaning, but not your memory...) |
||
25 | ignore_events_if_full (Optional[bool]): When true, if internal buffer (here a queue) is full, |
||
26 | events will be ignored until older will be consumed. |
||
27 | Otherwise, producer will be locked until older will be consumed. |
||
28 | |||
29 | Returns: |
||
30 | (Observable): observable instance |
||
31 | |||
32 | Raise: |
||
33 | (RuntimeError): if no observable or duration are provided or buffer_size <= 0 |
||
34 | |||
35 | """ |
||
36 | if not observable or not duration: |
||
37 | raise RuntimeError("observable and duration are mandatory") |
||
38 | if buffer_size and buffer_size <= 0: |
||
39 | raise RuntimeError("buffer_size must be greather than zero or None") |
||
40 | |||
41 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
42 | _queue = curio.Queue(buffer_size) if buffer_size else curio.Queue() |
||
43 | _consumer_task = None |
||
44 | _subscription: Optional[Subscription] = None |
||
45 | _duration = duration.total_seconds() |
||
46 | |||
47 | async def consumer(): |
||
48 | nonlocal _queue, _duration |
||
49 | try: |
||
50 | while True: |
||
51 | item = await _queue.get() # retreaive an item (lock until one) |
||
52 | await curio.sleep(_duration) # add duration delay before send |
||
53 | await an_observer.on_next(item) |
||
54 | await _queue.task_done() # notify that job is done |
||
55 | except curio.TaskCancelled: |
||
56 | # it's time to finish |
||
57 | pass |
||
58 | |||
59 | async def _cancel_consumer(): |
||
60 | nonlocal _consumer_task |
||
|
|||
61 | if _consumer_task: |
||
62 | await _consumer_task.cancel() |
||
63 | _consumer_task = None |
||
64 | |||
65 | async def _on_next(item: Any): |
||
66 | nonlocal _queue |
||
67 | if ignore_events_if_full and _queue.full(): |
||
68 | return |
||
69 | await _queue.put(item) |
||
70 | |||
71 | async def _on_completed(): |
||
72 | nonlocal _queue, _consumer_task |
||
73 | await _queue.join() # wait complete processing |
||
74 | await _cancel_consumer() |
||
75 | await an_observer.on_completed() |
||
76 | |||
77 | async def _on_error(err: Any): |
||
78 | nonlocal _consumer_task |
||
79 | await curio.sleep(_duration) # add duration delay on error |
||
80 | await _cancel_consumer() |
||
81 | await an_observer.on_error(err=err) |
||
82 | |||
83 | async def _subscribe(): |
||
84 | nonlocal _consumer_task, _subscription |
||
85 | await _cancel_consumer() |
||
86 | if _subscription: |
||
87 | await _subscription() |
||
88 | _subscription = None |
||
89 | |||
90 | _consumer_task = await curio.spawn(consumer()) |
||
91 | |||
92 | _subscription = await observable.subscribe(rx_observer(on_next=_on_next, on_error=_on_error, on_completed=_on_completed)) |
||
93 | |||
94 | return _subscribe |
||
95 | |||
96 | return rx_create(subscribe=_subscribe, max_observer=1) |
||
97 |