Code Duplication    Length = 45-46 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 358-403 (lines=46) @@
355
        response = await self.api_client.put(self.trace_endpoint, json=payload)
356
        assert response.status_code == 424
357
358
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
359
    async def test_get_traces(self, mock_stored_flows, event_loop):
360
        """Test traces rest call."""
361
        self.napp.controller.loop = event_loop
362
        payload = [{
363
            "trace": {
364
                "switch": {
365
                    "dpid": "00:00:00:00:00:00:00:01",
366
                    "in_port": 1
367
                    },
368
                "eth": {"dl_vlan": 100},
369
            }
370
        }]
371
372
        stored_flow = {
373
            "id": 1,
374
            "flow": {
375
                "table_id": 0,
376
                "cookie": 84114964,
377
                "hard_timeout": 0,
378
                "idle_timeout": 0,
379
                "priority": 10,
380
                "match": {"dl_vlan": 100, "in_port": 1},
381
                "actions": [
382
                    {"action_type": "pop_vlan"},
383
                    {"action_type": "output", "port": 2},
384
                ],
385
            }
386
        }
387
388
        mock_stored_flows.return_value = {
389
            "00:00:00:00:00:00:00:01": [stored_flow]
390
        }
391
392
        resp = await self.api_client.put(self.traces_endpoint, json=payload)
393
        assert resp.status_code == 200
394
        current_data = resp.json()
395
        result1 = current_data["result"]
396
397
        assert len(result1) == 1
398
        assert len(result1[0]) == 1
399
        assert result1[0][0]["dpid"] == "00:00:00:00:00:00:00:01"
400
        assert result1[0][0]["port"] == 1
401
        assert result1[0][0]["type"] == "last"
402
        assert result1[0][0]["vlan"] == 100
403
        assert result1[0][0]["out"] == {"port": 2}
404
405
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
406
    async def test_traces(self, mock_stored_flows, event_loop):
@@ 504-548 (lines=45) @@
501
        resp = await self.api_client.put(self.traces_endpoint, json=payload)
502
        assert resp.status_code == 424
503
504
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
505
    async def test_traces_with_loop(self, mock_stored_flows, event_loop):
506
        """Test traces rest call"""
507
        self.napp.controller.loop = event_loop
508
        payload = [
509
                    {
510
                        "trace": {
511
                            "switch": {
512
                                "dpid": "00:00:00:00:00:00:00:01",
513
                                "in_port": 1
514
                            },
515
                            "eth": {
516
                                "dl_vlan": 100
517
                            }
518
                        }
519
                    }
520
                ]
521
522
        stored_flow = {
523
            "id": 1,
524
            "flow": {
525
                "table_id": 0,
526
                "cookie": 84114964,
527
                "hard_timeout": 0,
528
                "idle_timeout": 0,
529
                "priority": 10,
530
                "match": {"dl_vlan": 100, "in_port": 1},
531
                "actions": [{"action_type": "output", "port": 1}],
532
            }
533
        }
534
535
        mock_stored_flows.return_value = {
536
            "00:00:00:00:00:00:00:01": [stored_flow],
537
        }
538
539
        resp = await self.api_client.put(self.traces_endpoint, json=payload)
540
        assert resp.status_code == 200
541
        current_data = resp.json()
542
        result = current_data["result"]
543
        assert len(result) == 1
544
        assert result[0][0]["dpid"] == "00:00:00:00:00:00:00:01"
545
        assert result[0][0]["port"] == 1
546
        assert result[0][0]["type"] == "loop"
547
        assert result[0][0]["vlan"] == 100
548
        assert result[0][0]["out"] == {"port": 1, "vlan": 100}
549
550
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
551
    async def test_traces_no_action(self, mock_stored_flows, event_loop):