Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 490-531 (lines=42) @@
487
        mock_save_flow.assert_called()
488
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
489
490
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
491
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
492
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
493
    def test_no_strict_delete_with_ipv4_fail(self, *args):
494
        """Test the non-strict matching method.
495
496
        Test non-strict Fail case matching to delete a Flow using IPv4.
497
        """
498
        (mock_save_flow, _, _) = args
499
        dpid = "00:00:00:00:00:00:00:01"
500
        switch = get_switch_mock(dpid, 0x04)
501
        switch.id = dpid
502
        stored_flow = {
503
            "command": "add",
504
            "flow": {
505
                "priority": 10,
506
                "cookie": 84114904,
507
                "match": {
508
                    "ipv4_src": "192.168.2.1",
509
                    "ipv4_dst": "192.168.0.2",
510
                },
511
                "actions": [],
512
            },
513
        }
514
        stored_flow2 = {
515
            "command": "add",
516
            "flow": {
517
                "actions": [],
518
                "cookie": 4961162389751548787,
519
                "match": {"in_port": 2},
520
            },
521
        }
522
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
523
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
524
        command = "delete"
525
        self.napp.stored_flows = {dpid: flow_list}
526
527
        self.napp._store_changed_flows(command, flow_to_install, switch)
528
        mock_save_flow.assert_called()
529
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
530
531
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
532
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
533
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
534
    def test_no_strict_delete_of10(self, *args):
@@ 449-490 (lines=42) @@
446
        mock_save_flow.assert_called()
447
        self.assertEqual(len(self.napp.stored_flows), 1)
448
449
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
450
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
451
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
452
    def test_no_strict_delete_with_ipv4(self, *args):
453
        """Test the non-strict matching method.
454
455
        Test non-strict matching to delete a Flow using IPv4.
456
        """
457
        (mock_save_flow, _, _) = args
458
        dpid = "00:00:00:00:00:00:00:01"
459
        switch = get_switch_mock(dpid, 0x04)
460
        switch.id = dpid
461
        stored_flow = {
462
            "command": "add",
463
            "flow": {
464
                "priority": 10,
465
                "cookie": 84114904,
466
                "match": {
467
                    "ipv4_src": "192.168.1.120",
468
                    "ipv4_dst": "192.168.0.2",
469
                },
470
                "actions": [],
471
            },
472
        }
473
        stored_flow2 = {
474
            "command": "add",
475
            "flow": {
476
                "actions": [],
477
                "cookie": 4961162389751548787,
478
                "match": {"in_port": 2},
479
            },
480
        }
481
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
482
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
483
        command = "delete"
484
        self.napp.stored_flows = {dpid: flow_list}
485
486
        self.napp._store_changed_flows(command, flow_to_install, switch)
487
        mock_save_flow.assert_called()
488
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
489
490
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
491
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
492
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
493
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 409-457 (lines=49) @@
406
        self.napp.check_storehouse_consistency(switch)
407
        mock_install_flows.assert_called()
408
409
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
410
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
411
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
412
    def test_no_strict_delete(self, *args):
413
        """Test the non-strict matching method.
414
415
        Test non-strict matching to delete a Flow using a cookie.
416
        """
417
        (mock_save_flow, _, _) = args
418
        dpid = "00:00:00:00:00:00:00:01"
419
        switch = get_switch_mock(dpid, 0x04)
420
        switch.id = dpid
421
        stored_flow = {
422
            "command": "add",
423
            "flow": {
424
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
425
                "cookie": 6191162389751548793,
426
                "match": {"dl_vlan": 300, "in_port": 1},
427
            },
428
        }
429
        stored_flow2 = {
430
            "command": "add",
431
            "flow": {
432
                "actions": [],
433
                "cookie": 4961162389751548787,
434
                "match": {"in_port": 2},
435
            },
436
        }
437
        flow_to_install = {
438
            "cookie": 6191162389751548793,
439
            "cookie_mask": 18446744073709551615,
440
        }
441
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
442
        command = "delete"
443
        self.napp.stored_flows = {dpid: flow_list}
444
445
        self.napp._store_changed_flows(command, flow_to_install, switch)
446
        mock_save_flow.assert_called()
447
        self.assertEqual(len(self.napp.stored_flows), 1)
448
449
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
450
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
451
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
452
    def test_no_strict_delete_with_ipv4(self, *args):
453
        """Test the non-strict matching method.
454
455
        Test non-strict matching to delete a Flow using IPv4.
456
        """
457
        (mock_save_flow, _, _) = args
458
        dpid = "00:00:00:00:00:00:00:01"
459
        switch = get_switch_mock(dpid, 0x04)
460
        switch.id = dpid