Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 466-507 (lines=42) @@
463
        mock_save_flow.assert_called()
464
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
465
466
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
467
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
468
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
469
    def test_no_strict_delete_with_ipv4_fail(self, *args):
470
        """Test the non-strict matching method.
471
472
        Test non-strict Fail case matching to delete a Flow using IPv4.
473
        """
474
        (mock_save_flow, _, _) = args
475
        dpid = "00:00:00:00:00:00:00:01"
476
        switch = get_switch_mock(dpid, 0x04)
477
        switch.id = dpid
478
        stored_flow = {
479
            "command": "add",
480
            "flow": {
481
                "priority": 10,
482
                "cookie": 84114904,
483
                "match": {
484
                    "ipv4_src": "192.168.2.1",
485
                    "ipv4_dst": "192.168.0.2",
486
                },
487
                "actions": [],
488
            },
489
        }
490
        stored_flow2 = {
491
            "command": "add",
492
            "flow": {
493
                "actions": [],
494
                "cookie": 4961162389751548787,
495
                "match": {"in_port": 2},
496
            },
497
        }
498
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
499
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
500
        command = "delete"
501
        self.napp.stored_flows = {dpid: flow_list}
502
503
        self.napp._store_changed_flows(command, flow_to_install, switch)
504
        mock_save_flow.assert_called()
505
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
506
507
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
508
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
509
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
510
    def test_no_strict_delete_of10(self, *args):
@@ 425-466 (lines=42) @@
422
        mock_save_flow.assert_called()
423
        self.assertEqual(len(self.napp.stored_flows), 1)
424
425
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
426
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
427
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
428
    def test_no_strict_delete_with_ipv4(self, *args):
429
        """Test the non-strict matching method.
430
431
        Test non-strict matching to delete a Flow using IPv4.
432
        """
433
        (mock_save_flow, _, _) = args
434
        dpid = "00:00:00:00:00:00:00:01"
435
        switch = get_switch_mock(dpid, 0x04)
436
        switch.id = dpid
437
        stored_flow = {
438
            "command": "add",
439
            "flow": {
440
                "priority": 10,
441
                "cookie": 84114904,
442
                "match": {
443
                    "ipv4_src": "192.168.1.120",
444
                    "ipv4_dst": "192.168.0.2",
445
                },
446
                "actions": [],
447
            },
448
        }
449
        stored_flow2 = {
450
            "command": "add",
451
            "flow": {
452
                "actions": [],
453
                "cookie": 4961162389751548787,
454
                "match": {"in_port": 2},
455
            },
456
        }
457
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
458
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
459
        command = "delete"
460
        self.napp.stored_flows = {dpid: flow_list}
461
462
        self.napp._store_changed_flows(command, flow_to_install, switch)
463
        mock_save_flow.assert_called()
464
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
465
466
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
467
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
468
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
469
    def test_no_strict_delete_with_ipv4_fail(self, *args):
@@ 385-433 (lines=49) @@
382
        self.napp.check_storehouse_consistency(switch)
383
        mock_install_flows.assert_called()
384
385
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
386
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
387
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
388
    def test_no_strict_delete(self, *args):
389
        """Test the non-strict matching method.
390
391
        Test non-strict matching to delete a Flow using a cookie.
392
        """
393
        (mock_save_flow, _, _) = args
394
        dpid = "00:00:00:00:00:00:00:01"
395
        switch = get_switch_mock(dpid, 0x04)
396
        switch.id = dpid
397
        stored_flow = {
398
            "command": "add",
399
            "flow": {
400
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
401
                "cookie": 6191162389751548793,
402
                "match": {"dl_vlan": 300, "in_port": 1},
403
            },
404
        }
405
        stored_flow2 = {
406
            "command": "add",
407
            "flow": {
408
                "actions": [],
409
                "cookie": 4961162389751548787,
410
                "match": {"in_port": 2},
411
            },
412
        }
413
        flow_to_install = {
414
            "cookie": 6191162389751548793,
415
            "cookie_mask": 18446744073709551615,
416
        }
417
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
418
        command = "delete"
419
        self.napp.stored_flows = {dpid: flow_list}
420
421
        self.napp._store_changed_flows(command, flow_to_install, switch)
422
        mock_save_flow.assert_called()
423
        self.assertEqual(len(self.napp.stored_flows), 1)
424
425
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
426
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
427
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
428
    def test_no_strict_delete_with_ipv4(self, *args):
429
        """Test the non-strict matching method.
430
431
        Test non-strict matching to delete a Flow using IPv4.
432
        """
433
        (mock_save_flow, _, _) = args
434
        dpid = "00:00:00:00:00:00:00:01"
435
        switch = get_switch_mock(dpid, 0x04)
436
        switch.id = dpid