Code Duplication    Length = 26-31 lines in 2 locations

models/evc.py 2 locations

@@ 1266-1296 (lines=31) @@
1263
1264
        return uni_flows
1265
1266
    @staticmethod
1267
    @retry(
1268
        stop=stop_after_attempt(3),
1269
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1270
        retry=retry_if_exception_type(FlowModException),
1271
        before_sleep=before_sleep,
1272
        reraise=True,
1273
    )
1274
    def _send_flow_mods(data_content: dict, command="install", force=False):
1275
        """Send a flow_mod list to a specific switch.
1276
1277
        Args:
1278
            dpid(str): The target of flows (i.e. Switch.id).
1279
            flow_mods(dict): Python dictionary with flow_mods.
1280
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1281
            force(bool): True to send via consistency check in case of errors
1282
1283
        """
1284
        endpoint = f"{settings.MANAGER_URL}/flows"
1285
        data_content["force"] = force
1286
        try:
1287
            if command == "install":
1288
                res = httpx.post(endpoint, json=data_content, timeout=30)
1289
            elif command == "delete":
1290
                res = httpx.request(
1291
                    "DELETE", endpoint, json=data_content, timeout=30
1292
                )
1293
        except httpx.RequestError as err:
1294
            raise FlowModException(str(err)) from err
1295
        if res.is_server_error or res.status_code >= 400:
1296
            raise FlowModException(res.text)
1297
1298
    @staticmethod
1299
    @retry(
@@ 1298-1323 (lines=26) @@
1295
        if res.is_server_error or res.status_code >= 400:
1296
            raise FlowModException(res.text)
1297
1298
    @staticmethod
1299
    @retry(
1300
        stop=stop_after_attempt(3),
1301
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1302
        retry=retry_if_exception_type(FlowModException),
1303
        before_sleep=before_sleep,
1304
        reraise=True,
1305
    )
1306
    def _send_flow_mods_by_switch(
1307
        flows_dict: dict[str, list],
1308
        command: str,
1309
        force=False
1310
    ):
1311
        """Send flow mods"""
1312
        endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1313
        try:
1314
            if command == "install":
1315
                res = httpx.post(endpoint, json=flows_dict, timeout=30)
1316
            elif command == "delete":
1317
                res = httpx.request(
1318
                    "DELETE", endpoint, json=flows_dict, timeout=30
1319
                )
1320
        except httpx.RequestError as err:
1321
            raise FlowModException(str(err)) from err
1322
        if res.is_server_error or res.status_code >= 400:
1323
            raise FlowModException(res.text)
1324
1325
    def get_cookie(self):
1326
        """Return the cookie integer from evc id."""