build.managers.loop_manager   D
last analyzed

Complexity

Total Complexity 59

Size/Duplication

Total Lines 361
Duplicated Lines 0 %

Test Coverage

Coverage 87.3%

Importance

Changes 0
Metric Value
eloc 257
dl 0
loc 361
ccs 165
cts 189
cp 0.873
rs 4.08
c 0
b 0
f 0
wmc 59

16 Methods

Rating   Name   Duplication   Size   Complexity  
A LoopManager.apublish_loop_state() 0 19 1
A LoopManager.publish_loop_actions() 0 16 2
A LoopManager.handle_switch_metadata_changed() 0 6 3
B LoopManager.handle_disable_action() 0 33 5
B LoopManager.has_loop_stopped() 0 20 7
A LoopManager.handle_topology_loaded() 0 4 2
C LoopManager.handle_loop_stopped() 0 53 9
A LoopManager.is_loop_ignored() 0 12 3
B LoopManager.get_stopped_loops() 0 13 6
B LoopManager.try_to_load_ignored_switch() 0 13 6
A LoopManager.process_if_looped() 0 23 2
A LoopManager.publish_loop_state() 0 19 1
A LoopManager.handle_log_action() 0 23 4
A LoopManager.__init__() 0 13 1
A LoopManager.is_looped() 0 6 2
B LoopManager.set_loop_detected() 0 42 5

How to fix   Complexity   

Complexity

Complex classes like build.managers.loop_manager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""LoopManager."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from enum import Enum
5
6 1
import httpx
7
8 1
from kytos.core import KytosEvent, log
9 1
from kytos.core.interface import Interface
10 1
from kytos.core.helpers import get_time, now
11 1
from napps.kytos.of_lldp import settings as napp_settings
12
13
14 1
class LoopState(str, Enum):
15
    """LoopState Enum."""
16
17 1
    detected = "detected"
18 1
    stopped = "stopped"
19
20
21 1
class LoopManager:
22
    """LoopManager."""
23
24 1
    def __init__(self, controller, settings=napp_settings):
25
        """Constructor of LoopDetection."""
26 1
        self.controller = controller
27 1
        self.loop_lock = asyncio.Lock()
28 1
        self.loop_counter = defaultdict(dict)
29 1
        self.loop_state = defaultdict(dict)
30
31 1
        self.settings = settings
32 1
        self.ignored_loops = settings.LLDP_IGNORED_LOOPS
33 1
        self.actions = settings.LLDP_LOOP_ACTIONS
34 1
        self.dead_multiplier = int(napp_settings.LLDP_LOOP_DEAD_MULTIPLIER)
35 1
        self.stopped_interval = self.dead_multiplier * settings.POLLING_TIME
36 1
        self.log_every = settings.LOOP_LOG_EVERY
37
38 1
    def is_loop_ignored(self, dpid, port_a, port_b):
39
        """Check if a loop is ignored."""
40 1
        if dpid not in self.ignored_loops:
41 1
            return False
42 1
        if any(
43
            (
44
                [port_a, port_b] in self.ignored_loops[dpid],
45
                [port_b, port_a] in self.ignored_loops[dpid],
46
            )
47
        ):
48 1
            return True
49 1
        return False
50
51 1
    @staticmethod
52 1
    def is_looped(dpid_a, port_a, dpid_b, port_b):
53
        """Check if the given dpids and ports are looped."""
54 1
        if all((dpid_a == dpid_b, port_a <= port_b)):  # only enter one pair
55 1
            return True
56 1
        return False
57
58 1
    async def process_if_looped(
59
        self,
60
        interface_a,
61
        interface_b,
62
    ):
63
        """Process if interface_a and interface_b are looped."""
64 1
        dpid_a = interface_a.switch.dpid
65 1
        dpid_b = interface_b.switch.dpid
66 1
        port_a = interface_a.port_number
67 1
        port_b = interface_b.port_number
68 1
        if all(
69
            (
70
                self.is_looped(dpid_a, port_a, dpid_b, port_b),
71
                not self.is_loop_ignored(dpid_a, port_a, port_b),
72
            )
73
        ):
74 1
            await self.set_loop_detected(interface_a, [port_a, port_b])
75 1
            await self.apublish_loop_state(
76
                interface_a, interface_b, LoopState.detected.value
77
            )
78 1
            await self.publish_loop_actions(interface_a, interface_b)
79 1
            return True
80
        return False
81
82 1
    def publish_loop_state(
83
        self,
84
        interface_a,
85
        interface_b,
86
        state,
87
    ):
88
        """Publish loop state event."""
89
        dpid = interface_a.switch.dpid
90
        port_a = interface_a.port_number
91
        port_b = interface_b.port_number
92
        event = KytosEvent(
93
            name=f"kytos/of_lldp.loop.{state}",
94
            content={
95
                "interface_id": interface_a.id,
96
                "dpid": dpid,
97
                "port_numbers": [port_a, port_b],
98
            },
99
        )
100
        self.controller.buffers.app.put(event)
101
102 1
    async def apublish_loop_state(
103
        self,
104
        interface_a,
105
        interface_b,
106
        state,
107
    ):
108
        """Publish loop state event."""
109 1
        dpid = interface_a.switch.dpid
110 1
        port_a = interface_a.port_number
111 1
        port_b = interface_b.port_number
112 1
        event = KytosEvent(
113
            name=f"kytos/of_lldp.loop.{state}",
114
            content={
115
                "interface_id": interface_a.id,
116
                "dpid": dpid,
117
                "port_numbers": [port_a, port_b],
118
            },
119
        )
120 1
        await self.controller.buffers.app.aput(event)
121
122 1
    async def publish_loop_actions(
123
        self,
124
        interface_a,
125
        interface_b,
126
    ):
127
        """Publish loop action events."""
128 1
        supported_actions = {"log", "disable"}
129 1
        for action in set(self.actions).intersection(supported_actions):
130 1
            event = KytosEvent(
131
                name=f"kytos/of_lldp.loop.action.{action}",
132
                content={
133
                    "interface_a": interface_a,
134
                    "interface_b": interface_b,
135
                },
136
            )
137 1
            await self.controller.buffers.app.aput(event)
138
139 1
    async def set_loop_detected(self, interface_a: Interface, port_pair: list):
140
        """Set loop detected."""
141 1
        is_new_loop = False
142 1
        port_pair, dpid = tuple(port_pair), interface_a.switch.dpid
143 1
        async with self.loop_lock:
144 1
            if port_pair not in self.loop_state[dpid]:
145 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
146 1
                data = {
147
                    "state": LoopState.detected.value,
148
                    "port_numbers": list(port_pair),
149
                    "updated_at": dt_at,
150
                    "detected_at": dt_at,
151
                }
152 1
                self.loop_state[dpid][port_pair] = data
153 1
                is_new_loop = True
154 1
            if (
155
                self.loop_state[dpid][port_pair]["state"]
156
                != LoopState.detected.value
157
            ):
158 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
159 1
                data = {
160
                    "state": LoopState.detected.value,
161
                    "updated_at": dt_at,
162
                    "detected_at": dt_at,
163
                }
164 1
                self.loop_state[dpid][port_pair].update(data)
165 1
                self.loop_state[dpid][port_pair].pop("stopped_at", None)
166 1
                is_new_loop = True
167
            else:
168 1
                data = {"updated_at": now().strftime("%Y-%m-%dT%H:%M:%S")}
169 1
                self.loop_state[dpid][port_pair].update(data)
170
171 1
            if is_new_loop:
172 1
                port_numbers = self.loop_state[dpid][port_pair]["port_numbers"]
173 1
                detected_at = self.loop_state[dpid][port_pair]["detected_at"]
174 1
                metadata = {
175
                    "looped": {
176
                        "port_numbers": port_numbers,
177
                        "detected_at": detected_at,
178
                    }
179
                }
180 1
                interface_a.extend_metadata(metadata)
181
182 1
    def has_loop_stopped(self, dpid, port_pair):
183
        """Check if a loop has stopped by checking within an interval
184
        or based on their operational state."""
185 1
        data = self.loop_state[dpid].get(port_pair)
186 1
        switch = self.controller.get_switch_by_dpid(dpid)
187 1
        if not data or not switch:
188
            return None
189 1
        try:
190 1
            interface_a = switch.interfaces[port_pair[0]]
191 1
            interface_b = switch.interfaces[port_pair[1]]
192
        except KeyError:
193
            return None
194
195 1
        if not interface_a.is_active() or not interface_b.is_active():
196
            return True
197
198 1
        delta_seconds = (now() - get_time(data["updated_at"])).seconds
199 1
        if delta_seconds > self.stopped_interval:
200 1
            return True
201
        return False
202
203 1
    def get_stopped_loops(self):
204
        """Get stopped loops."""
205 1
        stopped_loops = {}
206 1
        for key, state_dict in self.loop_state.copy().items():
207 1
            for port_pair, values in state_dict.items():
208 1
                if values["state"] != LoopState.detected.value:
209
                    continue
210 1
                if self.has_loop_stopped(key, port_pair):
211 1
                    if key not in stopped_loops:
212 1
                        stopped_loops[key] = [port_pair]
213
                    else:
214 1
                        stopped_loops[key].append(port_pair)
215 1
        return stopped_loops
216
217 1
    async def handle_loop_stopped(self, interface_a: Interface,
218
                                  interface_b: Interface):
219
        """Handle loop stopped."""
220 1
        dpid = interface_a.switch.dpid
221 1
        port_a = interface_a.port_number
222 1
        port_b = interface_b.port_number
223 1
        port_pair = (port_a, port_b)
224
225 1
        async with self.loop_lock:
226 1
            if port_pair not in self.loop_state[dpid]:
227
                return
228
229 1
            dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
230 1
            data = {
231
                "state": "stopped",
232
                "updated_at": dt_at,
233
                "stopped_at": dt_at,
234
            }
235 1
            self.loop_state[dpid][port_pair].update(data)
236 1
            key = "looped"
237 1
            if not interface_a.remove_metadata(key):
238
                log.error(
239
                    f"Failed to delete metadata key {key} on interface: "
240
                    f"{interface_a.id}",
241
                )
242
243 1
        if "log" in self.actions:
244 1
            log.info(
245
                f"LLDP loop stopped on switch: {dpid}, "
246
                f"interfaces: {[interface_a.name, interface_b.name]}, "
247
                f"port_numbers: {[port_a, port_b]}"
248
            )
249 1
        if "disable" in self.actions:
250 1
            base_url = self.settings.TOPOLOGY_URL
251 1
            async with httpx.AsyncClient(base_url=base_url) as client:
252 1
                endpoint = f"/interfaces/{interface_a.id}/enable"
253 1
                try:
254 1
                    resp = await client.post(endpoint, timeout=10)
255 1
                    if resp.status_code != 200:
256
                        log.error(
257
                            f"Failed to enable interface: {interface_a.id},"
258
                            f" status code: {resp.status_code}, {resp.text}"
259
                        )
260
                    else:
261 1
                        log.info(
262
                            "LLDP loop detection enabled interface "
263
                            f"{interface_a.id}, looped interfaces: "
264
                            f"{[interface_a.name, interface_b.name]},"
265
                            f"port_numbers: {[port_a, port_b]}"
266
                        )
267
                except httpx.RequestError as exc:
268
                    log.error(
269
                        f"Failed to enable interface: {interface_a.id}, "
270
                        f"error: {exc}"
271
                    )
272
273 1
    async def handle_log_action(
274
        self,
275
        interface_a,
276
        interface_b,
277
    ):
278
        """Execute loop log action."""
279 1
        dpid = interface_a.switch.dpid
280 1
        port_a = interface_a.port_number
281 1
        port_b = interface_b.port_number
282 1
        port_pair = (port_a, port_b)
283 1
        log_every = self.log_every
284 1
        async with self.loop_lock:
285 1
            if port_pair not in self.loop_counter[dpid]:
286 1
                self.loop_counter[dpid][port_pair] = 0
287
            else:
288 1
                self.loop_counter[dpid][port_pair] += 1
289 1
                self.loop_counter[dpid][port_pair] %= log_every
290 1
            count = self.loop_counter[dpid][port_pair]
291 1
            if count != 0:
292 1
                return
293
294 1
        log.warning(
295
            f"LLDP loop detected on switch: {dpid}, "
296
            f"interfaces: {[interface_a.name, interface_b.name]}, "
297
            f"port_numbers: {[port_a, port_b]}"
298
        )
299
300 1
    async def handle_disable_action(
301
        self,
302
        interface_a,
303
        interface_b,
304
    ):
305
        """Execute LLDP loop disable action idempotently."""
306 1
        if not interface_a.is_enabled():
307
            return
308
309 1
        port_a = interface_a.port_number
310 1
        port_b = interface_b.port_number
311 1
        intf_id = interface_a.id
312 1
        base_url = self.settings.TOPOLOGY_URL
313 1
        async with httpx.AsyncClient(base_url=base_url) as client:
314 1
            endpoint = f"/interfaces/{intf_id}/disable"
315 1
            try:
316 1
                resp = await client.post(endpoint, timeout=10)
317 1
                if resp.status_code != 200:
318
                    log.error(
319
                        f"Failed to disable interface: {intf_id},"
320
                        f" status code: {resp.status_code}, {resp.text}"
321
                    )
322
                    return
323
324 1
                log.info(
325
                    "LLDP loop detection disabled interface "
326
                    f"{interface_a.id}, looped interfaces: "
327
                    f"{[interface_a.name, interface_b.name]}, "
328
                    f"port_numbers: {[port_a, port_b]}"
329
                )
330
            except httpx.RequestError as exc:
331
                log.error(
332
                    f"Failed to disable interface: {interface_a.id}, "
333
                    f"error: {exc}"
334
                )
335
336 1
    async def handle_switch_metadata_changed(self, switch):
337
        """Handle switch metadata changed."""
338 1
        if "ignored_loops" not in switch.metadata:
339 1
            async with self.loop_lock:
340 1
                return self.ignored_loops.pop(switch.dpid, None)
341 1
        return await self.try_to_load_ignored_switch(switch)
342
343 1
    async def try_to_load_ignored_switch(self, switch):
344
        """Try to load an ignored switch."""
345 1
        if "ignored_loops" not in switch.metadata:
346
            return
347 1
        if not isinstance(switch.metadata["ignored_loops"], list):
348
            return
349
350 1
        dpid = switch.dpid
351 1
        async with self.loop_lock:
352 1
            self.ignored_loops[dpid] = []
353 1
            for port_pair in switch.metadata["ignored_loops"]:
354 1
                if isinstance(port_pair, list):
355 1
                    self.ignored_loops[dpid].append(port_pair)
356
357 1
    async def handle_topology_loaded(self, topology):
358
        """Handle on topology loaded."""
359 1
        for switch in topology.switches.values():
360
            await self.try_to_load_ignored_switch(switch)
361