Code Duplication    Length = 31-31 lines in 2 locations

main.py 2 locations

@@ 369-399 (lines=31) @@
366
        self.notify_topology_update()
367
        return JSONResponse("Operation successful")
368
369
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
370
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])
371
    def disable_interface(self, request: Request) -> JSONResponse:
372
        """Administratively disable interfaces in the topology."""
373
        interface_disable_id = request.path_params.get("interface_disable_id")
374
        dpid = request.path_params.get("dpid")
375
        if dpid is None:
376
            dpid = ":".join(interface_disable_id.split(":")[:-1])
377
        try:
378
            switch = self.controller.switches[dpid]
379
        except KeyError:
380
            raise HTTPException(404, detail="Switch not found")
381
382
        if interface_disable_id:
383
            interface_number = int(interface_disable_id.split(":")[-1])
384
385
            try:
386
                interface = switch.interfaces[interface_number]
387
                self.topo_controller.disable_interface(interface.id)
388
                interface.disable()
389
                self.notify_interface_link_status(interface, "link disabled")
390
            except KeyError:
391
                msg = f"Switch {dpid} interface {interface_number} not found"
392
                raise HTTPException(404, detail=msg)
393
        else:
394
            for interface in switch.interfaces.copy().values():
395
                interface.disable()
396
                self.notify_interface_link_status(interface, "link disabled")
397
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
398
        self.notify_topology_update()
399
        return JSONResponse("Operation successful")
400
401
    @rest('v3/interfaces/{interface_id}/metadata')
402
    def get_interface_metadata(self, request: Request) -> JSONResponse:
@@ 337-367 (lines=31) @@
334
335
        return JSONResponse({'interfaces': interfaces})
336
337
    @rest('v3/interfaces/switch/{dpid}/enable', methods=['POST'])
338
    @rest('v3/interfaces/{interface_enable_id}/enable', methods=['POST'])
339
    def enable_interface(self, request: Request) -> JSONResponse:
340
        """Administratively enable interfaces in the topology."""
341
        interface_enable_id = request.path_params.get("interface_enable_id")
342
        dpid = request.path_params.get("dpid")
343
        if dpid is None:
344
            dpid = ":".join(interface_enable_id.split(":")[:-1])
345
        try:
346
            switch = self.controller.switches[dpid]
347
        except KeyError:
348
            raise HTTPException(404, detail="Switch not found")
349
350
        if interface_enable_id:
351
            interface_number = int(interface_enable_id.split(":")[-1])
352
353
            try:
354
                interface = switch.interfaces[interface_number]
355
                self.topo_controller.enable_interface(interface.id)
356
                interface.enable()
357
                self.notify_interface_link_status(interface, "link enabled")
358
            except KeyError:
359
                msg = f"Switch {dpid} interface {interface_number} not found"
360
                raise HTTPException(404, detail=msg)
361
        else:
362
            for interface in switch.interfaces.copy().values():
363
                interface.enable()
364
                self.notify_interface_link_status(interface, "link enabled")
365
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
366
        self.notify_topology_update()
367
        return JSONResponse("Operation successful")
368
369
    @rest('v3/interfaces/switch/{dpid}/disable', methods=['POST'])
370
    @rest('v3/interfaces/{interface_disable_id}/disable', methods=['POST'])