Passed
Push — master ( ad8a25...eaed3c )
by Vinicius
04:58 queued 02:09
created

build.managers.liveness   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 274
Duplicated Lines 0 %

Test Coverage

Coverage 97.33%

Importance

Changes 0
Metric Value
eloc 183
dl 0
loc 274
ccs 146
cts 150
cp 0.9733
rs 4.5599
c 0
b 0
f 0
wmc 58

22 Methods

Rating   Name   Duplication   Size   Complexity  
A ILSM.__init__() 0 13 1
A LSM.__init__() 0 6 1
A LivenessManager.disable() 0 6 2
A LivenessManager.__init__() 0 9 1
A ILSM.__repr__() 0 3 1
A LivenessManager.atry_to_publish_lsm_event() 0 10 2
A LivenessManager.try_to_publish_lsm_event() 0 10 2
A LivenessManager.reaper() 0 29 4
A LivenessManager.should_call_reaper() 0 11 2
A LivenessManager.link_status_hook_liveness() 0 11 5
A ILSM.transition_to() 0 6 2
A LivenessManager.is_enabled() 0 6 3
A LivenessManager.get_interface_status() 0 14 5
B LivenessManager.consume_hello() 0 43 5
A ILSM.consume_hello() 0 9 4
A LSM._transition_to() 0 6 2
A LSM.next_state() 0 3 1
A LivenessManager.enable() 0 4 2
A ILSM.reaper_check() 0 9 3
A LivenessManager.consume_hello_if_enabled() 0 5 2
B LSM.agg_state() 0 9 7
A LSM.__repr__() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like build.managers.liveness often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""LivenessManager."""
2 1
from typing import Optional, Tuple
3 1
from datetime import datetime
4 1
from kytos.core import KytosEvent, log
5 1
from kytos.core.common import EntityStatus
6 1
from napps.kytos.of_lldp import settings as napp_settings
7
8
9 1
class ILSM:
10
11
    """InterfaceLivenessStateMachine.
12
13
    This state machine represents the logical liveness state of the interface.
14
    If an interface is admin disabled or isn't active then a manager that uses
15
    this state machine should call the transition accordingly.
16
    """
17
18 1
    def __init__(self, state="init", min_hellos=1) -> None:
19
        """InterfaceLivenessStateMachine."""
20 1
        self.transitions = {
21
            "init": ["up", "down"],
22
            "up": ["down", "init"],
23
            "down": ["up", "init"],
24
        }
25 1
        self.state = state
26 1
        self.min_hellos = min_hellos
27 1
        assert self.min_hellos >= 1, self.min_hellos
28 1
        assert self.state in self.transitions, self.state
29 1
        self.hello_counter = 0
30 1
        self.last_hello_at: Optional[datetime] = None
31
32 1
    def __repr__(self) -> str:
33
        """Repr."""
34 1
        return f"ILSM({self.state}, {self.last_hello_at})"
35
36 1
    def transition_to(self, to_state: str) -> Optional[str]:
37
        """Try to transition to a state."""
38 1
        if to_state not in self.transitions[self.state]:
39 1
            return None
40 1
        self.state = to_state
41 1
        return self.state
42
43 1
    def reaper_check(self, dead_interval: int) -> Optional[str]:
44
        """Try to transition to down. It must be called every dead_interval."""
45 1
        if (
46
            self.last_hello_at and
47
            (datetime.utcnow() - self.last_hello_at).seconds > dead_interval
48
        ):
49 1
            self.hello_counter = 0
50 1
            return self.transition_to("down")
51 1
        return None
52
53 1
    def consume_hello(self, received_at: datetime) -> Optional[str]:
54
        """Consume hello. It must be called on every received hello."""
55 1
        self.last_hello_at = received_at
56 1
        if self.state != "up":
57 1
            self.hello_counter += 1
58 1
        if self.hello_counter >= self.min_hellos and self.transition_to("up"):
59 1
            self.hello_counter = 0
60 1
            return "up"
61 1
        return None
62
63
64 1
class LSM:
65
66
    """LivenessStateMachine aggregates two resulting ILSM acts like a link."""
67
68 1
    def __init__(self, ilsm_a: ILSM, ilsm_b: ILSM, state="init") -> None:
69
        """LinkLivenessStateMachine."""
70 1
        self.ilsm_a = ilsm_a
71 1
        self.ilsm_b = ilsm_b
72 1
        self.state = state
73 1
        self.transitions = self.ilsm_a.transitions
74
75 1
    def __repr__(self) -> str:
76
        """Repr."""
77 1
        return f"LSM({self.agg_state()}, {self.ilsm_a}, {self.ilsm_b})"
78
79 1
    def agg_state(self) -> str:
80
        """Aggregated state."""
81 1
        if self.ilsm_a.state == "init" or self.ilsm_b.state == "init":
82 1
            return "init"
83 1
        if self.ilsm_a.state == "down" or self.ilsm_b.state == "down":
84 1
            return "down"
85 1
        if self.ilsm_a.state == "up" and self.ilsm_b.state == "up":
86 1
            return "up"
87
        return "init"
88
89 1
    def _transition_to(self, to_state: str) -> Optional[str]:
90
        """Try to transition to a state."""
91 1
        if to_state not in self.transitions[self.state]:
92 1
            return None
93 1
        self.state = to_state
94 1
        return self.state
95
96 1
    def next_state(self) -> Optional[str]:
97
        """Next state."""
98 1
        return self._transition_to(self.agg_state())
99
100
101 1
class LivenessManager:
102
103
    """LivenessManager."""
104
105 1
    def __init__(self, controller, settings=napp_settings) -> None:
106
        """LivenessManager."""
107
108 1
        self.controller = controller
109
110 1
        self.interfaces = {}  # liveness enabled
111 1
        self.liveness = {}  # indexed by the lowest interface id of the pair
112 1
        self.liveness_ids = {}  # interface id to lowest id of the pair
113 1
        self.settings = settings
114
115 1
    @classmethod
116 1
    def link_status_hook_liveness(cls, link) -> Optional[EntityStatus]:
117
        """Link status hook liveness."""
118 1
        if (
119
            link.is_active()
120
            and link.is_enabled()
121
            and "liveness_status" in link.metadata
122
            and link.metadata["liveness_status"] != "up"
123
        ):
124 1
            return EntityStatus.DOWN
125 1
        return None
126
127 1
    def is_enabled(self, *interfaces) -> bool:
128
        """Check if liveness is enabled on an interface."""
129 1
        for interface in interfaces:
130 1
            if interface.id not in self.interfaces:
131 1
                return False
132 1
        return True
133
134 1
    def enable(self, *interfaces) -> None:
135
        """Enable liveness on interface."""
136 1
        for interface in interfaces:
137 1
            self.interfaces[interface.id] = interface
138
139 1
    def disable(self, *interfaces) -> None:
140
        """Disable liveness interface."""
141 1
        for interface in interfaces:
142 1
            self.interfaces.pop(interface.id, None)
143 1
            min_id = self.liveness_ids.pop(interface.id, None)
144 1
            self.liveness.pop(min_id, None)
145
146 1
    async def atry_to_publish_lsm_event(
147
        self, event_suffix: str, interface_a, interface_b
148
    ) -> None:
149
        """Async try to publish a LSM event."""
150 1
        if not event_suffix:
151 1
            return
152 1
        name = f"kytos/of_lldp.liveness.{event_suffix}"
153 1
        content = {"interface_a": interface_a, "interface_b": interface_b}
154 1
        event = KytosEvent(name=name, content=content)
155 1
        await self.controller.buffers.app.aput(event)
156
157 1
    def try_to_publish_lsm_event(
158
        self, event_suffix: str, interface_a, interface_b
159
    ) -> None:
160
        """Try to publish a LSM event."""
161 1
        if not event_suffix:
162 1
            return
163 1
        name = f"kytos/of_lldp.liveness.{event_suffix}"
164 1
        content = {"interface_a": interface_a, "interface_b": interface_b}
165 1
        event = KytosEvent(name=name, content=content)
166 1
        self.controller.buffers.app.put(event)
167
168 1
    async def consume_hello_if_enabled(self, interface_a, interface_b):
169
        """Consume liveness hello if enabled."""
170 1
        if not self.is_enabled(interface_a, interface_b):
171 1
            return
172 1
        await self.consume_hello(interface_a, interface_b, datetime.utcnow())
173
174 1
    def get_interface_status(
175
        self, interface_id
176
    ) -> Tuple[Optional[str], Optional[datetime]]:
177
        """Get interface status."""
178 1
        if interface_id not in self.interfaces:
179 1
            return None, None
180 1
        min_id = self.liveness_ids.get(interface_id)
181 1
        if min_id and min_id in self.liveness:
182 1
            lsm = self.liveness[min_id]["lsm"]
183 1
            if interface_id == min_id:
184 1
                return lsm.ilsm_a.state, lsm.ilsm_a.last_hello_at
185
            else:
186
                return lsm.ilsm_b.state, lsm.ilsm_b.last_hello_at
187 1
        return "init", None
188
189 1
    async def consume_hello(
190
        self, interface_a, interface_b, received_at: datetime
191
    ) -> None:
192
        """Consume liveness hello event."""
193 1
        min_id = min(interface_a.id, interface_b.id)
194 1
        is_interface_a_min_id = min_id == interface_a.id
195 1
        if min_id not in self.liveness:
196 1
            min_hellos = self.settings.LIVENESS_MIN_HELLOS_UP
197 1
            lsm = LSM(
198
                ILSM(state="init", min_hellos=min_hellos),
199
                ILSM(state="init", min_hellos=min_hellos)
200
            )
201 1
            entry = {
202
                "lsm": lsm,
203
            }
204 1
            if is_interface_a_min_id:
205 1
                entry["interface_a"], entry["interface_b"] = interface_a, interface_b
206
            else:
207
                entry["interface_a"], entry["interface_b"] = interface_b, interface_a
208 1
            self.liveness[min_id] = entry
209 1
            self.liveness_ids[interface_a.id] = min_id
210 1
            self.liveness_ids[interface_b.id] = min_id
211
212 1
        entry = self.liveness[min_id]
213 1
        lsm = entry["lsm"]
214 1
        if is_interface_a_min_id:
215 1
            lsm.ilsm_a.consume_hello(received_at)
216 1
            if entry["interface_b"].id != interface_b.id:
217 1
                """
218
                Implies that the topology connection has changed, needs new ref
219
                """
220 1
                min_hellos = self.settings.LIVENESS_MIN_HELLOS_UP
221 1
                entry["interface_b"] = interface_b
222 1
                entry["lsm"].ilsm_b = ILSM(state="init", min_hellos=min_hellos)
223
        else:
224 1
            lsm.ilsm_b.consume_hello(received_at)
225
226 1
        lsm_next_state = lsm.next_state()
227 1
        log.debug(
228
            f"Liveness hello {interface_a.id} <- {interface_b.id}"
229
            f" next state: {lsm_next_state}, lsm: {lsm}"
230
        )
231 1
        await self.atry_to_publish_lsm_event(lsm_next_state, interface_a, interface_b)
232
233 1
    def should_call_reaper(self, interface) -> bool:
234
        """Should call reaper."""
235 1
        if all(
236
            (
237
                interface.switch.is_connected(),
238
                interface.lldp,
239
                self.is_enabled(interface),
240
            )
241
        ):
242 1
            return True
243 1
        return False
244
245 1
    def reaper(self, dead_interval: int):
246
        """Reaper check processable interfaces."""
247 1
        for value in list(self.liveness.values()):
248 1
            lsm, intf_a, intf_b = (
249
                value["lsm"],
250
                value["interface_a"],
251
                value["interface_b"],
252
            )
253 1
            if any(
254
                (
255
                    lsm.state == "down",
256
                    not self.should_call_reaper(intf_a),
257
                    not self.should_call_reaper(intf_b),
258
                )
259
            ):
260
                continue
261
262 1
            lsm.ilsm_a.reaper_check(dead_interval)
263 1
            lsm.ilsm_b.reaper_check(dead_interval)
264 1
            lsm_next_state = lsm.next_state()
265
266 1
            if all(
267
                (
268
                    intf_a.status == EntityStatus.UP,
269
                    intf_b.status == EntityStatus.UP,
270
                )
271
            ):
272 1
                self.try_to_publish_lsm_event(
273
                    lsm_next_state, value["interface_a"], value["interface_b"]
274
                )
275