async_rx.observable.rx_distinct   A
last analyzed

Complexity

Total Complexity 3

Size/Duplication

Total Lines 48
Duplicated Lines 79.17 %

Importance

Changes 0
Metric Value
wmc 3
eloc 22
dl 38
loc 48
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_distinct() 38 38 3

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from collections import deque
2
from typing import Any, Deque
3
4
from ..protocol import Observable, Observer, Subscription, rx_observer_from
5
from .rx_create import rx_create
6
7
__all__ = ["rx_distinct"]
8
9
10 View Code Duplication
def rx_distinct(observable: Observable, frame_size: int) -> Observable:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
11
    """Create an observable which send distinct event inside a windows of size #frame_size.
12
13
    Args:
14
        observable (Observable): observable source
15
        frame_size (int): windows size
16
17
    Returns:
18
        (Observable): observable instance
19
20
    Raise:
21
        (RuntimeError): if frame_size <= 0
22
23
    """
24
    if frame_size <= 0:
25
        raise RuntimeError('framesize must be greather than zero')
26
27
    async def _subscribe(an_observer: Observer) -> Subscription:
28
29
        # our frame buffer
30
        _q: Deque = deque(maxlen=frame_size)
31
32
        async def _on_next(item: Any):
33
            nonlocal _q
34
35
            if item not in _q:  # distinct value
36
                _q.append(item)
37
                await an_observer.on_next(item)
38
39
        async def _on_completed():
40
            nonlocal _q
41
42
            _q.clear()
43
            await an_observer.on_completed()
44
45
        return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed))
46
47
    return rx_create(subscribe=_subscribe)
48