Test Failed
Pull Request — master (#76)
by Vinicius
06:10
created

INTManager._discard_pps_evc_ids()   A

Complexity

Conditions 2

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 3.1852

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 11
ccs 2
cts 6
cp 0.3333
crap 3.1852
rs 10
c 0
b 0
f 0
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import Interface
11 1
from napps.kytos.telemetry_int import utils
12 1
from napps.kytos.telemetry_int import settings
13 1
from kytos.core import log
14 1
from kytos.core.link import Link
15 1
import napps.kytos.telemetry_int.kytos_api_helper as api
16 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
17 1
from kytos.core.common import EntityStatus
18 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
19
20 1
from napps.kytos.telemetry_int.exceptions import (
21
    EVCError,
22
    EVCNotFound,
23
    EVCHasINT,
24
    EVCHasNoINT,
25
    FlowsNotFound,
26
    ProxyPortError,
27
    ProxyPortStatusNotUP,
28
    ProxyPortDestNotFound,
29
    ProxyPortNotFound,
30
    ProxyPortSameSourceIntraEVC,
31
)
32
33
34 1
class INTManager:
35
36
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
37
38 1
    def __init__(self, controller: Controller) -> None:
39
        """INTManager."""
40 1
        self.controller = controller
41 1
        self.flow_builder = FlowBuilder()
42 1
        self._topo_link_lock = asyncio.Lock()
43 1
        self._intf_meta_lock = asyncio.Lock()
44
45
        # Keep track between each uni intf id and its src intf id port
46 1
        self.unis_src: dict[str, str] = {}
47
        # Keep track between src intf id and its ProxyPort instance
48 1
        self.srcs_pp: dict[str, ProxyPort] = {}
49
50 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
51
        """Load UNI ids src ids and their ProxyPort instances."""
52 1
        for evc_id, evc in evcs.items():
53 1
            if not utils.has_int_enabled(evc):
54 1
                continue
55
56 1
            uni_a_id = evc["uni_a"]["interface_id"]
57 1
            uni_z_id = evc["uni_z"]["interface_id"]
58 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
59 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
60 1
            if uni_a and "proxy_port" in uni_a.metadata:
61 1
                src_a = uni_a.switch.get_interface_by_port_no(
62
                    uni_a.metadata["proxy_port"]
63
                )
64 1
                self.unis_src[uni_a.id] = src_a.id
65 1
                try:
66 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
67
                except ProxyPortDestNotFound:
68
                    pp = self.srcs_pp[src_a.id]
69 1
                pp.evc_ids.add(evc_id)
70
71 1
            if uni_z and "proxy_port" in uni_z.metadata:
72 1
                src_z = uni_z.switch.get_interface_by_port_no(
73
                    uni_z.metadata["proxy_port"]
74
                )
75 1
                self.unis_src[uni_z.id] = src_z.id
76 1
                try:
77 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
78
                except ProxyPortDestNotFound:
79
                    pp = self.srcs_pp[src_z.id]
80 1
                pp.evc_ids.add(evc_id)
81
82 1
    async def handle_pp_link_down(self, link: Link) -> None:
83
        """Handle proxy_port link_down."""
84 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
85
            return
86 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
87 1
        if not pp:
88
            pp = self.srcs_pp.get(link.endpoint_b.id)
89 1
        if not pp or not pp.evc_ids:
90
            return
91
92 1
        async with self._topo_link_lock:
93 1
            evcs = await api.get_evcs(
94
                **{
95
                    "metadata.telemetry.enabled": "true",
96
                    "metadata.telemetry.status": "UP",
97
                }
98
            )
99 1
            to_deactivate = {
100
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
101
            }
102 1
            if not to_deactivate:
103
                return
104
105 1
            log.info(
106
                f"Handling link_down {link}, removing INT flows falling back to "
107
                f"mef_eline, EVC ids: {list(to_deactivate)}"
108
            )
109 1
            metadata = {
110
                "telemetry": {
111
                    "enabled": True,
112
                    "status": "DOWN",
113
                    "status_reason": ["proxy_port_down"],
114
                    "status_updated_at": datetime.utcnow().strftime(
115
                        "%Y-%m-%dT%H:%M:%S"
116
                    ),
117
                }
118
            }
119 1
            await self.remove_int_flows(to_deactivate, metadata)
120
121 1
    async def handle_pp_link_up(self, link: Link) -> None:
122
        """Handle proxy_port link_up."""
123 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
124
            return
125 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
126 1
        if not pp:
127
            pp = self.srcs_pp.get(link.endpoint_b.id)
128 1
        if not pp or not pp.evc_ids:
129
            return
130
131 1
        async with self._topo_link_lock:
132 1
            if link.status != EntityStatus.UP or link.status_reason:
133
                return
134 1
            evcs = await api.get_evcs(
135
                **{
136
                    "metadata.telemetry.enabled": "true",
137
                    "metadata.telemetry.status": "DOWN",
138
                }
139
            )
140
141 1
            to_install = {}
142 1
            for evc_id, evc in evcs.items():
143 1
                if any(
144
                    (
145
                        not evc["active"],
146
                        evc["archived"],
147
                        evc_id not in pp.evc_ids,
148
                        evc["uni_a"]["interface_id"] not in self.unis_src,
149
                        evc["uni_z"]["interface_id"] not in self.unis_src,
150
                    )
151
                ):
152
                    continue
153
154 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
155 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
156 1
                if (
157
                    src_a_id in self.srcs_pp
158
                    and src_z_id in self.srcs_pp
159
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
160
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
161
                ):
162 1
                    to_install[evc_id] = evc
163
164 1
            if not to_install:
165
                return
166
167 1
            try:
168 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
169
            except EVCError as exc:
170
                log.exception(exc)
171
                return
172
173 1
            log.info(
174
                f"Handling link_up {link}, deploying INT flows, "
175
                f"EVC ids: {list(to_install)}"
176
            )
177 1
            metadata = {
178
                "telemetry": {
179
                    "enabled": True,
180
                    "status": "UP",
181
                    "status_reason": [],
182
                    "status_updated_at": datetime.utcnow().strftime(
183
                        "%Y-%m-%dT%H:%M:%S"
184
                    ),
185
                }
186
            }
187 1
            try:
188 1
                await self.install_int_flows(to_install, metadata)
189
            except FlowsNotFound as exc:
190
                log.exception(f"FlowsNotFound {str(exc)}")
191
                return
192
193 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
194
        """Handle proxy port metadata removed."""
195 1
        if "proxy_port" in intf.metadata:
196
            return
197 1
        try:
198 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
199 1
            if not pp.evc_ids:
200
                return
201
        except KeyError:
202
            return
203
204 1
        async with self._intf_meta_lock:
205 1
            evcs = await api.get_evcs(
206
                **{
207
                    "metadata.telemetry.enabled": "true",
208
                    "metadata.telemetry.status": "UP",
209
                }
210
            )
211 1
            to_deactivate = {
212
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
213
            }
214 1
            if not to_deactivate:
215
                return
216
217 1
            log.info(
218
                f"Handling interface metadata removed on {intf}, removing INT flows "
219
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
220
            )
221 1
            metadata = {
222
                "telemetry": {
223
                    "enabled": True,
224
                    "status": "DOWN",
225
                    "status_reason": ["proxy_port_metadata_removed"],
226
                    "status_updated_at": datetime.utcnow().strftime(
227
                        "%Y-%m-%dT%H:%M:%S"
228
                    ),
229
                }
230
            }
231 1
            await self.remove_int_flows(to_deactivate, metadata)
232
233 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
234
        """Disable INT on EVCs.
235
236
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
237
238
        The force bool option, if True, will bypass the following:
239
240
        1 - EVC not found
241
        2 - EVC doesn't have INT
242
        3 - ProxyPortNotFound or ProxyPortDestNotFound
243
244
        """
245 1
        self._validate_disable_evcs(evcs, force)
246 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
247
248 1
        metadata = {
249
            "telemetry": {
250
                "enabled": False,
251
                "status": "DOWN",
252
                "status_reason": ["disabled"],
253
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
254
            }
255
        }
256 1
        await self.remove_int_flows(evcs, metadata, force=force)
257 1
        try:
258 1
            self._discard_pps_evc_ids(evcs)
259
        except ProxyPortError:
260
            if not force:
261
                raise
262
263 1
    async def remove_int_flows(
264
        self, evcs: dict[str, dict], metadata: dict, force=False
265
    ) -> None:
266
        """Remove INT flows and set metadata on EVCs."""
267 1
        stored_flows = await api.get_stored_flows(
268
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
269
        )
270 1
        await asyncio.gather(
271
            self._remove_int_flows(stored_flows),
272
            api.add_evcs_metadata(evcs, metadata, force),
273
        )
274
275 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
276
        """Enable INT on EVCs.
277
278
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
279
280
        The force bool option, if True, will bypass the following:
281
282
        1 - EVC already has INT
283
        2 - ProxyPort isn't UP
284
        Other cases won't be bypassed since at the point it won't have the data needed.
285
286
        """
287 1
        evcs = self._validate_map_enable_evcs(evcs, force)
288 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
289
290 1
        metadata = {
291
            "telemetry": {
292
                "enabled": True,
293
                "status": "UP",
294
                "status_reason": [],
295
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
296
            }
297
        }
298 1
        await self.install_int_flows(evcs, metadata)
299 1
        self._add_pps_evc_ids(evcs)
300
301 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
302
        """Redeploy INT on EVCs. It'll only delete and install the flows again.
303
304
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
305
        """
306 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
307 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
308
309 1
        stored_flows = await api.get_stored_flows(
310
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
311
        )
312
        await self._remove_int_flows(stored_flows)
313
314
        found_stored_flows = self.flow_builder.build_int_flows(
315 1
            evcs,
316
            await utils.get_found_stored_flows(
317 1
                [
318
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
319
                    for evc_id in evcs
320
                ]
321
            ),
322
        )
323
        await self._install_int_flows(found_stored_flows)
324
325
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
326 1
        """Install INT flows and set metadata on EVCs."""
327
        stored_flows = self.flow_builder.build_int_flows(
328 1
            evcs,
329
            await utils.get_found_stored_flows(
330 1
                [
331
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
332
                    for evc_id in evcs
333
                ]
334
            ),
335
        )
336
        await asyncio.gather(
337
            self._install_int_flows(stored_flows),
338
            api.add_evcs_metadata(evcs, metadata),
339 1
        )
340
341
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
342
        """Return a ProxyPort assigned to a UNI or raise."""
343
344 1
        interface = self.controller.get_interface_by_id(intf_id)
345
        if not interface:
346
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
347 1
348 1
        if "proxy_port" not in interface.metadata:
349 1
            raise ProxyPortNotFound(
350
                evc_id, f"proxy_port metadata not found in {intf_id}"
351 1
            )
352 1
353
        source_intf = interface.switch.get_interface_by_port_no(
354
            interface.metadata.get("proxy_port")
355
        )
356 1
        if not source_intf:
357
            raise ProxyPortNotFound(
358
                evc_id,
359 1
                f"proxy_port of {intf_id} source interface not found",
360
            )
361
362
        pp = self.srcs_pp.get(source_intf.id)
363
        if not pp:
364
            pp = ProxyPort(self.controller, source_intf)
365 1
            self.srcs_pp[source_intf.id] = pp
366 1
367 1
        if not pp.destination:
368 1
            raise ProxyPortDestNotFound(
369
                evc_id,
370 1
                f"proxy_port of {intf_id} isn't looped or destination interface "
371 1
                "not found",
372
            )
373
374
        return pp
375
376
    def _validate_disable_evcs(
377 1
        self,
378
        evcs: dict[str, dict],
379 1
        force=False,
380
    ) -> None:
381
        """Validate disable EVCs."""
382
        for evc_id, evc in evcs.items():
383
            if not evc and not force:
384
                raise EVCNotFound(evc_id)
385 1
            if not utils.has_int_enabled(evc) and not force:
386
                raise EVCHasNoINT(evc_id)
387
388
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
389
        """Validate that an intra EVC is using different proxy ports.
390
391 1
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
392
        would ended up being overwritten since they'd be the same. Currently, an
393
        external loop will have unidirectional flows matching in the lower (source)
394
        port number.
395
        """
396
        pp_a = evc["uni_a"].get("proxy_port")
397
        pp_z = evc["uni_z"].get("proxy_port")
398
        if any(
399 1
            (
400 1
                not utils.is_intra_switch_evc(evc),
401 1
                pp_a is None,
402
                pp_z is None,
403
            )
404
        ):
405
            return
406
        if pp_a.source != pp_z.source:
407
            return
408 1
409 1
        raise ProxyPortSameSourceIntraEVC(
410 1
            evc["id"], "intra EVC UNIs must use different proxy ports"
411
        )
412 1
413
    def _validate_map_enable_evcs(
414
        self,
415
        evcs: dict[str, dict],
416 1
        force=False,
417
    ) -> dict[str, dict]:
418
        """Validate map enabling EVCs.
419
420
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
421
        so it can be reused later during provisioning.
422
423
        """
424
        for evc_id, evc in evcs.items():
425
            if not evc:
426
                raise EVCNotFound(evc_id)
427 1
            if utils.has_int_enabled(evc) and not force:
428 1
                raise EVCHasINT(evc_id)
429
430 1
            uni_a, uni_z = utils.get_evc_unis(evc)
431
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
432
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
433 1
434 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
435 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
436
437 1
            if pp_a.status != EntityStatus.UP and not force:
438 1
                dest_id = pp_a.destination.id if pp_a.destination else None
439
                dest_status = pp_a.status if pp_a.destination else None
440 1
                raise ProxyPortStatusNotUP(
441
                    evc_id,
442
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
443
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
444
                    f"destination {dest_id} status {dest_status}",
445
                )
446
            if pp_z.status != EntityStatus.UP and not force:
447
                dest_id = pp_z.destination.id if pp_z.destination else None
448
                dest_status = pp_z.status if pp_z.destination else None
449 1
                raise ProxyPortStatusNotUP(
450
                    evc_id,
451
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
452
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
453
                    f"destination {dest_id} status {dest_status}",
454
                )
455
456
            self._validate_intra_evc_different_proxy_ports(evc)
457
        return evcs
458
459 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
460 1
        """Add proxy ports evc_ids.
461
462 1
        This is meant to be called after an EVC is enabled.
463
        """
464
        for evc_id, evc in evcs.items():
465
            uni_a, uni_z = utils.get_evc_unis(evc)
466
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
467 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
468
            pp_a.evc_ids.add(evc_id)
469
            pp_z.evc_ids.add(evc_id)
470
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
471
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
472
473
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
474
        """Discard proxy port evc_ids.
475
476 1
        This is meant to be called when an EVC is disabled.
477
        """
478
        for evc_id, evc in evcs.items():
479
            uni_a, uni_z = utils.get_evc_unis(evc)
480
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
481 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
482
            pp_a.evc_ids.discard(evc_id)
483
            pp_z.evc_ids.discard(evc_id)
484
485
    def evc_compare(
486
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
487
    ) -> dict[str, list]:
488 1
        """EVC compare.
489
490
        Cases:
491
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
492
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
493
494
        """
495
        int_flows = {
496
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
497
        }
498
        mef_flows = {
499
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
500
        }
501
502
        results = defaultdict(list)
503
        for evc in evcs.values():
504
            evc_id = evc["id"]
505
506
            if (
507
                not utils.has_int_enabled(evc)
508
                and evc_id in int_flows
509
                and int_flows[evc_id]
510
            ):
511
                results[evc_id].append("wrong_metadata_has_int_flows")
512
513
            if (
514
                utils.has_int_enabled(evc)
515
                and evc_id in mef_flows
516
                and mef_flows[evc_id]
517
                and (
518
                    evc_id not in int_flows
519
                    or (
520
                        evc_id in int_flows
521
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
522
                    )
523
                )
524
            ):
525
                results[evc_id].append("missing_some_int_flows")
526
        return results
527
528
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
529
        """Delete int flows given a prefiltered stored_flows.
530
531 1
        Removal is driven by the stored flows instead of EVC ids and dpids to also
532
        be able to handle the force mode when an EVC no longer exists. It also follows
533
        the same pattern that mef_eline currently uses.
534
535
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
536
        for settings.BATCH_INTERVAL per batch iteration.
537
538 1
        """
539 1
        switch_flows = defaultdict(set)
540
        for flows in stored_flows.values():
541
            for flow in flows:
542
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
543 1
544
        for dpid, cookies in switch_flows.items():
545
            cookie_vals = list(cookies)
546
            batch_size = settings.BATCH_SIZE
547
            if batch_size <= 0:
548
                batch_size = len(cookie_vals)
549
550
            for i in range(0, len(cookie_vals), batch_size):
551
                if i > 0:
552
                    await asyncio.sleep(settings.BATCH_INTERVAL)
553
                flows = [
554
                    {
555
                        "cookie": cookie,
556
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
557
                        "table_id": Table.OFPTT_ALL.value,
558
                    }
559
                    for cookie in cookie_vals[i : i + batch_size]
560
                ]
561
                event = KytosEvent(
562
                    "kytos.flow_manager.flows.delete",
563
                    content={
564
                        "dpid": dpid,
565
                        "force": True,
566
                        "flow_dict": {"flows": flows},
567
                    },
568
                )
569
                await self.controller.buffers.app.aput(event)
570
571
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
572
        """Install INT flow mods.
573
574
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
575
        for settings.BATCH_INTERVAL per batch iteration.
576
        """
577
578
        switch_flows = defaultdict(list)
579
        for flows in stored_flows.values():
580
            for flow in flows:
581
                switch_flows[flow["switch"]].append(flow["flow"])
582
583
        for dpid, flows in switch_flows.items():
584
            flow_vals = list(flows)
585
            batch_size = settings.BATCH_SIZE
586
            if batch_size <= 0:
587
                batch_size = len(flow_vals)
588
589
            for i in range(0, len(flow_vals), batch_size):
590
                if i > 0:
591
                    await asyncio.sleep(settings.BATCH_INTERVAL)
592
                flows = flow_vals[i : i + batch_size]
593
                event = KytosEvent(
594
                    "kytos.flow_manager.flows.install",
595
                    content={
596
                        "dpid": dpid,
597
                        "force": True,
598
                        "flow_dict": {"flows": flows},
599
                    },
600
                )
601
                await self.controller.buffers.app.aput(event)
602