Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_last   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 41
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 4
eloc 26
dl 0
loc 41
rs 10
c 0
b 0
f 0
1
from typing import Any
2
3
import pytest
4
5
from async_rx import Observer, rx_last
6
7
from ..model import ObserverCounterCollector
8
from .model import get_observable
9
10
11
def test_rx_last_with_default(kernel):
12
13
    seeker = ObserverCounterCollector()
14
15
    sub = kernel.run(rx_last(observable=get_observable()).subscribe(an_observer=seeker))
16
    kernel.run(sub())
17
18
    assert seeker.on_next_count == 1
19
    assert seeker.on_completed_count == 1
20
    assert seeker.on_error_count == 0
21
    assert seeker.items == [99]
22
23
24
def test_rx_last(kernel):
25
26
    seeker = ObserverCounterCollector()
27
28
    sub = kernel.run(rx_last(observable=get_observable(), count=5).subscribe(an_observer=seeker))
29
    kernel.run(sub())
30
31
    assert seeker.on_next_count == 5
32
    assert seeker.on_completed_count == 1
33
    assert seeker.on_error_count == 0
34
    assert seeker.items == list(range(95, 100))
35
36
    with pytest.raises(RuntimeError):
37
        rx_last(observable=get_observable(), count=0)
38
39
    with pytest.raises(RuntimeError):
40
        rx_last(observable=get_observable(), count=-1)
41