Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 504-545 (lines=42) @@
501
        mock_save_flow.assert_called()
502
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
503
504
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
505
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
506
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
507
    def test_no_strict_delete_with_ipv4_fail(self, *args):
508
        """Test the non-strict matching method.
509
510
        Test non-strict Fail case matching to delete a Flow using IPv4.
511
        """
512
        (mock_save_flow, _, _) = args
513
        dpid = "00:00:00:00:00:00:00:01"
514
        switch = get_switch_mock(dpid, 0x04)
515
        switch.id = dpid
516
        stored_flow = {
517
            "command": "add",
518
            "flow": {
519
                "priority": 10,
520
                "cookie": 84114904,
521
                "match": {
522
                    "ipv4_src": "192.168.2.1",
523
                    "ipv4_dst": "192.168.0.2",
524
                },
525
                "actions": [],
526
            },
527
        }
528
        stored_flow2 = {
529
            "command": "add",
530
            "flow": {
531
                "actions": [],
532
                "cookie": 4961162389751548787,
533
                "match": {"in_port": 2},
534
            },
535
        }
536
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
537
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
538
        command = "delete"
539
        self.napp.stored_flows = {dpid: flow_list}
540
541
        self.napp._store_changed_flows(command, flow_to_install, switch)
542
        mock_save_flow.assert_called()
543
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
544
545
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
546
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
547
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
548
    def test_no_strict_delete_of10(self, *args):
@@ 463-504 (lines=42) @@
460
        mock_save_flow.assert_called()
461
        self.assertEqual(len(self.napp.stored_flows), 1)
462
463
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
464
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
465
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
466
    def test_no_strict_delete_with_ipv4(self, *args):
467
        """Test the non-strict matching method.
468
469
        Test non-strict matching to delete a Flow using IPv4.
470
        """
471
        (mock_save_flow, _, _) = args
472
        dpid = "00:00:00:00:00:00:00:01"
473
        switch = get_switch_mock(dpid, 0x04)
474
        switch.id = dpid
475
        stored_flow = {
476
            "command": "add",
477
            "flow": {
478
                "priority": 10,
479
                "cookie": 84114904,
480
                "match": {
481
                    "ipv4_src": "192.168.1.120",
482
                    "ipv4_dst": "192.168.0.2",
483
                },
484
                "actions": [],
485
            },
486
        }
487
        stored_flow2 = {
488
            "command": "add",
489
            "flow": {
490
                "actions": [],
491
                "cookie": 4961162389751548787,
492
                "match": {"in_port": 2},
493
            },
494
        }
495
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
496
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
497
        command = "delete"
498
        self.napp.stored_flows = {dpid: flow_list}
499
500
        self.napp._store_changed_flows(command, flow_to_install, switch)
501
        mock_save_flow.assert_called()
502
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
503
504
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
505
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
506
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
507
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 423-471 (lines=49) @@
420
        self.napp.check_storehouse_consistency(switch)
421
        mock_install_flows.assert_called()
422
423
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
424
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
425
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
426
    def test_no_strict_delete(self, *args):
427
        """Test the non-strict matching method.
428
429
        Test non-strict matching to delete a Flow using a cookie.
430
        """
431
        (mock_save_flow, _, _) = args
432
        dpid = "00:00:00:00:00:00:00:01"
433
        switch = get_switch_mock(dpid, 0x04)
434
        switch.id = dpid
435
        stored_flow = {
436
            "command": "add",
437
            "flow": {
438
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
439
                "cookie": 6191162389751548793,
440
                "match": {"dl_vlan": 300, "in_port": 1},
441
            },
442
        }
443
        stored_flow2 = {
444
            "command": "add",
445
            "flow": {
446
                "actions": [],
447
                "cookie": 4961162389751548787,
448
                "match": {"in_port": 2},
449
            },
450
        }
451
        flow_to_install = {
452
            "cookie": 6191162389751548793,
453
            "cookie_mask": 18446744073709551615,
454
        }
455
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
456
        command = "delete"
457
        self.napp.stored_flows = {dpid: flow_list}
458
459
        self.napp._store_changed_flows(command, flow_to_install, switch)
460
        mock_save_flow.assert_called()
461
        self.assertEqual(len(self.napp.stored_flows), 1)
462
463
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
464
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
465
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
466
    def test_no_strict_delete_with_ipv4(self, *args):
467
        """Test the non-strict matching method.
468
469
        Test non-strict matching to delete a Flow using IPv4.
470
        """
471
        (mock_save_flow, _, _) = args
472
        dpid = "00:00:00:00:00:00:00:01"
473
        switch = get_switch_mock(dpid, 0x04)
474
        switch.id = dpid