Code Duplication    Length = 46-51 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 343-393 (lines=51) @@
340
        assert result[0]["vlan"] == 100
341
        assert result[0]["out"] == {"port": 2, "vlan": 200}
342
343
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
344
    async def test_trace_instructions(self, mock_stored_flows):
345
        """Test trace rest call with instructions."""
346
        self.napp.controller.loop = asyncio.get_running_loop()
347
        payload = {
348
            "trace": {
349
                "switch": {
350
                    "dpid": "00:00:00:00:00:00:00:01",
351
                    "in_port": 1
352
                    },
353
                "eth": {"dl_vlan": 100},
354
            }
355
        }
356
        stored_flows = {
357
                "flow": {
358
                    "table_id": 0,
359
                    "cookie": 84114964,
360
                    "hard_timeout": 0,
361
                    "idle_timeout": 0,
362
                    "priority": 10,
363
                    "match": {"dl_vlan": 100, "in_port": 1},
364
                    "instructions": [
365
                        {
366
                            "instruction_type": "apply_actions",
367
                            "actions": [
368
                                {"action_type": "push_vlan"},
369
                                {"action_type": "set_vlan", "vlan_id": 200},
370
                                {"action_type": "output", "port": 2}
371
                            ]
372
                        }
373
                    ]
374
                },
375
                "flow_id": 1,
376
                "state": "installed",
377
                "switch": "00:00:00:00:00:00:00:01",
378
        }
379
        mock_stored_flows.return_value = {
380
            "00:00:00:00:00:00:00:01": [stored_flows]
381
        }
382
383
        resp = await self.api_client.put(self.trace_endpoint, json=payload)
384
        assert resp.status_code == 200
385
        current_data = resp.json()
386
        result = current_data["result"]
387
388
        assert len(result) == 1
389
        assert result[0]["dpid"] == "00:00:00:00:00:00:00:01"
390
        assert result[0]["port"] == 1
391
        assert result[0]["type"] == "last"
392
        assert result[0]["vlan"] == 100
393
        assert result[0]["out"] == {"port": 2, "vlan": 200}
394
395
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
396
    async def test_instructions_no_match(self, mock_stored_flows):
@@ 296-341 (lines=46) @@
293
        result = self.napp.has_loop(trace_step, trace_result)
294
        assert not result
295
296
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
297
    async def test_trace(self, mock_stored_flows):
298
        """Test trace rest call."""
299
        self.napp.controller.loop = asyncio.get_running_loop()
300
        payload = {
301
            "trace": {
302
                "switch": {
303
                    "dpid": "00:00:00:00:00:00:00:01",
304
                    "in_port": 1
305
                    },
306
                "eth": {"dl_vlan": 100},
307
            }
308
        }
309
        stored_flows = {
310
                "flow": {
311
                    "table_id": 0,
312
                    "cookie": 84114964,
313
                    "hard_timeout": 0,
314
                    "idle_timeout": 0,
315
                    "priority": 10,
316
                    "match": {"dl_vlan": 100, "in_port": 1},
317
                    "actions": [
318
                        {"action_type": "push_vlan"},
319
                        {"action_type": "set_vlan", "vlan_id": 200},
320
                        {"action_type": "output", "port": 2}
321
                    ],
322
                },
323
                "flow_id": 1,
324
                "state": "installed",
325
                "switch": "00:00:00:00:00:00:00:01",
326
        }
327
        mock_stored_flows.return_value = {
328
            "00:00:00:00:00:00:00:01": [stored_flows]
329
        }
330
331
        resp = await self.api_client.put(self.trace_endpoint, json=payload)
332
        assert resp.status_code == 200
333
        current_data = resp.json()
334
        result = current_data["result"]
335
336
        assert len(result) == 1
337
        assert result[0]["dpid"] == "00:00:00:00:00:00:00:01"
338
        assert result[0]["port"] == 1
339
        assert result[0]["type"] == "last"
340
        assert result[0]["vlan"] == 100
341
        assert result[0]["out"] == {"port": 2, "vlan": 200}
342
343
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
344
    async def test_trace_instructions(self, mock_stored_flows):