Passed
Push — master ( 63bf6a...8cf7d6 )
by Vinicius
08:14 queued 06:34
created

INTManager.load_uni_src_proxy_ports()   C

Complexity

Conditions 9

Size

Total Lines 31
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 9.3752

Importance

Changes 0
Metric Value
cc 9
eloc 26
nop 2
dl 0
loc 31
ccs 20
cts 24
cp 0.8333
crap 9.3752
rs 6.6666
c 0
b 0
f 0
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from napps.kytos.telemetry_int import utils
11 1
from napps.kytos.telemetry_int import settings
12 1
from kytos.core import log
13 1
from kytos.core.link import Link
14 1
import napps.kytos.telemetry_int.kytos_api_helper as api
15 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
16 1
from kytos.core.common import EntityStatus
17 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
18
19 1
from napps.kytos.telemetry_int.exceptions import (
20
    EVCError,
21
    EVCNotFound,
22
    EVCHasINT,
23
    EVCHasNoINT,
24
    FlowsNotFound,
25
    ProxyPortStatusNotUP,
26
    ProxyPortDestNotFound,
27
    ProxyPortNotFound,
28
    ProxyPortSameSourceIntraEVC,
29
)
30
31
32 1
class INTManager:
33
34
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
35
36 1
    def __init__(self, controller: Controller) -> None:
37
        """INTManager."""
38 1
        self.controller = controller
39 1
        self.flow_builder = FlowBuilder()
40 1
        self._topo_link_lock = asyncio.Lock()
41
42
        # Keep track between each uni intf id and its src intf id port
43 1
        self.unis_src: dict[str, str] = {}
44
        # Keep track between src intf id and its ProxyPort instance
45 1
        self.srcs_pp: dict[str, ProxyPort] = {}
46
47 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
48
        """Load UNI ids src ids and their ProxyPort instances."""
49 1
        for evc_id, evc in evcs.items():
50 1
            if not utils.has_int_enabled(evc):
51 1
                continue
52
53 1
            uni_a_id = evc["uni_a"]["interface_id"]
54 1
            uni_z_id = evc["uni_z"]["interface_id"]
55 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
56 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
57 1
            if uni_a and "proxy_port" in uni_a.metadata:
58 1
                src_a = uni_a.switch.get_interface_by_port_no(
59
                    uni_a.metadata["proxy_port"]
60
                )
61 1
                self.unis_src[uni_a.id] = src_a.id
62 1
                try:
63 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
64
                except ProxyPortDestNotFound:
65
                    pp = self.srcs_pp[src_a.id]
66 1
                pp.evc_ids.add(evc_id)
67
68 1
            if uni_z and "proxy_port" in uni_z.metadata:
69 1
                src_z = uni_z.switch.get_interface_by_port_no(
70
                    uni_z.metadata["proxy_port"]
71
                )
72 1
                self.unis_src[uni_z.id] = src_z.id
73 1
                try:
74 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
75
                except ProxyPortDestNotFound:
76
                    pp = self.srcs_pp[src_z.id]
77 1
                pp.evc_ids.add(evc_id)
78
79 1
    async def handle_pp_link_down(self, link: Link) -> None:
80
        """Handle proxy_port link_down."""
81 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
82
            return
83 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
84 1
        if not pp:
85
            pp = self.srcs_pp.get(link.endpoint_b.id)
86 1
        if not pp or not pp.evc_ids:
87
            return
88
89 1
        async with self._topo_link_lock:
90 1
            evcs = await api.get_evcs(
91
                **{
92
                    "metadata.telemetry.enabled": "true",
93
                    "metadata.telemetry.status": "UP",
94
                }
95
            )
96 1
            to_deactivate = {
97
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
98
            }
99 1
            if not to_deactivate:
100
                return
101
102 1
            log.info(
103
                f"Handling link_down {link}, removing INT flows falling back to "
104
                f"mef_eline, EVC ids: {list(to_deactivate)}"
105
            )
106 1
            metadata = {
107
                "telemetry": {
108
                    "enabled": True,
109
                    "status": "DOWN",
110
                    "status_reason": ["proxy_port_down"],
111
                    "status_updated_at": datetime.utcnow().strftime(
112
                        "%Y-%m-%dT%H:%M:%S"
113
                    ),
114
                }
115
            }
116 1
            await self.remove_int_flows(to_deactivate, metadata)
117
118 1
    async def handle_pp_link_up(self, link: Link) -> None:
119
        """Handle proxy_port link_up."""
120 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
121
            return
122 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
123 1
        if not pp:
124
            pp = self.srcs_pp.get(link.endpoint_b.id)
125 1
        if not pp or not pp.evc_ids:
126
            return
127
128
        # This sleep is to wait for at least a few seconds to ensure that the other
129
        # proxy port would also have been considered up since the request
130
        # on topology setting metadata might end up delaying, check out issue
131
        # https://github.com/kytos-ng/of_lldp/issues/100
132
        # TODO this will be optimized later on before releasing this NApp
133 1
        await asyncio.sleep(5)
134
135 1
        async with self._topo_link_lock:
136 1
            if link.status != EntityStatus.UP or link.status_reason:
137
                return
138 1
            evcs = await api.get_evcs(
139
                **{
140
                    "metadata.telemetry.enabled": "true",
141
                    "metadata.telemetry.status": "DOWN",
142
                }
143
            )
144
145 1
            to_install = {}
146 1
            for evc_id, evc in evcs.items():
147 1
                if any(
148
                    (
149
                        not evc["active"],
150
                        evc["archived"],
151
                        evc_id not in pp.evc_ids,
152
                        evc["uni_a"]["interface_id"] not in self.unis_src,
153
                        evc["uni_z"]["interface_id"] not in self.unis_src,
154
                    )
155
                ):
156
                    continue
157
158 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
159 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
160 1
                if (
161
                    src_a_id in self.srcs_pp
162
                    and src_z_id in self.srcs_pp
163
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
164
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
165
                ):
166 1
                    to_install[evc_id] = evc
167
168 1
            if not to_install:
169
                return
170
171 1
            try:
172 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
173
            except EVCError as exc:
174
                log.exception(exc)
175
                return
176
177 1
            log.info(
178
                f"Handling link_up {link}, deploying INT flows, "
179
                f"EVC ids: {list(to_install)}"
180
            )
181 1
            metadata = {
182
                "telemetry": {
183
                    "enabled": True,
184
                    "status": "UP",
185
                    "status_reason": [],
186
                    "status_updated_at": datetime.utcnow().strftime(
187
                        "%Y-%m-%dT%H:%M:%S"
188
                    ),
189
                }
190
            }
191 1
            try:
192 1
                await self.install_int_flows(to_install, metadata)
193
            except FlowsNotFound as exc:
194
                log.exception(f"FlowsNotFound {str(exc)}")
195
                return
196
197 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
198
        """Disable INT on EVCs.
199
200
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
201
202
        The force bool option, if True, will bypass the following:
203
204
        1 - EVC not found
205
        2 - EVC doesn't have INT
206
207
        """
208 1
        self._validate_disable_evcs(evcs, force)
209 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
210
211 1
        metadata = {
212
            "telemetry": {
213
                "enabled": False,
214
                "status": "DOWN",
215
                "status_reason": ["disabled"],
216
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
217
            }
218
        }
219 1
        await self.remove_int_flows(evcs, metadata, force=force)
220 1
        self._discard_pps_evc_ids(evcs)
221
222 1
    async def remove_int_flows(
223
        self, evcs: dict[str, dict], metadata: dict, force=False
224
    ) -> None:
225
        """Remove INT flows and set metadata on EVCs."""
226 1
        stored_flows = await api.get_stored_flows(
227
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
228
        )
229 1
        await asyncio.gather(
230
            self._remove_int_flows(stored_flows),
231
            api.add_evcs_metadata(evcs, metadata, force),
232
        )
233
234 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
235
        """Enable INT on EVCs.
236
237
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
238
239
        The force bool option, if True, will bypass the following:
240
241
        1 - EVC already has INT
242
        2 - ProxyPort isn't UP
243
        Other cases won't be bypassed since at the point it won't have the data needed.
244
245
        """
246 1
        evcs = self._validate_map_enable_evcs(evcs, force)
247 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
248
249 1
        metadata = {
250
            "telemetry": {
251
                "enabled": True,
252
                "status": "UP",
253
                "status_reason": [],
254
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
255
            }
256
        }
257 1
        await self.install_int_flows(evcs, metadata)
258 1
        self._add_pps_evc_ids(evcs)
259
260 1
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
261
        """Install INT flows and set metadata on EVCs."""
262 1
        stored_flows = self.flow_builder.build_int_flows(
263
            evcs,
264
            await utils.get_found_stored_flows(
265
                [
266
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
267
                    for evc_id in evcs
268
                ]
269
            ),
270
        )
271 1
        await asyncio.gather(
272
            self._install_int_flows(stored_flows),
273
            api.add_evcs_metadata(evcs, metadata),
274
        )
275
276 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
277
        """Return a ProxyPort assigned to a UNI or raise."""
278
279 1
        interface = self.controller.get_interface_by_id(intf_id)
280 1
        if not interface:
281 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
282
283 1
        if "proxy_port" not in interface.metadata:
284 1
            raise ProxyPortNotFound(
285
                evc_id, f"proxy_port metadata not found in {intf_id}"
286
            )
287
288 1
        source_intf = interface.switch.get_interface_by_port_no(
289
            interface.metadata.get("proxy_port")
290
        )
291 1
        if not source_intf:
292
            raise ProxyPortNotFound(
293
                evc_id,
294
                f"proxy_port of {intf_id} source interface not found",
295
            )
296
297 1
        pp = self.srcs_pp.get(source_intf.id)
298 1
        if not pp:
299 1
            pp = ProxyPort(self.controller, source_intf)
300 1
            self.srcs_pp[source_intf.id] = pp
301
302 1
        if not pp.destination:
303 1
            raise ProxyPortDestNotFound(
304
                evc_id,
305
                f"proxy_port of {intf_id} isn't looped or destination interface "
306
                "not found",
307
            )
308
309 1
        return pp
310
311 1
    def _validate_disable_evcs(
312
        self,
313
        evcs: dict[str, dict],
314
        force=False,
315
    ) -> None:
316
        """Validate disable EVCs."""
317 1
        for evc_id, evc in evcs.items():
318
            if not evc and not force:
319
                raise EVCNotFound(evc_id)
320
            if not utils.has_int_enabled(evc) and not force:
321
                raise EVCHasNoINT(evc_id)
322
323 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
324
        """Validate that an intra EVC is using different proxy ports.
325
326
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
327
        would ended up being overwritten since they'd be the same. Currently, an
328
        external loop will have unidirectional flows matching in the lower (source)
329
        port number.
330
        """
331 1
        pp_a = evc["uni_a"].get("proxy_port")
332 1
        pp_z = evc["uni_z"].get("proxy_port")
333 1
        if any(
334
            (
335
                not utils.is_intra_switch_evc(evc),
336
                pp_a is None,
337
                pp_z is None,
338
            )
339
        ):
340 1
            return
341 1
        if pp_a.source != pp_z.source:
342 1
            return
343
344 1
        raise ProxyPortSameSourceIntraEVC(
345
            evc["id"], "intra EVC UNIs must use different proxy ports"
346
        )
347
348 1
    def _validate_map_enable_evcs(
349
        self,
350
        evcs: dict[str, dict],
351
        force=False,
352
    ) -> dict[str, dict]:
353
        """Validate map enabling EVCs.
354
355
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
356
        so it can be reused later during provisioning.
357
358
        """
359 1
        for evc_id, evc in evcs.items():
360 1
            if not evc:
361
                raise EVCNotFound(evc_id)
362 1
            if utils.has_int_enabled(evc) and not force:
363
                raise EVCHasINT(evc_id)
364
365 1
            uni_a, uni_z = utils.get_evc_unis(evc)
366 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
367 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
368
369 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
370 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
371
372 1
            if pp_a.status != EntityStatus.UP and not force:
373
                dest_id = pp_a.destination.id if pp_a.destination else None
374
                dest_status = pp_a.status if pp_a.destination else None
375
                raise ProxyPortStatusNotUP(
376
                    evc_id,
377
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
378
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
379
                    f"destination {dest_id} status {dest_status}",
380
                )
381 1
            if pp_z.status != EntityStatus.UP and not force:
382
                dest_id = pp_z.destination.id if pp_z.destination else None
383
                dest_status = pp_z.status if pp_z.destination else None
384
                raise ProxyPortStatusNotUP(
385
                    evc_id,
386
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
387
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
388
                    f"destination {dest_id} status {dest_status}",
389
                )
390
391 1
            self._validate_intra_evc_different_proxy_ports(evc)
392 1
        return evcs
393
394 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
395
        """Add proxy ports evc_ids.
396
397
        This is meant to be called after an EVC is enabled.
398
        """
399 1
        for evc_id, evc in evcs.items():
400
            uni_a, uni_z = utils.get_evc_unis(evc)
401
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
402
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
403
            pp_a.evc_ids.add(evc_id)
404
            pp_z.evc_ids.add(evc_id)
405
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
406
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
407
408 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
409
        """Discard proxy port evc_ids.
410
411
        This is meant to be called when an EVC is disabled.
412
        """
413 1
        for evc_id, evc in evcs.items():
414
            uni_a, uni_z = utils.get_evc_unis(evc)
415
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
416
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
417
            pp_a.evc_ids.discard(evc_id)
418
            pp_z.evc_ids.discard(evc_id)
419
420 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
421
        """Delete int flows given a prefiltered stored_flows.
422
423
        Removal is driven by the stored flows instead of EVC ids and dpids to also
424
        be able to handle the force mode when an EVC no longer exists. It also follows
425
        the same pattern that mef_eline currently uses.
426
427
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
428
        for settings.BATCH_INTERVAL per batch iteration.
429
430
        """
431
        switch_flows = defaultdict(set)
432
        for flows in stored_flows.values():
433
            for flow in flows:
434
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
435
436
        for dpid, cookies in switch_flows.items():
437
            cookie_vals = list(cookies)
438
            batch_size = settings.BATCH_SIZE
439
            if batch_size <= 0:
440
                batch_size = len(cookie_vals)
441
442
            for i in range(0, len(cookie_vals), batch_size):
443
                if i > 0:
444
                    await asyncio.sleep(settings.BATCH_INTERVAL)
445
                flows = [
446
                    {
447
                        "cookie": cookie,
448
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
449
                        "table_id": Table.OFPTT_ALL.value,
450
                    }
451
                    for cookie in cookie_vals[i : i + batch_size]
452
                ]
453
                event = KytosEvent(
454
                    "kytos.flow_manager.flows.delete",
455
                    content={
456
                        "dpid": dpid,
457
                        "force": True,
458
                        "flow_dict": {"flows": flows},
459
                    },
460
                )
461
                await self.controller.buffers.app.aput(event)
462
463 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
464
        """Install INT flow mods.
465
466
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
467
        for settings.BATCH_INTERVAL per batch iteration.
468
        """
469
470 1
        switch_flows = defaultdict(list)
471 1
        for flows in stored_flows.values():
472
            for flow in flows:
473
                switch_flows[flow["switch"]].append(flow["flow"])
474
475 1
        for dpid, flows in switch_flows.items():
476
            flow_vals = list(flows)
477
            batch_size = settings.BATCH_SIZE
478
            if batch_size <= 0:
479
                batch_size = len(flow_vals)
480
481
            for i in range(0, len(flow_vals), batch_size):
482
                if i > 0:
483
                    await asyncio.sleep(settings.BATCH_INTERVAL)
484
                flows = flow_vals[i : i + batch_size]
485
                event = KytosEvent(
486
                    "kytos.flow_manager.flows.install",
487
                    content={
488
                        "dpid": dpid,
489
                        "force": True,
490
                        "flow_dict": {"flows": flows},
491
                    },
492
                )
493
                await self.controller.buffers.app.aput(event)
494