Passed
Pull Request — master (#36)
by Vinicius
10:59 queued 07:52
created

LoopManager.publish_loop_actions()   A

Complexity

Conditions 2

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 3
dl 0
loc 16
ccs 5
cts 5
cp 1
crap 2
rs 9.8
c 0
b 0
f 0
1
"""LoopManager."""
2 1
from collections import defaultdict
3 1
from enum import Enum
4 1
from threading import Lock
5
6 1
import requests
7
8 1
from kytos.core import KytosEvent, log
9 1
from kytos.core.helpers import get_time, now
10 1
from napps.kytos.of_lldp import settings as napp_settings
11
12
13 1
class LoopState(str, Enum):
14
    """LoopState Enum."""
15
16 1
    detected = "detected"
17 1
    stopped = "stopped"
18
19
20 1
class LoopManager:
21
    """LoopManager."""
22
23 1
    def __init__(self, controller, settings=napp_settings):
24
        """Constructor of LoopDetection."""
25 1
        self.controller = controller
26 1
        self.loop_lock = Lock()
27 1
        self.loop_counter = defaultdict(dict)
28 1
        self.loop_state = defaultdict(dict)
29
30 1
        self.settings = settings
31 1
        self.ignored_loops = settings.LLDP_IGNORED_LOOPS
32 1
        self.actions = settings.LLDP_LOOP_ACTIONS
33 1
        self.stopped_interval = 3 * settings.POLLING_TIME
34 1
        self.log_every = settings.LOOP_LOG_EVERY
35
36 1
    def _is_loop_ignored(self, dpid, port_a, port_b):
37
        """Check if a loop is ignored."""
38 1
        if dpid not in self.ignored_loops:
39 1
            return False
40 1
        if any(
41
            (
42
                [port_a, port_b] in self.ignored_loops[dpid],
43
                [port_b, port_a] in self.ignored_loops[dpid],
44
            )
45
        ):
46 1
            return True
47 1
        return False
48
49 1
    @staticmethod
50
    def _is_looped(dpid_a, port_a, dpid_b, port_b):
51
        """Check if the given dpids and ports are looped."""
52 1
        if all((dpid_a == dpid_b, port_a <= port_b)):  # only enter one pair
53 1
            return True
54 1
        return False
55
56 1
    def process_if_looped(
57
        self,
58
        interface_a,
59
        interface_b,
60
    ):
61
        """Process if interface_a and interface_b are looped."""
62 1
        dpid_a = interface_a.switch.dpid
63 1
        dpid_b = interface_b.switch.dpid
64 1
        port_a = interface_a.port_number
65 1
        port_b = interface_b.port_number
66 1
        if all(
67
            (
68
                self._is_looped(dpid_a, port_a, dpid_b, port_b),
69
                not self._is_loop_ignored(dpid_a, port_a, port_b),
70
            )
71
        ):
72 1
            self.publish_loop_state(
73
                interface_a, interface_b, LoopState.detected.value
74
            )
75 1
            self.publish_loop_actions(interface_a, interface_b)
76 1
            return True
77
        return False
78
79 1
    def publish_loop_state(
80
        self,
81
        interface_a,
82
        interface_b,
83
        state,
84
    ):
85
        """Publish loop state event."""
86 1
        dpid = interface_a.switch.dpid
87 1
        port_a = interface_a.port_number
88 1
        port_b = interface_b.port_number
89 1
        event = KytosEvent(
90
            name=f"kytos/of_lldp.loop.{state}",
91
            content={
92
                "interface_id": interface_a.id,
93
                "dpid": dpid,
94
                "port_numbers": [port_a, port_b],
95
            },
96
        )
97 1
        self.controller.buffers.app.put(event)
98
99 1
    def publish_loop_actions(
100
        self,
101
        interface_a,
102
        interface_b,
103
    ):
104
        """Publish loop action events."""
105 1
        supported_actions = {"log", "disable"}
106 1
        for action in set(self.actions).intersection(supported_actions):
107 1
            event = KytosEvent(
108
                name=f"kytos/of_lldp.loop.action.{action}",
109
                content={
110
                    "interface_a": interface_a,
111
                    "interface_b": interface_b,
112
                },
113
            )
114 1
            self.controller.buffers.app.put(event)
115
116 1
    def handle_loop_detected(self, interface_id, dpid, port_pair):
117
        """Handle loop detected."""
118 1
        is_new_loop = False
119 1
        port_pair = tuple(port_pair)
120 1
        with self.loop_lock:
121 1
            if port_pair not in self.loop_state[dpid]:
122 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
123 1
                data = {
124
                    "state": LoopState.detected.value,
125
                    "port_numbers": list(port_pair),
126
                    "updated_at": dt_at,
127
                    "detected_at": dt_at,
128
                }
129 1
                self.loop_state[dpid][port_pair] = data
130 1
                is_new_loop = True
131 1
            if (
132
                self.loop_state[dpid][port_pair]["state"]
133
                != LoopState.detected.value
134
            ):
135 1
                dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
136 1
                data = {
137
                    "state": LoopState.detected.value,
138
                    "updated_at": dt_at,
139
                    "detected_at": dt_at,
140
                }
141 1
                self.loop_state[dpid][port_pair].update(data)
142 1
                self.loop_state[dpid][port_pair].pop("stopped_at", None)
143 1
                is_new_loop = True
144
            else:
145 1
                data = {"updated_at": now().strftime("%Y-%m-%dT%H:%M:%S")}
146 1
                self.loop_state[dpid][port_pair].update(data)
147 1
        if is_new_loop:
148 1
            port_numbers = self.loop_state[dpid][port_pair]["port_numbers"]
149 1
            detected_at = self.loop_state[dpid][port_pair]["detected_at"]
150 1
            metadata = {
151
                "looped": {
152
                    "port_numbers": port_numbers,
153
                    "detected_at": detected_at,
154
                }
155
            }
156
157 1
            response = self.add_interface_metadata(interface_id, metadata)
158 1
            if response.status_code != 201:
159 1
                log.error(
160
                    f"Failed to add metadata {metadata} on interface "
161
                    f"{interface_id}, response: {response.json()}"
162
                )
163 1
                return
164
165 1
    def has_loop_stopped(self, dpid, port_pair):
166
        """Check if a loop has stopped by checking within an interval
167
        or based on their operational state."""
168 1
        data = self.loop_state[dpid].get(port_pair)
169 1
        switch = self.controller.get_switch_by_dpid(dpid)
170 1
        if not data or not switch:
171
            return None
172 1
        try:
173 1
            interface_a = switch.interfaces[port_pair[0]]
174 1
            interface_b = switch.interfaces[port_pair[1]]
175
        except KeyError:
176
            return None
177
178 1
        if not interface_a.is_active() or not interface_b.is_active():
179
            return True
180
181 1
        delta_seconds = (now() - get_time(data["updated_at"])).seconds
182 1
        if delta_seconds > self.stopped_interval:
183 1
            return True
184
        return False
185
186 1
    def get_stopped_loops(self):
187
        """Get stopped loops."""
188 1
        stopped_loops = {}
189 1
        for key, state_dict in self.loop_state.items():
190 1
            for port_pair, values in state_dict.items():
191 1
                if values["state"] != LoopState.detected.value:
192
                    continue
193 1
                if self.has_loop_stopped(key, port_pair):
194 1
                    if key not in stopped_loops:
195 1
                        stopped_loops[key] = [port_pair]
196
                    else:
197 1
                        stopped_loops[key].append(port_pair)
198 1
        return stopped_loops
199
200 1
    def add_interface_metadata(self, interface_id, metadata):
201
        """Add interface metadata."""
202 1
        base_url = self.settings.TOPOLOGY_URL
203 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata"
204 1
        return requests.post(endpoint, json=metadata)
205
206 1
    def del_interface_metadata(self, interface_id, key):
207
        """Delete interface metadata."""
208 1
        base_url = self.settings.TOPOLOGY_URL
209 1
        endpoint = f"{base_url}/interfaces/{interface_id}/metadata/{key}"
210 1
        return requests.delete(endpoint)
211
212 1
    def handle_loop_stopped(self, interface_a, interface_b):
213
        """Handle loop stopped."""
214 1
        dpid = interface_a.switch.dpid
215 1
        port_a = interface_a.port_number
216 1
        port_b = interface_b.port_number
217 1
        port_pair = (port_a, port_b)
218
219 1
        if port_pair not in self.loop_state[dpid]:
220
            return
221 1
        with self.loop_lock:
222 1
            dt_at = now().strftime("%Y-%m-%dT%H:%M:%S")
223 1
            data = {
224
                "state": "stopped",
225
                "updated_at": dt_at,
226
                "stopped_at": dt_at,
227
            }
228 1
            self.loop_state[dpid][port_pair].update(data)
229
230 1
        if "log" in self.actions:
231 1
            log.info(
232
                f"LLDP loop stopped on switch: {dpid}, "
233
                f"interfaces: {[interface_a.name, interface_b.name]}, "
234
                f"port_numbers: {[port_a, port_b]}"
235
            )
236
237 1
        key = "looped"
238 1
        response = self.del_interface_metadata(interface_a.id, key)
239 1
        if response.status_code != 200:
240 1
            log.error(
241
                f"Failed to delete metadata key {key} on interface ",
242
                f"{interface_a.id}",
243
            )
244 1
            return
245
246 1
    def handle_log_action(
247
        self,
248
        interface_a,
249
        interface_b,
250
    ):
251
        """Execute loop log action."""
252 1
        dpid = interface_a.switch.dpid
253 1
        port_a = interface_a.port_number
254 1
        port_b = interface_b.port_number
255 1
        port_pair = (port_a, port_b)
256 1
        log_every = self.log_every
257 1
        with self.loop_lock:
258 1
            if port_pair not in self.loop_counter[dpid]:
259 1
                self.loop_counter[dpid][port_pair] = 0
260
            else:
261 1
                self.loop_counter[dpid][port_pair] += 1
262 1
                self.loop_counter[dpid][port_pair] %= log_every
263 1
            count = self.loop_counter[dpid][port_pair]
264 1
            if count != 0:
265 1
                return
266
267 1
        log.warning(
268
            f"LLDP loop detected on switch: {dpid}, "
269
            f"interfaces: {[interface_a.name, interface_b.name]}, "
270
            f"port_numbers: {[port_a, port_b]}"
271
        )
272
273 1
    def handle_disable_action(
274
        self,
275
        interface_a,
276
        interface_b,
277
    ):
278
        """Execute LLDP loop disable action idempotently."""
279 1
        if not interface_a.is_enabled():
280
            return
281
282 1
        port_a = interface_a.port_number
283 1
        port_b = interface_b.port_number
284 1
        intf_id = interface_a.id
285 1
        base_url = self.settings.TOPOLOGY_URL
286 1
        endpoint = f"{base_url}/interfaces/{intf_id}/disable"
287 1
        response = requests.post(endpoint)
288 1
        if response.status_code != 200:
289
            log.error(
290
                f"Failed to disable interface: {intf_id},"
291
                f" status code: {response.status_code}"
292
            )
293
            return
294
295 1
        log.info(
296
            f"LLDP loop detection disabled interface {interface_a.name}, "
297
            f"looped interfaces: {[interface_a.name, interface_b.name]}, "
298
            f"port_numbers: {[port_a, port_b]}"
299
        )
300
301 1
    def handle_switch_metadata_changed(self, switch):
302
        """Handle switch metadata changed."""
303 1
        if switch.id not in self.ignored_loops:
304 1
            self.try_to_load_ignored_switch(switch)
305
        else:
306 1
            with self.loop_lock:
307 1
                self.ignored_loops.pop(switch.dpid, None)
308
309 1
    def try_to_load_ignored_switch(self, switch):
310
        """Try to load an ignored switch."""
311 1
        if "ignored_loops" not in switch.metadata:
312
            return
313 1
        if not isinstance(switch.metadata["ignored_loops"], list):
314
            return
315
316 1
        dpid = switch.dpid
317 1
        with self.loop_lock:
318 1
            self.ignored_loops[dpid] = []
319 1
            for port_pair in switch.metadata["ignored_loops"]:
320 1
                if isinstance(port_pair, list):
321 1
                    self.ignored_loops[dpid].append(port_pair)
322
323 1
    def handle_topology_loaded(self, topology):
324
        """Handle on topology loaded."""
325 1
        for switch in topology.switches.values():
326
            self.try_to_load_ignored_switch(switch)
327