@@ 17-47 (lines=31) @@ | ||
14 | self._subscribers = 0 |
|
15 | super().__init__(dict) |
|
16 | ||
17 | async def subscribe(self, an_observer: Observer) -> Subscription: |
|
18 | ||
19 | if self._subscribers > 0: |
|
20 | raise RuntimeError("Only one subscription is supported") |
|
21 | ||
22 | self._subscribers += 1 |
|
23 | ||
24 | _consumer_task = None |
|
25 | ||
26 | async def consumer(): |
|
27 | try: |
|
28 | while True: |
|
29 | await self._event.wait() |
|
30 | await an_observer.on_next(dict(self.data)) |
|
31 | self._event.clear() |
|
32 | except curio.TaskCancelled: |
|
33 | # it's time to finish |
|
34 | pass |
|
35 | ||
36 | async def _subscription(): |
|
37 | nonlocal _consumer_task |
|
38 | if _consumer_task: |
|
39 | await _consumer_task.cancel() |
|
40 | _consumer_task = None |
|
41 | self._subscribers -= 1 |
|
42 | ||
43 | _consumer_task = await curio.spawn(consumer()) |
|
44 | ||
45 | await an_observer.on_next(dict(self.data)) |
|
46 | ||
47 | return _subscription |
|
48 | ||
49 | def _set_event(self): |
|
50 | if not self._event.is_set() and self._subscribers: |
@@ 17-46 (lines=30) @@ | ||
14 | self._subscribers = 0 |
|
15 | super().__init__(initlist=initlist if initlist else []) |
|
16 | ||
17 | async def subscribe(self, an_observer: Observer) -> Subscription: |
|
18 | if self._subscribers > 0: |
|
19 | raise RuntimeError("Only one subscription is supported") |
|
20 | ||
21 | self._subscribers += 1 |
|
22 | ||
23 | _consumer_task = None |
|
24 | ||
25 | async def consumer(): |
|
26 | try: |
|
27 | while True: |
|
28 | await self._event.wait() |
|
29 | await an_observer.on_next(list(self.data)) |
|
30 | self._event.clear() |
|
31 | except curio.TaskCancelled: |
|
32 | # it's time to finish |
|
33 | pass |
|
34 | ||
35 | async def _subscription(): |
|
36 | nonlocal _consumer_task |
|
37 | if _consumer_task: |
|
38 | await _consumer_task.cancel() |
|
39 | _consumer_task = None |
|
40 | self._subscribers -= 1 |
|
41 | ||
42 | _consumer_task = await curio.spawn(consumer()) |
|
43 | ||
44 | await an_observer.on_next(list(self.data)) |
|
45 | ||
46 | return _subscription |
|
47 | ||
48 | def _set_event(self): |
|
49 | if not self._event.is_set(): |