Code Duplication    Length = 21-22 lines in 3 locations

tests/unit/test_main.py 3 locations

@@ 27-48 (lines=22) @@
24
        self.api_client = get_test_client(controller, self.napp)
25
        self.base_endpoint = "kytos/telemetry_int/v1"
26
27
    async def test_enable_telemetry(self, monkeypatch) -> None:
28
        """Test enable telemetry."""
29
        api_mock, flow = AsyncMock(), MagicMock()
30
        flow.cookie = 0xA800000000000001
31
        monkeypatch.setattr(
32
            "napps.kytos.telemetry_int.main.api",
33
            api_mock,
34
        )
35
36
        evc_id = utils.get_id_from_cookie(flow.cookie)
37
        api_mock.get_evcs.return_value = {
38
            evc_id: {"metadata": {"telemetry": {"enabled": False}}}
39
        }
40
41
        self.napp.int_manager = AsyncMock()
42
43
        endpoint = f"{self.base_endpoint}/evc/enable"
44
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
45
        assert self.napp.int_manager.enable_int.call_count == 1
46
        assert self.napp.int_manager._remove_int_flows_by_cookies.call_count == 1
47
        assert response.status_code == 201
48
        assert response.json() == [evc_id]
49
50
    async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None:
51
        """Test redeploy telemetry enabled."""
@@ 107-127 (lines=21) @@
104
        assert response.status_code == 400
105
        assert "evc_ids" in response.json()["description"]
106
107
    async def test_disable_telemetry(self, monkeypatch) -> None:
108
        """Test disable telemetry."""
109
        api_mock, flow = AsyncMock(), MagicMock()
110
        flow.cookie = 0xA800000000000001
111
        monkeypatch.setattr(
112
            "napps.kytos.telemetry_int.main.api",
113
            api_mock,
114
        )
115
116
        evc_id = utils.get_id_from_cookie(flow.cookie)
117
        api_mock.get_evcs.return_value = {
118
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
119
        }
120
121
        self.napp.int_manager = AsyncMock()
122
123
        endpoint = f"{self.base_endpoint}/evc/disable"
124
        response = await self.api_client.post(endpoint, json={"evc_ids": [evc_id]})
125
        assert self.napp.int_manager.disable_int.call_count == 1
126
        assert response.status_code == 200
127
        assert response.json() == [evc_id]
128
129
    async def test_get_enabled_evcs(self, monkeypatch) -> None:
130
        """Test get enabled evcs."""
@@ 50-70 (lines=21) @@
47
        assert response.status_code == 201
48
        assert response.json() == [evc_id]
49
50
    async def test_redeploy_telemetry_enabled(self, monkeypatch) -> None:
51
        """Test redeploy telemetry enabled."""
52
        api_mock, flow = AsyncMock(), MagicMock()
53
        flow.cookie = 0xA800000000000001
54
        monkeypatch.setattr(
55
            "napps.kytos.telemetry_int.main.api",
56
            api_mock,
57
        )
58
59
        evc_id = utils.get_id_from_cookie(flow.cookie)
60
        api_mock.get_evc.return_value = {
61
            evc_id: {"metadata": {"telemetry": {"enabled": True}}}
62
        }
63
64
        self.napp.int_manager = AsyncMock()
65
66
        endpoint = f"{self.base_endpoint}/evc/redeploy"
67
        response = await self.api_client.patch(endpoint, json={"evc_ids": [evc_id]})
68
        assert self.napp.int_manager.redeploy_int.call_count == 1
69
        assert response.status_code == 201
70
        assert response.json() == [evc_id]
71
72
    async def test_redeploy_telemetry_not_enabled(self, monkeypatch) -> None:
73
        """Test redeploy telemetry not enabled."""