Code Duplication    Length = 42-49 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 339-387 (lines=49) @@
336
        self.napp.check_storehouse_consistency(switch)
337
        mock_install_flows.assert_called()
338
339
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
340
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
341
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
342
    def test_no_strict_delete(self, *args):
343
        """Test the non-strict matching method.
344
345
        Test non-strict matching to delete a Flow using a cookie.
346
        """
347
        (mock_save_flow, _, _) = args
348
        dpid = "00:00:00:00:00:00:00:01"
349
        switch = get_switch_mock(dpid, 0x04)
350
        switch.id = dpid
351
        stored_flow = {
352
                        "command": "add",
353
                        "flow": {
354
                            "actions": [
355
                                {
356
                                    "action_type": "set_vlan",
357
                                    "vlan_id": 300
358
                                }
359
                            ],
360
                            "cookie": 6191162389751548793,
361
                            "match": {
362
                                "dl_vlan": 300,
363
                                "in_port": 1
364
                            }
365
                        }
366
                    }
367
        stored_flow2 = {
368
                        "command": "add",
369
                        "flow": {
370
                                "actions": [],
371
                                "cookie": 4961162389751548787,
372
                                "match": {
373
                                    "in_port": 2
374
                                    }
375
                                }
376
                        }
377
        flow_to_install = {
378
                            "cookie": 6191162389751548793,
379
                            "cookie_mask": 18446744073709551615
380
                        }
381
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
382
        command = "delete"
383
        self.napp.stored_flows = {dpid: flow_list}
384
385
        self.napp._store_changed_flows(command, flow_to_install, switch)
386
        mock_save_flow.assert_called()
387
        self.assertEqual(len(self.napp.stored_flows), 1)
388
389
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
390
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
@@ 432-473 (lines=42) @@
429
        mock_save_flow.assert_called()
430
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
431
432
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
433
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
434
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
435
    def test_no_strict_delete_with_ipv4_fail(self, *args):
436
        """Test the non-strict matching method.
437
438
        Test non-strict Fail case matching to delete a Flow using IPv4.
439
        """
440
        (mock_save_flow, _, _) = args
441
        dpid = "00:00:00:00:00:00:00:01"
442
        switch = get_switch_mock(dpid, 0x04)
443
        switch.id = dpid
444
        stored_flow = {
445
                    "command": "add",
446
                    "flow": {
447
                        "priority": 10,
448
                        "cookie": 84114904,
449
                        "match": {
450
                            "ipv4_src": "192.168.2.1",
451
                            "ipv4_dst": "192.168.0.2",
452
                        },
453
                        "actions": []
454
                        }
455
                    }
456
        stored_flow2 = {
457
                        "command": "add",
458
                        "flow": {
459
                                    "actions": [],
460
                                    "cookie": 4961162389751548787,
461
                                    "match": {
462
                                        "in_port": 2
463
                                    }
464
                                }
465
                        }
466
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
467
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
468
        command = "delete"
469
        self.napp.stored_flows = {dpid: flow_list}
470
471
        self.napp._store_changed_flows(command, flow_to_install, switch)
472
        mock_save_flow.assert_called()
473
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 3)
474
@@ 389-430 (lines=42) @@
386
        mock_save_flow.assert_called()
387
        self.assertEqual(len(self.napp.stored_flows), 1)
388
389
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
390
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')
391
    @patch("napps.kytos.flow_manager.main.StoreHouse.save_flow")
392
    def test_no_strict_delete_with_ipv4(self, *args):
393
        """Test the non-strict matching method.
394
395
        Test non-strict matching to delete a Flow using IPv4.
396
        """
397
        (mock_save_flow, _, _) = args
398
        dpid = "00:00:00:00:00:00:00:01"
399
        switch = get_switch_mock(dpid, 0x04)
400
        switch.id = dpid
401
        stored_flow = {
402
                    "command": "add",
403
                    "flow": {
404
                        "priority": 10,
405
                        "cookie": 84114904,
406
                        "match": {
407
                            "ipv4_src": "192.168.1.120",
408
                            "ipv4_dst": "192.168.0.2",
409
                        },
410
                        "actions": []
411
                        }
412
                    }
413
        stored_flow2 = {
414
                        "command": "add",
415
                        "flow": {
416
                                    "actions": [],
417
                                    "cookie": 4961162389751548787,
418
                                    "match": {
419
                                        "in_port": 2
420
                                    }
421
                                }
422
                        }
423
        flow_to_install = {"match": {"ipv4_src": '192.168.1.1/24'}}
424
        flow_list = {"flow_list": [stored_flow, stored_flow2]}
425
        command = "delete"
426
        self.napp.stored_flows = {dpid: flow_list}
427
428
        self.napp._store_changed_flows(command, flow_to_install, switch)
429
        mock_save_flow.assert_called()
430
        self.assertEqual(len(self.napp.stored_flows[dpid]['flow_list']), 2)
431
432
    @patch('napps.kytos.flow_manager.main.Main._install_flows')
433
    @patch('napps.kytos.flow_manager.main.FlowFactory.get_class')