Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 494-535 (lines=42) @@
491
        mock_save_flow.assert_called()
492
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
493
494
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
495
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
496
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
497
    def test_no_strict_delete_with_ipv4_fail(self, *args):
498
        """Test the non-strict matching method.
499
500
        Test non-strict Fail case matching to delete a Flow using IPv4.
501
        """
502
        (mock_save_flow, _, _) = args
503
        dpid = "00:00:00:00:00:00:00:01"
504
        switch = get_switch_mock(dpid, 0x04)
505
        switch.id = dpid
506
        stored_flow = {
507
            "command": "add",
508
            "flow": {
509
                "priority": 10,
510
                "cookie": 84114904,
511
                "match": {
512
                    "ipv4_src": "192.168.2.1",
513
                    "ipv4_dst": "192.168.0.2",
514
                },
515
                "actions": [],
516
            },
517
        }
518
        stored_flow2 = {
519
            "command": "add",
520
            "flow": {
521
                "actions": [],
522
                "cookie": 4961162389751548787,
523
                "match": {"in_port": 2},
524
            },
525
        }
526
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
527
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
528
        command = "delete"
529
        self.napp.stored_flows = {dpid: flow_list}
530
531
        self.napp._store_changed_flows(command, flow_to_install, switch)
532
        mock_save_flow.assert_called()
533
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
534
535
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
536
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
537
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
538
    def test_no_strict_delete_of10(self, *args):
@@ 453-494 (lines=42) @@
450
        mock_save_flow.assert_called()
451
        self.assertEqual(len(self.napp.stored_flows), 1)
452
453
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
454
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
455
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
456
    def test_no_strict_delete_with_ipv4(self, *args):
457
        """Test the non-strict matching method.
458
459
        Test non-strict matching to delete a Flow using IPv4.
460
        """
461
        (mock_save_flow, _, _) = args
462
        dpid = "00:00:00:00:00:00:00:01"
463
        switch = get_switch_mock(dpid, 0x04)
464
        switch.id = dpid
465
        stored_flow = {
466
            "command": "add",
467
            "flow": {
468
                "priority": 10,
469
                "cookie": 84114904,
470
                "match": {
471
                    "ipv4_src": "192.168.1.120",
472
                    "ipv4_dst": "192.168.0.2",
473
                },
474
                "actions": [],
475
            },
476
        }
477
        stored_flow2 = {
478
            "command": "add",
479
            "flow": {
480
                "actions": [],
481
                "cookie": 4961162389751548787,
482
                "match": {"in_port": 2},
483
            },
484
        }
485
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
486
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
487
        command = "delete"
488
        self.napp.stored_flows = {dpid: flow_list}
489
490
        self.napp._store_changed_flows(command, flow_to_install, switch)
491
        mock_save_flow.assert_called()
492
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
493
494
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
495
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
496
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
497
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 413-461 (lines=49) @@
410
        self.napp.check_storehouse_consistency(switch)
411
        mock_install_flows.assert_called()
412
413
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
414
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
415
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
416
    def test_no_strict_delete(self, *args):
417
        """Test the non-strict matching method.
418
419
        Test non-strict matching to delete a Flow using a cookie.
420
        """
421
        (mock_save_flow, _, _) = args
422
        dpid = "00:00:00:00:00:00:00:01"
423
        switch = get_switch_mock(dpid, 0x04)
424
        switch.id = dpid
425
        stored_flow = {
426
            "command": "add",
427
            "flow": {
428
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
429
                "cookie": 6191162389751548793,
430
                "match": {"dl_vlan": 300, "in_port": 1},
431
            },
432
        }
433
        stored_flow2 = {
434
            "command": "add",
435
            "flow": {
436
                "actions": [],
437
                "cookie": 4961162389751548787,
438
                "match": {"in_port": 2},
439
            },
440
        }
441
        flow_to_install = {
442
            "cookie": 6191162389751548793,
443
            "cookie_mask": 18446744073709551615,
444
        }
445
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
446
        command = "delete"
447
        self.napp.stored_flows = {dpid: flow_list}
448
449
        self.napp._store_changed_flows(command, flow_to_install, switch)
450
        mock_save_flow.assert_called()
451
        self.assertEqual(len(self.napp.stored_flows), 1)
452
453
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
454
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
455
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
456
    def test_no_strict_delete_with_ipv4(self, *args):
457
        """Test the non-strict matching method.
458
459
        Test non-strict matching to delete a Flow using IPv4.
460
        """
461
        (mock_save_flow, _, _) = args
462
        dpid = "00:00:00:00:00:00:00:01"
463
        switch = get_switch_mock(dpid, 0x04)
464
        switch.id = dpid