Code Duplication    Length = 31-31 lines in 2 locations

main.py 2 locations

@@ 383-413 (lines=31) @@
380
        self.notify_topology_update()
381
        return JSONResponse("Operation successful")
382
383
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
384
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])
385
    def disable_interface(self, request: Request) -> JSONResponse:
386
        """Administratively disable interfaces in the topology."""
387
        interface_disable_id = request.path_params.get("interface_disable_id")
388
        dpid = request.path_params.get("dpid")
389
        if dpid is None:
390
            dpid = ":".join(interface_disable_id.split(":")[:-1])
391
        try:
392
            switch = self.controller.switches[dpid]
393
        except KeyError:
394
            raise HTTPException(404, detail="Switch not found")
395
396
        if interface_disable_id:
397
            interface_number = int(interface_disable_id.split(":")[-1])
398
399
            try:
400
                interface = switch.interfaces[interface_number]
401
                self.topo_controller.disable_interface(interface.id)
402
                interface.disable()
403
                self.notify_interface_link_status(interface, "link disabled")
404
            except KeyError:
405
                msg = f"Switch {dpid} interface {interface_number} not found"
406
                raise HTTPException(404, detail=msg)
407
        else:
408
            for interface in switch.interfaces.copy().values():
409
                interface.disable()
410
                self.notify_interface_link_status(interface, "link disabled")
411
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
412
        self.notify_topology_update()
413
        return JSONResponse("Operation successful")
414
415
    @rest('v3/interfaces/{interface_id}/metadata')
416
    def get_interface_metadata(self, request: Request) -> JSONResponse:
@@ 351-381 (lines=31) @@
348
349
        return JSONResponse({'interfaces': interfaces})
350
351
    @rest('v3/interfaces/switch/{dpid}/enable', methods=['POST'])
352
    @rest('v3/interfaces/{interface_enable_id}/enable', methods=['POST'])
353
    def enable_interface(self, request: Request) -> JSONResponse:
354
        """Administratively enable interfaces in the topology."""
355
        interface_enable_id = request.path_params.get("interface_enable_id")
356
        dpid = request.path_params.get("dpid")
357
        if dpid is None:
358
            dpid = ":".join(interface_enable_id.split(":")[:-1])
359
        try:
360
            switch = self.controller.switches[dpid]
361
        except KeyError:
362
            raise HTTPException(404, detail="Switch not found")
363
364
        if interface_enable_id:
365
            interface_number = int(interface_enable_id.split(":")[-1])
366
367
            try:
368
                interface = switch.interfaces[interface_number]
369
                self.topo_controller.enable_interface(interface.id)
370
                interface.enable()
371
                self.notify_interface_link_status(interface, "link enabled")
372
            except KeyError:
373
                msg = f"Switch {dpid} interface {interface_number} not found"
374
                raise HTTPException(404, detail=msg)
375
        else:
376
            for interface in switch.interfaces.copy().values():
377
                interface.enable()
378
                self.notify_interface_link_status(interface, "link enabled")
379
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
380
        self.notify_topology_update()
381
        return JSONResponse("Operation successful")
382
383
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
384
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])