Passed
Pull Request — master (#52)
by Vinicius
12:20 queued 05:21
created

LoopManager.del_interface_metadata()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 3
dl 0
loc 5
ccs 4
cts 4
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""LoopManager."""
2 1
from collections import defaultdict
3 1
from enum import Enum
4 1
from threading import Lock
5
6 1
import requests
7
8 1
from kytos.core import KytosEvent, log
9 1
from kytos.core.helpers import get_time, now
10 1
from napps.kytos.of_lldp import settings as napp_settings
11
12
13 1
class LoopState(str, Enum):
14
    """LoopState Enum."""
15
16 1
    detected = "detected"
17 1
    stopped = "stopped"
18
19
20 1
class LoopManager:
21
    """LoopManager."""
22
23 1
    def __init__(self, controller, settings=napp_settings):
24
        """Constructor of LoopDetection."""
25 1
        self.controller = controller
26 1
        self.loop_lock = Lock()
27 1
        self.loop_counter = defaultdict(dict)
28 1
        self.loop_state = defaultdict(dict)
29
30 1
        self.settings = settings
31 1
        self.ignored_loops = settings.LLDP_IGNORED_LOOPS
32 1
        self.actions = settings.LLDP_LOOP_ACTIONS
33 1
        self.stopped_interval = 3 * settings.POLLING_TIME
34 1
        self.log_every = settings.LOOP_LOG_EVERY
35
36 1
    def is_loop_ignored(self, dpid, port_a, port_b):
37
        """Check if a loop is ignored."""
38 1
        if dpid not in self.ignored_loops:
39 1
            return False
40 1
        if any(
41
            (
42
                [port_a, port_b] in self.ignored_loops[dpid],
43
                [port_b, port_a] in self.ignored_loops[dpid],
44
            )
45
        ):
46 1
            return True
47 1
        return False
48
49 1
    @staticmethod
50 1
    def is_looped(dpid_a, port_a, dpid_b, port_b):
51
        """Check if the given dpids and ports are looped."""
52 1
        if all((dpid_a == dpid_b, port_a <= port_b)):  # only enter one pair
53 1
            return True
54 1
        return False
55
56 1
    def process_if_looped(
57
        self,
58
        interface_a,
59
        interface_b,
60
    ):
61
        """Process if interface_a and interface_b are looped."""
62 1
        dpid_a = interface_a.switch.dpid
63 1
        dpid_b = interface_b.switch.dpid
64 1
        port_a = interface_a.port_number
65 1
        port_b = interface_b.port_number
66 1
        if all(
67
            (
68
                self.is_looped(dpid_a, port_a, dpid_b, port_b),
69
                not self.is_loop_ignored(dpid_a, port_a, port_b),
70
            )
71
        ):
72 1
            self.publish_loop_state(
73
                interface_a, interface_b, LoopState.detected.value
74
            )
75 1
            self.publish_loop_actions(interface_a, interface_b)
76 1
            return True
77
        return False
78
79 1
    def publish_loop_state(
80
        self,
81
        interface_a,
82
        interface_b,
83
        state,
84
    ):
85
        """Publish loop state event."""
86 1
        dpid = interface_a.switch.dpid
87 1
        port_a = interface_a.port_number
88 1
        port_b = interface_b.port_number
89 1
        event = KytosEvent(
90
            name=f"kytos/of_lldp.loop.{state}",
91
            content={
92
                "interface_id": interface_a.id,
93
                "dpid": dpid,
94
                "port_numbers": [port_a, port_b],
95
            },
96
        )
97 1
        self.controller.buffers.app.put(event)
98
99 1
    def publish_loop_actions(
100
        self,
101
        interface_a,
102
        interface_b,
103
    ):
104
        """Publish loop action events."""
105 1
        supported_actions = {"log", "disable"}
106 1
        for action in set(self.actions).intersection(supported_actions):
107 1
            event = KytosEvent(
108
                name=f"kytos/of_lldp.loop.action.{action}",
109
                content={
110
                    "interface_a": interface_a,
111
                    "interface_b": interface_b,
112
                },
113
            )
114 1
            self.controller.buffers.app.put(event)
115
116 1
    def handle_loop_detected(self, interface_id, dpid, port_pair):
117
        """Handle loop detected."""
118 1
        is_new_loop = False
119 1
        port_pair = tuple(port_pair)
120 1
        with self.loop_lock:
121 1
            if port_pair not in self.loop_state[dpid]:
122 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
123 1
                data = {
124
                    "state": LoopState.detected.value,
125
                    "port_numbers": list(port_pair),
126
                    "updated_at": dt_at,
127
                    "detected_at": dt_at,
128
                }
129 1
                self.loop_state[dpid][port_pair] = data
130 1
                is_new_loop = True
131 1
            if (
132
                self.loop_state[dpid][port_pair]["state"]
133
                != LoopState.detected.value
134
            ):
135 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
136 1
                data = {
137
                    "state": LoopState.detected.value,
138
                    "updated_at": dt_at,
139
                    "detected_at": dt_at,
140
                }
141 1
                self.loop_state[dpid][port_pair].update(data)
142 1
                self.loop_state[dpid][port_pair].pop("stopped_at", None)
143 1
                is_new_loop = True
144
            else:
145 1
                data = {"updated_at": now().strftime("%Y-%m-%dT%H:%M:%S")}
146 1
                self.loop_state[dpid][port_pair].update(data)
147 1
        if is_new_loop:
148 1
            port_numbers = self.loop_state[dpid][port_pair]["port_numbers"]
149 1
            detected_at = self.loop_state[dpid][port_pair]["detected_at"]
150 1
            metadata = {
151
                "looped": {
152
                    "port_numbers": port_numbers,
153
                    "detected_at": detected_at,
154
                }
155
            }
156 1
            response = self.add_interface_metadata(interface_id, metadata)
157 1
            if response.status_code != 201:
158 1
                log.error(
159
                    f"Failed to add metadata {metadata} on interface "
160
                    f"{interface_id}, response: {response.json()}"
161
                )
162
163 1
    def has_loop_stopped(self, dpid, port_pair):
164
        """Check if a loop has stopped by checking within an interval
165
        or based on their operational state."""
166 1
        data = self.loop_state[dpid].get(port_pair)
167 1
        switch = self.controller.get_switch_by_dpid(dpid)
168 1
        if not data or not switch:
169
            return None
170 1
        try:
171 1
            interface_a = switch.interfaces[port_pair[0]]
172 1
            interface_b = switch.interfaces[port_pair[1]]
173
        except KeyError:
174
            return None
175
176 1
        if not interface_a.is_active() or not interface_b.is_active():
177
            return True
178
179 1
        delta_seconds = (now() - get_time(data["updated_at"])).seconds
180 1
        if delta_seconds > self.stopped_interval:
181 1
            return True
182
        return False
183
184 1
    def get_stopped_loops(self):
185
        """Get stopped loops."""
186 1
        stopped_loops = {}
187 1
        for key, state_dict in self.loop_state.items():
188 1
            for port_pair, values in state_dict.items():
189 1
                if values["state"] != LoopState.detected.value:
190
                    continue
191 1
                if self.has_loop_stopped(key, port_pair):
192 1
                    if key not in stopped_loops:
193 1
                        stopped_loops[key] = [port_pair]
194
                    else:
195 1
                        stopped_loops[key].append(port_pair)
196 1
        return stopped_loops
197
198 1
    def add_interface_metadata(self, interface_id, metadata):
199
        """Add interface metadata."""
200 1
        base_url = self.settings.TOPOLOGY_URL
201 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata"
202 1
        return requests.post(endpoint, json=metadata)
203
204 1
    def del_interface_metadata(self, interface_id, key):
205
        """Delete interface metadata."""
206 1
        base_url = self.settings.TOPOLOGY_URL
207 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata/{key}"
208 1
        return requests.delete(endpoint)
209
210 1
    def handle_loop_stopped(self, interface_a, interface_b):
211
        """Handle loop stopped."""
212 1
        dpid = interface_a.switch.dpid
213 1
        port_a = interface_a.port_number
214 1
        port_b = interface_b.port_number
215 1
        port_pair = (port_a, port_b)
216
217 1
        if port_pair not in self.loop_state[dpid]:
218
            return
219 1
        with self.loop_lock:
220 1
            dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
221 1
            data = {
222
                "state": "stopped",
223
                "updated_at": dt_at,
224
                "stopped_at": dt_at,
225
            }
226 1
            self.loop_state[dpid][port_pair].update(data)
227
228 1
        if "log" in self.actions:
229 1
            log.info(
230
                f"LLDP loop stopped on switch: {dpid}, "
231
                f"interfaces: {[interface_a.name, interface_b.name]}, "
232
                f"port_numbers: {[port_a, port_b]}"
233
            )
234 1
        if "disable" in self.actions:
235 1
            base_url = self.settings.TOPOLOGY_URL
236 1
            endpoint = f"{base_url}/interfaces/{interface_a.id}/enable"
237 1
            response = requests.post(endpoint)
238 1
            if response.status_code != 200:
239
                log.error(
240
                    f"Failed to enable interface: {interface_a.id},"
241
                    f" status code: {response.status_code}"
242
                )
243
            else:
244 1
                log.info(
245
                    "LLDP loop detection enabled interface "
246
                    f"{interface_a.id}, looped interfaces: "
247
                    f"{[interface_a.name, interface_b.name]},"
248
                    f"port_numbers: {[port_a, port_b]}"
249
                )
250
251 1
        key = "looped"
252 1
        response = self.del_interface_metadata(interface_a.id, key)
253 1
        if response.status_code != 200:
254 1
            log.error(
255
                f"Failed to delete metadata key {key} on interface ",
256
                f"{interface_a.id}",
257
            )
258
259 1
    def handle_log_action(
260
        self,
261
        interface_a,
262
        interface_b,
263
    ):
264
        """Execute loop log action."""
265 1
        dpid = interface_a.switch.dpid
266 1
        port_a = interface_a.port_number
267 1
        port_b = interface_b.port_number
268 1
        port_pair = (port_a, port_b)
269 1
        log_every = self.log_every
270 1
        with self.loop_lock:
271 1
            if port_pair not in self.loop_counter[dpid]:
272 1
                self.loop_counter[dpid][port_pair] = 0
273
            else:
274 1
                self.loop_counter[dpid][port_pair] += 1
275 1
                self.loop_counter[dpid][port_pair] %= log_every
276 1
            count = self.loop_counter[dpid][port_pair]
277 1
            if count != 0:
278 1
                return
279
280 1
        log.warning(
281
            f"LLDP loop detected on switch: {dpid}, "
282
            f"interfaces: {[interface_a.name, interface_b.name]}, "
283
            f"port_numbers: {[port_a, port_b]}"
284
        )
285
286 1
    def handle_disable_action(
287
        self,
288
        interface_a,
289
        interface_b,
290
    ):
291
        """Execute LLDP loop disable action idempotently."""
292 1
        if not interface_a.is_enabled():
293
            return
294
295 1
        port_a = interface_a.port_number
296 1
        port_b = interface_b.port_number
297 1
        intf_id = interface_a.id
298 1
        base_url = self.settings.TOPOLOGY_URL
299 1
        endpoint = f"{base_url}/interfaces/{intf_id}/disable"
300 1
        response = requests.post(endpoint)
301 1
        if response.status_code != 200:
302
            log.error(
303
                f"Failed to disable interface: {intf_id},"
304
                f" status code: {response.status_code}"
305
            )
306
            return
307
308 1
        log.info(
309
            f"LLDP loop detection disabled interface {interface_a.id}, "
310
            f"looped interfaces: {[interface_a.name, interface_b.name]}, "
311
            f"port_numbers: {[port_a, port_b]}"
312
        )
313
314 1
    def handle_switch_metadata_changed(self, switch):
315
        """Handle switch metadata changed."""
316 1
        if "ignored_loops" not in switch.metadata:
317 1
            with self.loop_lock:
318 1
                return self.ignored_loops.pop(switch.dpid, None)
319 1
        return self.try_to_load_ignored_switch(switch)
320
321 1
    def try_to_load_ignored_switch(self, switch):
322
        """Try to load an ignored switch."""
323 1
        if "ignored_loops" not in switch.metadata:
324
            return
325 1
        if not isinstance(switch.metadata["ignored_loops"], list):
326
            return
327
328 1
        dpid = switch.dpid
329 1
        with self.loop_lock:
330 1
            self.ignored_loops[dpid] = []
331 1
            for port_pair in switch.metadata["ignored_loops"]:
332 1
                if isinstance(port_pair, list):
333 1
                    self.ignored_loops[dpid].append(port_pair)
334
335 1
    def handle_topology_loaded(self, topology):
336
        """Handle on topology loaded."""
337 1
        for switch in topology.switches.values():
338
            self.try_to_load_ignored_switch(switch)
339