async_rx.observable.rx_last   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 47
Duplicated Lines 78.72 %

Importance

Changes 0
Metric Value
wmc 3
eloc 22
dl 37
loc 47
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_last() 37 37 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from collections import deque
2
from typing import Any, Deque
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_last"]
8
9
10 View Code Duplication
def rx_last(observable: Observable, count: int = 1) -> Observable:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
11
    """Create an observale which only take #count (or less) last events and complete.
12
13
    Args:
14
        observable (Observable): observable source
15
        count (int): number of event to get (default 1)
16
17
    Returns:
18
        (Observable): observable instance
19
20
    Raise:
21
        (RuntimeError): if count <= 0
22
23
    """
24
    if count <= 0:
25
        raise RuntimeError('count must be greather than zero')
26
27
    async def _subscribe(an_observer: Observer) -> Subscription:
28
        # local buffer of #count
29
        _q: Deque = deque(maxlen=count)
30
31
        async def _on_next(item: Any):
32
            nonlocal _q
33
34
            _q.append(item)
35
36
        async def _on_completed():
37
            nonlocal _q
38
39
            for item in _q:
40
                await an_observer.on_next(item)
41
            _q.clear()
42
            await an_observer.on_completed()
43
44
        return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed))
45
46
    return rx_create(subscribe=_subscribe)
47