Passed
Pull Request — master (#75)
by Vinicius
08:27 queued 01:13
created

build.managers.int   F

Complexity

Total Complexity 93

Size/Duplication

Total Lines 562
Duplicated Lines 0 %

Test Coverage

Coverage 67.07%

Importance

Changes 0
Metric Value
eloc 363
dl 0
loc 562
ccs 163
cts 243
cp 0.6707
rs 2
c 0
b 0
f 0
wmc 93

18 Methods

Rating   Name   Duplication   Size   Complexity  
C INTManager.load_uni_src_proxy_ports() 0 31 9
B INTManager.handle_pp_link_down() 0 38 7
A INTManager.__init__() 0 11 1
A INTManager.enable_int() 0 25 1
B INTManager._install_int_flows() 0 31 7
F INTManager.handle_pp_link_up() 0 71 17
B INTManager._validate_disable_evcs() 0 11 6
A INTManager._add_pps_evc_ids() 0 13 2
D INTManager._validate_map_enable_evcs() 0 45 13
B INTManager.get_proxy_port_or_raise() 0 34 6
A INTManager.remove_int_flows() 0 10 1
A INTManager._discard_pps_evc_ids() 0 11 2
A INTManager.redeploy_int() 0 26 1
A INTManager.install_int_flows() 0 14 1
B INTManager._remove_int_flows() 0 42 7
A INTManager.disable_int() 0 29 3
A INTManager._validate_intra_evc_different_proxy_ports() 0 23 3
B INTManager.handle_pp_metadata_removed() 0 39 6

How to fix   Complexity   

Complexity

Complex classes like build.managers.int often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import Interface
11 1
from napps.kytos.telemetry_int import utils
12 1
from napps.kytos.telemetry_int import settings
13 1
from kytos.core import log
14 1
from kytos.core.link import Link
15 1
import napps.kytos.telemetry_int.kytos_api_helper as api
16 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
17 1
from kytos.core.common import EntityStatus
18 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
19
20 1
from napps.kytos.telemetry_int.exceptions import (
21
    EVCError,
22
    EVCNotFound,
23
    EVCHasINT,
24
    EVCHasNoINT,
25
    FlowsNotFound,
26
    ProxyPortError,
27
    ProxyPortStatusNotUP,
28
    ProxyPortDestNotFound,
29
    ProxyPortNotFound,
30
    ProxyPortSameSourceIntraEVC,
31
)
32
33
34 1
class INTManager:
35
36
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
37
38 1
    def __init__(self, controller: Controller) -> None:
39
        """INTManager."""
40 1
        self.controller = controller
41 1
        self.flow_builder = FlowBuilder()
42 1
        self._topo_link_lock = asyncio.Lock()
43 1
        self._intf_meta_lock = asyncio.Lock()
44
45
        # Keep track between each uni intf id and its src intf id port
46 1
        self.unis_src: dict[str, str] = {}
47
        # Keep track between src intf id and its ProxyPort instance
48 1
        self.srcs_pp: dict[str, ProxyPort] = {}
49
50 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
51
        """Load UNI ids src ids and their ProxyPort instances."""
52 1
        for evc_id, evc in evcs.items():
53 1
            if not utils.has_int_enabled(evc):
54 1
                continue
55
56 1
            uni_a_id = evc["uni_a"]["interface_id"]
57 1
            uni_z_id = evc["uni_z"]["interface_id"]
58 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
59 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
60 1
            if uni_a and "proxy_port" in uni_a.metadata:
61 1
                src_a = uni_a.switch.get_interface_by_port_no(
62
                    uni_a.metadata["proxy_port"]
63
                )
64 1
                self.unis_src[uni_a.id] = src_a.id
65 1
                try:
66 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
67
                except ProxyPortDestNotFound:
68
                    pp = self.srcs_pp[src_a.id]
69 1
                pp.evc_ids.add(evc_id)
70
71 1
            if uni_z and "proxy_port" in uni_z.metadata:
72 1
                src_z = uni_z.switch.get_interface_by_port_no(
73
                    uni_z.metadata["proxy_port"]
74
                )
75 1
                self.unis_src[uni_z.id] = src_z.id
76 1
                try:
77 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
78
                except ProxyPortDestNotFound:
79
                    pp = self.srcs_pp[src_z.id]
80 1
                pp.evc_ids.add(evc_id)
81
82 1
    async def handle_pp_link_down(self, link: Link) -> None:
83
        """Handle proxy_port link_down."""
84 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
85
            return
86 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
87 1
        if not pp:
88
            pp = self.srcs_pp.get(link.endpoint_b.id)
89 1
        if not pp or not pp.evc_ids:
90
            return
91
92 1
        async with self._topo_link_lock:
93 1
            evcs = await api.get_evcs(
94
                **{
95
                    "metadata.telemetry.enabled": "true",
96
                    "metadata.telemetry.status": "UP",
97
                }
98
            )
99 1
            to_deactivate = {
100
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
101
            }
102 1
            if not to_deactivate:
103
                return
104
105 1
            log.info(
106
                f"Handling link_down {link}, removing INT flows falling back to "
107
                f"mef_eline, EVC ids: {list(to_deactivate)}"
108
            )
109 1
            metadata = {
110
                "telemetry": {
111
                    "enabled": True,
112
                    "status": "DOWN",
113
                    "status_reason": ["proxy_port_down"],
114
                    "status_updated_at": datetime.utcnow().strftime(
115
                        "%Y-%m-%dT%H:%M:%S"
116
                    ),
117
                }
118
            }
119 1
            await self.remove_int_flows(to_deactivate, metadata)
120
121 1
    async def handle_pp_link_up(self, link: Link) -> None:
122
        """Handle proxy_port link_up."""
123 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
124
            return
125 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
126 1
        if not pp:
127
            pp = self.srcs_pp.get(link.endpoint_b.id)
128 1
        if not pp or not pp.evc_ids:
129
            return
130
131 1
        async with self._topo_link_lock:
132 1
            if link.status != EntityStatus.UP or link.status_reason:
133
                return
134 1
            evcs = await api.get_evcs(
135
                **{
136
                    "metadata.telemetry.enabled": "true",
137
                    "metadata.telemetry.status": "DOWN",
138
                }
139
            )
140
141 1
            to_install = {}
142 1
            for evc_id, evc in evcs.items():
143 1
                if any(
144
                    (
145
                        not evc["active"],
146
                        evc["archived"],
147
                        evc_id not in pp.evc_ids,
148
                        evc["uni_a"]["interface_id"] not in self.unis_src,
149
                        evc["uni_z"]["interface_id"] not in self.unis_src,
150
                    )
151
                ):
152
                    continue
153
154 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
155 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
156 1
                if (
157
                    src_a_id in self.srcs_pp
158
                    and src_z_id in self.srcs_pp
159
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
160
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
161
                ):
162 1
                    to_install[evc_id] = evc
163
164 1
            if not to_install:
165
                return
166
167 1
            try:
168 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
169
            except EVCError as exc:
170
                log.exception(exc)
171
                return
172
173 1
            log.info(
174
                f"Handling link_up {link}, deploying INT flows, "
175
                f"EVC ids: {list(to_install)}"
176
            )
177 1
            metadata = {
178
                "telemetry": {
179
                    "enabled": True,
180
                    "status": "UP",
181
                    "status_reason": [],
182
                    "status_updated_at": datetime.utcnow().strftime(
183
                        "%Y-%m-%dT%H:%M:%S"
184
                    ),
185
                }
186
            }
187 1
            try:
188 1
                await self.install_int_flows(to_install, metadata)
189
            except FlowsNotFound as exc:
190
                log.exception(f"FlowsNotFound {str(exc)}")
191
                return
192
193 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
194
        """Handle proxy port metadata removed."""
195 1
        if "proxy_port" in intf.metadata:
196
            return
197 1
        try:
198 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
199 1
            if not pp.evc_ids:
200
                return
201
        except KeyError:
202
            return
203
204 1
        async with self._intf_meta_lock:
205 1
            evcs = await api.get_evcs(
206
                **{
207
                    "metadata.telemetry.enabled": "true",
208
                    "metadata.telemetry.status": "UP",
209
                }
210
            )
211 1
            to_deactivate = {
212
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
213
            }
214 1
            if not to_deactivate:
215
                return
216
217 1
            log.info(
218
                f"Handling interface metadata removed on {intf}, removing INT flows "
219
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
220
            )
221 1
            metadata = {
222
                "telemetry": {
223
                    "enabled": True,
224
                    "status": "DOWN",
225
                    "status_reason": ["proxy_port_metadata_removed"],
226
                    "status_updated_at": datetime.utcnow().strftime(
227
                        "%Y-%m-%dT%H:%M:%S"
228
                    ),
229
                }
230
            }
231 1
            await self.remove_int_flows(to_deactivate, metadata)
232
233 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
234
        """Disable INT on EVCs.
235
236
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
237
238
        The force bool option, if True, will bypass the following:
239
240
        1 - EVC not found
241
        2 - EVC doesn't have INT
242
        3 - ProxyPortNotFound or ProxyPortDestNotFound
243
244
        """
245 1
        self._validate_disable_evcs(evcs, force)
246 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
247
248 1
        metadata = {
249
            "telemetry": {
250
                "enabled": False,
251
                "status": "DOWN",
252
                "status_reason": ["disabled"],
253
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
254
            }
255
        }
256 1
        await self.remove_int_flows(evcs, metadata, force=force)
257 1
        try:
258 1
            self._discard_pps_evc_ids(evcs)
259
        except ProxyPortError:
260
            if not force:
261
                raise
262
263 1
    async def remove_int_flows(
264
        self, evcs: dict[str, dict], metadata: dict, force=False
265
    ) -> None:
266
        """Remove INT flows and set metadata on EVCs."""
267 1
        stored_flows = await api.get_stored_flows(
268
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
269
        )
270 1
        await asyncio.gather(
271
            self._remove_int_flows(stored_flows),
272
            api.add_evcs_metadata(evcs, metadata, force),
273
        )
274
275 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
276
        """Enable INT on EVCs.
277
278
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
279
280
        The force bool option, if True, will bypass the following:
281
282
        1 - EVC already has INT
283
        2 - ProxyPort isn't UP
284
        Other cases won't be bypassed since at the point it won't have the data needed.
285
286
        """
287 1
        evcs = self._validate_map_enable_evcs(evcs, force)
288 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
289
290 1
        metadata = {
291
            "telemetry": {
292
                "enabled": True,
293
                "status": "UP",
294
                "status_reason": [],
295
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
296
            }
297
        }
298 1
        await self.install_int_flows(evcs, metadata)
299 1
        self._add_pps_evc_ids(evcs)
300
301 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
302
        """Redeploy INT on EVCs. It'll only delete and install the flows again.
303
304
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
305
        """
306 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
307 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
308
309 1
        stored_flows = await api.get_stored_flows(
310
            [
311
                utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
312
                for evc_id in evcs
313
            ]
314
        )
315 1
        await self._remove_int_flows(stored_flows)
316
317 1
        found_stored_flows = self.flow_builder.build_int_flows(
318
            evcs,
319
            await utils.get_found_stored_flows(
320
                [
321
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
322
                    for evc_id in evcs
323
                ]
324
            ),
325
        )
326 1
        await self._install_int_flows(found_stored_flows)
327
328 1
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
329
        """Install INT flows and set metadata on EVCs."""
330 1
        stored_flows = self.flow_builder.build_int_flows(
331
            evcs,
332
            await utils.get_found_stored_flows(
333
                [
334
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
335
                    for evc_id in evcs
336
                ]
337
            ),
338
        )
339 1
        await asyncio.gather(
340
            self._install_int_flows(stored_flows),
341
            api.add_evcs_metadata(evcs, metadata),
342
        )
343
344 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
345
        """Return a ProxyPort assigned to a UNI or raise."""
346
347 1
        interface = self.controller.get_interface_by_id(intf_id)
348 1
        if not interface:
349 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
350
351 1
        if "proxy_port" not in interface.metadata:
352 1
            raise ProxyPortNotFound(
353
                evc_id, f"proxy_port metadata not found in {intf_id}"
354
            )
355
356 1
        source_intf = interface.switch.get_interface_by_port_no(
357
            interface.metadata.get("proxy_port")
358
        )
359 1
        if not source_intf:
360
            raise ProxyPortNotFound(
361
                evc_id,
362
                f"proxy_port of {intf_id} source interface not found",
363
            )
364
365 1
        pp = self.srcs_pp.get(source_intf.id)
366 1
        if not pp:
367 1
            pp = ProxyPort(self.controller, source_intf)
368 1
            self.srcs_pp[source_intf.id] = pp
369
370 1
        if not pp.destination:
371 1
            raise ProxyPortDestNotFound(
372
                evc_id,
373
                f"proxy_port of {intf_id} isn't looped or destination interface "
374
                "not found",
375
            )
376
377 1
        return pp
378
379 1
    def _validate_disable_evcs(
380
        self,
381
        evcs: dict[str, dict],
382
        force=False,
383
    ) -> None:
384
        """Validate disable EVCs."""
385 1
        for evc_id, evc in evcs.items():
386
            if not evc and not force:
387
                raise EVCNotFound(evc_id)
388
            if not utils.has_int_enabled(evc) and not force:
389
                raise EVCHasNoINT(evc_id)
390
391 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
392
        """Validate that an intra EVC is using different proxy ports.
393
394
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
395
        would ended up being overwritten since they'd be the same. Currently, an
396
        external loop will have unidirectional flows matching in the lower (source)
397
        port number.
398
        """
399 1
        pp_a = evc["uni_a"].get("proxy_port")
400 1
        pp_z = evc["uni_z"].get("proxy_port")
401 1
        if any(
402
            (
403
                not utils.is_intra_switch_evc(evc),
404
                pp_a is None,
405
                pp_z is None,
406
            )
407
        ):
408 1
            return
409 1
        if pp_a.source != pp_z.source:
410 1
            return
411
412 1
        raise ProxyPortSameSourceIntraEVC(
413
            evc["id"], "intra EVC UNIs must use different proxy ports"
414
        )
415
416 1
    def _validate_map_enable_evcs(
417
        self,
418
        evcs: dict[str, dict],
419
        force=False,
420
    ) -> dict[str, dict]:
421
        """Validate map enabling EVCs.
422
423
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
424
        so it can be reused later during provisioning.
425
426
        """
427 1
        for evc_id, evc in evcs.items():
428 1
            if not evc:
429
                raise EVCNotFound(evc_id)
430 1
            if utils.has_int_enabled(evc) and not force:
431
                raise EVCHasINT(evc_id)
432
433 1
            uni_a, uni_z = utils.get_evc_unis(evc)
434 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
435 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
436
437 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
438 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
439
440 1
            if pp_a.status != EntityStatus.UP and not force:
441
                dest_id = pp_a.destination.id if pp_a.destination else None
442
                dest_status = pp_a.status if pp_a.destination else None
443
                raise ProxyPortStatusNotUP(
444
                    evc_id,
445
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
446
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
447
                    f"destination {dest_id} status {dest_status}",
448
                )
449 1
            if pp_z.status != EntityStatus.UP and not force:
450
                dest_id = pp_z.destination.id if pp_z.destination else None
451
                dest_status = pp_z.status if pp_z.destination else None
452
                raise ProxyPortStatusNotUP(
453
                    evc_id,
454
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
455
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
456
                    f"destination {dest_id} status {dest_status}",
457
                )
458
459 1
            self._validate_intra_evc_different_proxy_ports(evc)
460 1
        return evcs
461
462 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
463
        """Add proxy ports evc_ids.
464
465
        This is meant to be called after an EVC is enabled.
466
        """
467 1
        for evc_id, evc in evcs.items():
468
            uni_a, uni_z = utils.get_evc_unis(evc)
469
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
470
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
471
            pp_a.evc_ids.add(evc_id)
472
            pp_z.evc_ids.add(evc_id)
473
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
474
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
475
476 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
477
        """Discard proxy port evc_ids.
478
479
        This is meant to be called when an EVC is disabled.
480
        """
481 1
        for evc_id, evc in evcs.items():
482
            uni_a, uni_z = utils.get_evc_unis(evc)
483
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
484
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
485
            pp_a.evc_ids.discard(evc_id)
486
            pp_z.evc_ids.discard(evc_id)
487
488 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
489
        """Delete int flows given a prefiltered stored_flows.
490
491
        Removal is driven by the stored flows instead of EVC ids and dpids to also
492
        be able to handle the force mode when an EVC no longer exists. It also follows
493
        the same pattern that mef_eline currently uses.
494
495
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
496
        for settings.BATCH_INTERVAL per batch iteration.
497
498
        """
499
        switch_flows = defaultdict(set)
500
        for flows in stored_flows.values():
501
            for flow in flows:
502
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
503
504
        for dpid, cookies in switch_flows.items():
505
            cookie_vals = list(cookies)
506
            batch_size = settings.BATCH_SIZE
507
            if batch_size <= 0:
508
                batch_size = len(cookie_vals)
509
510
            for i in range(0, len(cookie_vals), batch_size):
511
                if i > 0:
512
                    await asyncio.sleep(settings.BATCH_INTERVAL)
513
                flows = [
514
                    {
515
                        "cookie": cookie,
516
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
517
                        "table_id": Table.OFPTT_ALL.value,
518
                    }
519
                    for cookie in cookie_vals[i : i + batch_size]
520
                ]
521
                event = KytosEvent(
522
                    "kytos.flow_manager.flows.delete",
523
                    content={
524
                        "dpid": dpid,
525
                        "force": True,
526
                        "flow_dict": {"flows": flows},
527
                    },
528
                )
529
                await self.controller.buffers.app.aput(event)
530
531 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
532
        """Install INT flow mods.
533
534
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
535
        for settings.BATCH_INTERVAL per batch iteration.
536
        """
537
538 1
        switch_flows = defaultdict(list)
539 1
        for flows in stored_flows.values():
540
            for flow in flows:
541
                switch_flows[flow["switch"]].append(flow["flow"])
542
543 1
        for dpid, flows in switch_flows.items():
544
            flow_vals = list(flows)
545
            batch_size = settings.BATCH_SIZE
546
            if batch_size <= 0:
547
                batch_size = len(flow_vals)
548
549
            for i in range(0, len(flow_vals), batch_size):
550
                if i > 0:
551
                    await asyncio.sleep(settings.BATCH_INTERVAL)
552
                flows = flow_vals[i : i + batch_size]
553
                event = KytosEvent(
554
                    "kytos.flow_manager.flows.install",
555
                    content={
556
                        "dpid": dpid,
557
                        "force": True,
558
                        "flow_dict": {"flows": flows},
559
                    },
560
                )
561
                await self.controller.buffers.app.aput(event)
562