| @@ 17-47 (lines=31) @@ | ||
| 14 | self._subscribers = 0 |
|
| 15 | super().__init__(dict) |
|
| 16 | ||
| 17 | async def subscribe(self, an_observer: Observer) -> Subscription: |
|
| 18 | ||
| 19 | if self._subscribers > 0: |
|
| 20 | raise RuntimeError("Only one subscription is supported") |
|
| 21 | ||
| 22 | self._subscribers += 1 |
|
| 23 | ||
| 24 | _consumer_task = None |
|
| 25 | ||
| 26 | async def consumer(): |
|
| 27 | try: |
|
| 28 | while True: |
|
| 29 | await self._event.wait() |
|
| 30 | await an_observer.on_next(dict(self.data)) |
|
| 31 | self._event.clear() |
|
| 32 | except curio.TaskCancelled: |
|
| 33 | # it's time to finish |
|
| 34 | pass |
|
| 35 | ||
| 36 | async def _subscription(): |
|
| 37 | nonlocal _consumer_task |
|
| 38 | if _consumer_task: |
|
| 39 | await _consumer_task.cancel() |
|
| 40 | _consumer_task = None |
|
| 41 | self._subscribers -= 1 |
|
| 42 | ||
| 43 | _consumer_task = await curio.spawn(consumer()) |
|
| 44 | ||
| 45 | await an_observer.on_next(dict(self.data)) |
|
| 46 | ||
| 47 | return _subscription |
|
| 48 | ||
| 49 | def _set_event(self): |
|
| 50 | if not self._event.is_set() and self._subscribers: |
|
| @@ 17-46 (lines=30) @@ | ||
| 14 | self._subscribers = 0 |
|
| 15 | super().__init__(initlist=initlist if initlist else []) |
|
| 16 | ||
| 17 | async def subscribe(self, an_observer: Observer) -> Subscription: |
|
| 18 | if self._subscribers > 0: |
|
| 19 | raise RuntimeError("Only one subscription is supported") |
|
| 20 | ||
| 21 | self._subscribers += 1 |
|
| 22 | ||
| 23 | _consumer_task = None |
|
| 24 | ||
| 25 | async def consumer(): |
|
| 26 | try: |
|
| 27 | while True: |
|
| 28 | await self._event.wait() |
|
| 29 | await an_observer.on_next(list(self.data)) |
|
| 30 | self._event.clear() |
|
| 31 | except curio.TaskCancelled: |
|
| 32 | # it's time to finish |
|
| 33 | pass |
|
| 34 | ||
| 35 | async def _subscription(): |
|
| 36 | nonlocal _consumer_task |
|
| 37 | if _consumer_task: |
|
| 38 | await _consumer_task.cancel() |
|
| 39 | _consumer_task = None |
|
| 40 | self._subscribers -= 1 |
|
| 41 | ||
| 42 | _consumer_task = await curio.spawn(consumer()) |
|
| 43 | ||
| 44 | await an_observer.on_next(list(self.data)) |
|
| 45 | ||
| 46 | return _subscription |
|
| 47 | ||
| 48 | def _set_event(self): |
|
| 49 | if not self._event.is_set(): |
|