Passed
Pull Request — master (#71)
by Vinicius
07:39 queued 04:37
created

INTManager.handle_pp_metadata_removed()   B

Complexity

Conditions 6

Size

Total Lines 39
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 6.9157

Importance

Changes 0
Metric Value
cc 6
eloc 28
nop 2
dl 0
loc 39
ccs 12
cts 17
cp 0.7059
crap 6.9157
rs 8.2746
c 0
b 0
f 0
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import Interface
11 1
from napps.kytos.telemetry_int import utils
12 1
from napps.kytos.telemetry_int import settings
13 1
from kytos.core import log
14 1
from kytos.core.link import Link
15 1
import napps.kytos.telemetry_int.kytos_api_helper as api
16 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
17 1
from kytos.core.common import EntityStatus
18 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
19
20 1
from napps.kytos.telemetry_int.exceptions import (
21
    EVCError,
22
    EVCNotFound,
23
    EVCHasINT,
24
    EVCHasNoINT,
25
    FlowsNotFound,
26
    ProxyPortError,
27
    ProxyPortStatusNotUP,
28
    ProxyPortDestNotFound,
29
    ProxyPortNotFound,
30
    ProxyPortSameSourceIntraEVC,
31
)
32
33
34 1
class INTManager:
35
36
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
37
38 1
    def __init__(self, controller: Controller) -> None:
39
        """INTManager."""
40 1
        self.controller = controller
41 1
        self.flow_builder = FlowBuilder()
42 1
        self._topo_link_lock = asyncio.Lock()
43 1
        self._intf_meta_lock = asyncio.Lock()
44
45
        # Keep track between each uni intf id and its src intf id port
46 1
        self.unis_src: dict[str, str] = {}
47
        # Keep track between src intf id and its ProxyPort instance
48 1
        self.srcs_pp: dict[str, ProxyPort] = {}
49
50 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
51
        """Load UNI ids src ids and their ProxyPort instances."""
52 1
        for evc_id, evc in evcs.items():
53 1
            if not utils.has_int_enabled(evc):
54 1
                continue
55
56 1
            uni_a_id = evc["uni_a"]["interface_id"]
57 1
            uni_z_id = evc["uni_z"]["interface_id"]
58 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
59 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
60 1
            if uni_a and "proxy_port" in uni_a.metadata:
61 1
                src_a = uni_a.switch.get_interface_by_port_no(
62
                    uni_a.metadata["proxy_port"]
63
                )
64 1
                self.unis_src[uni_a.id] = src_a.id
65 1
                try:
66 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
67
                except ProxyPortDestNotFound:
68
                    pp = self.srcs_pp[src_a.id]
69 1
                pp.evc_ids.add(evc_id)
70
71 1
            if uni_z and "proxy_port" in uni_z.metadata:
72 1
                src_z = uni_z.switch.get_interface_by_port_no(
73
                    uni_z.metadata["proxy_port"]
74
                )
75 1
                self.unis_src[uni_z.id] = src_z.id
76 1
                try:
77 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
78
                except ProxyPortDestNotFound:
79
                    pp = self.srcs_pp[src_z.id]
80 1
                pp.evc_ids.add(evc_id)
81
82 1
    async def handle_pp_link_down(self, link: Link) -> None:
83
        """Handle proxy_port link_down."""
84 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
85
            return
86 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
87 1
        if not pp:
88
            pp = self.srcs_pp.get(link.endpoint_b.id)
89 1
        if not pp or not pp.evc_ids:
90
            return
91
92 1
        async with self._topo_link_lock:
93 1
            evcs = await api.get_evcs(
94
                **{
95
                    "metadata.telemetry.enabled": "true",
96
                    "metadata.telemetry.status": "UP",
97
                }
98
            )
99 1
            to_deactivate = {
100
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
101
            }
102 1
            if not to_deactivate:
103
                return
104
105 1
            log.info(
106
                f"Handling link_down {link}, removing INT flows falling back to "
107
                f"mef_eline, EVC ids: {list(to_deactivate)}"
108
            )
109 1
            metadata = {
110
                "telemetry": {
111
                    "enabled": True,
112
                    "status": "DOWN",
113
                    "status_reason": ["proxy_port_down"],
114
                    "status_updated_at": datetime.utcnow().strftime(
115
                        "%Y-%m-%dT%H:%M:%S"
116
                    ),
117
                }
118
            }
119 1
            await self.remove_int_flows(to_deactivate, metadata)
120
121 1
    async def handle_pp_link_up(self, link: Link) -> None:
122
        """Handle proxy_port link_up."""
123 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
124
            return
125 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
126 1
        if not pp:
127
            pp = self.srcs_pp.get(link.endpoint_b.id)
128 1
        if not pp or not pp.evc_ids:
129
            return
130
131
        # This sleep is to wait for at least a few seconds to ensure that the other
132
        # proxy port would also have been considered up since the request
133
        # on topology setting metadata might end up delaying, check out issue
134
        # https://github.com/kytos-ng/of_lldp/issues/100
135
        # TODO this will be optimized later on before releasing this NApp
136 1
        await asyncio.sleep(5)
137
138 1
        async with self._topo_link_lock:
139 1
            if link.status != EntityStatus.UP or link.status_reason:
140
                return
141 1
            evcs = await api.get_evcs(
142
                **{
143
                    "metadata.telemetry.enabled": "true",
144
                    "metadata.telemetry.status": "DOWN",
145
                }
146
            )
147
148 1
            to_install = {}
149 1
            for evc_id, evc in evcs.items():
150 1
                if any(
151
                    (
152
                        not evc["active"],
153
                        evc["archived"],
154
                        evc_id not in pp.evc_ids,
155
                        evc["uni_a"]["interface_id"] not in self.unis_src,
156
                        evc["uni_z"]["interface_id"] not in self.unis_src,
157
                    )
158
                ):
159
                    continue
160
161 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
162 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
163 1
                if (
164
                    src_a_id in self.srcs_pp
165
                    and src_z_id in self.srcs_pp
166
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
167
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
168
                ):
169 1
                    to_install[evc_id] = evc
170
171 1
            if not to_install:
172
                return
173
174 1
            try:
175 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
176
            except EVCError as exc:
177
                log.exception(exc)
178
                return
179
180 1
            log.info(
181
                f"Handling link_up {link}, deploying INT flows, "
182
                f"EVC ids: {list(to_install)}"
183
            )
184 1
            metadata = {
185
                "telemetry": {
186
                    "enabled": True,
187
                    "status": "UP",
188
                    "status_reason": [],
189
                    "status_updated_at": datetime.utcnow().strftime(
190
                        "%Y-%m-%dT%H:%M:%S"
191
                    ),
192
                }
193
            }
194 1
            try:
195 1
                await self.install_int_flows(to_install, metadata)
196
            except FlowsNotFound as exc:
197
                log.exception(f"FlowsNotFound {str(exc)}")
198
                return
199
200 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
201
        """Handle proxy port metadata removed."""
202 1
        if "proxy_port" in intf.metadata:
203
            return
204 1
        try:
205 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
206 1
            if not pp.evc_ids:
207
                return
208
        except KeyError:
209
            return
210
211 1
        async with self._intf_meta_lock:
212 1
            evcs = await api.get_evcs(
213
                **{
214
                    "metadata.telemetry.enabled": "true",
215
                    "metadata.telemetry.status": "UP",
216
                }
217
            )
218 1
            to_deactivate = {
219
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
220
            }
221 1
            if not to_deactivate:
222
                return
223
224 1
            log.info(
225
                f"Handling interface metadata removed on {intf}, removing INT flows "
226
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
227
            )
228 1
            metadata = {
229
                "telemetry": {
230
                    "enabled": True,
231
                    "status": "DOWN",
232
                    "status_reason": ["proxy_port_metadata_removed"],
233
                    "status_updated_at": datetime.utcnow().strftime(
234
                        "%Y-%m-%dT%H:%M:%S"
235
                    ),
236
                }
237
            }
238 1
            await self.remove_int_flows(to_deactivate, metadata)
239
240 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
241
        """Disable INT on EVCs.
242
243
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
244
245
        The force bool option, if True, will bypass the following:
246
247
        1 - EVC not found
248
        2 - EVC doesn't have INT
249
        3 - ProxyPortNotFound or ProxyPortDestNotFound
250
251
        """
252 1
        self._validate_disable_evcs(evcs, force)
253 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
254
255 1
        metadata = {
256
            "telemetry": {
257
                "enabled": False,
258
                "status": "DOWN",
259
                "status_reason": ["disabled"],
260
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
261
            }
262
        }
263 1
        await self.remove_int_flows(evcs, metadata, force=force)
264 1
        try:
265 1
            self._discard_pps_evc_ids(evcs)
266
        except ProxyPortError:
267
            if not force:
268
                raise
269
270 1
    async def remove_int_flows(
271
        self, evcs: dict[str, dict], metadata: dict, force=False
272
    ) -> None:
273
        """Remove INT flows and set metadata on EVCs."""
274 1
        stored_flows = await api.get_stored_flows(
275
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
276
        )
277 1
        await asyncio.gather(
278
            self._remove_int_flows(stored_flows),
279
            api.add_evcs_metadata(evcs, metadata, force),
280
        )
281
282 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
283
        """Enable INT on EVCs.
284
285
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
286
287
        The force bool option, if True, will bypass the following:
288
289
        1 - EVC already has INT
290
        2 - ProxyPort isn't UP
291
        Other cases won't be bypassed since at the point it won't have the data needed.
292
293
        """
294 1
        evcs = self._validate_map_enable_evcs(evcs, force)
295 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
296
297 1
        metadata = {
298
            "telemetry": {
299
                "enabled": True,
300
                "status": "UP",
301
                "status_reason": [],
302
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
303
            }
304
        }
305 1
        await self.install_int_flows(evcs, metadata)
306 1
        self._add_pps_evc_ids(evcs)
307
308 1
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
309
        """Install INT flows and set metadata on EVCs."""
310 1
        stored_flows = self.flow_builder.build_int_flows(
311
            evcs,
312
            await utils.get_found_stored_flows(
313
                [
314
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
315
                    for evc_id in evcs
316
                ]
317
            ),
318
        )
319 1
        await asyncio.gather(
320
            self._install_int_flows(stored_flows),
321
            api.add_evcs_metadata(evcs, metadata),
322
        )
323
324 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
325
        """Return a ProxyPort assigned to a UNI or raise."""
326
327 1
        interface = self.controller.get_interface_by_id(intf_id)
328 1
        if not interface:
329 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
330
331 1
        if "proxy_port" not in interface.metadata:
332 1
            raise ProxyPortNotFound(
333
                evc_id, f"proxy_port metadata not found in {intf_id}"
334
            )
335
336 1
        source_intf = interface.switch.get_interface_by_port_no(
337
            interface.metadata.get("proxy_port")
338
        )
339 1
        if not source_intf:
340
            raise ProxyPortNotFound(
341
                evc_id,
342
                f"proxy_port of {intf_id} source interface not found",
343
            )
344
345 1
        pp = self.srcs_pp.get(source_intf.id)
346 1
        if not pp:
347 1
            pp = ProxyPort(self.controller, source_intf)
348 1
            self.srcs_pp[source_intf.id] = pp
349
350 1
        if not pp.destination:
351 1
            raise ProxyPortDestNotFound(
352
                evc_id,
353
                f"proxy_port of {intf_id} isn't looped or destination interface "
354
                "not found",
355
            )
356
357 1
        return pp
358
359 1
    def _validate_disable_evcs(
360
        self,
361
        evcs: dict[str, dict],
362
        force=False,
363
    ) -> None:
364
        """Validate disable EVCs."""
365 1
        for evc_id, evc in evcs.items():
366
            if not evc and not force:
367
                raise EVCNotFound(evc_id)
368
            if not utils.has_int_enabled(evc) and not force:
369
                raise EVCHasNoINT(evc_id)
370
371 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
372
        """Validate that an intra EVC is using different proxy ports.
373
374
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
375
        would ended up being overwritten since they'd be the same. Currently, an
376
        external loop will have unidirectional flows matching in the lower (source)
377
        port number.
378
        """
379 1
        pp_a = evc["uni_a"].get("proxy_port")
380 1
        pp_z = evc["uni_z"].get("proxy_port")
381 1
        if any(
382
            (
383
                not utils.is_intra_switch_evc(evc),
384
                pp_a is None,
385
                pp_z is None,
386
            )
387
        ):
388 1
            return
389 1
        if pp_a.source != pp_z.source:
390 1
            return
391
392 1
        raise ProxyPortSameSourceIntraEVC(
393
            evc["id"], "intra EVC UNIs must use different proxy ports"
394
        )
395
396 1
    def _validate_map_enable_evcs(
397
        self,
398
        evcs: dict[str, dict],
399
        force=False,
400
    ) -> dict[str, dict]:
401
        """Validate map enabling EVCs.
402
403
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
404
        so it can be reused later during provisioning.
405
406
        """
407 1
        for evc_id, evc in evcs.items():
408 1
            if not evc:
409
                raise EVCNotFound(evc_id)
410 1
            if utils.has_int_enabled(evc) and not force:
411
                raise EVCHasINT(evc_id)
412
413 1
            uni_a, uni_z = utils.get_evc_unis(evc)
414 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
415 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
416
417 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
418 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
419
420 1
            if pp_a.status != EntityStatus.UP and not force:
421
                dest_id = pp_a.destination.id if pp_a.destination else None
422
                dest_status = pp_a.status if pp_a.destination else None
423
                raise ProxyPortStatusNotUP(
424
                    evc_id,
425
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
426
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
427
                    f"destination {dest_id} status {dest_status}",
428
                )
429 1
            if pp_z.status != EntityStatus.UP and not force:
430
                dest_id = pp_z.destination.id if pp_z.destination else None
431
                dest_status = pp_z.status if pp_z.destination else None
432
                raise ProxyPortStatusNotUP(
433
                    evc_id,
434
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
435
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
436
                    f"destination {dest_id} status {dest_status}",
437
                )
438
439 1
            self._validate_intra_evc_different_proxy_ports(evc)
440 1
        return evcs
441
442 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
443
        """Add proxy ports evc_ids.
444
445
        This is meant to be called after an EVC is enabled.
446
        """
447 1
        for evc_id, evc in evcs.items():
448
            uni_a, uni_z = utils.get_evc_unis(evc)
449
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
450
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
451
            pp_a.evc_ids.add(evc_id)
452
            pp_z.evc_ids.add(evc_id)
453
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
454
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
455
456 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
457
        """Discard proxy port evc_ids.
458
459
        This is meant to be called when an EVC is disabled.
460
        """
461 1
        for evc_id, evc in evcs.items():
462
            uni_a, uni_z = utils.get_evc_unis(evc)
463
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
464
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
465
            pp_a.evc_ids.discard(evc_id)
466
            pp_z.evc_ids.discard(evc_id)
467
468 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
469
        """Delete int flows given a prefiltered stored_flows.
470
471
        Removal is driven by the stored flows instead of EVC ids and dpids to also
472
        be able to handle the force mode when an EVC no longer exists. It also follows
473
        the same pattern that mef_eline currently uses.
474
475
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
476
        for settings.BATCH_INTERVAL per batch iteration.
477
478
        """
479
        switch_flows = defaultdict(set)
480
        for flows in stored_flows.values():
481
            for flow in flows:
482
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
483
484
        for dpid, cookies in switch_flows.items():
485
            cookie_vals = list(cookies)
486
            batch_size = settings.BATCH_SIZE
487
            if batch_size <= 0:
488
                batch_size = len(cookie_vals)
489
490
            for i in range(0, len(cookie_vals), batch_size):
491
                if i > 0:
492
                    await asyncio.sleep(settings.BATCH_INTERVAL)
493
                flows = [
494
                    {
495
                        "cookie": cookie,
496
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
497
                        "table_id": Table.OFPTT_ALL.value,
498
                    }
499
                    for cookie in cookie_vals[i : i + batch_size]
500
                ]
501
                event = KytosEvent(
502
                    "kytos.flow_manager.flows.delete",
503
                    content={
504
                        "dpid": dpid,
505
                        "force": True,
506
                        "flow_dict": {"flows": flows},
507
                    },
508
                )
509
                await self.controller.buffers.app.aput(event)
510
511 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
512
        """Install INT flow mods.
513
514
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
515
        for settings.BATCH_INTERVAL per batch iteration.
516
        """
517
518 1
        switch_flows = defaultdict(list)
519 1
        for flows in stored_flows.values():
520
            for flow in flows:
521
                switch_flows[flow["switch"]].append(flow["flow"])
522
523 1
        for dpid, flows in switch_flows.items():
524
            flow_vals = list(flows)
525
            batch_size = settings.BATCH_SIZE
526
            if batch_size <= 0:
527
                batch_size = len(flow_vals)
528
529
            for i in range(0, len(flow_vals), batch_size):
530
                if i > 0:
531
                    await asyncio.sleep(settings.BATCH_INTERVAL)
532
                flows = flow_vals[i : i + batch_size]
533
                event = KytosEvent(
534
                    "kytos.flow_manager.flows.install",
535
                    content={
536
                        "dpid": dpid,
537
                        "force": True,
538
                        "flow_dict": {"flows": flows},
539
                    },
540
                )
541
                await self.controller.buffers.app.aput(event)
542