Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_repeat   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 46
Duplicated Lines 52.17 %

Importance

Changes 0
Metric Value
wmc 8
eloc 33
dl 24
loc 46
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from datetime import timedelta
2
3
import curio
4
import pytest
5
6
from async_rx import rx_concat, rx_from, rx_repeat, rx_repeat_series, rx_throw
7
8
from ..model import ObserverCounterCollectorWithTime
9
10
11
def test_rx_repeat_default():
12
    with pytest.raises(RuntimeError):
13
        rx_repeat(None, None)
14
    with pytest.raises(RuntimeError):
15
        rx_repeat(rx_from([1, 2]), None)
16
    with pytest.raises(RuntimeError):
17
        rx_repeat(None, lambda a: not a)
18
19
20
def test_rx_repeat(kernel):
21
22
    seeker = ObserverCounterCollectorWithTime()
23
    sub = kernel.run(rx_repeat(duration=timedelta(seconds=0.2), producer=lambda: True).subscribe(seeker))
24
    kernel.run(curio.sleep(1.1))
25
    kernel.run(sub())
26
27
    assert seeker.on_completed_count == 1
28
    assert seeker.on_next_count == 5
29
    assert seeker.on_error_count == 0
30
    assert seeker.get_delta() == [0.2, 0.2, 0.2, 0.2]
31
32
33
def test_rx_repeat_async(kernel):
34
    async def _producer():
35
        return True
36
37
    seeker = ObserverCounterCollectorWithTime()
38
    sub = kernel.run(rx_repeat(duration=timedelta(seconds=0.2), producer=_producer).subscribe(seeker))
39
    kernel.run(curio.sleep(1.1))
40
    kernel.run(sub())
41
42
    assert seeker.on_completed_count == 1
43
    assert seeker.on_next_count == 5
44
    assert seeker.on_error_count == 0
45
    assert seeker.get_delta() == [0.2, 0.2, 0.2, 0.2]
46