Code Duplication    Length = 19-20 lines in 2 locations

main.py 2 locations

@@ 651-670 (lines=20) @@
648
        """
649
        return JSONResponse(self._get_links_dict())
650
651
    @rest('v3/links/{link_id}/enable', methods=['POST'])
652
    def enable_link(self, request: Request) -> JSONResponse:
653
        """Administratively enable a link in the topology."""
654
        link_id = request.path_params["link_id"]
655
        try:
656
            with self._links_lock:
657
                link = self.links[link_id]
658
                self.topo_controller.enable_link(link_id)
659
                if (not link.is_enabled() and link.endpoint_a.is_enabled()
660
                        and link.endpoint_b.is_enabled()):
661
                    self.notify_link_status(link, "enabled")
662
                link.enable()
663
        except KeyError:
664
            raise HTTPException(404, detail="Link not found")
665
        self.notify_link_status_change(
666
            self.links[link_id],
667
            reason='link enabled'
668
        )
669
        self.notify_topology_update()
670
        return JSONResponse("Operation successful", status_code=201)
671
672
    @rest('v3/links/{link_id}/disable', methods=['POST'])
673
    def disable_link(self, request: Request) -> JSONResponse:
@@ 672-690 (lines=19) @@
669
        self.notify_topology_update()
670
        return JSONResponse("Operation successful", status_code=201)
671
672
    @rest('v3/links/{link_id}/disable', methods=['POST'])
673
    def disable_link(self, request: Request) -> JSONResponse:
674
        """Administratively disable a link in the topology."""
675
        link_id = request.path_params["link_id"]
676
        try:
677
            with self._links_lock:
678
                link = self.links[link_id]
679
                if link.is_enabled():
680
                    self.notify_link_status(link, "disabled")
681
                self.topo_controller.disable_link(link_id)
682
                link.disable()
683
        except KeyError:
684
            raise HTTPException(404, detail="Link not found")
685
        self.notify_link_status_change(
686
            self.links[link_id],
687
            reason='link disabled'
688
        )
689
        self.notify_topology_update()
690
        return JSONResponse("Operation successful", status_code=201)
691
692
    def notify_link_status(self, link: Link, action: str):
693
        """Send a KytosEvent whether a link status (enabled/disabled)