async_rx.observable.rx_list   A
last analyzed

Complexity

Total Complexity 25

Size/Duplication

Total Lines 131
Duplicated Lines 22.9 %

Importance

Changes 0
Metric Value
wmc 25
eloc 86
dl 30
loc 131
rs 10
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A _RxList.copy() 0 2 1
A _RxList.append() 0 3 1
B _RxList.subscribe() 30 30 5
A _RxList.sort() 0 3 1
A _RxList.remove() 0 3 1
A _RxList.__iadd__() 0 4 1
A _RxList._set_event() 0 3 2
A _RxList.__imul__() 0 4 1
A _RxList.__add__() 0 4 1
A _RxList.clear() 0 3 1
A _RxList.pop() 0 3 1
A _RxList.extend() 0 3 1
A _RxList.__init__() 0 4 2
A _RxList.reverse() 0 3 1
A _RxList.__delitem__() 0 3 1
A _RxList.__mul__() 0 4 1
A _RxList.__setitem__() 0 3 1
A _RxList.insert() 0 3 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_list() 0 15 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from collections import UserList
2
from typing import List, Optional, Union
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription
7
8
__all__ = ["rx_list"]
9
10
11
class _RxList(UserList):
12
    def __init__(self, initlist: Optional[Union[List, "_RxList"]] = None):
13
        self._event = curio.UniversalEvent()
14
        self._subscribers = 0
15
        super().__init__(initlist=initlist if initlist else [])
16
17 View Code Duplication
    async def subscribe(self, an_observer: Observer) -> Subscription:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
18
        if self._subscribers > 0:
19
            raise RuntimeError("Only one subscription is supported")
20
21
        self._subscribers += 1
22
23
        _consumer_task = None
24
25
        async def consumer():
26
            try:
27
                while True:
28
                    await self._event.wait()
29
                    await an_observer.on_next(list(self.data))
30
                    self._event.clear()
31
            except curio.TaskCancelled:
32
                # it's time to finish
33
                pass
34
35
        async def _subscription():
36
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
37
            if _consumer_task:
38
                await _consumer_task.cancel()
39
                _consumer_task = None
40
            self._subscribers -= 1
41
42
        _consumer_task = await curio.spawn(consumer())
43
44
        await an_observer.on_next(list(self.data))
45
46
        return _subscription
47
48
    def _set_event(self):
49
        if not self._event.is_set():
50
            self._event.set()
51
52
    def __setitem__(self, i, item):
53
        super().__setitem__(i, item)
54
        self._set_event()
55
56
    def __delitem__(self, i):
57
        super().__delitem__(i)
58
        self._set_event()
59
60
    def __add__(self, other):
61
        result = super().__add__(other)
62
        self._set_event()
63
        return _RxList(result)
64
65
    def __iadd__(self, other):
66
        super().__iadd__(other)
67
        self._set_event()
68
        return self
69
70
    def __mul__(self, n):
71
        result = super().__mul__(n)
72
        self._set_event()
73
        return _RxList(result)
74
75
    def __imul__(self, n):
76
        super().__imul__(n)
77
        self._set_event()
78
        return self
79
80
    def append(self, item):
81
        super().append(item)
82
        self._set_event()
83
84
    def insert(self, i, item):
85
        super().insert(i, item)
86
        self._set_event()
87
88
    def pop(self, i=-1):
89
        super().pop(i)
90
        self._set_event()
91
92
    def remove(self, item):
93
        super().remove(item)
94
        self._set_event()
95
96
    def clear(self):
97
        super().clear()
98
        self._set_event()
99
100
    def copy(self):
101
        return _RxList(super().copy())
102
103
    def reverse(self):
104
        super().reverse()
105
        self._set_event()
106
107
    def sort(self, *args, **kwds):
108
        super().sort(*args, **kwds)
109
        self._set_event()
110
111
    def extend(self, other):
112
        super().extend(other)
113
        self._set_event()
114
115
116
def rx_list(initial_value: Optional[List] = None) -> Observable:
117
    """Create an observable on list.
118
119
    The observer receive the current value of list on subscribe
120
    and when an item change (added, updated or deleted, ...).
121
    This observable implements a UserList, so you can use it as a classic list.
122
123
    Args:
124
        initial_value (Optional[List]): intial value (default: [])
125
126
    Returns:
127
        (Observable): observable instance
128
129
    """
130
    return _RxList(initlist=initial_value)
131