Passed
Push — master ( 127cfb...14f54d )
by Vinicius
02:06 queued 12s
created

LoopManager.apublish_loop_state()   A

Complexity

Conditions 1

Size

Total Lines 19
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 15
nop 4
dl 0
loc 19
rs 9.65
c 0
b 0
f 0
ccs 6
cts 6
cp 1
crap 1
1
"""LoopManager."""
2 1
from collections import defaultdict
3 1
from enum import Enum
4 1
from threading import Lock
5
6 1
import requests
7
8 1
from kytos.core import KytosEvent, log
9 1
from kytos.core.helpers import get_time, now
10 1
from napps.kytos.of_lldp import settings as napp_settings
11
12
13 1
class LoopState(str, Enum):
14
    """LoopState Enum."""
15
16 1
    detected = "detected"
17 1
    stopped = "stopped"
18
19
20 1
class LoopManager:
21
    """LoopManager."""
22
23 1
    def __init__(self, controller, settings=napp_settings):
24
        """Constructor of LoopDetection."""
25 1
        self.controller = controller
26 1
        self.loop_lock = Lock()
27 1
        self.loop_counter = defaultdict(dict)
28 1
        self.loop_state = defaultdict(dict)
29
30 1
        self.settings = settings
31 1
        self.ignored_loops = settings.LLDP_IGNORED_LOOPS
32 1
        self.actions = settings.LLDP_LOOP_ACTIONS
33 1
        self.dead_multiplier = int(napp_settings.LLDP_LOOP_DEAD_MULTIPLIER)
34 1
        self.stopped_interval = self.dead_multiplier * settings.POLLING_TIME
35 1
        self.log_every = settings.LOOP_LOG_EVERY
36
37 1
    def is_loop_ignored(self, dpid, port_a, port_b):
38
        """Check if a loop is ignored."""
39 1
        if dpid not in self.ignored_loops:
40 1
            return False
41 1
        if any(
42
            (
43
                [port_a, port_b] in self.ignored_loops[dpid],
44
                [port_b, port_a] in self.ignored_loops[dpid],
45
            )
46
        ):
47 1
            return True
48 1
        return False
49
50 1
    @staticmethod
51 1
    def is_looped(dpid_a, port_a, dpid_b, port_b):
52
        """Check if the given dpids and ports are looped."""
53 1
        if all((dpid_a == dpid_b, port_a <= port_b)):  # only enter one pair
54 1
            return True
55 1
        return False
56
57 1
    async def process_if_looped(
58
        self,
59
        interface_a,
60
        interface_b,
61
    ):
62
        """Process if interface_a and interface_b are looped."""
63 1
        dpid_a = interface_a.switch.dpid
64 1
        dpid_b = interface_b.switch.dpid
65 1
        port_a = interface_a.port_number
66 1
        port_b = interface_b.port_number
67 1
        if all(
68
            (
69
                self.is_looped(dpid_a, port_a, dpid_b, port_b),
70
                not self.is_loop_ignored(dpid_a, port_a, port_b),
71
            )
72
        ):
73 1
            await self.apublish_loop_state(
74
                interface_a, interface_b, LoopState.detected.value
75
            )
76 1
            await self.publish_loop_actions(interface_a, interface_b)
77 1
            return True
78
        return False
79
80 1
    def publish_loop_state(
81
        self,
82
        interface_a,
83
        interface_b,
84
        state,
85
    ):
86
        """Publish loop state event."""
87 1
        dpid = interface_a.switch.dpid
88 1
        port_a = interface_a.port_number
89 1
        port_b = interface_b.port_number
90 1
        event = KytosEvent(
91
            name=f"kytos/of_lldp.loop.{state}",
92
            content={
93
                "interface_id": interface_a.id,
94
                "dpid": dpid,
95
                "port_numbers": [port_a, port_b],
96
            },
97
        )
98 1
        self.controller.buffers.app.put(event)
99
100 1
    async def apublish_loop_state(
101
        self,
102
        interface_a,
103
        interface_b,
104
        state,
105
    ):
106
        """Publish loop state event."""
107 1
        dpid = interface_a.switch.dpid
108 1
        port_a = interface_a.port_number
109 1
        port_b = interface_b.port_number
110 1
        event = KytosEvent(
111
            name=f"kytos/of_lldp.loop.{state}",
112
            content={
113
                "interface_id": interface_a.id,
114
                "dpid": dpid,
115
                "port_numbers": [port_a, port_b],
116
            },
117
        )
118 1
        await self.controller.buffers.app.aput(event)
119
120 1
    async def publish_loop_actions(
121
        self,
122
        interface_a,
123
        interface_b,
124
    ):
125
        """Publish loop action events."""
126 1
        supported_actions = {"log", "disable"}
127 1
        for action in set(self.actions).intersection(supported_actions):
128 1
            event = KytosEvent(
129
                name=f"kytos/of_lldp.loop.action.{action}",
130
                content={
131
                    "interface_a": interface_a,
132
                    "interface_b": interface_b,
133
                },
134
            )
135 1
            await self.controller.buffers.app.aput(event)
136
137 1
    def handle_loop_detected(self, interface_id, dpid, port_pair):
138
        """Handle loop detected."""
139 1
        is_new_loop = False
140 1
        port_pair = tuple(port_pair)
141 1
        with self.loop_lock:
142 1
            if port_pair not in self.loop_state[dpid]:
143 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
144 1
                data = {
145
                    "state": LoopState.detected.value,
146
                    "port_numbers": list(port_pair),
147
                    "updated_at": dt_at,
148
                    "detected_at": dt_at,
149
                }
150 1
                self.loop_state[dpid][port_pair] = data
151 1
                is_new_loop = True
152 1
            if (
153
                self.loop_state[dpid][port_pair]["state"]
154
                != LoopState.detected.value
155
            ):
156 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
157 1
                data = {
158
                    "state": LoopState.detected.value,
159
                    "updated_at": dt_at,
160
                    "detected_at": dt_at,
161
                }
162 1
                self.loop_state[dpid][port_pair].update(data)
163 1
                self.loop_state[dpid][port_pair].pop("stopped_at", None)
164 1
                is_new_loop = True
165
            else:
166 1
                data = {"updated_at": now().strftime("%Y-%m-%dT%H:%M:%S")}
167 1
                self.loop_state[dpid][port_pair].update(data)
168 1
        if is_new_loop:
169 1
            port_numbers = self.loop_state[dpid][port_pair]["port_numbers"]
170 1
            detected_at = self.loop_state[dpid][port_pair]["detected_at"]
171 1
            metadata = {
172
                "looped": {
173
                    "port_numbers": port_numbers,
174
                    "detected_at": detected_at,
175
                }
176
            }
177 1
            response = self.add_interface_metadata(interface_id, metadata)
178 1
            if response.status_code != 201:
179 1
                log.error(
180
                    f"Failed to add metadata {metadata} on interface "
181
                    f"{interface_id}, response: {response.json()}"
182
                )
183
184 1
    def has_loop_stopped(self, dpid, port_pair):
185
        """Check if a loop has stopped by checking within an interval
186
        or based on their operational state."""
187 1
        data = self.loop_state[dpid].get(port_pair)
188 1
        switch = self.controller.get_switch_by_dpid(dpid)
189 1
        if not data or not switch:
190
            return None
191 1
        try:
192 1
            interface_a = switch.interfaces[port_pair[0]]
193 1
            interface_b = switch.interfaces[port_pair[1]]
194
        except KeyError:
195
            return None
196
197 1
        if not interface_a.is_active() or not interface_b.is_active():
198
            return True
199
200 1
        delta_seconds = (now() - get_time(data["updated_at"])).seconds
201 1
        if delta_seconds > self.stopped_interval:
202 1
            return True
203
        return False
204
205 1
    def get_stopped_loops(self):
206
        """Get stopped loops."""
207 1
        stopped_loops = {}
208 1
        for key, state_dict in self.loop_state.items():
209 1
            for port_pair, values in state_dict.items():
210 1
                if values["state"] != LoopState.detected.value:
211
                    continue
212 1
                if self.has_loop_stopped(key, port_pair):
213 1
                    if key not in stopped_loops:
214 1
                        stopped_loops[key] = [port_pair]
215
                    else:
216 1
                        stopped_loops[key].append(port_pair)
217 1
        return stopped_loops
218
219 1
    def add_interface_metadata(self, interface_id, metadata):
220
        """Add interface metadata."""
221 1
        base_url = self.settings.TOPOLOGY_URL
222 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata"
223 1
        return requests.post(endpoint, json=metadata)
224
225 1
    def del_interface_metadata(self, interface_id, key):
226
        """Delete interface metadata."""
227 1
        base_url = self.settings.TOPOLOGY_URL
228 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata/{key}"
229 1
        return requests.delete(endpoint)
230
231 1
    def handle_loop_stopped(self, interface_a, interface_b):
232
        """Handle loop stopped."""
233 1
        dpid = interface_a.switch.dpid
234 1
        port_a = interface_a.port_number
235 1
        port_b = interface_b.port_number
236 1
        port_pair = (port_a, port_b)
237
238 1
        if port_pair not in self.loop_state[dpid]:
239
            return
240 1
        with self.loop_lock:
241 1
            dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
242 1
            data = {
243
                "state": "stopped",
244
                "updated_at": dt_at,
245
                "stopped_at": dt_at,
246
            }
247 1
            self.loop_state[dpid][port_pair].update(data)
248
249 1
        if "log" in self.actions:
250 1
            log.info(
251
                f"LLDP loop stopped on switch: {dpid}, "
252
                f"interfaces: {[interface_a.name, interface_b.name]}, "
253
                f"port_numbers: {[port_a, port_b]}"
254
            )
255 1
        if "disable" in self.actions:
256 1
            base_url = self.settings.TOPOLOGY_URL
257 1
            endpoint = f"{base_url}/interfaces/{interface_a.id}/enable"
258 1
            response = requests.post(endpoint)
259 1
            if response.status_code != 200:
260
                log.error(
261
                    f"Failed to enable interface: {interface_a.id},"
262
                    f" status code: {response.status_code}"
263
                )
264
            else:
265 1
                log.info(
266
                    "LLDP loop detection enabled interface "
267
                    f"{interface_a.id}, looped interfaces: "
268
                    f"{[interface_a.name, interface_b.name]},"
269
                    f"port_numbers: {[port_a, port_b]}"
270
                )
271
272 1
        key = "looped"
273 1
        response = self.del_interface_metadata(interface_a.id, key)
274 1
        if response.status_code >= 400 and response.status_code != 404:
275
            log.error(
276
                f"Failed to delete metadata key {key} on interface: "
277
                f"{interface_a.id}, status code: {response.status_code}",
278
            )
279
280 1
    def handle_log_action(
281
        self,
282
        interface_a,
283
        interface_b,
284
    ):
285
        """Execute loop log action."""
286 1
        dpid = interface_a.switch.dpid
287 1
        port_a = interface_a.port_number
288 1
        port_b = interface_b.port_number
289 1
        port_pair = (port_a, port_b)
290 1
        log_every = self.log_every
291 1
        with self.loop_lock:
292 1
            if port_pair not in self.loop_counter[dpid]:
293 1
                self.loop_counter[dpid][port_pair] = 0
294
            else:
295 1
                self.loop_counter[dpid][port_pair] += 1
296 1
                self.loop_counter[dpid][port_pair] %= log_every
297 1
            count = self.loop_counter[dpid][port_pair]
298 1
            if count != 0:
299 1
                return
300
301 1
        log.warning(
302
            f"LLDP loop detected on switch: {dpid}, "
303
            f"interfaces: {[interface_a.name, interface_b.name]}, "
304
            f"port_numbers: {[port_a, port_b]}"
305
        )
306
307 1
    def handle_disable_action(
308
        self,
309
        interface_a,
310
        interface_b,
311
    ):
312
        """Execute LLDP loop disable action idempotently."""
313 1
        if not interface_a.is_enabled():
314
            return
315
316 1
        port_a = interface_a.port_number
317 1
        port_b = interface_b.port_number
318 1
        intf_id = interface_a.id
319 1
        base_url = self.settings.TOPOLOGY_URL
320 1
        endpoint = f"{base_url}/interfaces/{intf_id}/disable"
321 1
        response = requests.post(endpoint)
322 1
        if response.status_code != 200:
323
            log.error(
324
                f"Failed to disable interface: {intf_id},"
325
                f" status code: {response.status_code}"
326
            )
327
            return
328
329 1
        log.info(
330
            f"LLDP loop detection disabled interface {interface_a.id}, "
331
            f"looped interfaces: {[interface_a.name, interface_b.name]}, "
332
            f"port_numbers: {[port_a, port_b]}"
333
        )
334
335 1
    def handle_switch_metadata_changed(self, switch):
336
        """Handle switch metadata changed."""
337 1
        if "ignored_loops" not in switch.metadata:
338 1
            with self.loop_lock:
339 1
                return self.ignored_loops.pop(switch.dpid, None)
340 1
        return self.try_to_load_ignored_switch(switch)
341
342 1
    def try_to_load_ignored_switch(self, switch):
343
        """Try to load an ignored switch."""
344 1
        if "ignored_loops" not in switch.metadata:
345
            return
346 1
        if not isinstance(switch.metadata["ignored_loops"], list):
347
            return
348
349 1
        dpid = switch.dpid
350 1
        with self.loop_lock:
351 1
            self.ignored_loops[dpid] = []
352 1
            for port_pair in switch.metadata["ignored_loops"]:
353 1
                if isinstance(port_pair, list):
354 1
                    self.ignored_loops[dpid].append(port_pair)
355
356 1
    def handle_topology_loaded(self, topology):
357
        """Handle on topology loaded."""
358 1
        for switch in topology.switches.values():
359
            self.try_to_load_ignored_switch(switch)
360