Code Duplication    Length = 46-47 lines in 2 locations

main.py 2 locations

@@ 60-106 (lines=47) @@
57
        If you have some cleanup procedure, insert it here.
58
        """
59
60
    @rest("v1/evc/enable", methods=["POST"])
61
    async def enable_telemetry(self, request: Request) -> JSONResponse:
62
        """REST to enable INT flows on EVCs.
63
64
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
65
        """
66
67
        try:
68
            content = await aget_json_or_400(request)
69
            evc_ids = content["evc_ids"]
70
            force = content.get("force", False)
71
            if not isinstance(force, bool):
72
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
73
        except (TypeError, KeyError):
74
            raise HTTPException(400, detail=f"Invalid payload: {content}")
75
76
        try:
77
            evcs = await get_evcs() if len(evc_ids) != 1 else await get_evc(evc_ids[0])
78
        except RetryError as exc:
79
            exc_error = str(exc.last_attempt.exception())
80
            log.error(exc_error)
81
            raise HTTPException(503, detail=exc_error)
82
83
        if evc_ids:
84
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
85
        else:
86
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
87
            if not evcs:
88
                # There's no non-INT EVCs to get enabled.
89
                return JSONResponse({})
90
91
        try:
92
            await self.int_manager.enable_int(evcs, force)
93
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
94
            raise HTTPException(404, detail=str(exc))
95
        except (EVCHasINT, ProxyPortStatusNotUP) as exc:
96
            raise HTTPException(400, detail=str(exc))
97
        except RetryError as exc:
98
            exc_error = str(exc.last_attempt.exception())
99
            log.error(exc_error)
100
            raise HTTPException(503, detail=exc_error)
101
        except UnrecoverableError as exc:
102
            exc_error = str(exc)
103
            log.error(exc_error)
104
            raise HTTPException(500, detail=exc_error)
105
106
        return JSONResponse({}, status_code=201)
107
108
    @rest("v1/evc/disable", methods=["POST"])
109
    async def disable_telemetry(self, request: Request) -> JSONResponse:
@@ 108-153 (lines=46) @@
105
106
        return JSONResponse({}, status_code=201)
107
108
    @rest("v1/evc/disable", methods=["POST"])
109
    async def disable_telemetry(self, request: Request) -> JSONResponse:
110
        """REST to disable/remove INT flows for an EVC_ID
111
112
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
113
        """
114
        try:
115
            content = await aget_json_or_400(request)
116
            evc_ids = content["evc_ids"]
117
            force = content.get("force", False)
118
            if not isinstance(force, bool):
119
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
120
        except (TypeError, KeyError):
121
            raise HTTPException(400, detail=f"Invalid payload: {content}")
122
123
        try:
124
            evcs = await get_evcs() if len(evc_ids) != 1 else await get_evc(evc_ids[0])
125
        except RetryError as exc:
126
            exc_error = str(exc.last_attempt.exception())
127
            log.error(exc_error)
128
            raise HTTPException(503, detail=exc_error)
129
130
        if evc_ids:
131
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
132
        else:
133
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
134
            if not evcs:
135
                # There's no INT EVCs to get disabled.
136
                return JSONResponse({})
137
138
        try:
139
            await self.int_manager.disable_int(evcs, force)
140
        except EVCNotFound as exc:
141
            raise HTTPException(404, detail=str(exc))
142
        except EVCHasNoINT as exc:
143
            raise HTTPException(400, detail=str(exc))
144
        except RetryError as exc:
145
            exc_error = str(exc.last_attempt.exception())
146
            log.error(exc_error)
147
            raise HTTPException(503, detail=exc_error)
148
        except UnrecoverableError as exc:
149
            exc_error = str(exc)
150
            log.error(exc_error)
151
            raise HTTPException(500, detail=exc_error)
152
153
        return JSONResponse({})
154
155
    @rest("v1/evc")
156
    def get_evcs(self, _request: Request) -> JSONResponse: