Code Duplication    Length = 31-31 lines in 2 locations

main.py 2 locations

@@ 428-458 (lines=31) @@
425
        self.notify_topology_update()
426
        return JSONResponse("Operation successful")
427
428
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
429
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])
430
    def disable_interface(self, request: Request) -> JSONResponse:
431
        """Administratively disable interfaces in the topology."""
432
        interface_disable_id = request.path_params.get("interface_disable_id")
433
        dpid = request.path_params.get("dpid")
434
        if dpid is None:
435
            dpid = ":".join(interface_disable_id.split(":")[:-1])
436
        try:
437
            switch = self.controller.switches[dpid]
438
        except KeyError:
439
            raise HTTPException(404, detail="Switch not found")
440
441
        if interface_disable_id:
442
            interface_number = int(interface_disable_id.split(":")[-1])
443
444
            try:
445
                interface = switch.interfaces[interface_number]
446
                self.topo_controller.disable_interface(interface.id)
447
                interface.disable()
448
                self.notify_interface_link_status(interface, "link disabled")
449
            except KeyError:
450
                msg = f"Switch {dpid} interface {interface_number} not found"
451
                raise HTTPException(404, detail=msg)
452
        else:
453
            for interface in switch.interfaces.copy().values():
454
                interface.disable()
455
                self.notify_interface_link_status(interface, "link disabled")
456
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
457
        self.notify_topology_update()
458
        return JSONResponse("Operation successful")
459
460
    @rest('v3/interfaces/{interface_id}/metadata')
461
    def get_interface_metadata(self, request: Request) -> JSONResponse:
@@ 396-426 (lines=31) @@
393
394
        return JSONResponse({'interfaces': interfaces})
395
396
    @rest('v3/interfaces/switch/{dpid}/enable', methods=['POST'])
397
    @rest('v3/interfaces/{interface_enable_id}/enable', methods=['POST'])
398
    def enable_interface(self, request: Request) -> JSONResponse:
399
        """Administratively enable interfaces in the topology."""
400
        interface_enable_id = request.path_params.get("interface_enable_id")
401
        dpid = request.path_params.get("dpid")
402
        if dpid is None:
403
            dpid = ":".join(interface_enable_id.split(":")[:-1])
404
        try:
405
            switch = self.controller.switches[dpid]
406
        except KeyError:
407
            raise HTTPException(404, detail="Switch not found")
408
409
        if interface_enable_id:
410
            interface_number = int(interface_enable_id.split(":")[-1])
411
412
            try:
413
                interface = switch.interfaces[interface_number]
414
                self.topo_controller.enable_interface(interface.id)
415
                interface.enable()
416
                self.notify_interface_link_status(interface, "link enabled")
417
            except KeyError:
418
                msg = f"Switch {dpid} interface {interface_number} not found"
419
                raise HTTPException(404, detail=msg)
420
        else:
421
            for interface in switch.interfaces.copy().values():
422
                interface.enable()
423
                self.notify_interface_link_status(interface, "link enabled")
424
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
425
        self.notify_topology_update()
426
        return JSONResponse("Operation successful")
427
428
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
429
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])