Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.observables.test_rx_take   A

Complexity

Total Complexity 3

Size/Duplication

Total Lines 28
Duplicated Lines 60.71 %

Importance

Changes 0
Metric Value
wmc 3
eloc 18
dl 17
loc 28
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from typing import Any
2
3
import pytest
4
5
from async_rx import Observer, rx_take
6
7
from ..model import ObserverCounterCollector
8
from .model import get_observable
9
10
11
def test_rx_take(kernel):
12
13
    seeker = ObserverCounterCollector()
14
15
    sub = kernel.run(rx_take(observable=get_observable(), count=5).subscribe(an_observer=seeker))
16
    kernel.run(sub())
17
18
    assert seeker.on_next_count == 5
19
    assert seeker.on_completed_count == 1
20
    assert seeker.on_error_count == 0
21
    assert seeker.items[0:2] == [0, 1]
22
23
    with pytest.raises(RuntimeError):
24
        rx_take(observable=get_observable(), count=0)
25
26
    with pytest.raises(RuntimeError):
27
        rx_take(observable=get_observable(), count=-1)
28