Code Duplication    Length = 29-29 lines in 2 locations

main.py 2 locations

@@ 375-403 (lines=29) @@
372
        self.notify_topology_update()
373
        return jsonify("Operation successful"), 200
374
375
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
376
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])
377
    def disable_interface(self, interface_disable_id=None, dpid=None):
378
        """Administratively disable interfaces in the topology."""
379
        if dpid is None:
380
            dpid = ":".join(interface_disable_id.split(":")[:-1])
381
        try:
382
            switch = self.controller.switches[dpid]
383
        except KeyError as exc:
384
            return jsonify(f"Switch not found: {exc}"), 404
385
386
        if interface_disable_id:
387
            interface_number = int(interface_disable_id.split(":")[-1])
388
389
            try:
390
                interface = switch.interfaces[interface_number]
391
                self.topo_controller.disable_interface(interface.id)
392
                interface.disable()
393
                self.notify_interface_link_status(interface, "link disabled")
394
            except KeyError:
395
                msg = f"Switch {dpid} interface {interface_number} not found"
396
                return jsonify(msg), 404
397
        else:
398
            for interface in switch.interfaces.copy().values():
399
                interface.disable()
400
                self.notify_interface_link_status(interface, "link disabled")
401
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
402
        self.notify_topology_update()
403
        return jsonify("Operation successful"), 200
404
405
    @rest('v3/interfaces/<interface_id>/metadata')
406
    def get_interface_metadata(self, interface_id):
@@ 345-373 (lines=29) @@
342
343
        return jsonify({'interfaces': interfaces})
344
345
    @rest('v3/interfaces/switch/<dpid>/enable', methods=['POST'])
346
    @rest('v3/interfaces/<interface_enable_id>/enable', methods=['POST'])
347
    def enable_interface(self, interface_enable_id=None, dpid=None):
348
        """Administratively enable interfaces in the topology."""
349
        if dpid is None:
350
            dpid = ":".join(interface_enable_id.split(":")[:-1])
351
        try:
352
            switch = self.controller.switches[dpid]
353
        except KeyError as exc:
354
            return jsonify(f"Switch not found: {exc}"), 404
355
356
        if interface_enable_id:
357
            interface_number = int(interface_enable_id.split(":")[-1])
358
359
            try:
360
                interface = switch.interfaces[interface_number]
361
                self.topo_controller.enable_interface(interface.id)
362
                interface.enable()
363
                self.notify_interface_link_status(interface, "link enabled")
364
            except KeyError:
365
                msg = f"Switch {dpid} interface {interface_number} not found"
366
                return jsonify(msg), 404
367
        else:
368
            for interface in switch.interfaces.copy().values():
369
                interface.enable()
370
                self.notify_interface_link_status(interface, "link enabled")
371
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
372
        self.notify_topology_update()
373
        return jsonify("Operation successful"), 200
374
375
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
376
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])