Passed
Pull Request — master (#497)
by
unknown
04:17
created

filters.RepeateMessageFilter.filter()   A

Complexity

Conditions 4

Size

Total Lines 11
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 11
nop 2
dl 0
loc 11
rs 9.85
c 0
b 0
f 0
1
2
import time
3
from logging import LogRecord
4
from threading import Lock
5
from typing import Generic, Self, TypeVar
6
7
T = TypeVar("T")
8
9
10
class _DoublyLinkedList(Generic[T]):
11
    value: T
12
    previous: Self
13
    next: Self
14
15
    def __init__(self, value: T):
16
        self.value = value
17
        self.previous = self
18
        self.next = self
19
20
    def swap_next(self, other: Self):
21
22
        curr_next = self.next
23
        other_curr_next = other.next
24
25
        self.next = other_curr_next
26
        other_curr_next.previous = self
27
        other.next = curr_next
28
        curr_next.previous = other
29
30
    def swap_previous(self, other: Self):
31
        curr_prev = self.previous
32
        other_curr_prev = other.previous
33
34
        self.previous = other_curr_prev
35
        other_curr_prev.next = self
36
        other.previous = curr_prev
37
        curr_prev.next = other
38
39
    def remove(self):
40
        self.swap_next(self.previous)
41
42
    def __iter__(self):
43
        yield self.value
44
        curr = self.next
45
        while curr is not self:
46
            yield curr.value
47
            curr = curr.next
48
49
50
K = TypeVar("K")
51
V = TypeVar("V")
52
53
54
class _LimitedCache(Generic[K, V]):
55
    root: _DoublyLinkedList[tuple[K, V] | None]
56
    cache: dict[K, _DoublyLinkedList[tuple[K, V]]]
57
    max_size: int
58
59
    def __init__(self, max_size=512):
60
        self.root = _DoublyLinkedList(None)
61
        self.cache = {}
62
        self.max_size = max_size
63
64
    def __getitem__(self, key: K) -> V:
65
        entry = self.cache[key]
66
        _, v = entry.value
67
        return v
68
69
    def __contains__(self, key: K) -> bool:
70
        return key in self.cache
71
72
    def __setitem__(self, key: K, value: V):
73
        if key in self.cache:
74
            entry = self.cache[key]
75
            entry.remove()
76
            entry.value = (key, value)
77
        else:
78
            entry = _DoublyLinkedList(
79
                (key, value)
80
            )
81
            self.cache[key] = entry
82
            if len(self.cache) == self.max_size:
83
                self._remove_oldest()
84
85
        self.root.swap_next(entry)
86
87
    def __delitem__(self, key: K):
88
        entry = self.cache[key]
89
        del self.cache[key]
90
        entry.remove()
91
92
    def _remove_oldest(self):
93
        oldest_entry = self.root.previous
94
        k, _ = oldest_entry.value
95
        del self.cache[k]
96
        oldest_entry.remove()
97
98
99
class RepeateMessageFilter:
100
    lockout_time: float
101
    _cache: _LimitedCache
102
    _lock: Lock
103
104
    def __init__(self, lockout_time: float, cache_size: int = 512):
105
        self.lockout_time = lockout_time
106
        self._cache = _LimitedCache(cache_size)
107
        self._lock = Lock()
108
109
    def filter(self, record: LogRecord) -> bool:
110
        key = self._record_key(record)
111
        current_time = time.time()
112
        with self._lock:
113
            if key not in self._cache:
114
                self._cache[key] = current_time
115
                return True
116
            elif current_time - self._cache[key] > self.lockout_time:
117
                self._cache[key] = current_time
118
                return True
119
            return False
120
121
    @staticmethod
122
    def _record_key(record: LogRecord):
123
        return (record.module, record.levelno, record.msg, record.args)
124