async_rx.observable.rx_dict._RxDict.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
from collections import UserDict
2
from typing import Dict, Optional, Union
3
4
import curio
5
6
from ..protocol import Observable, Observer, Subscription
7
8
__all__ = ["rx_dict"]
9
10
11
class _RxDict(UserDict):
12
    def __init__(self, dict: Union[Dict, "_RxDict"]):
13
        self._event = curio.UniversalEvent()
14
        self._subscribers = 0
15
        super().__init__(dict)
16
17 View Code Duplication
    async def subscribe(self, an_observer: Observer) -> Subscription:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
18
19
        if self._subscribers > 0:
20
            raise RuntimeError("Only one subscription is supported")
21
22
        self._subscribers += 1
23
24
        _consumer_task = None
25
26
        async def consumer():
27
            try:
28
                while True:
29
                    await self._event.wait()
30
                    await an_observer.on_next(dict(self.data))
31
                    self._event.clear()
32
            except curio.TaskCancelled:
33
                # it's time to finish
34
                pass
35
36
        async def _subscription():
37
            nonlocal _consumer_task
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _consumer_task does not seem to be defined.
Loading history...
38
            if _consumer_task:
39
                await _consumer_task.cancel()
40
                _consumer_task = None
41
            self._subscribers -= 1
42
43
        _consumer_task = await curio.spawn(consumer())
44
45
        await an_observer.on_next(dict(self.data))
46
47
        return _subscription
48
49
    def _set_event(self):
50
        if not self._event.is_set() and self._subscribers:
51
            self._event.set()
52
53
    def __setitem__(self, key, item):
54
        self.data[key] = item
55
        self._set_event()
56
57
    def __delitem__(self, key):
58
        del self.data[key]
59
        self._set_event()
60
61
    def copy(self):
62
        return _RxDict(super().copy())
63
64
65
def rx_dict(initial_value: Optional[Dict] = None) -> Observable:
66
    """Create an observable on dictionnary.
67
68
    The observer receive the current value of dictionnary on subscribe
69
    and when a key change (added, updated or deleted).
70
    This observable implements a UserDict, so you can use it as a classic dictionnary.
71
72
    Args:
73
        initial_value (Optional[Dict]): intial value (default: {})
74
75
    Returns:
76
        (Observable): observable instance
77
78
    """
79
    return _RxDict(dict=initial_value if initial_value else {})
80