Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_zip   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 65
Duplicated Lines 55.38 %

Importance

Changes 0
Metric Value
wmc 4
eloc 44
dl 36
loc 65
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from async_rx import Observer, Subscription, rx_create, rx_from, rx_range, rx_zip
2
from async_rx.protocol import default_subscription
3
4
from ..model import ObserverCounterCollector
5
6
7
def test_rx_zip_equals(kernel):
8
9
    seeker = ObserverCounterCollector()
10
11
    obs = rx_zip(rx_range(start=0, stop=3), rx_from("abc"))
12
    sub_a = kernel.run(obs.subscribe(seeker))
13
    kernel.run(sub_a())
14
15
    assert seeker.on_completed_count == 1
16
    assert seeker.on_error_count == 0
17
    assert seeker.on_next_count == 3
18
    assert seeker.items == [(0, 'a'), (1, 'b'), (2, 'c')]
19
20
21
def test_rx_zip_limit_left(kernel):
22
23
    seeker = ObserverCounterCollector()
24
25
    obs = rx_zip(rx_range(start=0, stop=2), rx_from("abc"))
26
    sub_a = kernel.run(obs.subscribe(seeker))
27
    kernel.run(sub_a())
28
29
    assert seeker.on_completed_count == 1
30
    assert seeker.on_error_count == 0
31
    assert seeker.on_next_count == 2
32
    assert seeker.items == [(0, 'a'), (1, 'b')]
33
34
35
def test_rx_zip_limit_right(kernel):
36
37
    seeker = ObserverCounterCollector()
38
39
    obs = rx_zip(rx_range(start=0, stop=5), rx_from("abc"))
40
    sub_a = kernel.run(obs.subscribe(seeker))
41
    kernel.run(sub_a())
42
43
    assert seeker.on_completed_count == 1
44
    assert seeker.on_error_count == 0
45
    assert seeker.on_next_count == 3
46
    assert seeker.items == [(0, 'a'), (1, 'b'), (2, 'c')]
47
48
49
def test_rx_zip_with_error(kernel):
50
    async def _subscribe(an_observer: Observer) -> Subscription:
51
        await an_observer.on_next("a")
52
        await an_observer.on_error("Oups")
53
        return default_subscription
54
55
    seeker = ObserverCounterCollector()
56
57
    obs = rx_zip(rx_range(start=0, stop=3), rx_create(subscribe=_subscribe))
58
    sub_a = kernel.run(obs.subscribe(seeker))
59
    kernel.run(sub_a())
60
61
    assert seeker.on_completed_count == 0
62
    assert seeker.on_error_count == 1
63
    assert seeker.on_next_count == 1
64
    assert seeker.items == [(0, 'a')]
65