Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 478-519 (lines=42) @@
475
        mock_save_flow.assert_called()
476
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
477
478
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
479
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
480
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
481
    def test_no_strict_delete_with_ipv4_fail(self, *args):
482
        """Test the non-strict matching method.
483
484
        Test non-strict Fail case matching to delete a Flow using IPv4.
485
        """
486
        (mock_save_flow, _, _) = args
487
        dpid = "00:00:00:00:00:00:00:01"
488
        switch = get_switch_mock(dpid, 0x04)
489
        switch.id = dpid
490
        stored_flow = {
491
            "command": "add",
492
            "flow": {
493
                "priority": 10,
494
                "cookie": 84114904,
495
                "match": {
496
                    "ipv4_src": "192.168.2.1",
497
                    "ipv4_dst": "192.168.0.2",
498
                },
499
                "actions": [],
500
            },
501
        }
502
        stored_flow2 = {
503
            "command": "add",
504
            "flow": {
505
                "actions": [],
506
                "cookie": 4961162389751548787,
507
                "match": {"in_port": 2},
508
            },
509
        }
510
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
511
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
512
        command = "delete"
513
        self.napp.stored_flows = {dpid: flow_list}
514
515
        self.napp._store_changed_flows(command, flow_to_install, switch)
516
        mock_save_flow.assert_called()
517
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
518
519
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
520
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
521
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
522
    def test_no_strict_delete_of10(self, *args):
@@ 437-478 (lines=42) @@
434
        mock_save_flow.assert_called()
435
        self.assertEqual(len(self.napp.stored_flows), 1)
436
437
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
438
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
439
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
440
    def test_no_strict_delete_with_ipv4(self, *args):
441
        """Test the non-strict matching method.
442
443
        Test non-strict matching to delete a Flow using IPv4.
444
        """
445
        (mock_save_flow, _, _) = args
446
        dpid = "00:00:00:00:00:00:00:01"
447
        switch = get_switch_mock(dpid, 0x04)
448
        switch.id = dpid
449
        stored_flow = {
450
            "command": "add",
451
            "flow": {
452
                "priority": 10,
453
                "cookie": 84114904,
454
                "match": {
455
                    "ipv4_src": "192.168.1.120",
456
                    "ipv4_dst": "192.168.0.2",
457
                },
458
                "actions": [],
459
            },
460
        }
461
        stored_flow2 = {
462
            "command": "add",
463
            "flow": {
464
                "actions": [],
465
                "cookie": 4961162389751548787,
466
                "match": {"in_port": 2},
467
            },
468
        }
469
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
470
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
471
        command = "delete"
472
        self.napp.stored_flows = {dpid: flow_list}
473
474
        self.napp._store_changed_flows(command, flow_to_install, switch)
475
        mock_save_flow.assert_called()
476
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
477
478
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
479
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
480
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
481
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 397-445 (lines=49) @@
394
        self.napp.check_storehouse_consistency(switch)
395
        mock_install_flows.assert_called()
396
397
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
398
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
399
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
400
    def test_no_strict_delete(self, *args):
401
        """Test the non-strict matching method.
402
403
        Test non-strict matching to delete a Flow using a cookie.
404
        """
405
        (mock_save_flow, _, _) = args
406
        dpid = "00:00:00:00:00:00:00:01"
407
        switch = get_switch_mock(dpid, 0x04)
408
        switch.id = dpid
409
        stored_flow = {
410
            "command": "add",
411
            "flow": {
412
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
413
                "cookie": 6191162389751548793,
414
                "match": {"dl_vlan": 300, "in_port": 1},
415
            },
416
        }
417
        stored_flow2 = {
418
            "command": "add",
419
            "flow": {
420
                "actions": [],
421
                "cookie": 4961162389751548787,
422
                "match": {"in_port": 2},
423
            },
424
        }
425
        flow_to_install = {
426
            "cookie": 6191162389751548793,
427
            "cookie_mask": 18446744073709551615,
428
        }
429
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
430
        command = "delete"
431
        self.napp.stored_flows = {dpid: flow_list}
432
433
        self.napp._store_changed_flows(command, flow_to_install, switch)
434
        mock_save_flow.assert_called()
435
        self.assertEqual(len(self.napp.stored_flows), 1)
436
437
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
438
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
439
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
440
    def test_no_strict_delete_with_ipv4(self, *args):
441
        """Test the non-strict matching method.
442
443
        Test non-strict matching to delete a Flow using IPv4.
444
        """
445
        (mock_save_flow, _, _) = args
446
        dpid = "00:00:00:00:00:00:00:01"
447
        switch = get_switch_mock(dpid, 0x04)
448
        switch.id = dpid