Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_amb   A

Complexity

Total Complexity 5

Size/Duplication

Total Lines 69
Duplicated Lines 43.48 %

Importance

Changes 0
Metric Value
wmc 5
eloc 46
dl 30
loc 69
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import curio
2
import pytest
3
4
from async_rx import Observer, Subscription, rx_amb, rx_create, rx_from, rx_range
5
from async_rx.protocol import default_subscription
6
7
from ..model import ObserverCounterCollector
8
from .model import countdown
9
10
11
def test_rx_amb_init():
12
    with pytest.raises(RuntimeError):
13
        rx_amb()
14
15
16
def test_rx_amb(kernel):
17
18
    seeker = ObserverCounterCollector()
19
20
    a = rx_create(subscribe=kernel.run(countdown(5, 0.1)))
21
    b = rx_create(subscribe=kernel.run(countdown(10, 0.2)))
22
23
    obs = rx_amb(a, b)
24
    sub_a = kernel.run(obs.subscribe(seeker))
25
    kernel.run(sub_a())
26
27
    assert seeker.on_completed_count == 1
28
    assert seeker.on_error_count == 0
29
    assert seeker.on_next_count == 1
30
    assert seeker.items == [a]
31
32
33
def test_rx_amb2(kernel):
34
35
    seeker = ObserverCounterCollector()
36
37
    a = rx_create(subscribe=kernel.run(countdown(5, 0.1)))
38
    b = rx_create(subscribe=kernel.run(countdown(10, 0.05)))
39
40
    obs = rx_amb(a, b)
41
    sub_a = kernel.run(obs.subscribe(seeker))
42
    kernel.run(sub_a())
43
44
    assert seeker.on_completed_count == 1
45
    assert seeker.on_error_count == 0
46
    assert seeker.on_next_count == 1
47
    assert seeker.items == [b]
48
49
50
def test_rx_amb_with_error(kernel):
51
52
    seeker = ObserverCounterCollector()
53
54
    async def _subscribe(an_observer: Observer) -> Subscription:
55
        await curio.sleep(0.05)
56
        await an_observer.on_error(err="Args")
57
        return default_subscription
58
59
    a = rx_create(subscribe=kernel.run(countdown(5, 0.1)))
60
61
    obs = rx_amb(a, rx_create(subscribe=_subscribe))
62
    sub_a = kernel.run(obs.subscribe(seeker))
63
    kernel.run(sub_a())
64
65
    assert seeker.on_completed_count == 1
66
    assert seeker.on_error_count == 0
67
    assert seeker.on_next_count == 1
68
    assert seeker.items == [a]
69