Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 496-537 (lines=42) @@
493
        mock_save_flow.assert_called()
494
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
495
496
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
497
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
498
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
499
    def test_no_strict_delete_with_ipv4_fail(self, *args):
500
        """Test the non-strict matching method.
501
502
        Test non-strict Fail case matching to delete a Flow using IPv4.
503
        """
504
        (mock_save_flow, _, _) = args
505
        dpid = "00:00:00:00:00:00:00:01"
506
        switch = get_switch_mock(dpid, 0x04)
507
        switch.id = dpid
508
        stored_flow = {
509
            "command": "add",
510
            "flow": {
511
                "priority": 10,
512
                "cookie": 84114904,
513
                "match": {
514
                    "ipv4_src": "192.168.2.1",
515
                    "ipv4_dst": "192.168.0.2",
516
                },
517
                "actions": [],
518
            },
519
        }
520
        stored_flow2 = {
521
            "command": "add",
522
            "flow": {
523
                "actions": [],
524
                "cookie": 4961162389751548787,
525
                "match": {"in_port": 2},
526
            },
527
        }
528
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
529
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
530
        command = "delete"
531
        self.napp.stored_flows = {dpid: flow_list}
532
533
        self.napp._store_changed_flows(command, flow_to_install, switch)
534
        mock_save_flow.assert_called()
535
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
536
537
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
538
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
539
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
540
    def test_no_strict_delete_of10(self, *args):
@@ 455-496 (lines=42) @@
452
        mock_save_flow.assert_called()
453
        self.assertEqual(len(self.napp.stored_flows), 1)
454
455
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
456
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
457
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
458
    def test_no_strict_delete_with_ipv4(self, *args):
459
        """Test the non-strict matching method.
460
461
        Test non-strict matching to delete a Flow using IPv4.
462
        """
463
        (mock_save_flow, _, _) = args
464
        dpid = "00:00:00:00:00:00:00:01"
465
        switch = get_switch_mock(dpid, 0x04)
466
        switch.id = dpid
467
        stored_flow = {
468
            "command": "add",
469
            "flow": {
470
                "priority": 10,
471
                "cookie": 84114904,
472
                "match": {
473
                    "ipv4_src": "192.168.1.120",
474
                    "ipv4_dst": "192.168.0.2",
475
                },
476
                "actions": [],
477
            },
478
        }
479
        stored_flow2 = {
480
            "command": "add",
481
            "flow": {
482
                "actions": [],
483
                "cookie": 4961162389751548787,
484
                "match": {"in_port": 2},
485
            },
486
        }
487
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
488
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
489
        command = "delete"
490
        self.napp.stored_flows = {dpid: flow_list}
491
492
        self.napp._store_changed_flows(command, flow_to_install, switch)
493
        mock_save_flow.assert_called()
494
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
495
496
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
497
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
498
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
499
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 415-463 (lines=49) @@
412
        self.napp.check_storehouse_consistency(switch)
413
        mock_install_flows.assert_called()
414
415
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
416
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
417
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
418
    def test_no_strict_delete(self, *args):
419
        """Test the non-strict matching method.
420
421
        Test non-strict matching to delete a Flow using a cookie.
422
        """
423
        (mock_save_flow, _, _) = args
424
        dpid = "00:00:00:00:00:00:00:01"
425
        switch = get_switch_mock(dpid, 0x04)
426
        switch.id = dpid
427
        stored_flow = {
428
            "command": "add",
429
            "flow": {
430
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
431
                "cookie": 6191162389751548793,
432
                "match": {"dl_vlan": 300, "in_port": 1},
433
            },
434
        }
435
        stored_flow2 = {
436
            "command": "add",
437
            "flow": {
438
                "actions": [],
439
                "cookie": 4961162389751548787,
440
                "match": {"in_port": 2},
441
            },
442
        }
443
        flow_to_install = {
444
            "cookie": 6191162389751548793,
445
            "cookie_mask": 18446744073709551615,
446
        }
447
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
448
        command = "delete"
449
        self.napp.stored_flows = {dpid: flow_list}
450
451
        self.napp._store_changed_flows(command, flow_to_install, switch)
452
        mock_save_flow.assert_called()
453
        self.assertEqual(len(self.napp.stored_flows), 1)
454
455
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
456
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
457
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
458
    def test_no_strict_delete_with_ipv4(self, *args):
459
        """Test the non-strict matching method.
460
461
        Test non-strict matching to delete a Flow using IPv4.
462
        """
463
        (mock_save_flow, _, _) = args
464
        dpid = "00:00:00:00:00:00:00:01"
465
        switch = get_switch_mock(dpid, 0x04)
466
        switch.id = dpid