Conditions | 3 |
Total Lines | 38 |
Code Lines | 16 |
Lines | 38 |
Ratio | 100 % |
Changes | 0 |
1 | from collections import deque |
||
10 | View Code Duplication | def rx_distinct(observable: Observable, frame_size: int) -> Observable: |
|
|
|||
11 | """Create an observable which send distinct event inside a windows of size #frame_size. |
||
12 | |||
13 | Args: |
||
14 | observable (Observable): observable source |
||
15 | frame_size (int): windows size |
||
16 | |||
17 | Returns: |
||
18 | (Observable): observable instance |
||
19 | |||
20 | Raise: |
||
21 | (RuntimeError): if frame_size <= 0 |
||
22 | |||
23 | """ |
||
24 | if frame_size <= 0: |
||
25 | raise RuntimeError('framesize must be greather than zero') |
||
26 | |||
27 | async def _subscribe(an_observer: Observer) -> Subscription: |
||
28 | |||
29 | # our frame buffer |
||
30 | _q: Deque = deque(maxlen=frame_size) |
||
31 | |||
32 | async def _on_next(item: Any): |
||
33 | nonlocal _q |
||
34 | |||
35 | if item not in _q: # distinct value |
||
36 | _q.append(item) |
||
37 | await an_observer.on_next(item) |
||
38 | |||
39 | async def _on_completed(): |
||
40 | nonlocal _q |
||
41 | |||
42 | _q.clear() |
||
43 | await an_observer.on_completed() |
||
44 | |||
45 | return await observable.subscribe(an_observer=rx_observer_from(observer=an_observer, on_next=_on_next, on_completed=_on_completed)) |
||
46 | |||
47 | return rx_create(subscribe=_subscribe) |
||
48 |