async_rx.observable.rx_last.rx_last()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 37
Code Lines 16

Duplication

Lines 37
Ratio 100 %

Importance

Changes 0
Metric Value
eloc 16
dl 37
loc 37
rs 9.6
c 0
b 0
f 0
cc 3
nop 2
1
from collections import deque
2
from typing import Any, Deque
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_last"]
8
9
10 View Code Duplication
def rx_last(observable: Observable, count: int = 1) -> Observable:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
11
    """Create an observale which only take #count (or less) last events and complete.
12
13
    Args:
14
        observable (Observable): observable source
15
        count (int): number of event to get (default 1)
16
17
    Returns:
18
        (Observable): observable instance
19
20
    Raise:
21
        (RuntimeError): if count <= 0
22
23
    """
24
    if count <= 0:
25
        raise RuntimeError('count must be greather than zero')
26
27
    async def _subscribe(an_observer: Observer) -> Subscription:
28
        # local buffer of #count
29
        _q: Deque = deque(maxlen=count)
30
31
        async def _on_next(item: Any):
32
            nonlocal _q
33
34
            _q.append(item)
35
36
        async def _on_completed():
37
            nonlocal _q
38
39
            for item in _q:
40
                await an_observer.on_next(item)
41
            _q.clear()
42
            await an_observer.on_completed()
43
44
        return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed))
45
46
    return rx_create(subscribe=_subscribe)
47