Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.multicast.test_publish_behavior   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 35
Duplicated Lines 82.86 %

Importance

Changes 0
Metric Value
wmc 1
eloc 22
dl 29
loc 35
rs 10
c 0
b 0
f 0

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from async_rx import Observer, rx_publish_behavior, rx_range, rx_subject
2
3
from ..model import ObserverCounterCollector
4
5
6
def test_publish_behavior(kernel):
7
8
    seeker_a = ObserverCounterCollector()
9
    seeker_b = ObserverCounterCollector()
10
11
    a_multicast = rx_publish_behavior(an_observable=rx_range(start=0, stop=100))
12
    assert a_multicast
13
14
    # subscribe
15
    sub_a = kernel.run(a_multicast.subscribe(seeker_a))
16
    sub_b = kernel.run(a_multicast.subscribe(seeker_b))
17
18
    kernel.run(a_multicast.connect())
19
20
    # both observer see the same things
21
    assert seeker_a.on_next_count == seeker_b.on_next_count
22
    assert seeker_a.on_error_count == seeker_b.on_error_count
23
    assert seeker_a.on_completed_count == seeker_b.on_completed_count
24
25
    assert seeker_a.on_next_count == 100
26
    assert seeker_a.on_error_count == 0
27
    assert seeker_a.on_completed_count == 1
28
29
    # replay
30
    seeker_c = ObserverCounterCollector()
31
    sub_c = kernel.run(a_multicast.subscribe(seeker_c))
32
    assert seeker_c.on_next_count == 1
33
    assert seeker_c.on_error_count == 0
34
    assert seeker_c.on_completed_count == 1
35