async_rx.observable.rx_dict   A
last analyzed

Complexity

Total Complexity 14

Size/Duplication

Total Lines 80
Duplicated Lines 38.75 %

Importance

Changes 0
Metric Value
wmc 14
eloc 46
dl 31
loc 80
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A _RxDict.__delitem__() 0 3 1
A _RxDict.__init__() 0 4 1
A _RxDict.copy() 0 2 1
A _RxDict.__setitem__() 0 3 1
A _RxDict._set_event() 0 3 3
B _RxDict.subscribe() 31 31 5

1 Function

Rating   Name   Duplication   Size   Complexity  
A rx_dict() 0 15 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from collections import UserDict
2
from typing import Dict, Optional, Union
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription
7
8
__all__ = ["rx_dict"]
9
10
11
class _RxDict(UserDict):
12
    def __init__(self, dict: Union[Dict, "_RxDict"]):
13
        self._event = curio.UniversalEvent()
14
        self._subscribers = 0
15
        super().__init__(dict)
16
17 View Code Duplication
    async def subscribe(self, an_observer: Observer) -> Subscription:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
18
19
        if self._subscribers > 0:
20
            raise RuntimeError("Only one subscription is supported")
21
22
        self._subscribers += 1
23
24
        _consumer_task = None
25
26
        async def consumer():
27
            try:
28
                while True:
29
                    await self._event.wait()
30
                    await an_observer.on_next(dict(self.data))
31
                    self._event.clear()
32
            except curio.TaskCancelled:
33
                # it's time to finish
34
                pass
35
36
        async def _subscription():
37
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
38
            if _consumer_task:
39
                await _consumer_task.cancel()
40
                _consumer_task = None
41
            self._subscribers -= 1
42
43
        _consumer_task = await curio.spawn(consumer())
44
45
        await an_observer.on_next(dict(self.data))
46
47
        return _subscription
48
49
    def _set_event(self):
50
        if not self._event.is_set() and self._subscribers:
51
            self._event.set()
52
53
    def __setitem__(self, key, item):
54
        self.data[key] = item
55
        self._set_event()
56
57
    def __delitem__(self, key):
58
        del self.data[key]
59
        self._set_event()
60
61
    def copy(self):
62
        return _RxDict(super().copy())
63
64
65
def rx_dict(initial_value: Optional[Dict] = None) -> Observable:
66
    """Create an observable on dictionnary.
67
68
    The observer receive the current value of dictionnary on subscribe
69
    and when a key change (added, updated or deleted).
70
    This observable implements a UserDict, so you can use it as a classic dictionnary.
71
72
    Args:
73
        initial_value (Optional[Dict]): intial value (default: {})
74
75
    Returns:
76
        (Observable): observable instance
77
78
    """
79
    return _RxDict(dict=initial_value if initial_value else {})
80