Passed
Pull Request — master (#117)
by Vinicius
03:35
created

build.managers.int.INTManager.load_uni_src_proxy_ports()   C

Complexity

Conditions 9

Size

Total Lines 31
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 9.3752

Importance

Changes 0
Metric Value
cc 9
eloc 26
nop 2
dl 0
loc 31
ccs 20
cts 24
cp 0.8333
crap 9.3752
rs 6.6666
c 0
b 0
f 0
1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
from collections import defaultdict
6 1
from datetime import datetime
7 1
from typing import Literal
8
9 1
from pyof.v0x04.controller2switch.table_mod import Table
10
11 1
from kytos.core.controller import Controller
12 1
from kytos.core.events import KytosEvent
13 1
from kytos.core.interface import Interface
14 1
from napps.kytos.telemetry_int import utils
15 1
from napps.kytos.telemetry_int import settings
16 1
from kytos.core import log
17 1
from kytos.core.link import Link
18 1
import napps.kytos.telemetry_int.kytos_api_helper as api
19 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
20 1
from kytos.core.common import EntityStatus
21 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
22
23 1
from napps.kytos.telemetry_int.exceptions import (
24
    EVCError,
25
    EVCNotFound,
26
    EVCHasINT,
27
    EVCHasNoINT,
28
    FlowsNotFound,
29
    ProxyPortError,
30
    ProxyPortStatusNotUP,
31
    ProxyPortDestNotFound,
32
    ProxyPortNotFound,
33
    ProxyPortSameSourceIntraEVC,
34
    ProxyPortShared,
35
)
36
37
38 1
class INTManager:
39
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
40
41 1
    def __init__(self, controller: Controller) -> None:
42
        """INTManager."""
43 1
        self.controller = controller
44 1
        self.flow_builder = FlowBuilder()
45 1
        self._topo_link_lock = asyncio.Lock()
46 1
        self._intf_meta_lock = asyncio.Lock()
47
48
        # Keep track between each uni intf id and its src intf id port
49 1
        self.unis_src: dict[str, str] = {}
50
        # Keep track between src intf id and its ProxyPort instance
51 1
        self.srcs_pp: dict[str, ProxyPort] = {}
52
53 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
54
        """Load UNI ids src ids and their ProxyPort instances."""
55 1
        for evc_id, evc in evcs.items():
56 1
            if not utils.has_int_enabled(evc):
57 1
                continue
58
59 1
            uni_a_id = evc["uni_a"]["interface_id"]
60 1
            uni_z_id = evc["uni_z"]["interface_id"]
61 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
62 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
63 1
            if uni_a and "proxy_port" in uni_a.metadata:
64 1
                src_a = uni_a.switch.get_interface_by_port_no(
65
                    uni_a.metadata["proxy_port"]
66
                )
67 1
                self.unis_src[uni_a.id] = src_a.id
68 1
                try:
69 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
70
                except ProxyPortDestNotFound:
71
                    pp = self.srcs_pp[src_a.id]
72 1
                pp.evc_ids.add(evc_id)
73
74 1
            if uni_z and "proxy_port" in uni_z.metadata:
75 1
                src_z = uni_z.switch.get_interface_by_port_no(
76
                    uni_z.metadata["proxy_port"]
77
                )
78 1
                self.unis_src[uni_z.id] = src_z.id
79 1
                try:
80 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
81
                except ProxyPortDestNotFound:
82
                    pp = self.srcs_pp[src_z.id]
83 1
                pp.evc_ids.add(evc_id)
84
85 1
    async def handle_pp_link_down(self, link: Link) -> None:
86
        """Handle proxy_port link_down."""
87 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
88
            return
89 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
90 1
        if not pp:
91
            pp = self.srcs_pp.get(link.endpoint_b.id)
92 1
        if not pp or not pp.evc_ids:
93
            return
94
95 1
        async with self._topo_link_lock:
96 1
            evcs = await api.get_evcs(
97
                **{
98
                    "metadata.telemetry.enabled": "true",
99
                    "metadata.telemetry.status": "UP",
100
                }
101
            )
102 1
            to_deactivate = {
103
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
104
            }
105 1
            if not to_deactivate:
106
                return
107
108 1
            log.info(
109
                f"Handling link_down {link}, removing INT flows falling back to "
110
                f"mef_eline, EVC ids: {list(to_deactivate)}"
111
            )
112 1
            metadata = {
113
                "telemetry": {
114
                    "enabled": True,
115
                    "status": "DOWN",
116
                    "status_reason": ["proxy_port_down"],
117
                    "status_updated_at": datetime.utcnow().strftime(
118
                        "%Y-%m-%dT%H:%M:%S"
119
                    ),
120
                }
121
            }
122 1
            await self.remove_int_flows(to_deactivate, metadata)
123
124 1
    async def handle_pp_link_up(self, link: Link) -> None:
125
        """Handle proxy_port link_up."""
126 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
127
            return
128 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
129 1
        if not pp:
130
            pp = self.srcs_pp.get(link.endpoint_b.id)
131 1
        if not pp or not pp.evc_ids:
132
            return
133
134 1
        async with self._topo_link_lock:
135 1
            if link.status != EntityStatus.UP or link.status_reason:
136
                return
137 1
            evcs = await api.get_evcs(
138
                **{
139
                    "metadata.telemetry.enabled": "true",
140
                    "metadata.telemetry.status": "DOWN",
141
                }
142
            )
143
144 1
            to_install = {}
145 1
            for evc_id, evc in evcs.items():
146 1
                if any(
147
                    (
148
                        not evc["active"],
149
                        evc["archived"],
150
                        evc_id not in pp.evc_ids,
151
                        evc["uni_a"]["interface_id"] not in self.unis_src,
152
                        evc["uni_z"]["interface_id"] not in self.unis_src,
153
                    )
154
                ):
155
                    continue
156
157 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
158 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
159 1
                if (
160
                    src_a_id in self.srcs_pp
161
                    and src_z_id in self.srcs_pp
162
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
163
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
164
                ):
165 1
                    to_install[evc_id] = evc
166
167 1
            if not to_install:
168
                return
169
170 1
            try:
171 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
172
            except EVCError as exc:
173
                log.exception(exc)
174
                return
175
176 1
            log.info(
177
                f"Handling link_up {link}, deploying INT flows, "
178
                f"EVC ids: {list(to_install)}"
179
            )
180 1
            metadata = {
181
                "telemetry": {
182
                    "enabled": True,
183
                    "status": "UP",
184
                    "status_reason": [],
185
                    "status_updated_at": datetime.utcnow().strftime(
186
                        "%Y-%m-%dT%H:%M:%S"
187
                    ),
188
                }
189
            }
190 1
            try:
191 1
                await self.install_int_flows(to_install, metadata)
192
            except FlowsNotFound as exc:
193
                log.exception(f"FlowsNotFound {str(exc)}")
194
                return
195
196 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
197
        """Handle proxy port metadata removed."""
198 1
        if "proxy_port" in intf.metadata:
199
            return
200 1
        try:
201 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
202 1
            if not pp.evc_ids:
203
                return
204
        except KeyError:
205
            return
206
207 1
        async with self._intf_meta_lock:
208 1
            evcs = await api.get_evcs(
209
                **{
210
                    "metadata.telemetry.enabled": "true",
211
                    "metadata.telemetry.status": "UP",
212
                }
213
            )
214 1
            to_deactivate = {
215
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
216
            }
217 1
            if not to_deactivate:
218
                return
219
220 1
            log.info(
221
                f"Handling interface metadata removed on {intf}, removing INT flows "
222
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
223
            )
224 1
            metadata = {
225
                "telemetry": {
226
                    "enabled": True,
227
                    "status": "DOWN",
228
                    "status_reason": ["proxy_port_metadata_removed"],
229
                    "status_updated_at": datetime.utcnow().strftime(
230
                        "%Y-%m-%dT%H:%M:%S"
231
                    ),
232
                }
233
            }
234 1
            await self.remove_int_flows(to_deactivate, metadata)
235
236 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
237
        """Handle proxy port metadata added.
238
239
        If an existing ProxyPort gets its proxy_port meadata updated
240
        and has associated EVCs then it'll remove and install the flows accordingly.
241
242
        """
243 1
        if "proxy_port" not in intf.metadata:
244
            return
245 1
        try:
246 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
247 1
            if not pp.evc_ids:
248
                return
249
        except KeyError:
250
            return
251
252 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
253
            intf.metadata.get("proxy_port")
254
        )
255 1
        if cur_source_intf == pp.source:
256 1
            return
257
258 1
        async with self._intf_meta_lock:
259 1
            pp.source = cur_source_intf
260
261 1
            evcs = await api.get_evcs(
262
                **{
263
                    "metadata.telemetry.enabled": "true",
264
                }
265
            )
266 1
            affected_evcs = {
267
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
268
            }
269 1
            if not affected_evcs:
270 1
                return
271
272 1
            log.info(
273
                f"Handling interface metadata updated on {intf}. It'll disable the "
274
                "EVCs to be safe, and then try to enable again with the updated "
275
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
276
            )
277 1
            await self.disable_int(
278
                affected_evcs, force=True, reason="proxy_port_metadata_added"
279
            )
280 1
            try:
281 1
                await self.enable_int(affected_evcs, force=True)
282 1
            except (ProxyPortSameSourceIntraEVC, ProxyPortShared) as exc:
283 1
                msg = (
284
                    f"Validation error when updating interface {intf}"
285
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
286
                )
287 1
                log.error(msg)
288 1
                metadata = {
289
                    "telemetry": {
290
                        "enabled": False,
291
                        "status": "DOWN",
292
                        "status_reason": ["proxy_port_shared"],
293
                        "status_updated_at": datetime.utcnow().strftime(
294
                            "%Y-%m-%dT%H:%M:%S"
295
                        ),
296
                    }
297
                }
298 1
                await api.add_evcs_metadata(affected_evcs, metadata)
299
300 1
    async def disable_int(
301
        self, evcs: dict[str, dict], force=False, reason="disabled"
302
    ) -> None:
303
        """Disable INT on EVCs.
304
305
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
306
307
        The force bool option, if True, will bypass the following:
308
309
        1 - EVC not found
310
        2 - EVC doesn't have INT
311
        3 - ProxyPortNotFound or ProxyPortDestNotFound
312
313
        """
314 1
        self._validate_disable_evcs(evcs, force)
315 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
316
317 1
        metadata = {
318
            "telemetry": {
319
                "enabled": False,
320
                "status": "DOWN",
321
                "status_reason": [reason],
322
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
323
            }
324
        }
325 1
        await self.remove_int_flows(evcs, metadata, force=force)
326 1
        try:
327 1
            self._discard_pps_evc_ids(evcs)
328
        except ProxyPortError:
329
            if not force:
330
                raise
331
332 1
    async def remove_int_flows(
333
        self, evcs: dict[str, dict], metadata: dict, force=False
334
    ) -> None:
335
        """Remove INT flows and set metadata on EVCs."""
336 1
        stored_flows = await api.get_stored_flows(
337
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
338
        )
339 1
        await asyncio.gather(
340
            self._remove_int_flows_by_cookies(stored_flows),
341
            api.add_evcs_metadata(evcs, metadata, force),
342
        )
343
344 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
345
        """Enable INT on EVCs.
346
347
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
348
349
        The force bool option, if True, will bypass the following:
350
351
        1 - EVC already has INT
352
        2 - ProxyPort isn't UP
353
        Other cases won't be bypassed since at the point it won't have the data needed.
354
355
        """
356 1
        evcs = self._validate_map_enable_evcs(evcs, force)
357 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
358
359 1
        metadata = {
360
            "telemetry": {
361
                "enabled": True,
362
                "status": "UP",
363
                "status_reason": [],
364
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
365
            }
366
        }
367 1
        await self.install_int_flows(evcs, metadata)
368 1
        self._add_pps_evc_ids(evcs)
369
370 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
371
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
372
373
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
374
        """
375 1
        self._validate_has_int(evcs)
376 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
377 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
378
379 1
        stored_flows = await api.get_stored_flows(
380
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
381
        )
382 1
        await self._remove_int_flows_by_cookies(stored_flows)
383 1
        metadata = {
384
            "telemetry": {
385
                "enabled": True,
386
                "status": "UP",
387
                "status_reason": [],
388
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
389
            }
390
        }
391 1
        await self.install_int_flows(evcs, metadata, force=True)
392
393 1
    async def install_int_flows(
394
        self, evcs: dict[str, dict], metadata: dict, force=False
395
    ) -> None:
396
        """Install INT flows and set metadata on EVCs."""
397 1
        stored_flows = self.flow_builder.build_int_flows(
398
            evcs,
399
            await utils.get_found_stored_flows(
400
                [
401
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
402
                    for evc_id in evcs
403
                ]
404
            ),
405
        )
406 1
        self._validate_evcs_stored_flows(evcs, stored_flows)
407
408 1
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
409 1
        for evc_id, evc in evcs.items():
410 1
            if not evc["active"]:
411
                inactive_evcs[evc_id] = evc
412
                continue
413 1
            if any(
414
                (
415
                    evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
416
                    evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
417
                )
418
            ):
419 1
                pp_down_evcs[evc_id] = evc
420 1
                continue
421
            active_evcs[evc_id] = evc
422
423 1
        inactive_metadata = copy.deepcopy(metadata)
424 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
425 1
        pp_down_metadata = copy.deepcopy(inactive_metadata)
426 1
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
427 1
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
428
429 1
        await asyncio.gather(
430
            self._install_int_flows(stored_flows),
431
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
432
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
433
            api.add_evcs_metadata(active_evcs, metadata, force),
434
        )
435
436 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
437
        """Return a ProxyPort assigned to a UNI or raise."""
438
439 1
        interface = self.controller.get_interface_by_id(intf_id)
440 1
        if not interface:
441 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
442
443 1
        if "proxy_port" not in interface.metadata:
444 1
            raise ProxyPortNotFound(
445
                evc_id, f"proxy_port metadata not found in {intf_id}"
446
            )
447
448 1
        source_intf = interface.switch.get_interface_by_port_no(
449
            interface.metadata.get("proxy_port")
450
        )
451 1
        if not source_intf:
452
            raise ProxyPortNotFound(
453
                evc_id,
454
                f"proxy_port of {intf_id} source interface not found",
455
            )
456
457 1
        pp = self.srcs_pp.get(source_intf.id)
458 1
        if not pp:
459 1
            pp = ProxyPort(self.controller, source_intf)
460 1
            self.srcs_pp[source_intf.id] = pp
461
462 1
        if not pp.destination:
463 1
            raise ProxyPortDestNotFound(
464
                evc_id,
465
                f"proxy_port of {intf_id} isn't looped or destination interface "
466
                "not found",
467
            )
468
469 1
        return pp
470
471 1
    def _validate_disable_evcs(
472
        self,
473
        evcs: dict[str, dict],
474
        force=False,
475
    ) -> None:
476
        """Validate disable EVCs."""
477 1
        for evc_id, evc in evcs.items():
478
            if not evc and not force:
479
                raise EVCNotFound(evc_id)
480
            if not utils.has_int_enabled(evc) and not force:
481
                raise EVCHasNoINT(evc_id)
482
483 1
    def _validate_evcs_stored_flows(
484
        self, evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
485
    ) -> None:
486
        """Validate that each active EVC has corresponding flows."""
487 1
        for evc_id, evc in evcs.items():
488 1
            if evc["active"] and not stored_flows.get(
489
                utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
490
            ):
491 1
                raise FlowsNotFound(evc_id)
492
493 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
494
        """Validate that an intra EVC is using different proxy ports.
495
496
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
497
        would ended up being overwritten since they'd be the same. Currently, an
498
        external loop will have unidirectional flows matching in the lower (source)
499
        port number.
500
        """
501 1
        pp_a = evc["uni_a"].get("proxy_port")
502 1
        pp_z = evc["uni_z"].get("proxy_port")
503 1
        if any(
504
            (
505
                not utils.is_intra_switch_evc(evc),
506
                pp_a is None,
507
                pp_z is None,
508
            )
509
        ):
510 1
            return
511 1
        if pp_a.source != pp_z.source:
512 1
            return
513
514 1
        raise ProxyPortSameSourceIntraEVC(
515
            evc["id"], "intra EVC UNIs must use different proxy ports"
516
        )
517
518 1
    def _validate_dedicated_proxy_port_evcs(self, evcs: dict[str, dict]):
519
        """Validate that a proxy port is dedicated for the given EVCs.
520
521
        https://github.com/kytos-ng/telemetry_int/issues/110
522
        """
523 1
        seen_src_unis: dict[str, str] = {}
524 1
        for evc in evcs.values():
525 1
            pp_a, unia_id = evc["uni_a"]["proxy_port"], evc["uni_a"]["interface_id"]
526 1
            pp_z, uniz_id = evc["uni_z"]["proxy_port"], evc["uni_z"]["interface_id"]
527 1
            for cur_uni_id, cur_src_id in self.unis_src.items():
528 1
                for uni_id, pp in zip((unia_id, uniz_id), (pp_a, pp_z)):
529 1
                    if uni_id != cur_uni_id and cur_src_id == pp.source.id:
530 1
                        msg = (
531
                            f"UNI {uni_id} must use another dedicated proxy port. "
532
                            f"UNI {cur_uni_id} is using {pp}"
533
                        )
534 1
                        raise ProxyPortShared(evc["id"], msg)
535
536
            # This is needed to validate the EVCs of the current request
537
            # since self.uni_src only gets updated when a EVC gets enabled
538 1
            for uni_id, pp in zip((unia_id, uniz_id), (pp_a, pp_z)):
539 1
                if (found := seen_src_unis.get(pp.source.id)) and found != uni_id:
540 1
                    msg = (
541
                        f"UNI {uni_id} must use another dedicated proxy port. "
542
                        f"UNI {found} would use {pp}"
543
                    )
544 1
                    raise ProxyPortShared(evc["id"], msg)
545 1
                seen_src_unis[pp.source.id] = uni_id
546
547 1
    async def handle_failover_flows(
548
        self, evcs_content: dict[str, dict], event_name: str
549
    ) -> None:
550
        """Handle failover flows. This method will generate the subset
551
        of INT flows. EVCs with 'flows' key will be installed, and
552
        'old_flows' will be removed.
553
554
        If a given proxy port has an unexpected state INT will be
555
        removed falling back to mef_eline flows.
556
        """
557 1
        to_install, to_remove, to_remove_with_err = {}, {}, {}
558 1
        new_flows: dict[int, list[dict]] = defaultdict(list)
559 1
        old_flows: dict[int, list[dict]] = defaultdict(list)
560
561 1
        old_flows_key = "removed_flows"
562 1
        new_flows_key = "flows"
563
564 1
        for evc_id, evc in evcs_content.items():
565 1
            if not utils.has_int_enabled(evc):
566
                continue
567 1
            try:
568 1
                uni_a, uni_z = utils.get_evc_unis(evc)
569 1
                pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
570 1
                pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
571 1
                uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
572 1
                evc["id"] = evc_id
573 1
                evc["uni_a"], evc["uni_z"] = uni_a, uni_z
574
            except ProxyPortError as e:
575
                log.error(
576
                    f"Unexpected proxy port state: {str(e)}."
577
                    f"INT will be removed on evc id {evc_id}"
578
                )
579
                to_remove_with_err[evc_id] = evc
580
                continue
581
582 1
            for dpid, flows in evc.get(new_flows_key, {}).items():
583 1
                for flow in flows:
584 1
                    new_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
585
586 1
            for dpid, flows in evc.get(old_flows_key, {}).items():
587 1
                for flow in flows:
588
                    # set priority and table_group just so INT flows can be built
589
                    # the priority doesn't matter for deletion
590 1
                    flow["priority"] = 21000
591 1
                    flow["table_group"] = (
592
                        "evpl" if "dl_vlan" in flow.get("match", {}) else "epl"
593
                    )
594 1
                    old_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
595
596 1
            if evc.get(new_flows_key):
597 1
                to_install[evc_id] = evc
598 1
                evc.pop(new_flows_key)
599 1
            if evc.get(old_flows_key):
600 1
                to_remove[evc_id] = evc
601 1
                evc.pop(old_flows_key, None)
602
603 1
        if to_remove:
604 1
            log.info(
605
                f"Handling {event_name} flows remove on EVC ids: {to_remove.keys()}"
606
            )
607 1
            await self._remove_int_flows(
608
                self.flow_builder.build_failover_old_flows(to_remove, old_flows)
609
            )
610 1
        if to_remove_with_err:
611
            log.error(
612
                f"Handling {event_name} proxy_port_error falling back "
613
                f"to mef_eline, EVC ids: {list(to_remove_with_err.keys())}"
614
            )
615
            metadata = {
616
                "telemetry": {
617
                    "enabled": True,
618
                    "status": "DOWN",
619
                    "status_reason": ["proxy_port_error"],
620
                    "status_updated_at": datetime.utcnow().strftime(
621
                        "%Y-%m-%dT%H:%M:%S"
622
                    ),
623
                }
624
            }
625
            await self.remove_int_flows(to_remove_with_err, metadata, force=True)
626 1
        if to_install:
627 1
            log.info(
628
                f"Handling {event_name} flows install on EVC ids: {to_install.keys()}"
629
            )
630 1
            await self._install_int_flows(
631
                self.flow_builder.build_int_flows(to_install, new_flows)
632
            )
633
634 1
    def _validate_map_enable_evcs(
635
        self,
636
        evcs: dict[str, dict],
637
        force=False,
638
    ) -> dict[str, dict]:
639
        """Validate map enabling EVCs.
640
641
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
642
        so it can be reused later during provisioning.
643
644
        """
645 1
        for evc_id, evc in evcs.items():
646 1
            if not evc:
647
                raise EVCNotFound(evc_id)
648 1
            if utils.has_int_enabled(evc) and not force:
649
                raise EVCHasINT(evc_id)
650
651 1
            uni_a, uni_z = utils.get_evc_unis(evc)
652 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
653 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
654
655 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
656 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
657
658 1
            if pp_a.status != EntityStatus.UP and not force:
659
                dest_id = pp_a.destination.id if pp_a.destination else None
660
                dest_status = pp_a.status if pp_a.destination else None
661
                raise ProxyPortStatusNotUP(
662
                    evc_id,
663
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
664
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
665
                    f"destination {dest_id} status {dest_status}",
666
                )
667 1
            if pp_z.status != EntityStatus.UP and not force:
668
                dest_id = pp_z.destination.id if pp_z.destination else None
669
                dest_status = pp_z.status if pp_z.destination else None
670
                raise ProxyPortStatusNotUP(
671
                    evc_id,
672
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
673
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
674
                    f"destination {dest_id} status {dest_status}",
675
                )
676
677 1
            self._validate_intra_evc_different_proxy_ports(evc)
678 1
        self._validate_dedicated_proxy_port_evcs(evcs)
679 1
        return evcs
680
681 1
    def _validate_has_int(self, evcs: dict[str, dict]):
682 1
        for evc_id, evc in evcs.items():
683 1
            if not utils.has_int_enabled(evc):
684 1
                raise EVCHasNoINT(evc_id)
685
686 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
687
        """Add proxy ports evc_ids.
688
689
        This is meant to be called after an EVC is enabled.
690
        """
691 1
        for evc_id, evc in evcs.items():
692 1
            uni_a, uni_z = utils.get_evc_unis(evc)
693 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
694 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
695 1
            pp_a.evc_ids.add(evc_id)
696 1
            pp_z.evc_ids.add(evc_id)
697 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
698 1
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
699
700 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
701
        """Discard proxy port evc_ids.
702
703
        This is meant to be called when an EVC is disabled.
704
        """
705 1
        for evc_id, evc in evcs.items():
706 1
            uni_a, uni_z = utils.get_evc_unis(evc)
707 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
708 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
709 1
            pp_a.evc_ids.discard(evc_id)
710 1
            pp_z.evc_ids.discard(evc_id)
711 1
            if not pp_a.evc_ids:
712
                self.unis_src.pop(evc["uni_a"]["interface_id"], None)
713 1
            if not pp_z.evc_ids:
714
                self.unis_src.pop(evc["uni_z"]["interface_id"], None)
715
716 1
    def evc_compare(
717
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
718
    ) -> dict[str, list]:
719
        """EVC compare.
720
721
        Cases:
722
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
723
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
724
725
        """
726 1
        int_flows = {
727
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
728
        }
729 1
        mef_flows = {
730
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
731
        }
732
733 1
        results = defaultdict(list)
734 1
        for evc in evcs.values():
735 1
            evc_id = evc["id"]
736
737 1
            if (
738
                not utils.has_int_enabled(evc)
739
                and evc_id in int_flows
740
                and int_flows[evc_id]
741
            ):
742 1
                results[evc_id].append("wrong_metadata_has_int_flows")
743
744 1
            if (
745
                utils.has_int_enabled(evc)
746
                and evc_id in mef_flows
747
                and mef_flows[evc_id]
748
                and (
749
                    evc_id not in int_flows
750
                    or (
751
                        evc_id in int_flows
752
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
753
                    )
754
                )
755
            ):
756 1
                results[evc_id].append("missing_some_int_flows")
757 1
        return results
758
759 1
    async def _remove_int_flows_by_cookies(
760
        self, stored_flows: dict[int, list[dict]]
761
    ) -> dict[str, list[dict]]:
762
        """Delete int flows given a prefiltered stored_flows by cookies.
763
        You should use this type of removal when you need to remove all
764
        flows associated with a cookie, if you need to include all keys in the match
765
        to remove only a subset use `_remove_int_flows(stored_flows)` method instead.
766
767
        Removal is driven by the stored flows instead of EVC ids and dpids to also
768
        be able to handle the force mode when an EVC no longer exists. It also follows
769
        the same pattern that mef_eline currently uses.
770
        """
771 1
        switch_flows_cookies = defaultdict(set)
772 1
        for flows in stored_flows.values():
773 1
            for flow in flows:
774 1
                switch_flows_cookies[flow["switch"]].add(flow["flow"]["cookie"])
775
776 1
        switch_flows = defaultdict(list)
777 1
        for dpid, cookies in switch_flows_cookies.items():
778 1
            for cookie in cookies:
779 1
                switch_flows[dpid].append(
780
                    {
781
                        "cookie": cookie,
782
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
783
                        "table_id": Table.OFPTT_ALL.value,
784
                    }
785
                )
786 1
        await self._send_flows(switch_flows, "delete")
787 1
        return switch_flows
788
789 1
    async def _remove_int_flows(
790
        self, stored_flows: dict[int, list[dict]]
791
    ) -> dict[str, list[dict]]:
792
        """Delete int flows given a prefiltered stored_flows. This method is meant
793
        to be used when you need to match all the flow match keys, so, typically when
794
        you're removing just a subset of INT flows.
795
796
        Removal is driven by the stored flows instead of EVC ids and dpids to also
797
        be able to handle the force mode when an EVC no longer exists. It also follows
798
        the same pattern that mef_eline currently uses."""
799 1
        switch_flows = defaultdict(list)
800 1
        for flows in stored_flows.values():
801 1
            for flow in flows:
802 1
                switch_flows[flow["switch"]].append(flow["flow"])
803 1
        await self._send_flows(switch_flows, "delete")
804 1
        return switch_flows
805
806 1
    async def _install_int_flows(
807
        self, stored_flows: dict[int, list[dict]]
808
    ) -> dict[str, list[dict]]:
809
        """Install INT flow mods."""
810 1
        switch_flows = defaultdict(list)
811 1
        for flows in stored_flows.values():
812 1
            for flow in flows:
813 1
                switch_flows[flow["switch"]].append(flow["flow"])
814 1
        await self._send_flows(switch_flows, "install")
815 1
        return switch_flows
816
817 1
    async def _send_flows(
818
        self, switch_flows: dict[str, list[dict]], cmd: Literal["install", "delete"]
819
    ):
820
        """Send batched flows by dpid to flow_manager.
821
822
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
823
        for settings.BATCH_INTERVAL per batch iteration.
824
        """
825 1
        for dpid, flows in switch_flows.items():
826 1
            batch_size = settings.BATCH_SIZE
827 1
            if batch_size <= 0:
828
                batch_size = len(flows)
829
830 1
            for i in range(0, len(flows), batch_size):
831 1
                flows = flows[i : i + batch_size]
832 1
                if not flows:
833
                    continue
834
835 1
                if i > 0:
836
                    await asyncio.sleep(settings.BATCH_INTERVAL)
837 1
                event = KytosEvent(
838
                    f"kytos.flow_manager.flows.single.{cmd}",
839
                    content={
840
                        "dpid": dpid,
841
                        "force": True,
842
                        "flow_dict": {"flows": flows},
843
                    },
844
                )
845
                await self.controller.buffers.app.aput(event)
846