Passed
Pull Request — master (#104)
by Vinicius
04:02
created

build.managers.int.INTManager._install_int_flows()   B

Complexity

Conditions 7

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 7.0957

Importance

Changes 0
Metric Value
cc 7
eloc 21
nop 2
dl 0
loc 31
ccs 14
cts 16
cp 0.875
crap 7.0957
rs 7.9759
c 0
b 0
f 0
1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
import itertools
6 1
from collections import defaultdict
7 1
from datetime import datetime
8 1
from typing import Literal
9
10 1
from pyof.v0x04.controller2switch.table_mod import Table
11
12 1
from kytos.core.controller import Controller
13 1
from kytos.core.events import KytosEvent
14 1
from kytos.core.interface import Interface
15 1
from napps.kytos.telemetry_int import utils
16 1
from napps.kytos.telemetry_int import settings
17 1
from kytos.core import log
18 1
from kytos.core.link import Link
19 1
import napps.kytos.telemetry_int.kytos_api_helper as api
20 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
21 1
from kytos.core.common import EntityStatus
22 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
23
24 1
from napps.kytos.telemetry_int.exceptions import (
25
    EVCError,
26
    EVCNotFound,
27
    EVCHasINT,
28
    EVCHasNoINT,
29
    FlowsNotFound,
30
    ProxyPortError,
31
    ProxyPortStatusNotUP,
32
    ProxyPortDestNotFound,
33
    ProxyPortNotFound,
34
    ProxyPortSameSourceIntraEVC,
35
)
36
37
38 1
class INTManager:
39
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
40
41 1
    def __init__(self, controller: Controller) -> None:
42
        """INTManager."""
43 1
        self.controller = controller
44 1
        self.flow_builder = FlowBuilder()
45 1
        self._topo_link_lock = asyncio.Lock()
46 1
        self._intf_meta_lock = asyncio.Lock()
47
48
        # Keep track between each uni intf id and its src intf id port
49 1
        self.unis_src: dict[str, str] = {}
50
        # Keep track between src intf id and its ProxyPort instance
51 1
        self.srcs_pp: dict[str, ProxyPort] = {}
52
53 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
54
        """Load UNI ids src ids and their ProxyPort instances."""
55 1
        for evc_id, evc in evcs.items():
56 1
            if not utils.has_int_enabled(evc):
57 1
                continue
58
59 1
            uni_a_id = evc["uni_a"]["interface_id"]
60 1
            uni_z_id = evc["uni_z"]["interface_id"]
61 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
62 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
63 1
            if uni_a and "proxy_port" in uni_a.metadata:
64 1
                src_a = uni_a.switch.get_interface_by_port_no(
65
                    uni_a.metadata["proxy_port"]
66
                )
67 1
                self.unis_src[uni_a.id] = src_a.id
68 1
                try:
69 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
70
                except ProxyPortDestNotFound:
71
                    pp = self.srcs_pp[src_a.id]
72 1
                pp.evc_ids.add(evc_id)
73
74 1
            if uni_z and "proxy_port" in uni_z.metadata:
75 1
                src_z = uni_z.switch.get_interface_by_port_no(
76
                    uni_z.metadata["proxy_port"]
77
                )
78 1
                self.unis_src[uni_z.id] = src_z.id
79 1
                try:
80 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
81
                except ProxyPortDestNotFound:
82
                    pp = self.srcs_pp[src_z.id]
83 1
                pp.evc_ids.add(evc_id)
84
85 1
    async def handle_pp_link_down(self, link: Link) -> None:
86
        """Handle proxy_port link_down."""
87 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
88
            return
89 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
90 1
        if not pp:
91
            pp = self.srcs_pp.get(link.endpoint_b.id)
92 1
        if not pp or not pp.evc_ids:
93
            return
94
95 1
        async with self._topo_link_lock:
96 1
            evcs = await api.get_evcs(
97
                **{
98
                    "metadata.telemetry.enabled": "true",
99
                    "metadata.telemetry.status": "UP",
100
                }
101
            )
102 1
            to_deactivate = {
103
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
104
            }
105 1
            if not to_deactivate:
106
                return
107
108 1
            log.info(
109
                f"Handling link_down {link}, removing INT flows falling back to "
110
                f"mef_eline, EVC ids: {list(to_deactivate)}"
111
            )
112 1
            metadata = {
113
                "telemetry": {
114
                    "enabled": True,
115
                    "status": "DOWN",
116
                    "status_reason": ["proxy_port_down"],
117
                    "status_updated_at": datetime.utcnow().strftime(
118
                        "%Y-%m-%dT%H:%M:%S"
119
                    ),
120
                }
121
            }
122 1
            await self.remove_int_flows(to_deactivate, metadata)
123
124 1
    async def handle_pp_link_up(self, link: Link) -> None:
125
        """Handle proxy_port link_up."""
126 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
127
            return
128 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
129 1
        if not pp:
130
            pp = self.srcs_pp.get(link.endpoint_b.id)
131 1
        if not pp or not pp.evc_ids:
132
            return
133
134 1
        async with self._topo_link_lock:
135 1
            if link.status != EntityStatus.UP or link.status_reason:
136
                return
137 1
            evcs = await api.get_evcs(
138
                **{
139
                    "metadata.telemetry.enabled": "true",
140
                    "metadata.telemetry.status": "DOWN",
141
                }
142
            )
143
144 1
            to_install = {}
145 1
            for evc_id, evc in evcs.items():
146 1
                if any(
147
                    (
148
                        not evc["active"],
149
                        evc["archived"],
150
                        evc_id not in pp.evc_ids,
151
                        evc["uni_a"]["interface_id"] not in self.unis_src,
152
                        evc["uni_z"]["interface_id"] not in self.unis_src,
153
                    )
154
                ):
155
                    continue
156
157 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
158 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
159 1
                if (
160
                    src_a_id in self.srcs_pp
161
                    and src_z_id in self.srcs_pp
162
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
163
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
164
                ):
165 1
                    to_install[evc_id] = evc
166
167 1
            if not to_install:
168
                return
169
170 1
            try:
171 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
172
            except EVCError as exc:
173
                log.exception(exc)
174
                return
175
176 1
            log.info(
177
                f"Handling link_up {link}, deploying INT flows, "
178
                f"EVC ids: {list(to_install)}"
179
            )
180 1
            metadata = {
181
                "telemetry": {
182
                    "enabled": True,
183
                    "status": "UP",
184
                    "status_reason": [],
185
                    "status_updated_at": datetime.utcnow().strftime(
186
                        "%Y-%m-%dT%H:%M:%S"
187
                    ),
188
                }
189
            }
190 1
            try:
191 1
                await self.install_int_flows(to_install, metadata)
192
            except FlowsNotFound as exc:
193
                log.exception(f"FlowsNotFound {str(exc)}")
194
                return
195
196 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
197
        """Handle proxy port metadata removed."""
198 1
        if "proxy_port" in intf.metadata:
199
            return
200 1
        try:
201 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
202 1
            if not pp.evc_ids:
203
                return
204
        except KeyError:
205
            return
206
207 1
        async with self._intf_meta_lock:
208 1
            evcs = await api.get_evcs(
209
                **{
210
                    "metadata.telemetry.enabled": "true",
211
                    "metadata.telemetry.status": "UP",
212
                }
213
            )
214 1
            to_deactivate = {
215
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
216
            }
217 1
            if not to_deactivate:
218
                return
219
220 1
            log.info(
221
                f"Handling interface metadata removed on {intf}, removing INT flows "
222
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
223
            )
224 1
            metadata = {
225
                "telemetry": {
226
                    "enabled": True,
227
                    "status": "DOWN",
228
                    "status_reason": ["proxy_port_metadata_removed"],
229
                    "status_updated_at": datetime.utcnow().strftime(
230
                        "%Y-%m-%dT%H:%M:%S"
231
                    ),
232
                }
233
            }
234 1
            await self.remove_int_flows(to_deactivate, metadata)
235
236 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
237
        """Handle proxy port metadata added.
238
239
        If an existing ProxyPort gets its proxy_port meadata updated
240
        and has associated EVCs then it'll remove and install the flows accordingly.
241
242
        """
243 1
        if "proxy_port" not in intf.metadata:
244
            return
245 1
        try:
246 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
247 1
            if not pp.evc_ids:
248
                return
249
        except KeyError:
250
            return
251
252 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
253
            intf.metadata.get("proxy_port")
254
        )
255 1
        if cur_source_intf == pp.source:
256 1
            return
257
258 1
        async with self._intf_meta_lock:
259 1
            pp.source = cur_source_intf
260
261 1
            evcs = await api.get_evcs(
262
                **{
263
                    "metadata.telemetry.enabled": "true",
264
                }
265
            )
266 1
            affected_evcs = {
267
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
268
            }
269 1
            if not affected_evcs:
270 1
                return
271
272 1
            log.info(
273
                f"Handling interface metadata updated on {intf}. It'll disable the "
274
                "EVCs to be safe, and then try to enable again with the updated "
275
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
276
            )
277 1
            await self.disable_int(affected_evcs, force=True)
278 1
            try:
279 1
                await self.enable_int(affected_evcs, force=True)
280
            except ProxyPortSameSourceIntraEVC as exc:
281
                msg = (
282
                    f"Validation error when updating interface {intf} proxy port {pp}"
283
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
284
                )
285
                log.error(msg)
286
287 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
288
        """Disable INT on EVCs.
289
290
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
291
292
        The force bool option, if True, will bypass the following:
293
294
        1 - EVC not found
295
        2 - EVC doesn't have INT
296
        3 - ProxyPortNotFound or ProxyPortDestNotFound
297
298
        """
299 1
        self._validate_disable_evcs(evcs, force)
300 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
301
302 1
        metadata = {
303
            "telemetry": {
304
                "enabled": False,
305
                "status": "DOWN",
306
                "status_reason": ["disabled"],
307
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
308
            }
309
        }
310 1
        await self.remove_int_flows(evcs, metadata, force=force)
311 1
        try:
312 1
            self._discard_pps_evc_ids(evcs)
313
        except ProxyPortError:
314
            if not force:
315
                raise
316
317 1
    async def remove_int_flows(
318
        self, evcs: dict[str, dict], metadata: dict, force=False
319
    ) -> None:
320
        """Remove INT flows and set metadata on EVCs."""
321 1
        stored_flows = await api.get_stored_flows(
322
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
323
        )
324 1
        await asyncio.gather(
325
            self._remove_int_flows_by_cookies(stored_flows),
326
            api.add_evcs_metadata(evcs, metadata, force),
327
        )
328
329 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
330
        """Enable INT on EVCs.
331
332
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
333
334
        The force bool option, if True, will bypass the following:
335
336
        1 - EVC already has INT
337
        2 - ProxyPort isn't UP
338
        Other cases won't be bypassed since at the point it won't have the data needed.
339
340
        """
341 1
        evcs = self._validate_map_enable_evcs(evcs, force)
342 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
343
344 1
        metadata = {
345
            "telemetry": {
346
                "enabled": True,
347
                "status": "UP",
348
                "status_reason": [],
349
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
350
            }
351
        }
352 1
        await self.install_int_flows(evcs, metadata)
353 1
        self._add_pps_evc_ids(evcs)
354
355 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
356
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
357
358
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
359
        """
360 1
        self._validate_has_int(evcs)
361 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
362 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
363
364 1
        stored_flows = await api.get_stored_flows(
365
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
366
        )
367 1
        await self._remove_int_flows_by_cookies(stored_flows)
368 1
        metadata = {
369
            "telemetry": {
370
                "enabled": True,
371
                "status": "UP",
372
                "status_reason": [],
373
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
374
            }
375
        }
376 1
        await self.install_int_flows(evcs, metadata, force=True)
377
378 1
    async def install_int_flows(
379
        self, evcs: dict[str, dict], metadata: dict, force=False
380
    ) -> None:
381
        """Install INT flows and set metadata on EVCs."""
382 1
        stored_flows = self.flow_builder.build_int_flows(
383
            evcs,
384
            await utils.get_found_stored_flows(
385
                [
386
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
387
                    for evc_id in evcs
388
                ]
389
            ),
390
        )
391 1
        self._validate_evcs_stored_flows(evcs, stored_flows)
392
393 1
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
394 1
        for evc_id, evc in evcs.items():
395 1
            if not evc["active"]:
396
                inactive_evcs[evc_id] = evc
397
                continue
398 1
            if any(
399
                (
400
                    evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
401
                    evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
402
                )
403
            ):
404 1
                pp_down_evcs[evc_id] = evc
405 1
                continue
406
            active_evcs[evc_id] = evc
407
408 1
        inactive_metadata = copy.deepcopy(metadata)
409 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
410 1
        pp_down_metadata = copy.deepcopy(inactive_metadata)
411 1
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
412 1
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
413
414 1
        await asyncio.gather(
415
            self._install_int_flows(stored_flows),
416
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
417
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
418
            api.add_evcs_metadata(active_evcs, metadata, force),
419
        )
420
421 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
422
        """Return a ProxyPort assigned to a UNI or raise."""
423
424 1
        interface = self.controller.get_interface_by_id(intf_id)
425 1
        if not interface:
426 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
427
428 1
        if "proxy_port" not in interface.metadata:
429 1
            raise ProxyPortNotFound(
430
                evc_id, f"proxy_port metadata not found in {intf_id}"
431
            )
432
433 1
        source_intf = interface.switch.get_interface_by_port_no(
434
            interface.metadata.get("proxy_port")
435
        )
436 1
        if not source_intf:
437
            raise ProxyPortNotFound(
438
                evc_id,
439
                f"proxy_port of {intf_id} source interface not found",
440
            )
441
442 1
        pp = self.srcs_pp.get(source_intf.id)
443 1
        if not pp:
444 1
            pp = ProxyPort(self.controller, source_intf)
445 1
            self.srcs_pp[source_intf.id] = pp
446
447 1
        if not pp.destination:
448 1
            raise ProxyPortDestNotFound(
449
                evc_id,
450
                f"proxy_port of {intf_id} isn't looped or destination interface "
451
                "not found",
452
            )
453
454 1
        return pp
455
456 1
    def _validate_disable_evcs(
457
        self,
458
        evcs: dict[str, dict],
459
        force=False,
460
    ) -> None:
461
        """Validate disable EVCs."""
462 1
        for evc_id, evc in evcs.items():
463
            if not evc and not force:
464
                raise EVCNotFound(evc_id)
465
            if not utils.has_int_enabled(evc) and not force:
466
                raise EVCHasNoINT(evc_id)
467
468 1
    def _validate_evcs_stored_flows(
469
        self, evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
470
    ) -> None:
471
        """Validate that each active EVC has corresponding flows."""
472 1
        for evc_id, evc in evcs.items():
473 1
            if evc["active"] and not stored_flows.get(
474
                utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
475
            ):
476 1
                raise FlowsNotFound(evc_id)
477
478 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
479
        """Validate that an intra EVC is using different proxy ports.
480
481
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
482
        would ended up being overwritten since they'd be the same. Currently, an
483
        external loop will have unidirectional flows matching in the lower (source)
484
        port number.
485
        """
486 1
        pp_a = evc["uni_a"].get("proxy_port")
487 1
        pp_z = evc["uni_z"].get("proxy_port")
488 1
        if any(
489
            (
490
                not utils.is_intra_switch_evc(evc),
491
                pp_a is None,
492
                pp_z is None,
493
            )
494
        ):
495 1
            return
496 1
        if pp_a.source != pp_z.source:
497 1
            return
498
499 1
        raise ProxyPortSameSourceIntraEVC(
500
            evc["id"], "intra EVC UNIs must use different proxy ports"
501
        )
502
503 1
    async def handle_failover_flows(
504
        self, evcs_content: dict[str, dict], event_name: str
505
    ) -> None:
506
        """Handle failover flows. This method will generate the subset
507
        of INT flows. EVCs with 'flows' key will be installed, and
508
        'old_flows' will be removed.
509
510
        If a given proxy port has an unexpected state INT will be
511
        removed falling back to mef_eline flows.
512
        """
513
        to_install, to_remove, to_remove_with_err = {}, {}, {}
514
        new_flows: dict[int, list[dict]] = defaultdict(list)
515
        old_flows: dict[int, list[dict]] = defaultdict(list)
516
517
        old_flows_key = "removed_flows"
518
        new_flows_key = "flows"
519
520
        for evc_id, evc in evcs_content.items():
521
            if not utils.has_int_enabled(evc):
522
                continue
523
            try:
524
                uni_a, uni_z = utils.get_evc_unis(evc)
525
                pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
526
                pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
527
                uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
528
                evc["id"] = evc_id
529
                evc["uni_a"], evc["uni_z"] = uni_a, uni_z
530
            except ProxyPortError as e:
531
                log.error(
532
                    f"Unexpected proxy port state: {str(e)}."
533
                    f"INT will be removed on evc id {evc_id}"
534
                )
535
                to_remove_with_err[evc_id] = evc
536
                continue
537
538
            for dpid, flows in evc.get(new_flows_key, {}).items():
539
                for flow in flows:
540
                    new_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
541
542
            for dpid, flows in evc.get(old_flows_key, {}).items():
543
                for flow in flows:
544
                    # set priority and table_group just so INT flows can be built
545
                    # the priority doesn't matter for deletion
546
                    flow["priority"] = 21000
547
                    flow["table_group"] = (
548
                        "evpl" if "dl_vlan" in flow.get("match", {}) else "epl"
549
                    )
550
                    old_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
551
552
            if evc.get(new_flows_key):
553
                to_install[evc_id] = evc
554
                evc.pop(new_flows_key)
555
            if evc.get(old_flows_key):
556
                to_remove[evc_id] = evc
557
                evc.pop(old_flows_key, None)
558
559
        if to_remove:
560
            log.info(
561
                f"Handling {event_name} flows remove on EVC ids: {to_remove.keys()}"
562
            )
563
            await self._remove_int_flows(
564
                self._build_failover_old_flows(to_remove, old_flows)
565
            )
566
        if to_remove_with_err:
567
            log.error(
568
                f"Handling {event_name} proxy_port_error falling back "
569
                f"to mef_eline, EVC ids: {list(to_remove_with_err.keys())}"
570
            )
571
            metadata = {
572
                "telemetry": {
573
                    "enabled": True,
574
                    "status": "DOWN",
575
                    "status_reason": ["proxy_port_error"],
576
                    "status_updated_at": datetime.utcnow().strftime(
577
                        "%Y-%m-%dT%H:%M:%S"
578
                    ),
579
                }
580
            }
581
            await self.remove_int_flows(to_remove_with_err, metadata, force=True)
582
        if to_install:
583
            log.info(
584
                f"Handling {event_name} flows install on EVC ids: {to_install.keys()}"
585
            )
586
            await self._install_int_flows(
587
                self.flow_builder.build_int_flows(to_install, new_flows)
588
            )
589
590 1
    def _build_failover_old_flows(
591
        self, evcs: dict[str, list[dict]], old_flows: dict[int, list[dict]]
592
    ) -> dict[int, list[dict]]:
593
        """Build (old path) failover related to remove flows.
594
595
        If sink nnis svlan are different it'll regenerate the rest of sink loop flows,
596
        otherwise, it'll just remove the same received flows except with int cookie
597
        value the deletion uses flow mod OFPFC_DELETE, so no need to include the
598
        additional INT keys in the match like nw_proto for deletion.
599
        """
600
601
        removed_flows = defaultdict(list)
602
        for evc_id, evc in evcs.items():
603
            dpid_a, dpid_z = evc["uni_a"]["switch"], evc["uni_z"]["switch"]
604
605
            cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
606
            int_cookie = settings.INT_COOKIE_PREFIX << 56 | (cookie & 0xFFFFFFFFFFFFFF)
607
            cur_sink_a_svlan, cur_sink_z_svlan = None, None
608
            sink_a_flows: list[dict] = []
609
            sink_z_flows: list[dict] = []
610
611
            for link in evc["current_path"]:
612
                if cur_sink_a_svlan is None and (
613
                    svlan := utils.get_svlan_dpid_link(link, dpid_a)
614
                ):
615
                    cur_sink_a_svlan = svlan
616
                if cur_sink_z_svlan is None and (
617
                    svlan := utils.get_svlan_dpid_link(link, dpid_z)
618
                ):
619
                    cur_sink_z_svlan = svlan
620
                if cur_sink_a_svlan is not None and cur_sink_z_svlan is not None:
621
                    break
622
623
            log.info(
624
                f"bar sink_a_svlan {cur_sink_a_svlan}, sink_z_svlan {cur_sink_z_svlan}"
625
            )
626
627
            for flow in old_flows[cookie]:
628
                if not sink_a_flows and flow["switch"] == dpid_a:
629
                    if (
630
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_a_svlan
631
                        and cur_sink_a_svlan
632
                    ):
633
                        sink_a_flows = self.flow_builder._build_int_sink_flows(
634
                            "uni_a", evc, old_flows
635
                        )
636
                    else:
637
                        flow["flow"]["cookie"] = int_cookie
638
                        sink_a_flows = [flow]
639
                elif not sink_z_flows and flow["switch"] == dpid_z:
640
                    if (
641
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_z_svlan
642
                        and cur_sink_z_svlan
643
                    ):
644
                        sink_z_flows = self.flow_builder._build_int_sink_flows(
645
                            "uni_z", evc, old_flows
646
                        )
647
                    else:
648
                        flow["flow"]["cookie"] = int_cookie
649
                        sink_z_flows = [flow]
650
                if sink_a_flows and sink_z_flows:
651
                    break
652
653
            hop_flows = self.flow_builder._build_int_hop_flows(evc, old_flows)
654
            removed_flows[cookie] = list(
655
                itertools.chain(sink_a_flows, hop_flows, sink_z_flows)
656
            )
657
        return removed_flows
658
659 1
    def _validate_map_enable_evcs(
660
        self,
661
        evcs: dict[str, dict],
662
        force=False,
663
    ) -> dict[str, dict]:
664
        """Validate map enabling EVCs.
665
666
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
667
        so it can be reused later during provisioning.
668
669
        """
670 1
        for evc_id, evc in evcs.items():
671 1
            if not evc:
672
                raise EVCNotFound(evc_id)
673 1
            if utils.has_int_enabled(evc) and not force:
674
                raise EVCHasINT(evc_id)
675
676 1
            uni_a, uni_z = utils.get_evc_unis(evc)
677 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
678 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
679
680 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
681 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
682
683 1
            if pp_a.status != EntityStatus.UP and not force:
684
                dest_id = pp_a.destination.id if pp_a.destination else None
685
                dest_status = pp_a.status if pp_a.destination else None
686
                raise ProxyPortStatusNotUP(
687
                    evc_id,
688
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
689
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
690
                    f"destination {dest_id} status {dest_status}",
691
                )
692 1
            if pp_z.status != EntityStatus.UP and not force:
693
                dest_id = pp_z.destination.id if pp_z.destination else None
694
                dest_status = pp_z.status if pp_z.destination else None
695
                raise ProxyPortStatusNotUP(
696
                    evc_id,
697
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
698
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
699
                    f"destination {dest_id} status {dest_status}",
700
                )
701
702 1
            self._validate_intra_evc_different_proxy_ports(evc)
703 1
        return evcs
704
705 1
    def _validate_has_int(self, evcs: dict[str, dict]):
706 1
        for evc_id, evc in evcs.items():
707 1
            if not utils.has_int_enabled(evc):
708 1
                raise EVCHasNoINT(evc_id)
709
710 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
711
        """Add proxy ports evc_ids.
712
713
        This is meant to be called after an EVC is enabled.
714
        """
715 1
        for evc_id, evc in evcs.items():
716 1
            uni_a, uni_z = utils.get_evc_unis(evc)
717 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
718 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
719 1
            pp_a.evc_ids.add(evc_id)
720 1
            pp_z.evc_ids.add(evc_id)
721 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
722 1
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
723
724 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
725
        """Discard proxy port evc_ids.
726
727
        This is meant to be called when an EVC is disabled.
728
        """
729 1
        for evc_id, evc in evcs.items():
730 1
            uni_a, uni_z = utils.get_evc_unis(evc)
731 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
732 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
733 1
            pp_a.evc_ids.discard(evc_id)
734 1
            pp_z.evc_ids.discard(evc_id)
735
736 1
    def evc_compare(
737
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
738
    ) -> dict[str, list]:
739
        """EVC compare.
740
741
        Cases:
742
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
743
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
744
745
        """
746 1
        int_flows = {
747
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
748
        }
749 1
        mef_flows = {
750
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
751
        }
752
753 1
        results = defaultdict(list)
754 1
        for evc in evcs.values():
755 1
            evc_id = evc["id"]
756
757 1
            if (
758
                not utils.has_int_enabled(evc)
759
                and evc_id in int_flows
760
                and int_flows[evc_id]
761
            ):
762 1
                results[evc_id].append("wrong_metadata_has_int_flows")
763
764 1
            if (
765
                utils.has_int_enabled(evc)
766
                and evc_id in mef_flows
767
                and mef_flows[evc_id]
768
                and (
769
                    evc_id not in int_flows
770
                    or (
771
                        evc_id in int_flows
772
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
773
                    )
774
                )
775
            ):
776 1
                results[evc_id].append("missing_some_int_flows")
777 1
        return results
778
779 1
    async def _remove_int_flows_by_cookies(
780
        self, stored_flows: dict[int, list[dict]]
781
    ) -> None:
782
        """Delete int flows given a prefiltered stored_flows by cookies.
783
        You should use this type of removal when you need to remove all
784
        flows associated with a cookie, if you need to include all keys in the match
785
        to remove only a subset use `_remove_int_flows(stored_flows)` method instead.
786
787
        Removal is driven by the stored flows instead of EVC ids and dpids to also
788
        be able to handle the force mode when an EVC no longer exists. It also follows
789
        the same pattern that mef_eline currently uses.
790
        """
791 1
        switch_flows_cookies = defaultdict(set)
792 1
        for flows in stored_flows.values():
793 1
            for flow in flows:
794 1
                switch_flows_cookies[flow["switch"]].add(flow["flow"]["cookie"])
795
796 1
        switch_flows = defaultdict(list)
797 1
        for dpid, cookies in switch_flows_cookies.items():
798 1
            for cookie in cookies:
799 1
                switch_flows[dpid].append(
800
                    {
801
                        "cookie": cookie,
802
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
803
                        "table_id": Table.OFPTT_ALL.value,
804
                    }
805
                )
806 1
        await self._send_flows(switch_flows, "delete")
807
808 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
809
        """Delete int flows given a prefiltered stored_flows. This method is meant
810
        to be used when you need to match all the flow match keys, so, typically when
811
        you're removing just a subset of INT flows.
812
813
        Removal is driven by the stored flows instead of EVC ids and dpids to also
814
        be able to handle the force mode when an EVC no longer exists. It also follows
815
        the same pattern that mef_eline currently uses."""
816
        switch_flows = defaultdict(list)
817
        for flows in stored_flows.values():
818
            for flow in flows:
819
                switch_flows[flow["switch"]].append(flow["flow"])
820
        await self._send_flows(switch_flows, "delete")
821
822 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
823
        """Install INT flow mods."""
824 1
        switch_flows = defaultdict(list)
825 1
        for flows in stored_flows.values():
826 1
            for flow in flows:
827 1
                switch_flows[flow["switch"]].append(flow["flow"])
828 1
        await self._send_flows(switch_flows, "install")
829
830 1
    async def _send_flows(
831
        self, switch_flows: dict[str, list[dict]], cmd: Literal["install", "delete"]
832
    ):
833
        """Send batched flows by dpid to flow_manager.
834
835
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
836
        for settings.BATCH_INTERVAL per batch iteration.
837
        """
838 1
        for dpid, flows in switch_flows.items():
839 1
            batch_size = settings.BATCH_SIZE
840 1
            if batch_size <= 0:
841
                batch_size = len(flows)
842
843 1
            for i in range(0, len(flows), batch_size):
844 1
                if i > 0:
845
                    await asyncio.sleep(settings.BATCH_INTERVAL)
846 1
                flows = flows[i : i + batch_size]
847 1
                event = KytosEvent(
848
                    f"kytos.flow_manager.flows.{cmd}",
849
                    content={
850
                        "dpid": dpid,
851
                        "force": True,
852
                        "flow_dict": {"flows": flows},
853
                    },
854
                )
855
                await self.controller.buffers.app.aput(event)
856