Passed
Push — master ( 442faf...fe62f4 )
by Vinicius
08:31 queued 06:57
created

build.managers.int.INTManager._install_int_flows()   B

Complexity

Conditions 7

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 27.6718

Importance

Changes 0
Metric Value
cc 7
eloc 21
nop 2
dl 0
loc 31
ccs 4
cts 16
cp 0.25
crap 27.6718
rs 7.9759
c 0
b 0
f 0
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import Interface
11 1
from napps.kytos.telemetry_int import utils
12 1
from napps.kytos.telemetry_int import settings
13 1
from kytos.core import log
14 1
from kytos.core.link import Link
15 1
import napps.kytos.telemetry_int.kytos_api_helper as api
16 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
17 1
from kytos.core.common import EntityStatus
18 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
19
20 1
from napps.kytos.telemetry_int.exceptions import (
21
    EVCError,
22
    EVCNotFound,
23
    EVCHasINT,
24
    EVCHasNoINT,
25
    FlowsNotFound,
26
    ProxyPortError,
27
    ProxyPortStatusNotUP,
28
    ProxyPortDestNotFound,
29
    ProxyPortNotFound,
30
    ProxyPortSameSourceIntraEVC,
31
)
32
33
34 1
class INTManager:
35
36
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
37
38 1
    def __init__(self, controller: Controller) -> None:
39
        """INTManager."""
40 1
        self.controller = controller
41 1
        self.flow_builder = FlowBuilder()
42 1
        self._topo_link_lock = asyncio.Lock()
43 1
        self._intf_meta_lock = asyncio.Lock()
44
45
        # Keep track between each uni intf id and its src intf id port
46 1
        self.unis_src: dict[str, str] = {}
47
        # Keep track between src intf id and its ProxyPort instance
48 1
        self.srcs_pp: dict[str, ProxyPort] = {}
49
50 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
51
        """Load UNI ids src ids and their ProxyPort instances."""
52 1
        for evc_id, evc in evcs.items():
53 1
            if not utils.has_int_enabled(evc):
54 1
                continue
55
56 1
            uni_a_id = evc["uni_a"]["interface_id"]
57 1
            uni_z_id = evc["uni_z"]["interface_id"]
58 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
59 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
60 1
            if uni_a and "proxy_port" in uni_a.metadata:
61 1
                src_a = uni_a.switch.get_interface_by_port_no(
62
                    uni_a.metadata["proxy_port"]
63
                )
64 1
                self.unis_src[uni_a.id] = src_a.id
65 1
                try:
66 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
67
                except ProxyPortDestNotFound:
68
                    pp = self.srcs_pp[src_a.id]
69 1
                pp.evc_ids.add(evc_id)
70
71 1
            if uni_z and "proxy_port" in uni_z.metadata:
72 1
                src_z = uni_z.switch.get_interface_by_port_no(
73
                    uni_z.metadata["proxy_port"]
74
                )
75 1
                self.unis_src[uni_z.id] = src_z.id
76 1
                try:
77 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
78
                except ProxyPortDestNotFound:
79
                    pp = self.srcs_pp[src_z.id]
80 1
                pp.evc_ids.add(evc_id)
81
82 1
    async def handle_pp_link_down(self, link: Link) -> None:
83
        """Handle proxy_port link_down."""
84 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
85
            return
86 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
87 1
        if not pp:
88
            pp = self.srcs_pp.get(link.endpoint_b.id)
89 1
        if not pp or not pp.evc_ids:
90
            return
91
92 1
        async with self._topo_link_lock:
93 1
            evcs = await api.get_evcs(
94
                **{
95
                    "metadata.telemetry.enabled": "true",
96
                    "metadata.telemetry.status": "UP",
97
                }
98
            )
99 1
            to_deactivate = {
100
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
101
            }
102 1
            if not to_deactivate:
103
                return
104
105 1
            log.info(
106
                f"Handling link_down {link}, removing INT flows falling back to "
107
                f"mef_eline, EVC ids: {list(to_deactivate)}"
108
            )
109 1
            metadata = {
110
                "telemetry": {
111
                    "enabled": True,
112
                    "status": "DOWN",
113
                    "status_reason": ["proxy_port_down"],
114
                    "status_updated_at": datetime.utcnow().strftime(
115
                        "%Y-%m-%dT%H:%M:%S"
116
                    ),
117
                }
118
            }
119 1
            await self.remove_int_flows(to_deactivate, metadata)
120
121 1
    async def handle_pp_link_up(self, link: Link) -> None:
122
        """Handle proxy_port link_up."""
123 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
124
            return
125 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
126 1
        if not pp:
127
            pp = self.srcs_pp.get(link.endpoint_b.id)
128 1
        if not pp or not pp.evc_ids:
129
            return
130
131
        # This sleep is to wait for at least a few seconds to ensure that the other
132
        # proxy port would also have been considered up since the request
133
        # on topology setting metadata might end up delaying, check out issue
134
        # https://github.com/kytos-ng/of_lldp/issues/100
135
        # TODO this will be optimized later on before releasing this NApp
136 1
        await asyncio.sleep(5)
137
138 1
        async with self._topo_link_lock:
139 1
            if link.status != EntityStatus.UP or link.status_reason:
140
                return
141 1
            evcs = await api.get_evcs(
142
                **{
143
                    "metadata.telemetry.enabled": "true",
144
                    "metadata.telemetry.status": "DOWN",
145
                }
146
            )
147
148 1
            to_install = {}
149 1
            for evc_id, evc in evcs.items():
150 1
                if any(
151
                    (
152
                        not evc["active"],
153
                        evc["archived"],
154
                        evc_id not in pp.evc_ids,
155
                        evc["uni_a"]["interface_id"] not in self.unis_src,
156
                        evc["uni_z"]["interface_id"] not in self.unis_src,
157
                    )
158
                ):
159
                    continue
160
161 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
162 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
163 1
                if (
164
                    src_a_id in self.srcs_pp
165
                    and src_z_id in self.srcs_pp
166
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
167
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
168
                ):
169 1
                    to_install[evc_id] = evc
170
171 1
            if not to_install:
172
                return
173
174 1
            try:
175 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
176
            except EVCError as exc:
177
                log.exception(exc)
178
                return
179
180 1
            log.info(
181
                f"Handling link_up {link}, deploying INT flows, "
182
                f"EVC ids: {list(to_install)}"
183
            )
184 1
            metadata = {
185
                "telemetry": {
186
                    "enabled": True,
187
                    "status": "UP",
188
                    "status_reason": [],
189
                    "status_updated_at": datetime.utcnow().strftime(
190
                        "%Y-%m-%dT%H:%M:%S"
191
                    ),
192
                }
193
            }
194 1
            try:
195 1
                await self.install_int_flows(to_install, metadata)
196
            except FlowsNotFound as exc:
197
                log.exception(f"FlowsNotFound {str(exc)}")
198
                return
199
200 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
201
        """Handle proxy port metadata removed."""
202 1
        if "proxy_port" in intf.metadata:
203
            return
204 1
        try:
205 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
206 1
            if not pp.evc_ids:
207
                return
208
        except KeyError:
209
            return
210
211 1
        async with self._intf_meta_lock:
212 1
            evcs = await api.get_evcs(
213
                **{
214
                    "metadata.telemetry.enabled": "true",
215
                    "metadata.telemetry.status": "UP",
216
                }
217
            )
218 1
            to_deactivate = {
219
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
220
            }
221 1
            if not to_deactivate:
222
                return
223
224 1
            log.info(
225
                f"Handling interface metadata removed on {intf}, removing INT flows "
226
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
227
            )
228 1
            metadata = {
229
                "telemetry": {
230
                    "enabled": True,
231
                    "status": "DOWN",
232
                    "status_reason": ["proxy_port_metadata_removed"],
233
                    "status_updated_at": datetime.utcnow().strftime(
234
                        "%Y-%m-%dT%H:%M:%S"
235
                    ),
236
                }
237
            }
238 1
            await self.remove_int_flows(to_deactivate, metadata)
239
240 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
241
        """Disable INT on EVCs.
242
243
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
244
245
        The force bool option, if True, will bypass the following:
246
247
        1 - EVC not found
248
        2 - EVC doesn't have INT
249
        3 - ProxyPortNotFound or ProxyPortDestNotFound
250
251
        """
252 1
        self._validate_disable_evcs(evcs, force)
253 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
254
255 1
        metadata = {
256
            "telemetry": {
257
                "enabled": False,
258
                "status": "DOWN",
259
                "status_reason": ["disabled"],
260
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
261
            }
262
        }
263 1
        await self.remove_int_flows(evcs, metadata, force=force)
264 1
        try:
265 1
            self._discard_pps_evc_ids(evcs)
266
        except ProxyPortError:
267
            if not force:
268
                raise
269
270 1
    async def remove_int_flows(
271
        self, evcs: dict[str, dict], metadata: dict, force=False
272
    ) -> None:
273
        """Remove INT flows and set metadata on EVCs."""
274 1
        stored_flows = await api.get_stored_flows(
275
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
276
        )
277 1
        await asyncio.gather(
278
            self._remove_int_flows(stored_flows),
279
            api.add_evcs_metadata(evcs, metadata, force),
280
        )
281
282 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
283
        """Enable INT on EVCs.
284
285
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
286
287
        The force bool option, if True, will bypass the following:
288
289
        1 - EVC already has INT
290
        2 - ProxyPort isn't UP
291
        Other cases won't be bypassed since at the point it won't have the data needed.
292
293
        """
294 1
        evcs = self._validate_map_enable_evcs(evcs, force)
295 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
296
297 1
        metadata = {
298
            "telemetry": {
299
                "enabled": True,
300
                "status": "UP",
301
                "status_reason": [],
302
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
303
            }
304
        }
305 1
        await self.install_int_flows(evcs, metadata)
306 1
        self._add_pps_evc_ids(evcs)
307
308 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
309
        """Redeploy INT on EVCs. It'll only delete and install the flows again.
310
311
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
312
        """
313 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
314 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
315
316 1
        stored_flows = await api.get_stored_flows(
317
            [
318
                utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)
319
                for evc_id in evcs
320
            ]
321
        )
322 1
        await self._remove_int_flows(stored_flows)
323
324 1
        found_stored_flows = self.flow_builder.build_int_flows(
325
            evcs,
326
            await utils.get_found_stored_flows(
327
                [
328
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
329
                    for evc_id in evcs
330
                ]
331
            ),
332
        )
333 1
        await self._install_int_flows(found_stored_flows)
334
335 1
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
336
        """Install INT flows and set metadata on EVCs."""
337 1
        stored_flows = self.flow_builder.build_int_flows(
338
            evcs,
339
            await utils.get_found_stored_flows(
340
                [
341
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
342
                    for evc_id in evcs
343
                ]
344
            ),
345
        )
346 1
        await asyncio.gather(
347
            self._install_int_flows(stored_flows),
348
            api.add_evcs_metadata(evcs, metadata),
349
        )
350
351 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
352
        """Return a ProxyPort assigned to a UNI or raise."""
353
354 1
        interface = self.controller.get_interface_by_id(intf_id)
355 1
        if not interface:
356 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
357
358 1
        if "proxy_port" not in interface.metadata:
359 1
            raise ProxyPortNotFound(
360
                evc_id, f"proxy_port metadata not found in {intf_id}"
361
            )
362
363 1
        source_intf = interface.switch.get_interface_by_port_no(
364
            interface.metadata.get("proxy_port")
365
        )
366 1
        if not source_intf:
367
            raise ProxyPortNotFound(
368
                evc_id,
369
                f"proxy_port of {intf_id} source interface not found",
370
            )
371
372 1
        pp = self.srcs_pp.get(source_intf.id)
373 1
        if not pp:
374 1
            pp = ProxyPort(self.controller, source_intf)
375 1
            self.srcs_pp[source_intf.id] = pp
376
377 1
        if not pp.destination:
378 1
            raise ProxyPortDestNotFound(
379
                evc_id,
380
                f"proxy_port of {intf_id} isn't looped or destination interface "
381
                "not found",
382
            )
383
384 1
        return pp
385
386 1
    def _validate_disable_evcs(
387
        self,
388
        evcs: dict[str, dict],
389
        force=False,
390
    ) -> None:
391
        """Validate disable EVCs."""
392 1
        for evc_id, evc in evcs.items():
393
            if not evc and not force:
394
                raise EVCNotFound(evc_id)
395
            if not utils.has_int_enabled(evc) and not force:
396
                raise EVCHasNoINT(evc_id)
397
398 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
399
        """Validate that an intra EVC is using different proxy ports.
400
401
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
402
        would ended up being overwritten since they'd be the same. Currently, an
403
        external loop will have unidirectional flows matching in the lower (source)
404
        port number.
405
        """
406 1
        pp_a = evc["uni_a"].get("proxy_port")
407 1
        pp_z = evc["uni_z"].get("proxy_port")
408 1
        if any(
409
            (
410
                not utils.is_intra_switch_evc(evc),
411
                pp_a is None,
412
                pp_z is None,
413
            )
414
        ):
415 1
            return
416 1
        if pp_a.source != pp_z.source:
417 1
            return
418
419 1
        raise ProxyPortSameSourceIntraEVC(
420
            evc["id"], "intra EVC UNIs must use different proxy ports"
421
        )
422
423 1
    def _validate_map_enable_evcs(
424
        self,
425
        evcs: dict[str, dict],
426
        force=False,
427
    ) -> dict[str, dict]:
428
        """Validate map enabling EVCs.
429
430
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
431
        so it can be reused later during provisioning.
432
433
        """
434 1
        for evc_id, evc in evcs.items():
435 1
            if not evc:
436
                raise EVCNotFound(evc_id)
437 1
            if utils.has_int_enabled(evc) and not force:
438
                raise EVCHasINT(evc_id)
439
440 1
            uni_a, uni_z = utils.get_evc_unis(evc)
441 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
442 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
443
444 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
445 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
446
447 1
            if pp_a.status != EntityStatus.UP and not force:
448
                dest_id = pp_a.destination.id if pp_a.destination else None
449
                dest_status = pp_a.status if pp_a.destination else None
450
                raise ProxyPortStatusNotUP(
451
                    evc_id,
452
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
453
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
454
                    f"destination {dest_id} status {dest_status}",
455
                )
456 1
            if pp_z.status != EntityStatus.UP and not force:
457
                dest_id = pp_z.destination.id if pp_z.destination else None
458
                dest_status = pp_z.status if pp_z.destination else None
459
                raise ProxyPortStatusNotUP(
460
                    evc_id,
461
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
462
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
463
                    f"destination {dest_id} status {dest_status}",
464
                )
465
466 1
            self._validate_intra_evc_different_proxy_ports(evc)
467 1
        return evcs
468
469 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
470
        """Add proxy ports evc_ids.
471
472
        This is meant to be called after an EVC is enabled.
473
        """
474 1
        for evc_id, evc in evcs.items():
475
            uni_a, uni_z = utils.get_evc_unis(evc)
476
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
477
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
478
            pp_a.evc_ids.add(evc_id)
479
            pp_z.evc_ids.add(evc_id)
480
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
481
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
482
483 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
484
        """Discard proxy port evc_ids.
485
486
        This is meant to be called when an EVC is disabled.
487
        """
488 1
        for evc_id, evc in evcs.items():
489
            uni_a, uni_z = utils.get_evc_unis(evc)
490
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
491
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
492
            pp_a.evc_ids.discard(evc_id)
493
            pp_z.evc_ids.discard(evc_id)
494
495 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
496
        """Delete int flows given a prefiltered stored_flows.
497
498
        Removal is driven by the stored flows instead of EVC ids and dpids to also
499
        be able to handle the force mode when an EVC no longer exists. It also follows
500
        the same pattern that mef_eline currently uses.
501
502
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
503
        for settings.BATCH_INTERVAL per batch iteration.
504
505
        """
506
        switch_flows = defaultdict(set)
507
        for flows in stored_flows.values():
508
            for flow in flows:
509
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
510
511
        for dpid, cookies in switch_flows.items():
512
            cookie_vals = list(cookies)
513
            batch_size = settings.BATCH_SIZE
514
            if batch_size <= 0:
515
                batch_size = len(cookie_vals)
516
517
            for i in range(0, len(cookie_vals), batch_size):
518
                if i > 0:
519
                    await asyncio.sleep(settings.BATCH_INTERVAL)
520
                flows = [
521
                    {
522
                        "cookie": cookie,
523
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
524
                        "table_id": Table.OFPTT_ALL.value,
525
                    }
526
                    for cookie in cookie_vals[i : i + batch_size]
527
                ]
528
                event = KytosEvent(
529
                    "kytos.flow_manager.flows.delete",
530
                    content={
531
                        "dpid": dpid,
532
                        "force": True,
533
                        "flow_dict": {"flows": flows},
534
                    },
535
                )
536
                await self.controller.buffers.app.aput(event)
537
538 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
539
        """Install INT flow mods.
540
541
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
542
        for settings.BATCH_INTERVAL per batch iteration.
543
        """
544
545 1
        switch_flows = defaultdict(list)
546 1
        for flows in stored_flows.values():
547
            for flow in flows:
548
                switch_flows[flow["switch"]].append(flow["flow"])
549
550 1
        for dpid, flows in switch_flows.items():
551
            flow_vals = list(flows)
552
            batch_size = settings.BATCH_SIZE
553
            if batch_size <= 0:
554
                batch_size = len(flow_vals)
555
556
            for i in range(0, len(flow_vals), batch_size):
557
                if i > 0:
558
                    await asyncio.sleep(settings.BATCH_INTERVAL)
559
                flows = flow_vals[i : i + batch_size]
560
                event = KytosEvent(
561
                    "kytos.flow_manager.flows.install",
562
                    content={
563
                        "dpid": dpid,
564
                        "force": True,
565
                        "flow_dict": {"flows": flows},
566
                    },
567
                )
568
                await self.controller.buffers.app.aput(event)
569