Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_repeat_series   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 49
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 6
eloc 31
dl 0
loc 49
rs 10
c 0
b 0
f 0
1
import curio
2
import pytest
3
4
from async_rx import rx_repeat_series
5
6
from ..model import ObserverCounterCollectorWithTime
7
8
9
def test_rx_repeat_series_default():
10
    with pytest.raises(RuntimeError):
11
        rx_repeat_series(None)
12
13
    with pytest.raises(RuntimeError):
14
        rx_repeat_series(True)
15
16
17
def test_rx_repeat_series(kernel):
18
19
    source = rx_repeat_series([(0.1, "A"), (0.5, "B"), (1.0, "C")])
20
21
    seeker = ObserverCounterCollectorWithTime()
22
23
    sub = kernel.run(source.subscribe(seeker))
24
    kernel.run(curio.sleep(3))
25
    kernel.run(sub())
26
27
    assert len(seeker.items) == 3
28
    assert seeker.get_delta() == [0.5, 1.0]
29
30
31
def test_rx_repeat_series_async(kernel):
32
    async def build():
33
        async def generate():  # this is an asyn generator
34
            for t in [(0.1, "A"), (0.5, "B"), (1.0, "C")]:
35
                yield t
36
37
        return rx_repeat_series(generate())
38
39
    source = kernel.run(build())
40
41
    seeker = ObserverCounterCollectorWithTime()
42
43
    sub = kernel.run(source.subscribe(seeker))
44
    kernel.run(curio.sleep(3))
45
    kernel.run(sub())
46
47
    assert len(seeker.items) == 3
48
    assert seeker.get_delta() == [0.5, 1.0]
49