Total Complexity | 3 |
Total Lines | 47 |
Duplicated Lines | 78.72 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
1 | from collections import deque |
||
2 | from typing import Any, Deque |
||
3 | |||
4 | from ..protocol import Observable, Observer, Subscription, rx_observer_from |
||
5 | from .rx_create import rx_create |
||
6 | |||
7 | __all__ = ["rx_last"] |
||
8 | |||
9 | |||
10 | View Code Duplication | def rx_last(observable: Observable, count: int = 1) -> Observable: |
|
|
|||
11 | """Create an observale which only take #count (or less) last events and complete. |
||
12 | |||
13 | Args: |
||
14 | observable (Observable): observable source |
||
15 | count (int): number of event to get (default 1) |
||
16 | |||
17 | Returns: |
||
18 | (Observable): observable instance |
||
19 | |||
20 | Raise: |
||
21 | (RuntimeError): if count <= 0 |
||
22 | |||
23 | """ |
||
24 | if count <= 0: |
||
25 | raise RuntimeError('count must be greather than zero') |
||
26 | |||
27 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
28 | # local buffer of #count |
||
29 | _q: Deque = deque(maxlen=count) |
||
30 | |||
31 | async def _on_next(item: Any): |
||
32 | nonlocal _q |
||
33 | |||
34 | _q.append(item) |
||
35 | |||
36 | async def _on_completed(): |
||
37 | nonlocal _q |
||
38 | |||
39 | for item in _q: |
||
40 | await an_observer.on_next(item) |
||
41 | _q.clear() |
||
42 | await an_observer.on_completed() |
||
43 | |||
44 | return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed)) |
||
45 | |||
46 | return rx_create(subscribe=_subscribe) |
||
47 |