Code Duplication    Length = 27-27 lines in 2 locations

main.py 2 locations

@@ 366-392 (lines=27) @@
363
        self.notify_topology_update()
364
        return jsonify("Operation successful"), 200
365
366
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
367
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])
368
    def disable_interface(self, interface_disable_id=None, dpid=None):
369
        """Administratively disable interfaces in the topology."""
370
        if dpid is None:
371
            dpid = ":".join(interface_disable_id.split(":")[:-1])
372
        try:
373
            switch = self.controller.switches[dpid]
374
        except KeyError as exc:
375
            return jsonify(f"Switch not found: {exc}"), 404
376
377
        if interface_disable_id:
378
            interface_number = int(interface_disable_id.split(":")[-1])
379
380
            try:
381
                interface = switch.interfaces[interface_number]
382
                self.topo_controller.disable_interface(interface.id)
383
                interface.disable()
384
            except KeyError:
385
                msg = f"Switch {dpid} interface {interface_number} not found"
386
                return jsonify(msg), 404
387
        else:
388
            for interface in switch.interfaces.copy().values():
389
                interface.disable()
390
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
391
        self.notify_topology_update()
392
        return jsonify("Operation successful"), 200
393
394
    @rest('v3/interfaces/<interface_id>/metadata')
395
    def get_interface_metadata(self, interface_id):
@@ 338-364 (lines=27) @@
335
336
        return jsonify({'interfaces': interfaces})
337
338
    @rest('v3/interfaces/switch/<dpid>/enable', methods=['POST'])
339
    @rest('v3/interfaces/<interface_enable_id>/enable', methods=['POST'])
340
    def enable_interface(self, interface_enable_id=None, dpid=None):
341
        """Administratively enable interfaces in the topology."""
342
        if dpid is None:
343
            dpid = ":".join(interface_enable_id.split(":")[:-1])
344
        try:
345
            switch = self.controller.switches[dpid]
346
        except KeyError as exc:
347
            return jsonify(f"Switch not found: {exc}"), 404
348
349
        if interface_enable_id:
350
            interface_number = int(interface_enable_id.split(":")[-1])
351
352
            try:
353
                interface = switch.interfaces[interface_number]
354
                self.topo_controller.enable_interface(interface.id)
355
                interface.enable()
356
            except KeyError:
357
                msg = f"Switch {dpid} interface {interface_number} not found"
358
                return jsonify(msg), 404
359
        else:
360
            for interface in switch.interfaces.copy().values():
361
                interface.enable()
362
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
363
        self.notify_topology_update()
364
        return jsonify("Operation successful"), 200
365
366
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
367
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])