Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 477-518 (lines=42) @@
474
        mock_save_flow.assert_called()
475
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
476
477
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
478
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
479
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
480
    def test_no_strict_delete_with_ipv4_fail(self, *args):
481
        """Test the non-strict matching method.
482
483
        Test non-strict Fail case matching to delete a Flow using IPv4.
484
        """
485
        (mock_save_flow, _, _) = args
486
        dpid = "00:00:00:00:00:00:00:01"
487
        switch = get_switch_mock(dpid, 0x04)
488
        switch.id = dpid
489
        stored_flow = {
490
            "command": "add",
491
            "flow": {
492
                "priority": 10,
493
                "cookie": 84114904,
494
                "match": {
495
                    "ipv4_src": "192.168.2.1",
496
                    "ipv4_dst": "192.168.0.2",
497
                },
498
                "actions": [],
499
            },
500
        }
501
        stored_flow2 = {
502
            "command": "add",
503
            "flow": {
504
                "actions": [],
505
                "cookie": 4961162389751548787,
506
                "match": {"in_port": 2},
507
            },
508
        }
509
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
510
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
511
        command = "delete"
512
        self.napp.stored_flows = {dpid: flow_list}
513
514
        self.napp._store_changed_flows(command, flow_to_install, switch)
515
        mock_save_flow.assert_called()
516
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
517
518
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
519
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
520
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
521
    def test_no_strict_delete_of10(self, *args):
@@ 436-477 (lines=42) @@
433
        mock_save_flow.assert_called()
434
        self.assertEqual(len(self.napp.stored_flows), 1)
435
436
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
437
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
438
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
439
    def test_no_strict_delete_with_ipv4(self, *args):
440
        """Test the non-strict matching method.
441
442
        Test non-strict matching to delete a Flow using IPv4.
443
        """
444
        (mock_save_flow, _, _) = args
445
        dpid = "00:00:00:00:00:00:00:01"
446
        switch = get_switch_mock(dpid, 0x04)
447
        switch.id = dpid
448
        stored_flow = {
449
            "command": "add",
450
            "flow": {
451
                "priority": 10,
452
                "cookie": 84114904,
453
                "match": {
454
                    "ipv4_src": "192.168.1.120",
455
                    "ipv4_dst": "192.168.0.2",
456
                },
457
                "actions": [],
458
            },
459
        }
460
        stored_flow2 = {
461
            "command": "add",
462
            "flow": {
463
                "actions": [],
464
                "cookie": 4961162389751548787,
465
                "match": {"in_port": 2},
466
            },
467
        }
468
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
469
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
470
        command = "delete"
471
        self.napp.stored_flows = {dpid: flow_list}
472
473
        self.napp._store_changed_flows(command, flow_to_install, switch)
474
        mock_save_flow.assert_called()
475
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
476
477
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
478
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
479
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
480
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 396-444 (lines=49) @@
393
        self.napp.check_storehouse_consistency(switch)
394
        mock_install_flows.assert_called()
395
396
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
397
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
398
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
399
    def test_no_strict_delete(self, *args):
400
        """Test the non-strict matching method.
401
402
        Test non-strict matching to delete a Flow using a cookie.
403
        """
404
        (mock_save_flow, _, _) = args
405
        dpid = "00:00:00:00:00:00:00:01"
406
        switch = get_switch_mock(dpid, 0x04)
407
        switch.id = dpid
408
        stored_flow = {
409
            "command": "add",
410
            "flow": {
411
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
412
                "cookie": 6191162389751548793,
413
                "match": {"dl_vlan": 300, "in_port": 1},
414
            },
415
        }
416
        stored_flow2 = {
417
            "command": "add",
418
            "flow": {
419
                "actions": [],
420
                "cookie": 4961162389751548787,
421
                "match": {"in_port": 2},
422
            },
423
        }
424
        flow_to_install = {
425
            "cookie": 6191162389751548793,
426
            "cookie_mask": 18446744073709551615,
427
        }
428
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
429
        command = "delete"
430
        self.napp.stored_flows = {dpid: flow_list}
431
432
        self.napp._store_changed_flows(command, flow_to_install, switch)
433
        mock_save_flow.assert_called()
434
        self.assertEqual(len(self.napp.stored_flows), 1)
435
436
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
437
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
438
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
439
    def test_no_strict_delete_with_ipv4(self, *args):
440
        """Test the non-strict matching method.
441
442
        Test non-strict matching to delete a Flow using IPv4.
443
        """
444
        (mock_save_flow, _, _) = args
445
        dpid = "00:00:00:00:00:00:00:01"
446
        switch = get_switch_mock(dpid, 0x04)
447
        switch.id = dpid