Code Duplication    Length = 46-51 lines in 2 locations

tests/unit/test_main.py 2 locations

@@ 342-392 (lines=51) @@
339
        assert result[0]["vlan"] == 100
340
        assert result[0]["out"] == {"port": 2, "vlan": 200}
341
342
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
343
    async def test_trace_instructions(self, mock_stored_flows, event_loop):
344
        """Test trace rest call with instructions."""
345
        self.napp.controller.loop = event_loop
346
        payload = {
347
            "trace": {
348
                "switch": {
349
                    "dpid": "00:00:00:00:00:00:00:01",
350
                    "in_port": 1
351
                    },
352
                "eth": {"dl_vlan": 100},
353
            }
354
        }
355
        stored_flows = {
356
                "flow": {
357
                    "table_id": 0,
358
                    "cookie": 84114964,
359
                    "hard_timeout": 0,
360
                    "idle_timeout": 0,
361
                    "priority": 10,
362
                    "match": {"dl_vlan": 100, "in_port": 1},
363
                    "instructions": [
364
                        {
365
                            "instruction_type": "apply_actions",
366
                            "actions": [
367
                                {"action_type": "push_vlan"},
368
                                {"action_type": "set_vlan", "vlan_id": 200},
369
                                {"action_type": "output", "port": 2}
370
                            ]
371
                        }
372
                    ]
373
                },
374
                "flow_id": 1,
375
                "state": "installed",
376
                "switch": "00:00:00:00:00:00:00:01",
377
        }
378
        mock_stored_flows.return_value = {
379
            "00:00:00:00:00:00:00:01": [stored_flows]
380
        }
381
382
        resp = await self.api_client.put(self.trace_endpoint, json=payload)
383
        assert resp.status_code == 200
384
        current_data = resp.json()
385
        result = current_data["result"]
386
387
        assert len(result) == 1
388
        assert result[0]["dpid"] == "00:00:00:00:00:00:00:01"
389
        assert result[0]["port"] == 1
390
        assert result[0]["type"] == "last"
391
        assert result[0]["vlan"] == 100
392
        assert result[0]["out"] == {"port": 2, "vlan": 200}
393
394
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
395
    async def test_instructions_no_match(self, mock_stored_flows, event_loop):
@@ 295-340 (lines=46) @@
292
        result = self.napp.has_loop(trace_step, trace_result)
293
        assert not result
294
295
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
296
    async def test_trace(self, mock_stored_flows, event_loop):
297
        """Test trace rest call."""
298
        self.napp.controller.loop = event_loop
299
        payload = {
300
            "trace": {
301
                "switch": {
302
                    "dpid": "00:00:00:00:00:00:00:01",
303
                    "in_port": 1
304
                    },
305
                "eth": {"dl_vlan": 100},
306
            }
307
        }
308
        stored_flows = {
309
                "flow": {
310
                    "table_id": 0,
311
                    "cookie": 84114964,
312
                    "hard_timeout": 0,
313
                    "idle_timeout": 0,
314
                    "priority": 10,
315
                    "match": {"dl_vlan": 100, "in_port": 1},
316
                    "actions": [
317
                        {"action_type": "push_vlan"},
318
                        {"action_type": "set_vlan", "vlan_id": 200},
319
                        {"action_type": "output", "port": 2}
320
                    ],
321
                },
322
                "flow_id": 1,
323
                "state": "installed",
324
                "switch": "00:00:00:00:00:00:00:01",
325
        }
326
        mock_stored_flows.return_value = {
327
            "00:00:00:00:00:00:00:01": [stored_flows]
328
        }
329
330
        resp = await self.api_client.put(self.trace_endpoint, json=payload)
331
        assert resp.status_code == 200
332
        current_data = resp.json()
333
        result = current_data["result"]
334
335
        assert len(result) == 1
336
        assert result[0]["dpid"] == "00:00:00:00:00:00:00:01"
337
        assert result[0]["port"] == 1
338
        assert result[0]["type"] == "last"
339
        assert result[0]["vlan"] == 100
340
        assert result[0]["out"] == {"port": 2, "vlan": 200}
341
342
    @patch("napps.amlight.sdntrace_cp.main.get_stored_flows")
343
    async def test_trace_instructions(self, mock_stored_flows, event_loop):