Passed
Push — master ( 127cfb...14f54d )
by Vinicius
02:06 queued 12s
created

build.managers.liveness.ILSM.__repr__()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""LivenessManager."""
2 1
from typing import Optional, Tuple
3 1
from datetime import datetime
4 1
from kytos.core import KytosEvent, log
5 1
from kytos.core.common import EntityStatus
6
7
8 1
class ILSM:
9
10
    """InterfaceLivenessStateMachine.
11
12
    This state machine represents the logical liveness state of the interface.
13
    If an interface is admin disabled or isn't active then a manager that uses
14
    this state machine should call the transition accordingly.
15
    """
16
17 1
    def __init__(self, state="init") -> None:
18
        """InterfaceLivenessStateMachine."""
19 1
        self.transitions = {
20
            "init": ["up", "down"],
21
            "up": ["down", "init"],
22
            "down": ["up", "init"],
23
        }
24 1
        self.state = state
25 1
        self.last_hello_at: Optional[datetime] = None
26
27 1
    def __repr__(self) -> str:
28
        """Repr."""
29 1
        return f"ILSM({self.state}, {self.last_hello_at})"
30
31 1
    def transition_to(self, to_state: str) -> Optional[str]:
32
        """Try to transition to a state."""
33 1
        if to_state not in self.transitions[self.state]:
34 1
            return None
35 1
        self.state = to_state
36 1
        return self.state
37
38 1
    def reaper_check(self, dead_interval: int) -> Optional[str]:
39
        """Try to transition to down. It must be called every dead_interval."""
40 1
        if (
41
            self.last_hello_at and
42
            (datetime.utcnow() - self.last_hello_at).seconds > dead_interval
43
        ):
44 1
            return self.transition_to("down")
45 1
        return None
46
47 1
    def consume_hello(self, received_at: datetime) -> Optional[str]:
48
        """Consume hello. It must be called on every received hello."""
49 1
        self.last_hello_at = received_at
50 1
        if self.transition_to("up"):
51 1
            return "up"
52 1
        return None
53
54
55 1
class LSM:
56
57
    """LivenessStateMachine aggregates two resulting ILSM acts like a link."""
58
59 1
    def __init__(self, ilsm_a: ILSM, ilsm_b: ILSM, state="init") -> None:
60
        """LinkLivenessStateMachine."""
61 1
        self.ilsm_a = ilsm_a
62 1
        self.ilsm_b = ilsm_b
63 1
        self.state = state
64 1
        self.transitions = self.ilsm_a.transitions
65
66 1
    def __repr__(self) -> str:
67
        """Repr."""
68 1
        return f"LSM({self.agg_state()}, {self.ilsm_a}, {self.ilsm_b})"
69
70 1
    def agg_state(self) -> str:
71
        """Aggregated state."""
72 1
        if self.ilsm_a.state == "init" or self.ilsm_b.state == "init":
73 1
            return "init"
74 1
        if self.ilsm_a.state == "down" or self.ilsm_b.state == "down":
75 1
            return "down"
76 1
        if self.ilsm_a.state == "up" and self.ilsm_b.state == "up":
77 1
            return "up"
78
        return "init"
79
80 1
    def _transition_to(self, to_state: str) -> Optional[str]:
81
        """Try to transition to a state."""
82 1
        if to_state not in self.transitions[self.state]:
83 1
            return None
84 1
        self.state = to_state
85 1
        return self.state
86
87 1
    def next_state(self) -> Optional[str]:
88
        """Next state."""
89 1
        return self._transition_to(self.agg_state())
90
91
92 1
class LivenessManager:
93
94
    """LivenessManager."""
95
96 1
    def __init__(self, controller) -> None:
97
        """LivenessManager."""
98
99 1
        self.controller = controller
100
101 1
        self.interfaces = {}  # liveness enabled
102 1
        self.liveness = {}  # indexed by the lowest interface id of the pair
103 1
        self.liveness_ids = {}  # interface id to lowest id of the pair
104
105 1
    @classmethod
106 1
    def link_status_hook_liveness(cls, link) -> Optional[EntityStatus]:
107
        """Link status hook liveness."""
108 1
        if (
109
            link.is_active()
110
            and link.is_enabled()
111
            and "liveness_status" in link.metadata
112
            and link.metadata["liveness_status"] != "up"
113
        ):
114 1
            return EntityStatus.DOWN
115 1
        return None
116
117 1
    def is_enabled(self, *interfaces) -> bool:
118
        """Check if liveness is enabled on an interface."""
119 1
        for interface in interfaces:
120 1
            if interface.id not in self.interfaces:
121 1
                return False
122 1
        return True
123
124 1
    def enable(self, *interfaces) -> None:
125
        """Enable liveness on interface."""
126 1
        for interface in interfaces:
127 1
            self.interfaces[interface.id] = interface
128
129 1
    def disable(self, *interfaces) -> None:
130
        """Disable liveness interface."""
131 1
        for interface in interfaces:
132 1
            self.interfaces.pop(interface.id, None)
133 1
            min_id = self.liveness_ids.pop(interface.id, None)
134 1
            self.liveness.pop(min_id, None)
135
136 1
    async def atry_to_publish_lsm_event(
137
        self, event_suffix: str, interface_a, interface_b
138
    ) -> None:
139
        """Async try to publish a LSM event."""
140 1
        if not event_suffix:
141 1
            return
142 1
        name = f"kytos/of_lldp.liveness.{event_suffix}"
143 1
        content = {"interface_a": interface_a, "interface_b": interface_b}
144 1
        event = KytosEvent(name=name, content=content)
145 1
        await self.controller.buffers.app.aput(event)
146
147 1
    def try_to_publish_lsm_event(
148
        self, event_suffix: str, interface_a, interface_b
149
    ) -> None:
150
        """Try to publish a LSM event."""
151 1
        if not event_suffix:
152 1
            return
153 1
        name = f"kytos/of_lldp.liveness.{event_suffix}"
154 1
        content = {"interface_a": interface_a, "interface_b": interface_b}
155 1
        event = KytosEvent(name=name, content=content)
156 1
        self.controller.buffers.app.put(event)
157
158 1
    async def consume_hello_if_enabled(self, interface_a, interface_b):
159
        """Consume liveness hello if enabled."""
160 1
        if not self.is_enabled(interface_a, interface_b):
161 1
            return
162 1
        await self.consume_hello(interface_a, interface_b, datetime.utcnow())
163
164 1
    def get_interface_status(
165
        self, interface_id
166
    ) -> Tuple[Optional[str], Optional[datetime]]:
167
        """Get interface status."""
168 1
        if interface_id not in self.interfaces:
169 1
            return None, None
170 1
        min_id = self.liveness_ids.get(interface_id)
171 1
        if min_id and min_id in self.liveness:
172 1
            lsm = self.liveness[min_id]["lsm"]
173 1
            if interface_id == min_id:
174 1
                return lsm.ilsm_a.state, lsm.ilsm_a.last_hello_at
175
            else:
176
                return lsm.ilsm_b.state, lsm.ilsm_b.last_hello_at
177 1
        return "init", None
178
179 1
    async def consume_hello(
180
        self, interface_a, interface_b, received_at: datetime
181
    ) -> None:
182
        """Consume liveness hello event."""
183 1
        min_id = min(interface_a.id, interface_b.id)
184 1
        is_interface_a_min_id = min_id == interface_a.id
185 1
        if min_id not in self.liveness:
186 1
            lsm = LSM(ILSM(state="init"), ILSM(state="init"))
187 1
            entry = {
188
                "lsm": lsm,
189
            }
190 1
            if is_interface_a_min_id:
191 1
                entry["interface_a"], entry["interface_b"] = interface_a, interface_b
192
            else:
193
                entry["interface_a"], entry["interface_b"] = interface_b, interface_a
194 1
            self.liveness[min_id] = entry
195 1
            self.liveness_ids[interface_a.id] = min_id
196 1
            self.liveness_ids[interface_b.id] = min_id
197
198 1
        entry = self.liveness[min_id]
199 1
        lsm = entry["lsm"]
200 1
        if is_interface_a_min_id:
201 1
            lsm.ilsm_a.consume_hello(received_at)
202 1
            if entry["interface_b"].id != interface_b.id:
203
                """
204
                Implies that the topology connection has changed, needs new ref
205
                """
206 1
                entry["interface_b"] = interface_b
207 1
                entry["lsm"].ilsm_b = ILSM(state="init")
208
        else:
209 1
            lsm.ilsm_b.consume_hello(received_at)
210
211 1
        lsm_next_state = lsm.next_state()
212 1
        log.debug(
213
            f"Liveness hello {interface_a.id} <- {interface_b.id}"
214
            f" next state: {lsm_next_state}, lsm: {lsm}"
215
        )
216 1
        await self.atry_to_publish_lsm_event(lsm_next_state, interface_a, interface_b)
217
218 1
    def should_call_reaper(self, interface) -> bool:
219
        """Should call reaper."""
220 1
        if all(
221
            (
222
                interface.switch.is_connected(),
223
                interface.lldp,
224
                self.is_enabled(interface),
225
            )
226
        ):
227 1
            return True
228 1
        return False
229
230 1
    def reaper(self, dead_interval: int):
231
        """Reaper check processable interfaces."""
232 1
        for value in list(self.liveness.values()):
233 1
            lsm, intf_a, intf_b = (
234
                value["lsm"],
235
                value["interface_a"],
236
                value["interface_b"],
237
            )
238 1
            if any(
239
                (
240
                    lsm.state == "down",
241
                    not self.should_call_reaper(intf_a),
242
                    not self.should_call_reaper(intf_b),
243
                )
244
            ):
245
                continue
246
247 1
            lsm.ilsm_a.reaper_check(dead_interval)
248 1
            lsm.ilsm_b.reaper_check(dead_interval)
249 1
            lsm_next_state = lsm.next_state()
250
251 1
            if all(
252
                (
253
                    intf_a.status == EntityStatus.UP,
254
                    intf_b.status == EntityStatus.UP,
255
                )
256
            ):
257 1
                self.try_to_publish_lsm_event(
258
                    lsm_next_state, value["interface_a"], value["interface_b"]
259
                )
260