Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_merge.test_rx_merge()   B

Complexity

Conditions 1

Size

Total Lines 47
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 44
dl 0
loc 47
rs 8.824
c 0
b 0
f 0
cc 1
nop 1
1
import curio
2
import pytest
3
4
from async_rx import Observer, Subscription, rx_create, rx_from, rx_merge, rx_range
5
from async_rx.protocol import default_subscription
6
7
from ..model import ObserverCounterCollector
8
from .model import countdown
9
10
11
def test_rx_merge_default():
12
    with pytest.raises(RuntimeError):
13
        rx_merge()
14
15
16
def test_rx_merge_concurrent(kernel):
17
18
    seeker = ObserverCounterCollector()
19
20
    async def _build():
21
        return rx_merge(rx_create(subscribe=await countdown(10, 0.1)), rx_create(subscribe=await countdown(10, 0.2)))
22
23
    obs = kernel.run(_build())
24
    sub_a = kernel.run(obs.subscribe(seeker))
25
    kernel.run(sub_a())
26
    assert seeker.on_completed_count == 1
27
    assert seeker.on_error_count == 0
28
    assert seeker.on_next_count == 20
29
    assert seeker.items == [10, 10, 9, 8, 9, 7, 6, 8, 5, 4, 7, 3, 2, 6, 1, 5, 4, 3, 2, 1]
30
31
32
def test_rx_merge(kernel):
33
34
    seeker = ObserverCounterCollector()
35
36
    obs = rx_merge(rx_range(start=1, stop=20), rx_from("i am an iterable"))
37
    sub_a = kernel.run(obs.subscribe(seeker))
38
    kernel.run(sub_a())
39
40
    assert seeker.on_completed_count == 1
41
    assert seeker.on_error_count == 0
42
    assert seeker.on_next_count == (20 - 1 + len("i am an iterable"))
43
    assert seeker.items == [
44
        1,
45
        2,
46
        3,
47
        4,
48
        5,
49
        6,
50
        7,
51
        8,
52
        9,
53
        10,
54
        11,
55
        12,
56
        13,
57
        14,
58
        15,
59
        16,
60
        17,
61
        18,
62
        19,
63
        "i",
64
        " ",
65
        "a",
66
        "m",
67
        " ",
68
        "a",
69
        "n",
70
        " ",
71
        "i",
72
        "t",
73
        "e",
74
        "r",
75
        "a",
76
        "b",
77
        "l",
78
        "e",
79
    ]
80
81
82
def test_rx_merge_with_error(kernel):
83
84
    seeker = ObserverCounterCollector()
85
86
    async def _subscribe(an_observer: Observer) -> Subscription:
87
        await curio.sleep(0.2)
88
        await an_observer.on_error(err="Args")
89
        return default_subscription
90
91
    async def _build():
92
        return rx_merge(rx_create(subscribe=await countdown(10, 0.1)), rx_create(subscribe=_subscribe))
93
94
    obs = kernel.run(_build())
95
    sub_a = kernel.run(obs.subscribe(seeker))
96
    kernel.run(sub_a())
97
    assert seeker.on_completed_count == 0
98
    assert seeker.on_error_count == 1
99
    assert seeker.on_next_count <= 3
100
    assert seeker.items[0] == 10
101
    if seeker.on_next_count > 2:
102
        assert seeker.items[1] == 9
103