Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 366-414 (lines=49) @@
363
        self.napp.check_storehouse_consistency(switch)
364
        mock_install_flows.assert_called()
365
366
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
367
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
368
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
369
    def test_no_strict_delete(self, *args):
370
        """Test the non-strict matching method.
371
372
        Test non-strict matching to delete a Flow using a cookie.
373
        """
374
        (mock_save_flow, _, _) = args
375
        dpid = "00:00:00:00:00:00:00:01"
376
        switch = get_switch_mock(dpid, 0x04)
377
        switch.id = dpid
378
        stored_flow = {
379
            "command": "add",
380
            "flow": {
381
                "actions": [{"action_type": "set_vlan", "vlan_id": 300}],
382
                "cookie": 6191162389751548793,
383
                "match": {"dl_vlan": 300, "in_port": 1},
384
            },
385
        }
386
        stored_flow2 = {
387
            "command": "add",
388
            "flow": {
389
                "actions": [],
390
                "cookie": 4961162389751548787,
391
                "match": {"in_port": 2},
392
            },
393
        }
394
        flow_to_install = {
395
            "cookie": 6191162389751548793,
396
            "cookie_mask": 18446744073709551615,
397
        }
398
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
399
        command = "delete"
400
        self.napp.stored_flows = {dpid: flow_list}
401
402
        self.napp._store_changed_flows(command, flow_to_install, switch)
403
        mock_save_flow.assert_called()
404
        self.assertEqual(len(self.napp.stored_flows), 1)
405
406
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
407
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
408
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
409
    def test_no_strict_delete_with_ipv4(self, *args):
410
        """Test the non-strict matching method.
411
412
        Test non-strict matching to delete a Flow using IPv4.
413
        """
414
        (mock_save_flow, _, _) = args
415
        dpid = "00:00:00:00:00:00:00:01"
416
        switch = get_switch_mock(dpid, 0x04)
417
        switch.id = dpid
@@ 447-488 (lines=42) @@
444
        mock_save_flow.assert_called()
445
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
446
447
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
448
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
449
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
450
    def test_no_strict_delete_with_ipv4_fail(self, *args):
451
        """Test the non-strict matching method.
452
453
        Test non-strict Fail case matching to delete a Flow using IPv4.
454
        """
455
        (mock_save_flow, _, _) = args
456
        dpid = "00:00:00:00:00:00:00:01"
457
        switch = get_switch_mock(dpid, 0x04)
458
        switch.id = dpid
459
        stored_flow = {
460
            "command": "add",
461
            "flow": {
462
                "priority": 10,
463
                "cookie": 84114904,
464
                "match": {
465
                    "ipv4_src": "192.168.2.1",
466
                    "ipv4_dst": "192.168.0.2",
467
                },
468
                "actions": [],
469
            },
470
        }
471
        stored_flow2 = {
472
            "command": "add",
473
            "flow": {
474
                "actions": [],
475
                "cookie": 4961162389751548787,
476
                "match": {"in_port": 2},
477
            },
478
        }
479
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
480
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
481
        command = "delete"
482
        self.napp.stored_flows = {dpid: flow_list}
483
484
        self.napp._store_changed_flows(command, flow_to_install, switch)
485
        mock_save_flow.assert_called()
486
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
487
@@ 406-447 (lines=42) @@
403
        mock_save_flow.assert_called()
404
        self.assertEqual(len(self.napp.stored_flows), 1)
405
406
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
407
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
408
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
409
    def test_no_strict_delete_with_ipv4(self, *args):
410
        """Test the non-strict matching method.
411
412
        Test non-strict matching to delete a Flow using IPv4.
413
        """
414
        (mock_save_flow, _, _) = args
415
        dpid = "00:00:00:00:00:00:00:01"
416
        switch = get_switch_mock(dpid, 0x04)
417
        switch.id = dpid
418
        stored_flow = {
419
            "command": "add",
420
            "flow": {
421
                "priority": 10,
422
                "cookie": 84114904,
423
                "match": {
424
                    "ipv4_src": "192.168.1.120",
425
                    "ipv4_dst": "192.168.0.2",
426
                },
427
                "actions": [],
428
            },
429
        }
430
        stored_flow2 = {
431
            "command": "add",
432
            "flow": {
433
                "actions": [],
434
                "cookie": 4961162389751548787,
435
                "match": {"in_port": 2},
436
            },
437
        }
438
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
439
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
440
        command = "delete"
441
        self.napp.stored_flows = {dpid: flow_list}
442
443
        self.napp._store_changed_flows(command, flow_to_install, switch)
444
        mock_save_flow.assert_called()
445
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
446
447
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
448
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
449
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
450
    def test_no_strict_delete_with_ipv4_fail(self, *args):