Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_concat.test_rx_concat()   B

Complexity

Conditions 1

Size

Total Lines 47
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 44
dl 0
loc 47
rs 8.824
c 0
b 0
f 0
cc 1
nop 1
1
import curio
2
import pytest
3
4
from async_rx import Observer, Subscription, rx_concat, rx_create, rx_from, rx_range, rx_repeat_series, rx_throw
5
6
from ..model import ObserverCounterCollector
7
from .model import countdown
8
9
10
def test_rx_concat_concurrent(kernel):
11
12
    seeker = ObserverCounterCollector()
13
14
    async def _build():
15
        return rx_concat(rx_create(subscribe=await countdown(5, 0.1)), rx_create(subscribe=await countdown(5, 0.2)))
16
17
    obs = kernel.run(_build())
18
    sub_a = kernel.run(obs.subscribe(seeker))
19
    kernel.run(sub_a())
20
    assert seeker.on_completed_count == 1
21
    assert seeker.on_error_count == 0
22
    assert seeker.on_next_count == 10
23
    assert seeker.items == [5, 4, 3, 2, 1, 5, 4, 3, 2, 1]
24
25
26
def test_rx_concat(kernel):
27
28
    seeker = ObserverCounterCollector()
29
30
    obs = rx_concat(rx_range(start=1, stop=20), rx_from("i am an iterable"))
31
    sub_a = kernel.run(obs.subscribe(seeker))
32
    kernel.run(sub_a())
33
34
    assert seeker.on_completed_count == 1
35
    assert seeker.on_error_count == 0
36
    assert seeker.on_next_count == (20 - 1 + len("i am an iterable"))
37
    assert seeker.items == [
38
        1,
39
        2,
40
        3,
41
        4,
42
        5,
43
        6,
44
        7,
45
        8,
46
        9,
47
        10,
48
        11,
49
        12,
50
        13,
51
        14,
52
        15,
53
        16,
54
        17,
55
        18,
56
        19,
57
        "i",
58
        " ",
59
        "a",
60
        "m",
61
        " ",
62
        "a",
63
        "n",
64
        " ",
65
        "i",
66
        "t",
67
        "e",
68
        "r",
69
        "a",
70
        "b",
71
        "l",
72
        "e",
73
    ]
74
75
76
def test_rx_concat_with_error(kernel):
77
    async def sub(an_observer: Observer) -> Subscription:
78
        await an_observer.on_error("AA")
79
80
    seeker = ObserverCounterCollector()
81
82
    obs = rx_concat(rx_range(start=1, stop=20), rx_create(subscribe=sub))
83
    sub_a = kernel.run(obs.subscribe(seeker))
84
    kernel.run(sub_a())
85
86
    assert seeker.on_completed_count == 0
87
    assert seeker.on_error_count == 1
88
    assert seeker.on_next_count == 19
89
90
91
def test_rx_concat_with_throw(kernel):
92
93
    seeker = ObserverCounterCollector()
94
95
    obs = rx_concat(rx_range(start=1, stop=20), rx_throw("oups"))
96
    sub_a = kernel.run(obs.subscribe(seeker))
97
    kernel.run(sub_a())
98
99
    assert seeker.on_completed_count == 0
100
    assert seeker.on_error_count == 1
101
    assert seeker.on_next_count == 19
102
103
104
def test_rx_concat_with_series_throw(kernel):
105
106
    seeker = ObserverCounterCollector()
107
108
    obs = rx_concat(rx_repeat_series([(0.1, "A"), (0.5, "B"), (1.0, "C")]), rx_throw("oups"))
109
    sub_a = kernel.run(obs.subscribe(seeker))
110
    kernel.run(curio.sleep(3))
111
    kernel.run(sub_a())
112
113
    assert seeker.on_next_count == 3
114
    assert seeker.on_completed_count == 0
115
    assert seeker.on_error_count == 1
116
117
118
def test_rx_concat_with_no_observable(kernel):
119
120
    with pytest.raises(RuntimeError):
121
        rx_concat()
122