Passed
Push — master ( 5304dc...5eaab6 )
by Guibert
02:41
created

tests.model   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 87
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 18
eloc 56
dl 0
loc 87
rs 10
c 0
b 0
f 0
1
from datetime import datetime
2
from typing import Any, NoReturn
3
4
5
class ObserverCounter:
6
    def __init__(self):
7
        self.on_next_count = 0
8
        self.on_completed_count = 0
9
        self.on_error_count = 0
10
11
    async def on_next(self, item: Any) -> None:
12
        """Process item."""
13
        self.on_next_count += 1
14
15
    async def on_completed(self) -> None:
16
        """Signal completion of this observable."""
17
        self.on_completed_count += 1
18
19
    async def on_error(self, err: Any) -> NoReturn:
20
        self.on_error_count += 1
21
        raise RuntimeError(err)
22
23
24
class ObserverCounterSilentError:
25
    def __init__(self):
26
        self.on_next_count = 0
27
        self.on_completed_count = 0
28
        self.on_error_count = 0
29
30
    async def on_next(self, item: Any) -> None:
31
        """Process item."""
32
        self.on_next_count += 1
33
34
    async def on_completed(self) -> None:
35
        """Signal completion of this observable."""
36
        self.on_completed_count += 1
37
38
    async def on_error(self, err: Any) -> None:
39
        self.on_error_count += 1
40
41
42
class ObserverCounterCollector:
43
    def __init__(self):
44
        self.on_next_count = 0
45
        self.on_completed_count = 0
46
        self.on_error_count = 0
47
        self.items: Any = list([])
48
49
    async def on_next(self, item: Any) -> None:
50
        """Process item."""
51
        self.items.append(item)
52
        self.on_next_count += 1
53
54
    async def on_completed(self) -> None:
55
        """Signal completion of this observable."""
56
        self.on_completed_count += 1
57
58
    async def on_error(self, err: Any) -> None:
59
        self.on_error_count += 1
60
61
62
class ObserverCounterCollectorWithTime:
63
    """Store itema as tuple (utc, value)."""
64
65
    def __init__(self):
66
        self.on_next_count = 0
67
        self.on_completed_count = 0
68
        self.on_error_count = 0
69
        self.items: Any = list([])
70
71
    async def on_next(self, item: Any) -> None:
72
        """Process item."""
73
        self.items.append((datetime.utcnow(), item))
74
        self.on_next_count += 1
75
76
    async def on_completed(self) -> None:
77
        """Signal completion of this observable."""
78
        self.on_completed_count += 1
79
80
    async def on_error(self, err: Any) -> None:
81
        self.on_error_count += 1
82
83
    def get_delta(self):
84
        if len(self.items) <= 1:
85
            return []
86
        return [round((self.items[i][0] - self.items[i - 1][0]).total_seconds(), ndigits=1) for i in range(1, len(self.items))]
87