Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_first   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 33
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 2
eloc 21
dl 0
loc 33
rs 10
c 0
b 0
f 0
1
from typing import Any
2
3
from async_rx import Observer, rx_first, rx_from
4
5
from ..model import ObserverCounterCollector
6
from .model import get_observable
7
8
9
def test_rx_first(kernel):
10
11
    seeker = ObserverCounterCollector()
12
13
    sub = kernel.run(rx_first(observable=get_observable()).subscribe(an_observer=seeker))
14
    kernel.run(sub())
15
16
    assert seeker.on_next_count == 1
17
    assert seeker.on_completed_count == 1
18
    assert seeker.on_error_count == 0
19
    assert seeker.items == [0]
20
21
22
def test_rx_first_with_just_one(kernel):
23
24
    seeker = ObserverCounterCollector()
25
26
    sub = kernel.run(rx_first(observable=rx_from("A")).subscribe(an_observer=seeker))
27
    kernel.run(sub())
28
29
    assert seeker.on_next_count == 1
30
    assert seeker.on_completed_count == 1
31
    assert seeker.on_error_count == 0
32
    assert seeker.items == ["A"]
33