Code Duplication    Length = 38-39 lines in 2 locations

models/evc.py 1 location

@@ 1344-1382 (lines=39) @@
1341
1342
        return uni_flows
1343
1344
    @staticmethod
1345
    @retry(
1346
        stop=stop_after_attempt(3),
1347
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1348
        retry=retry_if_exception_type(FlowModException),
1349
        before_sleep=before_sleep,
1350
        reraise=True,
1351
    )
1352
    def _send_flow_mods(
1353
        data_content: dict,
1354
        command="install",
1355
        force=False,
1356
        by_switch=False
1357
    ):
1358
        """Send a flow_mod list to a specific switch.
1359
1360
        Args:
1361
            dpid(str): The target of flows (i.e. Switch.id).
1362
            flow_mods(dict): Python dictionary with flow_mods.
1363
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1364
            force(bool): True to send via consistency check in case of errors.
1365
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1366
        """
1367
        if by_switch:
1368
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1369
        else:
1370
            endpoint = f"{settings.MANAGER_URL}/flows"
1371
            data_content["force"] = force
1372
        try:
1373
            if command == "install":
1374
                res = httpx.post(endpoint, json=data_content, timeout=30)
1375
            elif command == "delete":
1376
                res = httpx.request(
1377
                    "DELETE", endpoint, json=data_content, timeout=30
1378
                )
1379
        except httpx.RequestError as err:
1380
            raise FlowModException(str(err)) from err
1381
        if res.is_server_error or res.status_code >= 400:
1382
            raise FlowModException(res.text)
1383
1384
    def get_cookie(self):
1385
        """Return the cookie integer from evc id."""

utils.py 1 location

@@ 208-245 (lines=38) @@
205
    return dpid_flows
206
207
208
@retry(
209
    stop=stop_after_attempt(3),
210
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
211
    retry=retry_if_exception_type(FlowModException),
212
    before_sleep=before_sleep,
213
    reraise=True,
214
)
215
def send_flow_mods_http(
216
    data_content: dict,
217
    command="install",
218
    force=False,
219
    by_switch=False
220
):
221
    """Send a flow_mod list to a specific switch.
222
223
    Args:
224
        dpid(str): The target of flows (i.e. Switch.id).
225
        flow_mods(dict): Python dictionary with flow_mods.
226
        command(str): By default is 'flows'. To remove a flow is 'remove'.
227
        force(bool): True to send via consistency check in case of errors.
228
        by_switch(bool): True to send to 'flows_by_switch' request instead.
229
    """
230
    if by_switch:
231
        endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
232
    else:
233
        endpoint = f"{settings.MANAGER_URL}/flows"
234
        data_content["force"] = force
235
    try:
236
        if command == "install":
237
            res = httpx.post(endpoint, json=data_content, timeout=30)
238
        elif command == "delete":
239
            res = httpx.request(
240
                "DELETE", endpoint, json=data_content, timeout=30
241
            )
242
    except httpx.RequestError as err:
243
        raise FlowModException(str(err)) from err
244
    if res.is_server_error or res.status_code >= 400:
245
        raise FlowModException(res.text)
246