Passed
Push — master ( de6073...127cfb )
by Vinicius
04:59 queued 03:03
created

LoopManager.publish_loop_actions()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 3
dl 0
loc 16
ccs 5
cts 5
cp 1
crap 2
rs 9.8
c 0
b 0
f 0
1
"""LoopManager."""
2 1
from collections import defaultdict
3 1
from enum import Enum
4 1
from threading import Lock
5
6 1
import requests
7
8 1
from kytos.core import KytosEvent, log
9 1
from kytos.core.helpers import get_time, now
10 1
from napps.kytos.of_lldp import settings as napp_settings
11
12
13 1
class LoopState(str, Enum):
14
    """LoopState Enum."""
15
16 1
    detected = "detected"
17 1
    stopped = "stopped"
18
19
20 1
class LoopManager:
21
    """LoopManager."""
22
23 1
    def __init__(self, controller, settings=napp_settings):
24
        """Constructor of LoopDetection."""
25 1
        self.controller = controller
26 1
        self.loop_lock = Lock()
27 1
        self.loop_counter = defaultdict(dict)
28 1
        self.loop_state = defaultdict(dict)
29
30 1
        self.settings = settings
31 1
        self.ignored_loops = settings.LLDP_IGNORED_LOOPS
32 1
        self.actions = settings.LLDP_LOOP_ACTIONS
33 1
        self.dead_multiplier = int(napp_settings.LLDP_LOOP_DEAD_MULTIPLIER)
34 1
        self.stopped_interval = self.dead_multiplier * settings.POLLING_TIME
35 1
        self.log_every = settings.LOOP_LOG_EVERY
36
37 1
    def is_loop_ignored(self, dpid, port_a, port_b):
38
        """Check if a loop is ignored."""
39 1
        if dpid not in self.ignored_loops:
40 1
            return False
41 1
        if any(
42
            (
43
                [port_a, port_b] in self.ignored_loops[dpid],
44
                [port_b, port_a] in self.ignored_loops[dpid],
45
            )
46
        ):
47 1
            return True
48 1
        return False
49
50 1
    @staticmethod
51 1
    def is_looped(dpid_a, port_a, dpid_b, port_b):
52
        """Check if the given dpids and ports are looped."""
53 1
        if all((dpid_a == dpid_b, port_a <= port_b)):  # only enter one pair
54 1
            return True
55 1
        return False
56
57 1
    def process_if_looped(
58
        self,
59
        interface_a,
60
        interface_b,
61
    ):
62
        """Process if interface_a and interface_b are looped."""
63 1
        dpid_a = interface_a.switch.dpid
64 1
        dpid_b = interface_b.switch.dpid
65 1
        port_a = interface_a.port_number
66 1
        port_b = interface_b.port_number
67 1
        if all(
68
            (
69
                self.is_looped(dpid_a, port_a, dpid_b, port_b),
70
                not self.is_loop_ignored(dpid_a, port_a, port_b),
71
            )
72
        ):
73 1
            self.publish_loop_state(
74
                interface_a, interface_b, LoopState.detected.value
75
            )
76 1
            self.publish_loop_actions(interface_a, interface_b)
77 1
            return True
78
        return False
79
80 1
    def publish_loop_state(
81
        self,
82
        interface_a,
83
        interface_b,
84
        state,
85
    ):
86
        """Publish loop state event."""
87 1
        dpid = interface_a.switch.dpid
88 1
        port_a = interface_a.port_number
89 1
        port_b = interface_b.port_number
90 1
        event = KytosEvent(
91
            name=f"kytos/of_lldp.loop.{state}",
92
            content={
93
                "interface_id": interface_a.id,
94
                "dpid": dpid,
95
                "port_numbers": [port_a, port_b],
96
            },
97
        )
98 1
        self.controller.buffers.app.put(event)
99
100 1
    def publish_loop_actions(
101
        self,
102
        interface_a,
103
        interface_b,
104
    ):
105
        """Publish loop action events."""
106 1
        supported_actions = {"log", "disable"}
107 1
        for action in set(self.actions).intersection(supported_actions):
108 1
            event = KytosEvent(
109
                name=f"kytos/of_lldp.loop.action.{action}",
110
                content={
111
                    "interface_a": interface_a,
112
                    "interface_b": interface_b,
113
                },
114
            )
115 1
            self.controller.buffers.app.put(event)
116
117 1
    def handle_loop_detected(self, interface_id, dpid, port_pair):
118
        """Handle loop detected."""
119 1
        is_new_loop = False
120 1
        port_pair = tuple(port_pair)
121 1
        with self.loop_lock:
122 1
            if port_pair not in self.loop_state[dpid]:
123 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
124 1
                data = {
125
                    "state": LoopState.detected.value,
126
                    "port_numbers": list(port_pair),
127
                    "updated_at": dt_at,
128
                    "detected_at": dt_at,
129
                }
130 1
                self.loop_state[dpid][port_pair] = data
131 1
                is_new_loop = True
132 1
            if (
133
                self.loop_state[dpid][port_pair]["state"]
134
                != LoopState.detected.value
135
            ):
136 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
137 1
                data = {
138
                    "state": LoopState.detected.value,
139
                    "updated_at": dt_at,
140
                    "detected_at": dt_at,
141
                }
142 1
                self.loop_state[dpid][port_pair].update(data)
143 1
                self.loop_state[dpid][port_pair].pop("stopped_at", None)
144 1
                is_new_loop = True
145
            else:
146 1
                data = {"updated_at": now().strftime("%Y-%m-%dT%H:%M:%S")}
147 1
                self.loop_state[dpid][port_pair].update(data)
148 1
        if is_new_loop:
149 1
            port_numbers = self.loop_state[dpid][port_pair]["port_numbers"]
150 1
            detected_at = self.loop_state[dpid][port_pair]["detected_at"]
151 1
            metadata = {
152
                "looped": {
153
                    "port_numbers": port_numbers,
154
                    "detected_at": detected_at,
155
                }
156
            }
157 1
            response = self.add_interface_metadata(interface_id, metadata)
158 1
            if response.status_code != 201:
159 1
                log.error(
160
                    f"Failed to add metadata {metadata} on interface "
161
                    f"{interface_id}, response: {response.json()}"
162
                )
163
164 1
    def has_loop_stopped(self, dpid, port_pair):
165
        """Check if a loop has stopped by checking within an interval
166
        or based on their operational state."""
167 1
        data = self.loop_state[dpid].get(port_pair)
168 1
        switch = self.controller.get_switch_by_dpid(dpid)
169 1
        if not data or not switch:
170
            return None
171 1
        try:
172 1
            interface_a = switch.interfaces[port_pair[0]]
173 1
            interface_b = switch.interfaces[port_pair[1]]
174
        except KeyError:
175
            return None
176
177 1
        if not interface_a.is_active() or not interface_b.is_active():
178
            return True
179
180 1
        delta_seconds = (now() - get_time(data["updated_at"])).seconds
181 1
        if delta_seconds > self.stopped_interval:
182 1
            return True
183
        return False
184
185 1
    def get_stopped_loops(self):
186
        """Get stopped loops."""
187 1
        stopped_loops = {}
188 1
        for key, state_dict in self.loop_state.items():
189 1
            for port_pair, values in state_dict.items():
190 1
                if values["state"] != LoopState.detected.value:
191
                    continue
192 1
                if self.has_loop_stopped(key, port_pair):
193 1
                    if key not in stopped_loops:
194 1
                        stopped_loops[key] = [port_pair]
195
                    else:
196 1
                        stopped_loops[key].append(port_pair)
197 1
        return stopped_loops
198
199 1
    def add_interface_metadata(self, interface_id, metadata):
200
        """Add interface metadata."""
201 1
        base_url = self.settings.TOPOLOGY_URL
202 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata"
203 1
        return requests.post(endpoint, json=metadata)
204
205 1
    def del_interface_metadata(self, interface_id, key):
206
        """Delete interface metadata."""
207 1
        base_url = self.settings.TOPOLOGY_URL
208 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata/{key}"
209 1
        return requests.delete(endpoint)
210
211 1
    def handle_loop_stopped(self, interface_a, interface_b):
212
        """Handle loop stopped."""
213 1
        dpid = interface_a.switch.dpid
214 1
        port_a = interface_a.port_number
215 1
        port_b = interface_b.port_number
216 1
        port_pair = (port_a, port_b)
217
218 1
        if port_pair not in self.loop_state[dpid]:
219
            return
220 1
        with self.loop_lock:
221 1
            dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
222 1
            data = {
223
                "state": "stopped",
224
                "updated_at": dt_at,
225
                "stopped_at": dt_at,
226
            }
227 1
            self.loop_state[dpid][port_pair].update(data)
228
229 1
        if "log" in self.actions:
230 1
            log.info(
231
                f"LLDP loop stopped on switch: {dpid}, "
232
                f"interfaces: {[interface_a.name, interface_b.name]}, "
233
                f"port_numbers: {[port_a, port_b]}"
234
            )
235 1
        if "disable" in self.actions:
236 1
            base_url = self.settings.TOPOLOGY_URL
237 1
            endpoint = f"{base_url}/interfaces/{interface_a.id}/enable"
238 1
            response = requests.post(endpoint)
239 1
            if response.status_code != 200:
240
                log.error(
241
                    f"Failed to enable interface: {interface_a.id},"
242
                    f" status code: {response.status_code}"
243
                )
244
            else:
245 1
                log.info(
246
                    "LLDP loop detection enabled interface "
247
                    f"{interface_a.id}, looped interfaces: "
248
                    f"{[interface_a.name, interface_b.name]},"
249
                    f"port_numbers: {[port_a, port_b]}"
250
                )
251
252 1
        key = "looped"
253 1
        response = self.del_interface_metadata(interface_a.id, key)
254 1
        if response.status_code >= 400 and response.status_code != 404:
255
            log.error(
256
                f"Failed to delete metadata key {key} on interface: "
257
                f"{interface_a.id}, status code: {response.status_code}",
258
            )
259
260 1
    def handle_log_action(
261
        self,
262
        interface_a,
263
        interface_b,
264
    ):
265
        """Execute loop log action."""
266 1
        dpid = interface_a.switch.dpid
267 1
        port_a = interface_a.port_number
268 1
        port_b = interface_b.port_number
269 1
        port_pair = (port_a, port_b)
270 1
        log_every = self.log_every
271 1
        with self.loop_lock:
272 1
            if port_pair not in self.loop_counter[dpid]:
273 1
                self.loop_counter[dpid][port_pair] = 0
274
            else:
275 1
                self.loop_counter[dpid][port_pair] += 1
276 1
                self.loop_counter[dpid][port_pair] %= log_every
277 1
            count = self.loop_counter[dpid][port_pair]
278 1
            if count != 0:
279 1
                return
280
281 1
        log.warning(
282
            f"LLDP loop detected on switch: {dpid}, "
283
            f"interfaces: {[interface_a.name, interface_b.name]}, "
284
            f"port_numbers: {[port_a, port_b]}"
285
        )
286
287 1
    def handle_disable_action(
288
        self,
289
        interface_a,
290
        interface_b,
291
    ):
292
        """Execute LLDP loop disable action idempotently."""
293 1
        if not interface_a.is_enabled():
294
            return
295
296 1
        port_a = interface_a.port_number
297 1
        port_b = interface_b.port_number
298 1
        intf_id = interface_a.id
299 1
        base_url = self.settings.TOPOLOGY_URL
300 1
        endpoint = f"{base_url}/interfaces/{intf_id}/disable"
301 1
        response = requests.post(endpoint)
302 1
        if response.status_code != 200:
303
            log.error(
304
                f"Failed to disable interface: {intf_id},"
305
                f" status code: {response.status_code}"
306
            )
307
            return
308
309 1
        log.info(
310
            f"LLDP loop detection disabled interface {interface_a.id}, "
311
            f"looped interfaces: {[interface_a.name, interface_b.name]}, "
312
            f"port_numbers: {[port_a, port_b]}"
313
        )
314
315 1
    def handle_switch_metadata_changed(self, switch):
316
        """Handle switch metadata changed."""
317 1
        if "ignored_loops" not in switch.metadata:
318 1
            with self.loop_lock:
319 1
                return self.ignored_loops.pop(switch.dpid, None)
320 1
        return self.try_to_load_ignored_switch(switch)
321
322 1
    def try_to_load_ignored_switch(self, switch):
323
        """Try to load an ignored switch."""
324 1
        if "ignored_loops" not in switch.metadata:
325
            return
326 1
        if not isinstance(switch.metadata["ignored_loops"], list):
327
            return
328
329 1
        dpid = switch.dpid
330 1
        with self.loop_lock:
331 1
            self.ignored_loops[dpid] = []
332 1
            for port_pair in switch.metadata["ignored_loops"]:
333 1
                if isinstance(port_pair, list):
334 1
                    self.ignored_loops[dpid].append(port_pair)
335
336 1
    def handle_topology_loaded(self, topology):
337
        """Handle on topology loaded."""
338 1
        for switch in topology.switches.values():
339
            self.try_to_load_ignored_switch(switch)
340