Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 522-563 (lines=42) @@
519
        mock_save_flow.assert_called()
520
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
521
522
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
523
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
524
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
525
    def test_no_strict_delete_with_ipv4_fail(self, *args):
526
        """Test the non-strict matching method.
527
528
        Test non-strict Fail case matching to delete a Flow using IPv4.
529
        """
530
        (mock_save_flow, _, _) = args
531
        dpid = "00:00:00:00:00:00:00:01"
532
        switch = get_switch_mock(dpid, 0x04)
533
        switch.id = dpid
534
        stored_flow = {
535
            "command": "add",
536
            "flow": {
537
                "priority": 10,
538
                "cookie": 84114904,
539
                "match": {
540
                    "ipv4_src": "192.168.2.1",
541
                    "ipv4_dst": "192.168.0.2",
542
                },
543
                "actions": [],
544
            },
545
        }
546
        stored_flow2 = {
547
            "command": "add",
548
            "flow": {
549
                "actions": [],
550
                "cookie": 4961162389751548787,
551
                "match": {"in_port": 2},
552
            },
553
        }
554
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
555
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
556
        command = "delete"
557
        self.napp.stored_flows = {dpid: flow_list}
558
559
        self.napp._store_changed_flows(command, flow_to_install, switch)
560
        mock_save_flow.assert_called()
561
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
562
563
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
564
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
565
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
566
    def test_no_strict_delete_of10(self, *args):
@@ 481-522 (lines=42) @@
478
        mock_save_flow.assert_called()
479
        self.assertEqual(len(self.napp.stored_flows), 1)
480
481
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
482
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
483
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
484
    def test_no_strict_delete_with_ipv4(self, *args):
485
        """Test the non-strict matching method.
486
487
        Test non-strict matching to delete a Flow using IPv4.
488
        """
489
        (mock_save_flow, _, _) = args
490
        dpid = "00:00:00:00:00:00:00:01"
491
        switch = get_switch_mock(dpid, 0x04)
492
        switch.id = dpid
493
        stored_flow = {
494
            "command": "add",
495
            "flow": {
496
                "priority": 10,
497
                "cookie": 84114904,
498
                "match": {
499
                    "ipv4_src": "192.168.1.120",
500
                    "ipv4_dst": "192.168.0.2",
501
                },
502
                "actions": [],
503
            },
504
        }
505
        stored_flow2 = {
506
            "command": "add",
507
            "flow": {
508
                "actions": [],
509
                "cookie": 4961162389751548787,
510
                "match": {"in_port": 2},
511
            },
512
        }
513
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
514
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
515
        command = "delete"
516
        self.napp.stored_flows = {dpid: flow_list}
517
518
        self.napp._store_changed_flows(command, flow_to_install, switch)
519
        mock_save_flow.assert_called()
520
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
521
522
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
523
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
524
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
525
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 441-489 (lines=49) @@
438
        self.napp.check_storehouse_consistency(switch)
439
        mock_install_flows.assert_called()
440
441
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
442
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
443
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
444
    def test_no_strict_delete(self, *args):
445
        """Test the non-strict matching method.
446
447
        Test non-strict matching to delete a Flow using a cookie.
448
        """
449
        (mock_save_flow, _, _) = args
450
        dpid = "00:00:00:00:00:00:00:01"
451
        switch = get_switch_mock(dpid, 0x04)
452
        switch.id = dpid
453
        stored_flow = {
454
            "command": "add",
455
            "flow": {
456
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
457
                "cookie": 6191162389751548793,
458
                "match": {"dl_vlan": 300, "in_port": 1},
459
            },
460
        }
461
        stored_flow2 = {
462
            "command": "add",
463
            "flow": {
464
                "actions": [],
465
                "cookie": 4961162389751548787,
466
                "match": {"in_port": 2},
467
            },
468
        }
469
        flow_to_install = {
470
            "cookie": 6191162389751548793,
471
            "cookie_mask": 18446744073709551615,
472
        }
473
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
474
        command = "delete"
475
        self.napp.stored_flows = {dpid: flow_list}
476
477
        self.napp._store_changed_flows(command, flow_to_install, switch)
478
        mock_save_flow.assert_called()
479
        self.assertEqual(len(self.napp.stored_flows), 1)
480
481
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
482
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
483
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
484
    def test_no_strict_delete_with_ipv4(self, *args):
485
        """Test the non-strict matching method.
486
487
        Test non-strict matching to delete a Flow using IPv4.
488
        """
489
        (mock_save_flow, _, _) = args
490
        dpid = "00:00:00:00:00:00:00:01"
491
        switch = get_switch_mock(dpid, 0x04)
492
        switch.id = dpid