Code Duplication    Length = 30-31 lines in 2 locations

async_rx/observable/rx_dict.py 1 location

@@ 17-47 (lines=31) @@
14
        self._subscribers = 0
15
        super().__init__(dict)
16
17
    async def subscribe(self, an_observer: Observer) -> Subscription:
18
19
        if self._subscribers > 0:
20
            raise RuntimeError("Only one subscription is supported")
21
22
        self._subscribers += 1
23
24
        _consumer_task = None
25
26
        async def consumer():
27
            try:
28
                while True:
29
                    await self._event.wait()
30
                    await an_observer.on_next(dict(self.data))
31
                    self._event.clear()
32
            except curio.TaskCancelled:
33
                # it's time to finish
34
                pass
35
36
        async def _subscription():
37
            nonlocal _consumer_task
38
            if _consumer_task:
39
                await _consumer_task.cancel()
40
                _consumer_task = None
41
            self._subscribers -= 1
42
43
        _consumer_task = await curio.spawn(consumer())
44
45
        await an_observer.on_next(dict(self.data))
46
47
        return _subscription
48
49
    def _set_event(self):
50
        if not self._event.is_set() and self._subscribers:

async_rx/observable/rx_list.py 1 location

@@ 17-46 (lines=30) @@
14
        self._subscribers = 0
15
        super().__init__(initlist=initlist if initlist else [])
16
17
    async def subscribe(self, an_observer: Observer) -> Subscription:
18
        if self._subscribers > 0:
19
            raise RuntimeError("Only one subscription is supported")
20
21
        self._subscribers += 1
22
23
        _consumer_task = None
24
25
        async def consumer():
26
            try:
27
                while True:
28
                    await self._event.wait()
29
                    await an_observer.on_next(list(self.data))
30
                    self._event.clear()
31
            except curio.TaskCancelled:
32
                # it's time to finish
33
                pass
34
35
        async def _subscription():
36
            nonlocal _consumer_task
37
            if _consumer_task:
38
                await _consumer_task.cancel()
39
                _consumer_task = None
40
            self._subscribers -= 1
41
42
        _consumer_task = await curio.spawn(consumer())
43
44
        await an_observer.on_next(list(self.data))
45
46
        return _subscription
47
48
    def _set_event(self):
49
        if not self._event.is_set():