Passed
Push — master ( eaed3c...825740 )
by Vinicius
04:58 queued 16s
created

build.managers.liveness.ILSM.__init__()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 3
dl 0
loc 13
ccs 8
cts 8
cp 1
crap 1
rs 9.85
c 0
b 0
f 0
1
"""LivenessManager."""
2 1
from typing import Optional, Tuple
3 1
from datetime import datetime
4 1
from kytos.core import KytosEvent, log
5 1
from kytos.core.common import EntityStatus
6 1
from napps.kytos.of_lldp import settings as napp_settings
7
8
9 1
class ILSM:
10
11
    """InterfaceLivenessStateMachine.
12
13
    This state machine represents the logical liveness state of the interface.
14
    If an interface is admin disabled or isn't active then a manager that uses
15
    this state machine should call the transition accordingly.
16
    """
17
18 1
    def __init__(self, state="init", min_hellos=1) -> None:
19
        """InterfaceLivenessStateMachine."""
20 1
        self.transitions = {
21
            "init": ["up", "down"],
22
            "up": ["down", "init"],
23
            "down": ["up", "init"],
24
        }
25 1
        self.state = state
26 1
        self.min_hellos = min_hellos
27 1
        assert self.min_hellos >= 1, self.min_hellos
28 1
        assert self.state in self.transitions, self.state
29 1
        self.hello_counter = 0
30 1
        self.last_hello_at: Optional[datetime] = None
31
32 1
    def __repr__(self) -> str:
33
        """Repr."""
34 1
        return f"ILSM({self.state}, {self.last_hello_at})"
35
36 1
    def transition_to(self, to_state: str) -> Optional[str]:
37
        """Try to transition to a state."""
38 1
        if to_state not in self.transitions[self.state]:
39 1
            return None
40 1
        self.state = to_state
41 1
        return self.state
42
43 1
    def reaper_check(self, dead_interval: int) -> Optional[str]:
44
        """Try to transition to down. It must be called every dead_interval."""
45 1
        if (
46
            self.last_hello_at and
47
            (datetime.utcnow() - self.last_hello_at).seconds > dead_interval
48
        ):
49 1
            self.hello_counter = 0
50 1
            return self.transition_to("down")
51 1
        return None
52
53 1
    def consume_hello(self, received_at: datetime) -> Optional[str]:
54
        """Consume hello. It must be called on every received hello."""
55 1
        self.last_hello_at = received_at
56 1
        if self.state != "up":
57 1
            self.hello_counter += 1
58 1
        if self.hello_counter >= self.min_hellos and self.transition_to("up"):
59 1
            self.hello_counter = 0
60 1
            return "up"
61 1
        return None
62
63
64 1
class LSM:
65
66
    """LivenessStateMachine aggregates two resulting ILSM acts like a link."""
67
68 1
    def __init__(self, ilsm_a: ILSM, ilsm_b: ILSM, state="init") -> None:
69
        """LinkLivenessStateMachine."""
70 1
        self.ilsm_a = ilsm_a
71 1
        self.ilsm_b = ilsm_b
72 1
        self.state = state
73 1
        self.transitions = self.ilsm_a.transitions
74
75 1
    def __repr__(self) -> str:
76
        """Repr."""
77 1
        return f"LSM({self.agg_state()}, {self.ilsm_a}, {self.ilsm_b})"
78
79 1
    def agg_state(self) -> str:
80
        """Aggregated state."""
81 1
        if self.ilsm_a.state == "init" or self.ilsm_b.state == "init":
82 1
            return "init"
83 1
        if self.ilsm_a.state == "down" or self.ilsm_b.state == "down":
84 1
            return "down"
85 1
        if self.ilsm_a.state == "up" and self.ilsm_b.state == "up":
86 1
            return "up"
87
        return "init"
88
89 1
    def _transition_to(self, to_state: str) -> Optional[str]:
90
        """Try to transition to a state."""
91 1
        if to_state not in self.transitions[self.state]:
92 1
            return None
93 1
        self.state = to_state
94 1
        return self.state
95
96 1
    def next_state(self) -> Optional[str]:
97
        """Next state."""
98 1
        return self._transition_to(self.agg_state())
99
100
101 1
class LivenessManager:
102
103
    """LivenessManager."""
104
105 1
    def __init__(self, controller, settings=napp_settings) -> None:
106
        """LivenessManager."""
107
108 1
        self.controller = controller
109
110 1
        self.interfaces = {}  # liveness enabled
111 1
        self.liveness = {}  # indexed by the lowest interface id of the pair
112 1
        self.liveness_ids = {}  # interface id to lowest id of the pair
113 1
        self.settings = settings
114
115 1
    @classmethod
116 1
    def link_status_hook_liveness(cls, link) -> Optional[EntityStatus]:
117
        """Link status hook liveness."""
118 1
        if (
119
            link.is_active()
120
            and link.is_enabled()
121
            and "liveness_status" in link.metadata
122
            and link.metadata["liveness_status"] != "up"
123
        ):
124 1
            return EntityStatus.DOWN
125 1
        return None
126
127 1
    @classmethod
128 1
    def link_status_reason_hook_liveness(cls, link) -> frozenset[str]:
129
        """Link status reason hook liveness."""
130 1
        if cls.link_status_hook_liveness(link) == EntityStatus.DOWN:
131 1
            return frozenset({"liveness"})
132 1
        return frozenset()
133
134 1
    def is_enabled(self, *interfaces) -> bool:
135
        """Check if liveness is enabled on an interface."""
136 1
        for interface in interfaces:
137 1
            if interface.id not in self.interfaces:
138 1
                return False
139 1
        return True
140
141 1
    def enable(self, *interfaces) -> None:
142
        """Enable liveness on interface."""
143 1
        for interface in interfaces:
144 1
            self.interfaces[interface.id] = interface
145
146 1
    def disable(self, *interfaces) -> None:
147
        """Disable liveness interface."""
148 1
        for interface in interfaces:
149 1
            self.interfaces.pop(interface.id, None)
150 1
            min_id = self.liveness_ids.pop(interface.id, None)
151 1
            self.liveness.pop(min_id, None)
152
153 1
    async def atry_to_publish_lsm_event(
154
        self, event_suffix: str, interface_a, interface_b
155
    ) -> None:
156
        """Async try to publish a LSM event."""
157 1
        if not event_suffix:
158 1
            return
159 1
        name = f"kytos/of_lldp.liveness.{event_suffix}"
160 1
        content = {"interface_a": interface_a, "interface_b": interface_b}
161 1
        event = KytosEvent(name=name, content=content)
162 1
        await self.controller.buffers.app.aput(event)
163
164 1
    def try_to_publish_lsm_event(
165
        self, event_suffix: str, interface_a, interface_b
166
    ) -> None:
167
        """Try to publish a LSM event."""
168 1
        if not event_suffix:
169 1
            return
170 1
        name = f"kytos/of_lldp.liveness.{event_suffix}"
171 1
        content = {"interface_a": interface_a, "interface_b": interface_b}
172 1
        event = KytosEvent(name=name, content=content)
173 1
        self.controller.buffers.app.put(event)
174
175 1
    async def consume_hello_if_enabled(self, interface_a, interface_b):
176
        """Consume liveness hello if enabled."""
177 1
        if not self.is_enabled(interface_a, interface_b):
178 1
            return
179 1
        await self.consume_hello(interface_a, interface_b, datetime.utcnow())
180
181 1
    def get_interface_status(
182
        self, interface_id
183
    ) -> Tuple[Optional[str], Optional[datetime]]:
184
        """Get interface status."""
185 1
        if interface_id not in self.interfaces:
186 1
            return None, None
187 1
        min_id = self.liveness_ids.get(interface_id)
188 1
        if min_id and min_id in self.liveness:
189 1
            lsm = self.liveness[min_id]["lsm"]
190 1
            if interface_id == min_id:
191 1
                return lsm.ilsm_a.state, lsm.ilsm_a.last_hello_at
192
            else:
193
                return lsm.ilsm_b.state, lsm.ilsm_b.last_hello_at
194 1
        return "init", None
195
196 1
    async def consume_hello(
197
        self, interface_a, interface_b, received_at: datetime
198
    ) -> None:
199
        """Consume liveness hello event."""
200 1
        min_id = min(interface_a.id, interface_b.id)
201 1
        is_interface_a_min_id = min_id == interface_a.id
202 1
        if min_id not in self.liveness:
203 1
            min_hellos = self.settings.LIVENESS_MIN_HELLOS_UP
204 1
            lsm = LSM(
205
                ILSM(state="init", min_hellos=min_hellos),
206
                ILSM(state="init", min_hellos=min_hellos)
207
            )
208 1
            entry = {
209
                "lsm": lsm,
210
            }
211 1
            if is_interface_a_min_id:
212 1
                entry["interface_a"], entry["interface_b"] = interface_a, interface_b
213
            else:
214
                entry["interface_a"], entry["interface_b"] = interface_b, interface_a
215 1
            self.liveness[min_id] = entry
216 1
            self.liveness_ids[interface_a.id] = min_id
217 1
            self.liveness_ids[interface_b.id] = min_id
218
219 1
        entry = self.liveness[min_id]
220 1
        lsm = entry["lsm"]
221 1
        if is_interface_a_min_id:
222 1
            lsm.ilsm_a.consume_hello(received_at)
223 1
            if entry["interface_b"].id != interface_b.id:
224 1
                """
225
                Implies that the topology connection has changed, needs new ref
226
                """
227 1
                min_hellos = self.settings.LIVENESS_MIN_HELLOS_UP
228 1
                entry["interface_b"] = interface_b
229 1
                entry["lsm"].ilsm_b = ILSM(state="init", min_hellos=min_hellos)
230
        else:
231 1
            lsm.ilsm_b.consume_hello(received_at)
232
233 1
        lsm_next_state = lsm.next_state()
234 1
        log.debug(
235
            f"Liveness hello {interface_a.id} <- {interface_b.id}"
236
            f" next state: {lsm_next_state}, lsm: {lsm}"
237
        )
238 1
        await self.atry_to_publish_lsm_event(lsm_next_state, interface_a, interface_b)
239
240 1
    def should_call_reaper(self, interface) -> bool:
241
        """Should call reaper."""
242 1
        if all(
243
            (
244
                interface.switch.is_connected(),
245
                interface.lldp,
246
                self.is_enabled(interface),
247
            )
248
        ):
249 1
            return True
250 1
        return False
251
252 1
    def reaper(self, dead_interval: int):
253
        """Reaper check processable interfaces."""
254 1
        for value in list(self.liveness.values()):
255 1
            lsm, intf_a, intf_b = (
256
                value["lsm"],
257
                value["interface_a"],
258
                value["interface_b"],
259
            )
260 1
            if any(
261
                (
262
                    lsm.state == "down",
263
                    not self.should_call_reaper(intf_a),
264
                    not self.should_call_reaper(intf_b),
265
                )
266
            ):
267
                continue
268
269 1
            lsm.ilsm_a.reaper_check(dead_interval)
270 1
            lsm.ilsm_b.reaper_check(dead_interval)
271 1
            lsm_next_state = lsm.next_state()
272
273 1
            if all(
274
                (
275
                    intf_a.status == EntityStatus.UP,
276
                    intf_b.status == EntityStatus.UP,
277
                )
278
            ):
279 1
                self.try_to_publish_lsm_event(
280
                    lsm_next_state, value["interface_a"], value["interface_b"]
281
                )
282