Test Failed
Pull Request — master (#121)
by Vinicius
05:36
created

build.main.Main.on_switch_deleted()   A

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 4.0582

Importance

Changes 0
Metric Value
cc 4
eloc 12
nop 2
dl 0
loc 15
ccs 11
cts 13
cp 0.8462
crap 4.0582
rs 9.8
c 0
b 0
f 0
1
"""NApp responsible to discover new switches and hosts."""
2 1
import struct
3
4 1
import httpx
5 1
import tenacity
6 1
from threading import Lock
7 1
from napps.kytos.of_core.msg_prios import of_msg_prio
8 1
from napps.kytos.of_lldp import constants, settings
9 1
from napps.kytos.of_lldp.managers import LivenessManager, LoopManager
10 1
from napps.kytos.of_lldp.managers.loop_manager import LoopState
11
from napps.kytos.of_lldp.utils import (get_cookie, try_to_gen_intf_mac,
12 1
                                       update_flow)
13 1
from pyof.foundation.basic_types import DPID, UBInt32
14 1
from pyof.foundation.network_types import LLDP, VLAN, Ethernet, EtherType
15 1
from pyof.v0x04.common.action import ActionOutput as AO13
16 1
from pyof.v0x04.common.port import PortNo as Port13
17 1
from pyof.v0x04.controller2switch.packet_out import PacketOut as PO13
18
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
19
                      wait_combine, wait_fixed, wait_random)
20 1
21 1
from kytos.core import KytosEvent, KytosNApp, log, rest
22 1
from kytos.core.exceptions import KytosTagError
23 1
from kytos.core.helpers import alisten_to, listen_to
24 1
from kytos.core.link import Link
25
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
26 1
                                 aget_json_or_400, get_json_or_400)
27 1
from kytos.core.retry import before_sleep
28
from kytos.core.switch import Switch
29 1
30
from .controllers import LivenessController
31
32 1
33
class Main(KytosNApp):
34
    """Main OF_LLDP NApp Class."""
35 1
36
    def setup(self):
37 1
        """Make this NApp run in a loop."""
38 1
        self.vlan_id = None
39 1
        self.polling_time = settings.POLLING_TIME
40 1
        if hasattr(settings, "FLOW_VLAN_VID"):
41 1
            self.vlan_id = settings.FLOW_VLAN_VID
42 1
        self.liveness_dead_multipler = settings.LIVENESS_DEAD_MULTIPLIER
43 1
        self.execute_as_loop(self.polling_time)
44 1
        self.loop_manager = LoopManager(self.controller)
45 1
        self.dead_interval = self.polling_time * self.liveness_dead_multipler
46 1
        self.liveness_controller = self.get_liveness_controller()
47 1
        self.liveness_controller.bootstrap_indexes()
48 1
        self.liveness_manager = LivenessManager(self.controller)
49
        self._liveness_ops_lock = Lock()
50 1
        Link.register_status_func(f"{self.napp_id}_liveness",
51 1
                                  LivenessManager.link_status_hook_liveness)
52
        status_reason_func = LivenessManager.link_status_reason_hook_liveness
53 1
        Link.register_status_reason_func(f"{self.napp_id}_liveness",
54
                                         status_reason_func)
55 1
        self.table_group = {"base": 0}
56 1
57
    @staticmethod
58
    def get_liveness_controller() -> LivenessController:
59
        """Get LivenessController."""
60 1
        return LivenessController()
61
62 1
    def execute(self):
63 1
        """Send LLDP Packets every 'POLLING_TIME' seconds to all switches."""
64 1
        switches = list(self.controller.switches.values())
65 1
        for switch in switches:
66
            try:
67
                of_version = switch.connection.protocol.version
68
            except AttributeError:
69 1
                of_version = None
70
71
            if not switch.is_connected():
72 1
                continue
73 1
74 1
            if of_version == 0x04:
75
                port_type = UBInt32
76
                local_port = Port13.OFPP_LOCAL
77
            else:
78
                # skip the current switch with unsupported OF version
79 1
                continue
80 1
81
            interfaces = list(switch.interfaces.values())
82
            for interface in interfaces:
83 1
                # Interface marked to receive lldp packet
84
                # Only send LLDP packet to active interface
85
                if (not interface.lldp or not interface.is_active()
86
                   or not interface.is_enabled()):
87 1
                    continue
88
                # Avoid the interface that connects to the controller.
89
                if interface.port_number == local_port:
90 1
                    continue
91 1
92 1
                lldp = LLDP()
93
                lldp.chassis_id.sub_value = DPID(switch.dpid)
94 1
                lldp.port_id.sub_value = port_type(interface.port_number)
95
96 1
                src_addr = try_to_gen_intf_mac(interface.address, switch.id,
97 1
                                               interface.port_number)
98 1
                ethernet = Ethernet()
99 1
                ethernet.ether_type = EtherType.LLDP
100 1
                ethernet.source = src_addr
101
                ethernet.destination = constants.LLDP_MULTICAST_MAC
102 1
                ethernet.data = lldp.pack()
103
                # self.vlan_id == None will result in a packet with no VLAN.
104 1
                ethernet.vlans.append(VLAN(vid=self.vlan_id))
105
106
                packet_out = self._build_lldp_packet_out(
107
                                    of_version,
108 1
                                    interface.port_number, ethernet.pack())
109
110
                if packet_out is None:
111 1
                    continue
112
113
                event_out = KytosEvent(
114
                    name='kytos/of_lldp.messages.out.ofpt_packet_out',
115
                    priority=of_msg_prio(packet_out.header.message_type.value),
116
                    content={
117
                            'destination': switch.connection,
118 1
                            'message': packet_out})
119 1
120
                self.controller.buffers.msg_out.put(event_out)
121
                log.debug(
122
                    "Sending a LLDP PacketOut to the switch %s",
123 1
                    switch.dpid)
124 1
125 1
                msg = 'Switch: %s (%s)'
126 1
                msg += ' Interface: %s'
127 1
                msg += ' -- LLDP PacketOut --'
128
                msg += ' Ethernet: eth_type (%s) | src (%s) | dst (%s) /'
129 1
                msg += ' LLDP: Switch (%s) | portno (%s)'
130
131
                log.debug(
132
                    msg,
133
                    switch.connection, switch.dpid,
134
                    interface.id, ethernet.ether_type,
135
                    ethernet.source, ethernet.destination,
136 1
                    switch.dpid, interface.port_number)
137 1
138
        self.try_to_publish_stopped_loops()
139 1
        self.liveness_manager.reaper(self.dead_interval)
140
141 1
    def load_liveness(self) -> None:
142 1
        """Load liveness."""
143 1
        enabled_intf_ids = {
144 1
            intf["id"]
145
            for intf in self.liveness_controller.get_enabled_interfaces()
146 1
        }
147
        intfs_to_enable = [
148
            intf
149
            for intf in self._get_interfaces()
150
            if intf.id in enabled_intf_ids
151
        ]
152
        self.liveness_manager.enable(*intfs_to_enable)
153
154
    def try_to_publish_stopped_loops(self):
155
        """Try to publish current stopped loops."""
156
        for dpid, port_pairs in self.loop_manager.get_stopped_loops().items():
157
            try:
158
                switch = self.controller.get_switch_by_dpid(dpid)
159
                for port_pair in port_pairs:
160
                    interface_a = switch.interfaces[port_pair[0]]
161 1
                    interface_b = switch.interfaces[port_pair[1]]
162 1
                    self.loop_manager.publish_loop_state(
163
                        interface_a, interface_b, LoopState.stopped.value
164
                    )
165
            except (KeyError, AttributeError) as exc:
166
                log.error("try_to_publish_stopped_loops failed with switch:"
167
                          f"{dpid}, port_pair: {port_pair}. {str(exc)}")
168
169
    @listen_to('kytos/topology.switch.(enabled|disabled)')
170
    def handle_lldp_flows(self, event):
171
        """Install or remove flows in a switch.
172
173
        Install a flow to send LLDP packets to the controller. The proactive
174
        flow is installed whenever a switch is enabled. If the switch is
175
        disabled the flow is removed.
176 1
177 1
        Args:
178
            event (:class:`~kytos.core.events.KytosEvent`):
179
                Event with new switch information.
180
181
        """
182
        self._handle_lldp_flows(event)
183 1
184 1
    @alisten_to("kytos/of_lldp.loop.action.log")
185
    async def on_lldp_loop_log_action(self, event: KytosEvent):
186
        """Handle LLDP loop log action."""
187
        interface_a = event.content["interface_a"]
188
        interface_b = event.content["interface_b"]
189
        await self.loop_manager.handle_log_action(interface_a, interface_b)
190 1
191 1
    @alisten_to("kytos/of_lldp.loop.action.disable")
192
    async def on_lldp_loop_disable_action(self, event: KytosEvent):
193
        """Handle LLDP loop disable action."""
194
        interface_a = event.content["interface_a"]
195
        interface_b = event.content["interface_b"]
196
        await self.loop_manager.handle_disable_action(interface_a, interface_b)
197
198
    @alisten_to("kytos/of_lldp.loop.stopped")
199
    async def on_lldp_loop_stopped(self, event: KytosEvent):
200
        """Handle LLDP loop stopped."""
201
        dpid = event.content["dpid"]
202
        port_pair = event.content["port_numbers"]
203
        try:
204
            switch = self.controller.get_switch_by_dpid(dpid)
205 1
            interface_a = switch.interfaces[port_pair[0]]
206 1
            interface_b = switch.interfaces[port_pair[1]]
207
            await self.loop_manager.handle_loop_stopped(interface_a,
208 1
                                                        interface_b)
209 1
        except (KeyError, AttributeError) as exc:
210 1
            log.error("on_lldp_loop_stopped failed with: "
211
                      f"{event.content} {str(exc)}")
212 1
213 1
    @alisten_to("kytos/topology.topology_loaded")
214
    async def on_topology_loaded(self, event):
215
        """Handle on topology loaded."""
216
        topology = event.content["topology"]
217
        await self.loop_manager.handle_topology_loaded(topology)
218 1
        self.load_liveness()
219
220
    @listen_to("kytos/topology.switch.deleted")
221
    def handle_switch_deleted(self, event: KytosEvent) -> None:
222
        """Handle on switch deleted."""
223
        with self._liveness_ops_lock:
224
            self.on_switch_deleted(event)
225 1
226 1
    def on_switch_deleted(self, event: KytosEvent) -> None:
227 1
        """Handle on switch deleted."""
228 1
        switch = event.content["switch"]
229
        found_intfs = []
230
        for intf in list(switch.interfaces.values()):
231
            if intf.id in self.liveness_manager.interfaces:
232 1
                found_intfs.append(intf)
233 1
        if found_intfs:
234 1
            intf_ids = [intf.id for id in found_intfs]
0 ignored issues
show
introduced by
The variable intf does not seem to be defined in case the for loop on line 230 is not entered. Are you sure this can never be the case?
Loading history...
235 1
            log.info(
236
                f"Disabling liveness on {switch} with interfaces "
237 1
                f" deleted {intf_ids} "
238 1
            )
239 1
            self.liveness_manager.disable(*found_intfs)
240 1
            self.liveness_controller.delete_interfaces(intf_ids)
241 1
242
    @listen_to("kytos/topology.interface.deleted")
243 1
    def handle_interface_deleted(self, event: KytosEvent) -> None:
244 1
        """Handle on interface deleted."""
245
        with self._liveness_ops_lock:
246 1
            self.on_interface_deleted(event)
247
248 1
    def on_interface_deleted(self, event: KytosEvent) -> None:
249 1
        """Handle on interface deleted."""
250 1
        intf = event.content["interface"]
251 1
        if intf.id in self.liveness_manager.interfaces:
252 1
            log.info(f"Disabling liveness on {intf} deleted")
253 1
            self.liveness_manager.disable(intf)
254
            self.liveness_controller.delete_interface(intf.id)
255 1
256
    @alisten_to("kytos/topology.switches.metadata.(added|removed)")
257
    async def on_switches_metadata_changed(self, event):
258 1
        """Handle on switches metadata changed."""
259
        switch = event.content["switch"]
260
        await self.loop_manager.handle_switch_metadata_changed(switch)
261
262
    def _handle_lldp_flows(self, event):
263
        """Install or remove flows in a switch.
264
265 1
        Install a flow to send LLDP packets to the controller. The proactive
266
        flow is installed whenever a switch is enabled. If the switch is
267 1
        disabled the flow is removed.
268 1
        """
269 1
        try:
270 1
            dpid = event.content['dpid']
271 1
            switch = self.controller.get_switch_by_dpid(dpid)
272 1
            of_version = switch.connection.protocol.version
273 1
        except AttributeError:
274 1
            of_version = None
275 1
276
        try:
277 1
            installed_flows = self.get_flows_by_switch(switch.id)
278 1
        except tenacity.RetryError as err:
279 1
            msg = f"Error: {err.last_attempt.exception()} when "\
280
                   "obtaining flows."
281 1
            log.error(msg)
282
            return
283 1
        except ValueError as err:
284
            log.error(f"Error when getting flows, error: {err}")
285 1
            return
286 1
287 1
        flow = None
288 1
        if ("switch.enabled" in event.name and not installed_flows or
289 1
                "switch.disabled" in event.name and installed_flows):
290 1
            flow = self._build_lldp_flow(of_version, get_cookie(switch.dpid))
291 1
292 1
        if flow:
293
            data = {'flows': [flow]}
294 1
            try:
295
                self.send_flow(switch, event.name, data=data)
296 1
            except tenacity.RetryError as err:
297 1
                msg = f"Error: {err.last_attempt.exception()} when"\
298 1
                      f" sending flows to {switch.id}, {data}"
299 1
                log.error(msg)
300 1
301 1
    # pylint: disable=unexpected-keyword-arg
302
    @retry(
303
        stop=stop_after_attempt(3),
304 1
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
305 1
        before_sleep=before_sleep,
306
        retry=retry_if_exception_type(httpx.RequestError),
307 1
        after=update_flow(),
308 1
    )
309
    def send_flow(self, switch, event_name, data=None):
310 1
        """Send flows to flow_manager to be installed/deleted"""
311
        endpoint = f'{settings.FLOW_MANAGER_URL}/flows/{switch.id}'
312
        client_error = {424, 404, 400}
313
        if event_name == 'kytos/topology.switch.enabled':
314
            for flow in data['flows']:
315
                flow.pop("cookie_mask", None)
316 1
            res = httpx.post(endpoint, json=data, timeout=10)
317
            if res.is_server_error or res.status_code in client_error:
318 1
                raise httpx.RequestError(res.text)
319 1
            self.use_vlan(switch)
320 1
321
        elif event_name == 'kytos/topology.switch.disabled':
322
            res = httpx.request("DELETE", endpoint, json=data, timeout=10)
323 1
            if res.is_server_error or res.status_code in client_error:
324 1
                raise httpx.RequestError(res.text)
325
            self.make_vlan_available(switch)
326 1
327 1
    def use_vlan(self, switch: Switch) -> None:
328
        """Use vlan from interface"""
329
        if self.vlan_id is None:
330 1
            return
331 1
        for interface_id in switch.interfaces:
332
            interface = switch.interfaces[interface_id]
333
            try:
334
                interface.use_tags(self.controller, self.vlan_id)
335
            except KytosTagError as err:
336
                log.error(err)
337
338
    def make_vlan_available(self, switch: Switch) -> None:
339 1
        """Makes vlan from interface available"""
340 1
        if self.vlan_id is None:
341 1
            return
342 1
        for interface_id in switch.interfaces:
343 1
            interface = switch.interfaces[interface_id]
344
            try:
345
                conflict = interface.make_tags_available(
346
                    self.controller, self.vlan_id
347
                )
348
                if conflict:
349
                    log.warning(f"Tags {conflict} was already available"
350
                                f" in {switch.id}:{interface_id}")
351 1
            except KytosTagError as err:
352 1
                log.error(err)
353 1
354 1
    @retry(
355
        stop=stop_after_attempt(3),
356
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
357 1
        before_sleep=before_sleep,
358 1
        retry=retry_if_exception_type(httpx.RequestError),
359
    )
360 1
    def get_flows_by_switch(self, dpid: str) -> dict:
361 1
        """Get of_lldp flows by switch"""
362 1
        start = settings.COOKIE_PREFIX << 56
363 1
        end = start | 0x00FFFFFFFFFFFFFF
364
        endpoint = f'{settings.FLOW_MANAGER_URL}/stored_flows?state='\
365
                   f'installed&cookie_range={start}&cookie_range={end}'\
366
                   f'&dpid={dpid}'
367
        res = httpx.get(endpoint)
368
        if res.is_server_error or res.status_code == 404:
369 1
            raise httpx.RequestError(res.text)
370
        if res.status_code == 400:
371
            raise ValueError(res.json()["description"])
372 1
        return res.json()
373 1
374 1
    @alisten_to('kytos/of_core.v0x04.messages.in.ofpt_packet_in')
375 1
    async def on_ofpt_packet_in(self, event):
376
        """Dispatch two KytosEvents to notify identified NNI interfaces.
377 1
378 1
        Args:
379
            event (:class:`~kytos.core.events.KytosEvent`):
380 1
                Event with an LLDP packet as data.
381
382
        """
383 1
        ethernet = self._unpack_non_empty(Ethernet, event.message.data)
384
        if ethernet.ether_type == EtherType.LLDP:
385 1
            try:
386
                lldp = self._unpack_non_empty(LLDP, ethernet.data)
387 1
                dpid = self._unpack_non_empty(DPID, lldp.chassis_id.sub_value)
388
            except struct.error:
389
                #: If we have a LLDP packet but we cannot unpack it, or the
390 1
                #: unpacked packet does not contain the dpid attribute, then
391
                #: we are dealing with a LLDP generated by someone else. Thus
392 1
                #: this packet is not useful for us and we may just ignore it.
393
                return
394 1
395
            switch_a = event.source.switch
396 1
            port_a = event.message.in_port
397 1
            switch_b = None
398 1
            port_b = None
399 1
400
            # in_port is currently an Int in v0x04.
401 1
            if isinstance(port_a, int):
402
                port_a = UBInt32(port_a)
403
404
            try:
405 1
                switch_b = self.controller.get_switch_by_dpid(dpid.value)
406 1
                port_type = UBInt32
407
                port_b = self._unpack_non_empty(port_type,
408
                                                lldp.port_id.sub_value)
409
            except AttributeError:
410
                log.debug("Couldn't find datapath %s.", dpid.value)
411
412
            # Return if any of the needed information are not available
413
            if not (switch_a and port_a and switch_b and port_b):
414
                return
415
416
            interface_a = switch_a.get_interface_by_port_no(port_a.value)
417
            interface_b = switch_b.get_interface_by_port_no(port_b.value)
418
            if not interface_a or not interface_b:
419
                return
420
421 1
            await self.loop_manager.process_if_looped(interface_a, interface_b)
422 1
            await self.liveness_manager.consume_hello_if_enabled(interface_a,
423 1
                                                                 interface_b)
424
            event_out = KytosEvent(name='kytos/of_lldp.interface.is.nni',
425 1
                                   content={'interface_a': interface_a,
426 1
                                            'interface_b': interface_b})
427
            await self.controller.buffers.app.aput(event_out)
428 1
429 1
    def notify_lldp_change(self, state, interface_ids):
430
        """Dispatch a KytosEvent to notify changes to the LLDP status."""
431 1
        content = {'attribute': 'LLDP',
432 1
                   'state': state,
433 1
                   'interface_ids': interface_ids}
434
        event_out = KytosEvent(name='kytos/of_lldp.network_status.updated',
435 1
                               content=content)
436
        self.controller.buffers.app.put(event_out)
437 1
438
    def publish_liveness_status(self, event_suffix, interfaces):
439
        """Dispatch a KytosEvent to publish liveness admin status."""
440
        content = {"interfaces": interfaces}
441
        name = f"kytos/of_lldp.liveness.{event_suffix}"
442
        event_out = KytosEvent(name=name, content=content)
443
        self.controller.buffers.app.put(event_out)
444
445
    def shutdown(self):
446
        """End of the application."""
447
        log.debug('Shutting down...')
448
449
    @staticmethod
450 1
    def _build_lldp_packet_out(version, port_number, data):
451 1
        """Build a LLDP PacketOut message.
452 1
453
        Args:
454
            version (int): OpenFlow version
455 1
            port_number (int): Switch port number where the packet must be
456
                forwarded to.
457 1
            data (bytes): Binary data to be sent through the port.
458 1
459 1
        Returns:
460 1
            PacketOut message for the specific given OpenFlow version, if it
461 1
                is supported.
462 1
            None if the OpenFlow version is not supported.
463 1
464 1
        """
465 1
        if version == 0x04:
466
            action_output_class = AO13
467 1
            packet_out_class = PO13
468
        else:
469 1
            log.info('Openflow version %s is not yet supported.', version)
470 1
            return None
471
472
        output_action = action_output_class()
473
        output_action.port = port_number
474
475
        packet_out = packet_out_class()
476
        packet_out.data = data
477
        packet_out.actions.append(output_action)
478
479
        return packet_out
480
481
    def _build_lldp_flow(self, version, cookie,
482
                         cookie_mask=0xffffffffffffffff):
483
        """Build a Flow message to send LLDP to the controller.
484 1
485
        Args:
486 1
            version (int): OpenFlow version.
487 1
488
        Returns:
489 1
            Flow dictionary message for the specific given OpenFlow version,
490
            if it is supported.
491 1
            None if the OpenFlow version is not supported.
492
493 1
        """
494
        flow = {}
495 1
        if version == 0x04:
496 1
            flow['actions'] = [{'action_type': 'output',
497
                                'port': Port13.OFPP_CONTROLLER}]
498 1
        else:
499
            return None
500 1
501 1
        match = {}
502 1
        self.set_flow_table_group_owner(flow)
503 1
        flow['priority'] = settings.FLOW_PRIORITY
504
        flow['cookie'] = cookie
505 1
        flow['cookie_mask'] = cookie_mask
506 1
        match['dl_type'] = EtherType.LLDP
507
        if self.vlan_id:
508 1
            match['dl_vlan'] = self.vlan_id
509
        flow['match'] = match
510 1
511
        return flow
512 1
513
    @staticmethod
514 1
    def _unpack_non_empty(desired_class, data):
515 1
        """Unpack data using an instance of desired_class.
516
517 1
        Args:
518
            desired_class (class): The class to be used to unpack data.
519 1
            data (bytes): bytes to be unpacked.
520 1
521
        Return:
522 1
            An instance of desired_class class with data unpacked into it.
523 1
524 1
        Raises:
525 1
            UnpackException if the unpack could not be performed.
526 1
527 1
        """
528 1
        obj = desired_class()
529
530 1
        if hasattr(data, 'value'):
531 1
            data = data.value
532 1
533 1
        obj.unpack(data)
534 1
535 1
        return obj
536 1
537
    def _get_data(self, request: Request) -> list:
538 1
        """Get request data."""
539 1
        data = get_json_or_400(request, self.controller.loop)
540 1
        return data.get('interfaces', [])
541 1
542 1
    def _get_interfaces(self):
543 1
        """Get all interfaces."""
544 1
        interfaces = []
545 1
        for switch in list(self.controller.switches.values()):
546 1
            interfaces += list(switch.interfaces.values())
547
        return interfaces
548
549
    @staticmethod
550 1
    def _get_interfaces_dict(interfaces):
551 1
        """Return a dict of interfaces."""
552
        return {inter.id: inter for inter in interfaces}
553 1
554 1
    def _get_lldp_interfaces(self):
555
        """Get interfaces enabled to receive LLDP packets."""
556 1
        return [inter.id for inter in self._get_interfaces() if inter.lldp]
557 1
558 1
    @rest('v1/interfaces', methods=['GET'])
559 1
    async def get_lldp_interfaces(self, _request: Request) -> JSONResponse:
560 1
        """Return all the interfaces that have LLDP traffic enabled."""
561 1
        return JSONResponse({"interfaces": self._get_lldp_interfaces()})
562
563 1
    @rest('v1/interfaces/disable', methods=['POST'])
564 1
    def disable_lldp(self, request: Request) -> JSONResponse:
565 1
        """Disables an interface to receive LLDP packets."""
566 1
        interface_ids = self._get_data(request)
567 1
        error_list = []  # List of interfaces that were not activated.
568 1
        changed_interfaces = []
569
        interface_ids = filter(None, interface_ids)
570 1
        interfaces = self._get_interfaces()
571 1
        intfs = []
572 1
        if not interfaces:
573 1
            raise HTTPException(404, detail="No interfaces were found.")
574 1
        interfaces = self._get_interfaces_dict(interfaces)
575
        for id_ in interface_ids:
576
            interface = interfaces.get(id_)
577
            if interface:
578 1
                interface.lldp = False
579 1
                changed_interfaces.append(id_)
580
                intfs.append(interface)
581 1
            else:
582 1
                error_list.append(id_)
583
        if changed_interfaces:
584 1
            self.notify_lldp_change('disabled', changed_interfaces)
585 1
            intf_ids = [intf.id for intf in intfs]
586
            self.liveness_controller.disable_interfaces(intf_ids)
587 1
            self.liveness_manager.disable(*intfs)
588 1
            self.publish_liveness_status("disabled", intfs)
589 1
        if not error_list:
590
            return JSONResponse(
591
                "All the requested interfaces have been disabled.")
592 1
593 1
        # Return a list of interfaces that couldn't be disabled
594 1
        msg_error = "Some interfaces couldn't be found and deactivated: "
595
        return JSONResponse({msg_error: error_list}, status_code=400)
596
597 1
    @rest('v1/interfaces/enable', methods=['POST'])
598 1
    def enable_lldp(self, request: Request) -> JSONResponse:
599 1
        """Enable an interface to receive LLDP packets."""
600 1
        interface_ids = self._get_data(request)
601
        error_list = []  # List of interfaces that were not activated.
602 1
        changed_interfaces = []
603 1
        interface_ids = filter(None, interface_ids)
604
        interfaces = self._get_interfaces()
605 1
        if not interfaces:
606 1
            raise HTTPException(404, detail="No interfaces were found.")
607
        interfaces = self._get_interfaces_dict(interfaces)
608
        for id_ in interface_ids:
609 1
            interface = interfaces.get(id_)
610 1
            if interface:
611 1
                interface.lldp = True
612
                changed_interfaces.append(id_)
613
            else:
614 1
                error_list.append(id_)
615 1
        if changed_interfaces:
616 1
            self.notify_lldp_change('enabled', changed_interfaces)
617 1
        if not error_list:
618 1
            return JSONResponse(
619
                "All the requested interfaces have been enabled.")
620 1
621 1
        # Return a list of interfaces that couldn't be enabled
622
        msg_error = "Some interfaces couldn't be found and activated: "
623 1
        return JSONResponse({msg_error: error_list}, status_code=400)
624 1
625 1
    @rest("v1/liveness/enable", methods=["POST"])
626
    def enable_liveness(self, request: Request) -> JSONResponse:
627
        """Enable liveness link detection on interfaces."""
628
        intf_ids = self._get_data(request)
629
        if not intf_ids:
630
            raise HTTPException(400, "Interfaces payload is empty")
631
        interfaces = {intf.id: intf for intf in self._get_interfaces()}
632
        diff = set(intf_ids) - set(interfaces.keys())
633
        if diff:
634
            raise HTTPException(404, f"Interface IDs {diff} not found")
635
636
        intfs = [interfaces[_id] for _id in intf_ids]
637
        non_lldp = [intf.id for intf in intfs if not intf.lldp]
638
        if non_lldp:
639
            msg = f"Interface IDs {non_lldp} don't have LLDP enabled"
640
            raise HTTPException(400, msg)
641 1
        with self._liveness_ops_lock:
642 1
            self.liveness_controller.enable_interfaces(intf_ids)
643
            self.liveness_manager.enable(*intfs)
644
            self.publish_liveness_status("enabled", intfs)
645
        return JSONResponse({})
646
647
    @rest("v1/liveness/disable", methods=["POST"])
648 1
    def disable_liveness(self, request: Request) -> JSONResponse:
649
        """Disable liveness link detection on interfaces."""
650 1
        intf_ids = self._get_data(request)
651 1
        if not intf_ids:
652
            raise HTTPException(400, "Interfaces payload is empty")
653
654 1
        interfaces = {intf.id: intf for intf in self._get_interfaces()}
655 1
        diff = set(intf_ids) - set(interfaces.keys())
656
        if diff:
657
            raise HTTPException(404, f"Interface IDs {diff} not found")
658
659
        intfs = [interfaces[_id] for _id in intf_ids if _id in interfaces]
660
        with self._liveness_ops_lock:
661
            self.liveness_controller.disable_interfaces(intf_ids)
662
            self.liveness_manager.disable(*intfs)
663
            self.publish_liveness_status("disabled", intfs)
664
        return JSONResponse({})
665
666
    @rest("v1/liveness/", methods=["GET"])
667
    async def get_liveness_interfaces(self, request: Request) -> JSONResponse:
668
        """Get liveness interfaces."""
669
        args = request.query_params
670
        interface_id = args.get("interface_id")
671 1
        if interface_id:
672
            status, last_hello_at = self.liveness_manager.get_interface_status(
673 1
                interface_id
674 1
            )
675
            if not status:
676 1
                return {"interfaces": []}, 200
677
            body = {
678 1
                "interfaces": [
679 1
                    {
680
                        "id": interface_id,
681
                        "status": status,
682 1
                        "last_hello_at": last_hello_at,
683 1
                    }
684 1
                ]
685 1
            }
686
            return JSONResponse(body)
687
        interfaces = []
688
        for interface_id in list(self.liveness_manager.interfaces.keys()):
689 1
            status, last_hello_at = self.liveness_manager.get_interface_status(
690 1
                interface_id
691 1
            )
692
            interfaces.append({"id": interface_id, "status": status,
693
                              "last_hello_at": last_hello_at})
694 1
        return JSONResponse({"interfaces": interfaces})
695 1
696 1
    @rest("v1/liveness/pair", methods=["GET"])
697 1
    async def get_liveness_interface_pairs(self,
698
                                           _request: Request) -> JSONResponse:
699 1
        """Get liveness interface pairs."""
700
        pairs = []
701
        for entry in list(self.liveness_manager.liveness.values()):
702
            lsm = entry["lsm"]
703 1
            pair = {
704 1
                "interface_a": {
705 1
                    "id": entry["interface_a"].id,
706 1
                    "status": lsm.ilsm_a.state,
707
                    "last_hello_at": lsm.ilsm_a.last_hello_at,
708
                },
709 1
                "interface_b": {
710 1
                    "id": entry["interface_b"].id,
711
                    "status": lsm.ilsm_b.state,
712
                    "last_hello_at": lsm.ilsm_b.last_hello_at,
713
                },
714 1
                "status": lsm.state
715 1
            }
716
            pairs.append(pair)
717 1
        return JSONResponse({"pairs": pairs})
718 1
719 1
    @rest('v1/polling_time', methods=['GET'])
720
    async def get_time(self, _request: Request) -> JSONResponse:
721
        """Get LLDP polling time in seconds."""
722 1
        return JSONResponse({"polling_time": self.polling_time})
723 1
724 1
    @rest('v1/polling_time', methods=['POST'])
725 1
    async def set_time(self, request: Request) -> JSONResponse:
726
        """Set LLDP polling time."""
727 1
        # pylint: disable=attribute-defined-outside-init
728
        try:
729
            payload = await aget_json_or_400(request)
730
            polling_time = int(payload['polling_time'])
731
            if polling_time <= 0:
732
                msg = f"invalid polling_time {polling_time}, " \
733
                        "must be greater than zero"
734
                raise HTTPException(400, detail=msg)
735
            self.polling_time = polling_time
736
            self.execute_as_loop(self.polling_time)
737
            log.info("Polling time has been updated to %s"
738
                     " second(s), but this change will not be saved"
739
                     " permanently.", self.polling_time)
740
            return JSONResponse("Polling time has been updated.")
741
        except (ValueError, KeyError) as error:
742
            msg = f"This operation is not completed: {error}"
743
            raise HTTPException(400, detail=msg) from error
744
745
    def set_flow_table_group_owner(self,
746
                                   flow: dict,
747
                                   group: str = "base") -> dict:
748
        """Set owner, table_group and table_id"""
749
        flow["table_id"] = self.table_group[group]
750
        flow["owner"] = "of_lldp"
751
        flow["table_group"] = group
752
        return flow
753
754
    # pylint: disable=attribute-defined-outside-init
755
    @alisten_to("kytos/of_multi_table.enable_table")
756
    async def on_table_enabled(self, event):
757
        """Handle a recently table enabled.
758
        of_lldp only allows "base" as flow group
759
        """
760
        table_group = event.content.get("of_lldp", None)
761
        if not table_group:
762
            return
763
        for group in table_group:
764
            if group not in settings.TABLE_GROUP_ALLOWED:
765
                log.error(f'The table group "{group}" is not allowed for '
766
                          f'of_lldp. Allowed table groups are '
767
                          f'{settings.TABLE_GROUP_ALLOWED}')
768
                return
769
        self.table_group.update(table_group)
770
        content = {"group_table": self.table_group}
771
        event_out = KytosEvent(name="kytos/of_lldp.enable_table",
772
                               content=content)
773
        await self.controller.buffers.app.aput(event_out)
774