Code Duplication    Length = 31-31 lines in 2 locations

main.py 2 locations

@@ 385-415 (lines=31) @@
382
        self.notify_topology_update()
383
        return JSONResponse("Operation successful")
384
385
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
386
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])
387
    def disable_interface(self, request: Request) -> JSONResponse:
388
        """Administratively disable interfaces in the topology."""
389
        interface_disable_id = request.path_params.get("interface_disable_id")
390
        dpid = request.path_params.get("dpid")
391
        if dpid is None:
392
            dpid = ":".join(interface_disable_id.split(":")[:-1])
393
        try:
394
            switch = self.controller.switches[dpid]
395
        except KeyError:
396
            raise HTTPException(404, detail="Switch not found")
397
398
        if interface_disable_id:
399
            interface_number = int(interface_disable_id.split(":")[-1])
400
401
            try:
402
                interface = switch.interfaces[interface_number]
403
                self.topo_controller.disable_interface(interface.id)
404
                interface.disable()
405
                self.notify_interface_link_status(interface, "link disabled")
406
            except KeyError:
407
                msg = f"Switch {dpid} interface {interface_number} not found"
408
                raise HTTPException(404, detail=msg)
409
        else:
410
            for interface in switch.interfaces.copy().values():
411
                interface.disable()
412
                self.notify_interface_link_status(interface, "link disabled")
413
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
414
        self.notify_topology_update()
415
        return JSONResponse("Operation successful")
416
417
    @rest('v3/interfaces/{interface_id}/metadata')
418
    def get_interface_metadata(self, request: Request) -> JSONResponse:
@@ 353-383 (lines=31) @@
350
351
        return JSONResponse({'interfaces': interfaces})
352
353
    @rest('v3/interfaces/switch/{dpid}/enable', methods=['POST'])
354
    @rest('v3/interfaces/{interface_enable_id}/enable', methods=['POST'])
355
    def enable_interface(self, request: Request) -> JSONResponse:
356
        """Administratively enable interfaces in the topology."""
357
        interface_enable_id = request.path_params.get("interface_enable_id")
358
        dpid = request.path_params.get("dpid")
359
        if dpid is None:
360
            dpid = ":".join(interface_enable_id.split(":")[:-1])
361
        try:
362
            switch = self.controller.switches[dpid]
363
        except KeyError:
364
            raise HTTPException(404, detail="Switch not found")
365
366
        if interface_enable_id:
367
            interface_number = int(interface_enable_id.split(":")[-1])
368
369
            try:
370
                interface = switch.interfaces[interface_number]
371
                self.topo_controller.enable_interface(interface.id)
372
                interface.enable()
373
                self.notify_interface_link_status(interface, "link enabled")
374
            except KeyError:
375
                msg = f"Switch {dpid} interface {interface_number} not found"
376
                raise HTTPException(404, detail=msg)
377
        else:
378
            for interface in switch.interfaces.copy().values():
379
                interface.enable()
380
                self.notify_interface_link_status(interface, "link enabled")
381
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
382
        self.notify_topology_update()
383
        return JSONResponse("Operation successful")
384
385
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
386
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])