Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_debounce   A

Complexity

Total Complexity 6

Size/Duplication

Total Lines 76
Duplicated Lines 23.68 %

Importance

Changes 0
Metric Value
wmc 6
eloc 40
dl 18
loc 76
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from datetime import timedelta
2
3
import curio
4
import pytest
5
6
from async_rx import rx_concat, rx_debounce, rx_from, rx_repeat_series, rx_throw
7
8
from ..model import ObserverCounterCollectorWithTime
9
10
11
def test_rx_debounce_default():
12
    with pytest.raises(RuntimeError):
13
        rx_debounce(None, None)
14
    with pytest.raises(RuntimeError):
15
        rx_debounce(rx_from([1, 2]), None)
16
    with pytest.raises(RuntimeError):
17
        rx_debounce(None, timedelta(seconds=1))
18
19
20
def test_rx_debounce(kernel):
21
22
    source = rx_debounce(
23
        rx_repeat_series([(0.1, "A0"), (0.1, "A1"), (0.1, "A2"), (0.1, "A3"), (0.5, "B"), (1.0, "C"), (3.0, "D")]), duration=timedelta(seconds=0.5)
24
    )
25
    # A0:     -> 0.5
26
    # A1: 0.1 -> 0.6
27
    # A2: 0.2 -> 0.7
28
    # A3: 0.3 -> 0.8
29
    #     0.5 ? old(A3) = 0.2, wake up 1.0
30
    # B:  0.8 -> 1.3
31
    #     1.0 ? old(B) = 0.2, wake up 1.5
32
    #     1.5 ? old(B) = 0.7 -> send, wake ip 2.0
33
    #     2.0 ? none, wake up 2.5
34
    # C:  1.8 -> 2.3
35
    #     2.5 -> old(C) = 0.2, wake up 3.0
36
    #     3.0 -> olf(C) = 0.7, send, wakup 3.5
37
    #     3.5 -> none, wake up 4.0
38
    #     4.0 -> none, wake up 4.5
39
    #     4.5 -> none, wake up 5.0
40
    # D:  4.8 -> 5.3
41
    #     5.0 -> none, wake up 5.5
42
    #     5.3 -> complete
43
    seeker = ObserverCounterCollectorWithTime()
44
45
    sub = kernel.run(source.subscribe(seeker))
46
    kernel.run(curio.sleep(6))
47
    kernel.run(sub())
48
    assert seeker.on_next_count == 2
49
    assert seeker.on_error_count == 0
50
    assert seeker.on_completed_count == 1
51
52
    assert len(seeker.items) == 2
53
    assert seeker.items[0][1] == "B"
54
    assert seeker.items[1][1] == "C"
55
    assert seeker.get_delta() == [1.0]
56
57
58
def test_rx_debounce_with_error(kernel):
59
60
    source = rx_debounce(
61
        rx_concat(rx_repeat_series([(0.1, "A0"), (0.1, "A1"), (0.1, "A2"), (0.1, "A3"), (0.5, "B"), (1.0, "C"), (3.0, "D")]), rx_throw("oups")),
62
        duration=timedelta(seconds=0.5),
63
    )
64
65
    seeker = ObserverCounterCollectorWithTime()
66
67
    sub = kernel.run(source.subscribe(seeker))
68
    kernel.run(curio.sleep(6))
69
    kernel.run(sub())
70
71
    assert len(seeker.items) == 2
72
    assert seeker.get_delta() == [1.0]
73
    assert seeker.on_next_count == 2
74
    assert seeker.on_error_count == 1
75
    assert seeker.on_completed_count == 0
76