Code Duplication    Length = 31-31 lines in 2 locations

main.py 2 locations

@@ 386-416 (lines=31) @@
383
        self.notify_topology_update()
384
        return JSONResponse("Operation successful")
385
386
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
387
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])
388
    def disable_interface(self, request: Request) -> JSONResponse:
389
        """Administratively disable interfaces in the topology."""
390
        interface_disable_id = request.path_params.get("interface_disable_id")
391
        dpid = request.path_params.get("dpid")
392
        if dpid is None:
393
            dpid = ":".join(interface_disable_id.split(":")[:-1])
394
        try:
395
            switch = self.controller.switches[dpid]
396
        except KeyError:
397
            raise HTTPException(404, detail="Switch not found")
398
399
        if interface_disable_id:
400
            interface_number = int(interface_disable_id.split(":")[-1])
401
402
            try:
403
                interface = switch.interfaces[interface_number]
404
                self.topo_controller.disable_interface(interface.id)
405
                interface.disable()
406
                self.notify_interface_link_status(interface, "link disabled")
407
            except KeyError:
408
                msg = f"Switch {dpid} interface {interface_number} not found"
409
                raise HTTPException(404, detail=msg)
410
        else:
411
            for interface in switch.interfaces.copy().values():
412
                interface.disable()
413
                self.notify_interface_link_status(interface, "link disabled")
414
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
415
        self.notify_topology_update()
416
        return JSONResponse("Operation successful")
417
418
    @rest('v3/interfaces/{interface_id}/metadata')
419
    def get_interface_metadata(self, request: Request) -> JSONResponse:
@@ 354-384 (lines=31) @@
351
352
        return JSONResponse({'interfaces': interfaces})
353
354
    @rest('v3/interfaces/switch/{dpid}/enable', methods=['POST'])
355
    @rest('v3/interfaces/{interface_enable_id}/enable', methods=['POST'])
356
    def enable_interface(self, request: Request) -> JSONResponse:
357
        """Administratively enable interfaces in the topology."""
358
        interface_enable_id = request.path_params.get("interface_enable_id")
359
        dpid = request.path_params.get("dpid")
360
        if dpid is None:
361
            dpid = ":".join(interface_enable_id.split(":")[:-1])
362
        try:
363
            switch = self.controller.switches[dpid]
364
        except KeyError:
365
            raise HTTPException(404, detail="Switch not found")
366
367
        if interface_enable_id:
368
            interface_number = int(interface_enable_id.split(":")[-1])
369
370
            try:
371
                interface = switch.interfaces[interface_number]
372
                self.topo_controller.enable_interface(interface.id)
373
                interface.enable()
374
                self.notify_interface_link_status(interface, "link enabled")
375
            except KeyError:
376
                msg = f"Switch {dpid} interface {interface_number} not found"
377
                raise HTTPException(404, detail=msg)
378
        else:
379
            for interface in switch.interfaces.copy().values():
380
                interface.enable()
381
                self.notify_interface_link_status(interface, "link enabled")
382
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
383
        self.notify_topology_update()
384
        return JSONResponse("Operation successful")
385
386
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
387
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])