Passed
Pull Request — master (#67)
by Vinicius
06:56
created

build.managers.int   F

Complexity

Total Complexity 84

Size/Duplication

Total Lines 491
Duplicated Lines 0 %

Test Coverage

Coverage 66.36%

Importance

Changes 0
Metric Value
eloc 313
dl 0
loc 491
ccs 142
cts 214
cp 0.6636
rs 2
c 0
b 0
f 0
wmc 84

16 Methods

Rating   Name   Duplication   Size   Complexity  
C INTManager.load_uni_src_proxy_ports() 0 31 9
A INTManager.enable_int() 0 25 1
B INTManager._install_int_flows() 0 31 7
F INTManager.handle_pp_link_up() 0 75 17
B INTManager._validate_disable_evcs() 0 11 6
A INTManager._add_pps_evc_ids() 0 13 2
D INTManager._validate_map_enable_evcs() 0 45 13
B INTManager.get_proxy_port_or_raise() 0 34 6
A INTManager.remove_int_flows() 0 10 1
A INTManager._discard_pps_evc_ids() 0 11 2
A INTManager.install_int_flows() 0 14 1
B INTManager.handle_pp_link_down() 0 38 7
A INTManager.__init__() 0 10 1
B INTManager._remove_int_flows() 0 42 7
A INTManager.disable_int() 0 24 1
A INTManager._validate_intra_evc_different_proxy_ports() 0 23 3

How to fix   Complexity   

Complexity

Complex classes like build.managers.int often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from napps.kytos.telemetry_int import utils
11 1
from napps.kytos.telemetry_int import settings
12 1
from kytos.core import log
13 1
from kytos.core.link import Link
14 1
import napps.kytos.telemetry_int.kytos_api_helper as api
15 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
16 1
from kytos.core.common import EntityStatus
17 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
18
19 1
from napps.kytos.telemetry_int.exceptions import (
20
    EVCError,
21
    EVCNotFound,
22
    EVCHasINT,
23
    EVCHasNoINT,
24
    FlowsNotFound,
25
    ProxyPortStatusNotUP,
26
    ProxyPortDestNotFound,
27
    ProxyPortNotFound,
28
    ProxyPortSameSourceIntraEVC,
29
)
30
31
32 1
class INTManager:
33
34
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
35
36 1
    def __init__(self, controller: Controller) -> None:
37
        """INTManager."""
38 1
        self.controller = controller
39 1
        self.flow_builder = FlowBuilder()
40 1
        self._topo_link_lock = asyncio.Lock()
41
42
        # Keep track between each uni intf id and its src intf id port
43 1
        self.unis_src: dict[str, str] = {}
44
        # Keep track between src intf id and its ProxyPort instance
45 1
        self.srcs_pp: dict[str, ProxyPort] = {}
46
47 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
48
        """Load UNI ids src ids and their ProxyPort instances."""
49 1
        for evc_id, evc in evcs.items():
50 1
            if not utils.has_int_enabled(evc):
51 1
                continue
52
53 1
            uni_a_id = evc["uni_a"]["interface_id"]
54 1
            uni_z_id = evc["uni_z"]["interface_id"]
55 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
56 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
57 1
            if uni_a and "proxy_port" in uni_a.metadata:
58 1
                src_a = uni_a.switch.get_interface_by_port_no(
59
                    uni_a.metadata["proxy_port"]
60
                )
61 1
                self.unis_src[uni_a.id] = src_a.id
62 1
                try:
63 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
64
                except ProxyPortDestNotFound:
65
                    pp = self.srcs_pp[src_a.id]
66 1
                pp.evc_ids.add(evc_id)
67
68 1
            if uni_z and "proxy_port" in uni_z.metadata:
69 1
                src_z = uni_z.switch.get_interface_by_port_no(
70
                    uni_z.metadata["proxy_port"]
71
                )
72 1
                self.unis_src[uni_z.id] = src_z.id
73 1
                try:
74 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
75
                except ProxyPortDestNotFound:
76
                    pp = self.srcs_pp[src_z.id]
77 1
                pp.evc_ids.add(evc_id)
78
79 1
    async def handle_pp_link_down(self, link: Link) -> None:
80
        """Handle proxy_port link_down."""
81 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
82
            return
83 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
84 1
        if not pp:
85
            pp = self.srcs_pp.get(link.endpoint_b.id)
86 1
        if not pp or not pp.evc_ids:
87
            return
88
89 1
        async with self._topo_link_lock:
90 1
            evcs = await api.get_evcs(
91
                **{
92
                    "metadata.telemetry.enabled": "true",
93
                    "metadata.telemetry.status": "UP",
94
                }
95
            )
96 1
            to_deactivate = {
97
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
98
            }
99 1
            if not to_deactivate:
100
                return
101
102 1
            log.info(
103
                f"Handling link_down {link}, removing INT flows falling back to "
104
                f"mef_eline, EVC ids: {list(to_deactivate)}"
105
            )
106 1
            metadata = {
107
                "telemetry": {
108
                    "enabled": True,
109
                    "status": "DOWN",
110
                    "status_reason": ["proxy_port_down"],
111
                    "status_updated_at": datetime.utcnow().strftime(
112
                        "%Y-%m-%dT%H:%M:%S"
113
                    ),
114
                }
115
            }
116 1
            await self.remove_int_flows(to_deactivate, metadata)
117
118 1
    async def handle_pp_link_up(self, link: Link) -> None:
119
        """Handle proxy_port link_up."""
120 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
121
            return
122 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
123 1
        if not pp:
124
            pp = self.srcs_pp.get(link.endpoint_b.id)
125 1
        if not pp or not pp.evc_ids:
126
            return
127
128
        # This sleep is to wait for at least a few seconds to ensure that the other
129
        # proxy port would also have been considered up since the request
130
        # on topology setting metadata might end up delaying, check out issue
131
        # https://github.com/kytos-ng/of_lldp/issues/100
132
        # TODO this will be optimized later on before releasing this NApp
133 1
        await asyncio.sleep(5)
134
135 1
        async with self._topo_link_lock:
136 1
            if link.status != EntityStatus.UP or link.status_reason:
137
                return
138 1
            evcs = await api.get_evcs(
139
                **{
140
                    "metadata.telemetry.enabled": "true",
141
                    "metadata.telemetry.status": "DOWN",
142
                }
143
            )
144
145 1
            to_install = {}
146 1
            for evc_id, evc in evcs.items():
147 1
                if any(
148
                    (
149
                        not evc["active"],
150
                        evc["archived"],
151
                        evc_id not in pp.evc_ids,
152
                        evc["uni_a"]["interface_id"] not in self.unis_src,
153
                        evc["uni_z"]["interface_id"] not in self.unis_src,
154
                    )
155
                ):
156
                    continue
157
158 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
159 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
160 1
                if (
161
                    src_a_id in self.srcs_pp
162
                    and src_z_id in self.srcs_pp
163
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
164
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
165
                ):
166 1
                    to_install[evc_id] = evc
167
168 1
            if not to_install:
169
                return
170
171 1
            try:
172 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
173
            except EVCError as exc:
174
                log.exception(exc)
175
                return
176
177 1
            log.info(f"Handling link_up {link}, deploying EVC ids: {list(to_install)}")
178 1
            metadata = {
179
                "telemetry": {
180
                    "enabled": True,
181
                    "status": "UP",
182
                    "status_reason": [],
183
                    "status_updated_at": datetime.utcnow().strftime(
184
                        "%Y-%m-%dT%H:%M:%S"
185
                    ),
186
                }
187
            }
188 1
            try:
189 1
                await self.install_int_flows(to_install, metadata)
190
            except FlowsNotFound as exc:
191
                log.exception(f"FlowsNotFound {str(exc)}")
192
                return
193
194 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
195
        """Disable INT on EVCs.
196
197
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
198
199
        The force bool option, if True, will bypass the following:
200
201
        1 - EVC not found
202
        2 - EVC doesn't have INT
203
204
        """
205 1
        self._validate_disable_evcs(evcs, force)
206 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
207
208 1
        metadata = {
209
            "telemetry": {
210
                "enabled": False,
211
                "status": "DOWN",
212
                "status_reason": ["disabled"],
213
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
214
            }
215
        }
216 1
        await self.remove_int_flows(evcs, metadata, force=force)
217 1
        self._discard_pps_evc_ids(evcs)
218
219 1
    async def remove_int_flows(
220
        self, evcs: dict[str, dict], metadata: dict, force=False
221
    ) -> None:
222
        """Remove INT flows and set metadata on EVCs."""
223 1
        stored_flows = await api.get_stored_flows(
224
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
225
        )
226 1
        await asyncio.gather(
227
            self._remove_int_flows(stored_flows),
228
            api.add_evcs_metadata(evcs, metadata, force),
229
        )
230
231 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
232
        """Enable INT on EVCs.
233
234
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
235
236
        The force bool option, if True, will bypass the following:
237
238
        1 - EVC already has INT
239
        2 - ProxyPort isn't UP
240
        Other cases won't be bypassed since at the point it won't have the data needed.
241
242
        """
243 1
        evcs = self._validate_map_enable_evcs(evcs, force)
244 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
245
246 1
        metadata = {
247
            "telemetry": {
248
                "enabled": True,
249
                "status": "UP",
250
                "status_reason": [],
251
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
252
            }
253
        }
254 1
        await self.install_int_flows(evcs, metadata)
255 1
        self._add_pps_evc_ids(evcs)
256
257 1
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
258
        """Install INT flows and set metadata on EVCs."""
259 1
        stored_flows = self.flow_builder.build_int_flows(
260
            evcs,
261
            await utils.get_found_stored_flows(
262
                [
263
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
264
                    for evc_id in evcs
265
                ]
266
            ),
267
        )
268 1
        await asyncio.gather(
269
            self._install_int_flows(stored_flows),
270
            api.add_evcs_metadata(evcs, metadata),
271
        )
272
273 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
274
        """Return a ProxyPort assigned to a UNI or raise."""
275
276 1
        interface = self.controller.get_interface_by_id(intf_id)
277 1
        if not interface:
278 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
279
280 1
        if "proxy_port" not in interface.metadata:
281 1
            raise ProxyPortNotFound(
282
                evc_id, f"proxy_port metadata not found in {intf_id}"
283
            )
284
285 1
        source_intf = interface.switch.get_interface_by_port_no(
286
            interface.metadata.get("proxy_port")
287
        )
288 1
        if not source_intf:
289
            raise ProxyPortNotFound(
290
                evc_id,
291
                f"proxy_port of {intf_id} source interface not found",
292
            )
293
294 1
        pp = self.srcs_pp.get(source_intf.id)
295 1
        if not pp:
296 1
            pp = ProxyPort(self.controller, source_intf)
297 1
            self.srcs_pp[source_intf.id] = pp
298
299 1
        if not pp.destination:
300 1
            raise ProxyPortDestNotFound(
301
                evc_id,
302
                f"proxy_port of {intf_id} isn't looped or destination interface "
303
                "not found",
304
            )
305
306 1
        return pp
307
308 1
    def _validate_disable_evcs(
309
        self,
310
        evcs: dict[str, dict],
311
        force=False,
312
    ) -> None:
313
        """Validate disable EVCs."""
314 1
        for evc_id, evc in evcs.items():
315
            if not evc and not force:
316
                raise EVCNotFound(evc_id)
317
            if not utils.has_int_enabled(evc) and not force:
318
                raise EVCHasNoINT(evc_id)
319
320 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
321
        """Validate that an intra EVC is using different proxy ports.
322
323
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
324
        would ended up being overwritten since they'd be the same. Currently, an
325
        external loop will have unidirectional flows matching in the lower (source)
326
        port number.
327
        """
328 1
        pp_a = evc["uni_a"].get("proxy_port")
329 1
        pp_z = evc["uni_z"].get("proxy_port")
330 1
        if any(
331
            (
332
                not utils.is_intra_switch_evc(evc),
333
                pp_a is None,
334
                pp_z is None,
335
            )
336
        ):
337 1
            return
338 1
        if pp_a.source != pp_z.source:
339 1
            return
340
341 1
        raise ProxyPortSameSourceIntraEVC(
342
            evc["id"], "intra EVC UNIs must use different proxy ports"
343
        )
344
345 1
    def _validate_map_enable_evcs(
346
        self,
347
        evcs: dict[str, dict],
348
        force=False,
349
    ) -> dict[str, dict]:
350
        """Validate map enabling EVCs.
351
352
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
353
        so it can be reused later during provisioning.
354
355
        """
356 1
        for evc_id, evc in evcs.items():
357 1
            if not evc:
358
                raise EVCNotFound(evc_id)
359 1
            if utils.has_int_enabled(evc) and not force:
360
                raise EVCHasINT(evc_id)
361
362 1
            uni_a, uni_z = utils.get_evc_unis(evc)
363 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
364 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
365
366 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
367 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
368
369 1
            if pp_a.status != EntityStatus.UP and not force:
370
                dest_id = pp_a.destination.id if pp_a.destination else None
371
                dest_status = pp_a.status if pp_a.destination else None
372
                raise ProxyPortStatusNotUP(
373
                    evc_id,
374
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
375
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
376
                    f"destination {dest_id} status {dest_status}",
377
                )
378 1
            if pp_z.status != EntityStatus.UP and not force:
379
                dest_id = pp_z.destination.id if pp_z.destination else None
380
                dest_status = pp_z.status if pp_z.destination else None
381
                raise ProxyPortStatusNotUP(
382
                    evc_id,
383
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
384
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
385
                    f"destination {dest_id} status {dest_status}",
386
                )
387
388 1
            self._validate_intra_evc_different_proxy_ports(evc)
389 1
        return evcs
390
391 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
392
        """Add proxy ports evc_ids.
393
394
        This is meant to be called after an EVC is enabled.
395
        """
396 1
        for evc_id, evc in evcs.items():
397
            uni_a, uni_z = utils.get_evc_unis(evc)
398
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
399
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
400
            pp_a.evc_ids.add(evc_id)
401
            pp_z.evc_ids.add(evc_id)
402
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
403
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
404
405 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
406
        """Discard proxy port evc_ids.
407
408
        This is meant to be called when an EVC is disabled.
409
        """
410 1
        for evc_id, evc in evcs.items():
411
            uni_a, uni_z = utils.get_evc_unis(evc)
412
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
413
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
414
            pp_a.evc_ids.discard(evc_id)
415
            pp_z.evc_ids.discard(evc_id)
416
417 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
418
        """Delete int flows given a prefiltered stored_flows.
419
420
        Removal is driven by the stored flows instead of EVC ids and dpids to also
421
        be able to handle the force mode when an EVC no longer exists. It also follows
422
        the same pattern that mef_eline currently uses.
423
424
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
425
        for settings.BATCH_INTERVAL per batch iteration.
426
427
        """
428
        switch_flows = defaultdict(set)
429
        for flows in stored_flows.values():
430
            for flow in flows:
431
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
432
433
        for dpid, cookies in switch_flows.items():
434
            cookie_vals = list(cookies)
435
            batch_size = settings.BATCH_SIZE
436
            if batch_size <= 0:
437
                batch_size = len(cookie_vals)
438
439
            for i in range(0, len(cookie_vals), batch_size):
440
                if i > 0:
441
                    await asyncio.sleep(settings.BATCH_INTERVAL)
442
                flows = [
443
                    {
444
                        "cookie": cookie,
445
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
446
                        "table_id": Table.OFPTT_ALL.value,
447
                    }
448
                    for cookie in cookie_vals[i : i + batch_size]
449
                ]
450
                event = KytosEvent(
451
                    "kytos.flow_manager.flows.delete",
452
                    content={
453
                        "dpid": dpid,
454
                        "force": True,
455
                        "flow_dict": {"flows": flows},
456
                    },
457
                )
458
                await self.controller.buffers.app.aput(event)
459
460 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
461
        """Install INT flow mods.
462
463
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
464
        for settings.BATCH_INTERVAL per batch iteration.
465
        """
466
467 1
        switch_flows = defaultdict(list)
468 1
        for flows in stored_flows.values():
469
            for flow in flows:
470
                switch_flows[flow["switch"]].append(flow["flow"])
471
472 1
        for dpid, flows in switch_flows.items():
473
            flow_vals = list(flows)
474
            batch_size = settings.BATCH_SIZE
475
            if batch_size <= 0:
476
                batch_size = len(flow_vals)
477
478
            for i in range(0, len(flow_vals), batch_size):
479
                if i > 0:
480
                    await asyncio.sleep(settings.BATCH_INTERVAL)
481
                flows = flow_vals[i : i + batch_size]
482
                event = KytosEvent(
483
                    "kytos.flow_manager.flows.install",
484
                    content={
485
                        "dpid": dpid,
486
                        "force": True,
487
                        "flow_dict": {"flows": flows},
488
                    },
489
                )
490
                await self.controller.buffers.app.aput(event)
491