| Total Complexity | 3 |
| Total Lines | 28 |
| Duplicated Lines | 60.71 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
| 1 | from typing import Any |
||
| 2 | |||
| 3 | import pytest |
||
| 4 | |||
| 5 | from async_rx import Observer, rx_take |
||
| 6 | |||
| 7 | from ..model import ObserverCounterCollector |
||
| 8 | from .model import get_observable |
||
| 9 | |||
| 10 | |||
| 11 | def test_rx_take(kernel): |
||
| 12 | |||
| 13 | seeker = ObserverCounterCollector() |
||
| 14 | |||
| 15 | sub = kernel.run(rx_take(observable=get_observable(), count=5).subscribe(an_observer=seeker)) |
||
| 16 | kernel.run(sub()) |
||
| 17 | |||
| 18 | assert seeker.on_next_count == 5 |
||
| 19 | assert seeker.on_completed_count == 1 |
||
| 20 | assert seeker.on_error_count == 0 |
||
| 21 | assert seeker.items[0:2] == [0, 1] |
||
| 22 | |||
| 23 | with pytest.raises(RuntimeError): |
||
| 24 | rx_take(observable=get_observable(), count=0) |
||
| 25 | |||
| 26 | with pytest.raises(RuntimeError): |
||
| 27 | rx_take(observable=get_observable(), count=-1) |
||
| 28 |