Passed
Pull Request — master (#64)
by Vinicius
07:12 queued 04:20
created

build.managers.int   A

Complexity

Total Complexity 39

Size/Duplication

Total Lines 266
Duplicated Lines 0 %

Test Coverage

Coverage 59.38%

Importance

Changes 0
Metric Value
eloc 153
dl 0
loc 266
ccs 57
cts 96
cp 0.5938
rs 9.28
c 0
b 0
f 0
wmc 39

8 Methods

Rating   Name   Duplication   Size   Complexity  
A INTManager.enable_int() 0 36 1
B INTManager._validate_disable_evcs() 0 11 6
D INTManager._validate_map_enable_evcs() 0 49 13
B INTManager.remove_int_flows() 0 42 7
B INTManager.install_int_flows() 0 31 7
A INTManager.__init__() 0 4 1
A INTManager._validate_intra_evc_different_proxy_ports() 0 23 3
A INTManager.disable_int() 0 30 1
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from napps.kytos.telemetry_int import utils
11 1
from napps.kytos.telemetry_int import settings
12 1
from kytos.core import log
13 1
import napps.kytos.telemetry_int.kytos_api_helper as api
14 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
15 1
from kytos.core.common import EntityStatus
16
17 1
from napps.kytos.telemetry_int.exceptions import (
18
    EVCNotFound,
19
    EVCHasINT,
20
    EVCHasNoINT,
21
    ProxyPortStatusNotUP,
22
    ProxyPortSameSourceIntraEVC
23
)
24
25
26 1
class INTManager:
27
28
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
29
30 1
    def __init__(self, controller: Controller) -> None:
31
        """INTManager."""
32 1
        self.controller = controller
33 1
        self.flow_builder = FlowBuilder()
34
35 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
36
        """Disable INT on EVCs.
37
38
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
39
40
        The force bool option, if True, will bypass the following:
41
42
        1 - EVC not found
43
        2 - EVC doesn't have INT
44
45
        """
46 1
        self._validate_disable_evcs(evcs, force)
47
48 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
49
50 1
        stored_flows = await api.get_stored_flows(
51
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
52
        )
53
54 1
        metadata = {
55
            "telemetry": {
56
                "enabled": False,
57
                "status": "DOWN",
58
                "status_reason": ["disabled"],
59
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
60
            }
61
        }
62 1
        await asyncio.gather(
63
            self.remove_int_flows(stored_flows),
64
            api.add_evcs_metadata(evcs, metadata, force),
65
        )
66
67 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
68
        """Enable INT on EVCs.
69
70
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
71
72
        The force bool option, if True, will bypass the following:
73
74
        1 - EVC already has INT
75
        2 - ProxyPort isn't UP
76
        Other cases won't be bypassed since at the point it won't have the data needed.
77
78
        """
79 1
        evcs = self._validate_map_enable_evcs(evcs, force)
80
81 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
82
83 1
        stored_flows = self.flow_builder.build_int_flows(
84
            evcs,
85
            await utils.get_found_stored_flows(
86
                [
87
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
88
                    for evc_id in evcs
89
                ]
90
            ),
91
        )
92
93 1
        metadata = {
94
            "telemetry": {
95
                "enabled": True,
96
                "status": "UP",
97
                "status_reason": [],
98
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
99
            }
100
        }
101 1
        await asyncio.gather(
102
            self.install_int_flows(stored_flows), api.add_evcs_metadata(evcs, metadata)
103
        )
104
105 1
    def _validate_disable_evcs(
106
        self,
107
        evcs: dict[str, dict],
108
        force=False,
109
    ) -> None:
110
        """Validate disable EVCs."""
111 1
        for evc_id, evc in evcs.items():
112
            if not evc and not force:
113
                raise EVCNotFound(evc_id)
114
            if not utils.has_int_enabled(evc) and not force:
115
                raise EVCHasNoINT(evc_id)
116
117 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
118
        """Validate that an intra EVC is using different proxy ports.
119
120
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
121
        would ended up being overwritten since they'd be the same. Currently, an
122
        external loop will have unidirectional flows matching in the lower (source)
123
        port number.
124
        """
125 1
        pp_a = evc["uni_a"].get("proxy_port")
126 1
        pp_z = evc["uni_z"].get("proxy_port")
127 1
        if any(
128
            (
129
                not utils.is_intra_switch_evc(evc),
130
                pp_a is None,
131
                pp_z is None,
132
            )
133
        ):
134 1
            return
135 1
        if pp_a.source != pp_z.source:
136 1
            return
137
138 1
        raise ProxyPortSameSourceIntraEVC(
139
            evc["id"], "intra EVC UNIs must use different proxy ports"
140
        )
141
142 1
    def _validate_map_enable_evcs(
143
        self,
144
        evcs: dict[str, dict],
145
        force=False,
146
    ) -> dict[str, dict]:
147
        """Validate map enabling EVCs.
148
149
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
150
        so it can be reused later during provisioning.
151
152
        """
153 1
        for evc_id, evc in evcs.items():
154 1
            if not evc:
155
                raise EVCNotFound(evc_id)
156 1
            if utils.has_int_enabled(evc) and not force:
157
                raise EVCHasINT(evc_id)
158
159 1
            uni_a, uni_z = utils.get_evc_unis(evc)
160 1
            pp_a = utils.get_proxy_port_or_raise(
161
                self.controller, uni_a["interface_id"], evc_id
162
            )
163 1
            pp_z = utils.get_proxy_port_or_raise(
164
                self.controller, uni_z["interface_id"], evc_id
165
            )
166
167 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
168 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
169
170 1
            if pp_a.status != EntityStatus.UP and not force:
171
                dest_id = pp_a.destination.id if pp_a.destination else None
172
                dest_status = pp_a.status if pp_a.destination else None
173
                raise ProxyPortStatusNotUP(
174
                    evc_id,
175
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
176
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
177
                    f"destination {dest_id} status {dest_status}",
178
                )
179 1
            if pp_z.status != EntityStatus.UP and not force:
180
                dest_id = pp_z.destination.id if pp_z.destination else None
181
                dest_status = pp_z.status if pp_z.destination else None
182
                raise ProxyPortStatusNotUP(
183
                    evc_id,
184
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
185
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
186
                    f"destination {dest_id} status {dest_status}",
187
                )
188
189 1
            self._validate_intra_evc_different_proxy_ports(evc)
190 1
        return evcs
191
192 1
    async def remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
193
        """Delete int flows given a prefiltered stored_flows.
194
195
        Removal is driven by the stored flows instead of EVC ids and dpids to also
196
        be able to handle the force mode when an EVC no longer exists. It also follows
197
        the same pattern that mef_eline currently uses.
198
199
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
200
        for settings.BATCH_INTERVAL per batch iteration.
201
202
        """
203
        switch_flows = defaultdict(set)
204
        for flows in stored_flows.values():
205
            for flow in flows:
206
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
207
208
        for dpid, cookies in switch_flows.items():
209
            cookie_vals = list(cookies)
210
            batch_size = settings.BATCH_SIZE
211
            if batch_size <= 0:
212
                batch_size = len(cookie_vals)
213
214
            for i in range(0, len(cookie_vals), batch_size):
215
                if i > 0:
216
                    await asyncio.sleep(settings.BATCH_INTERVAL)
217
                flows = [
218
                    {
219
                        "cookie": cookie,
220
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
221
                        "table_id": Table.OFPTT_ALL.value,
222
                    }
223
                    for cookie in cookie_vals[i : i + batch_size]
224
                ]
225
                event = KytosEvent(
226
                    "kytos.flow_manager.flows.delete",
227
                    content={
228
                        "dpid": dpid,
229
                        "force": True,
230
                        "flow_dict": {"flows": flows},
231
                    },
232
                )
233
                await self.controller.buffers.app.aput(event)
234
235 1
    async def install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
236
        """Install INT flow mods.
237
238
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
239
        for settings.BATCH_INTERVAL per batch iteration.
240
        """
241
242 1
        switch_flows = defaultdict(list)
243 1
        for flows in stored_flows.values():
244
            for flow in flows:
245
                switch_flows[flow["switch"]].append(flow["flow"])
246
247 1
        for dpid, flows in switch_flows.items():
248
            flow_vals = list(flows)
249
            batch_size = settings.BATCH_SIZE
250
            if batch_size <= 0:
251
                batch_size = len(flow_vals)
252
253
            for i in range(0, len(flow_vals), batch_size):
254
                if i > 0:
255
                    await asyncio.sleep(settings.BATCH_INTERVAL)
256
                flows = flow_vals[i : i + batch_size]
257
                event = KytosEvent(
258
                    "kytos.flow_manager.flows.install",
259
                    content={
260
                        "dpid": dpid,
261
                        "force": True,
262
                        "flow_dict": {"flows": flows},
263
                    },
264
                )
265
                await self.controller.buffers.app.aput(event)
266