Test Failed
Pull Request — master (#104)
by Vinicius
05:33
created

build.managers.int.INTManager._validate_map_enable_evcs()   D

Complexity

Conditions 13

Size

Total Lines 45
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 22.3466

Importance

Changes 0
Metric Value
cc 13
eloc 28
nop 3
dl 0
loc 45
ccs 13
cts 21
cp 0.619
crap 22.3466
rs 4.2
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.managers.int.INTManager._validate_map_enable_evcs() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
import itertools
6 1
from collections import defaultdict
7
from datetime import datetime
8 1
from typing import Literal
9
10 1
from pyof.v0x04.controller2switch.table_mod import Table
11 1
12 1
from kytos.core.controller import Controller
13 1
from kytos.core.events import KytosEvent
14 1
from kytos.core.interface import Interface
15 1
from napps.kytos.telemetry_int import utils
16 1
from napps.kytos.telemetry_int import settings
17 1
from kytos.core import log
18 1
from kytos.core.link import Link
19 1
import napps.kytos.telemetry_int.kytos_api_helper as api
20 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
21
from kytos.core.common import EntityStatus
22 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
23
24
from napps.kytos.telemetry_int.exceptions import (
25
    EVCError,
26
    EVCNotFound,
27
    EVCHasINT,
28
    EVCHasNoINT,
29
    FlowsNotFound,
30
    ProxyPortError,
31
    ProxyPortStatusNotUP,
32
    ProxyPortDestNotFound,
33
    ProxyPortNotFound,
34
    ProxyPortSameSourceIntraEVC,
35
)
36 1
37
38
class INTManager:
39 1
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
40
41 1
    def __init__(self, controller: Controller) -> None:
42 1
        """INTManager."""
43 1
        self.controller = controller
44 1
        self.flow_builder = FlowBuilder()
45
        self._topo_link_lock = asyncio.Lock()
46
        self._intf_meta_lock = asyncio.Lock()
47 1
48
        # Keep track between each uni intf id and its src intf id port
49 1
        self.unis_src: dict[str, str] = {}
50
        # Keep track between src intf id and its ProxyPort instance
51 1
        self.srcs_pp: dict[str, ProxyPort] = {}
52
53 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
54 1
        """Load UNI ids src ids and their ProxyPort instances."""
55 1
        for evc_id, evc in evcs.items():
56
            if not utils.has_int_enabled(evc):
57 1
                continue
58 1
59 1
            uni_a_id = evc["uni_a"]["interface_id"]
60 1
            uni_z_id = evc["uni_z"]["interface_id"]
61 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
62 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
63
            if uni_a and "proxy_port" in uni_a.metadata:
64
                src_a = uni_a.switch.get_interface_by_port_no(
65 1
                    uni_a.metadata["proxy_port"]
66 1
                )
67 1
                self.unis_src[uni_a.id] = src_a.id
68
                try:
69
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
70 1
                except ProxyPortDestNotFound:
71
                    pp = self.srcs_pp[src_a.id]
72 1
                pp.evc_ids.add(evc_id)
73 1
74
            if uni_z and "proxy_port" in uni_z.metadata:
75
                src_z = uni_z.switch.get_interface_by_port_no(
76 1
                    uni_z.metadata["proxy_port"]
77 1
                )
78 1
                self.unis_src[uni_z.id] = src_z.id
79
                try:
80
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
81 1
                except ProxyPortDestNotFound:
82
                    pp = self.srcs_pp[src_z.id]
83 1
                pp.evc_ids.add(evc_id)
84
85 1
    async def handle_pp_link_down(self, link: Link) -> None:
86
        """Handle proxy_port link_down."""
87 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
88 1
            return
89
        pp = self.srcs_pp.get(link.endpoint_a.id)
90 1
        if not pp:
91
            pp = self.srcs_pp.get(link.endpoint_b.id)
92
        if not pp or not pp.evc_ids:
93 1
            return
94 1
95
        async with self._topo_link_lock:
96
            evcs = await api.get_evcs(
97
                **{
98
                    "metadata.telemetry.enabled": "true",
99
                    "metadata.telemetry.status": "UP",
100 1
                }
101
            )
102
            to_deactivate = {
103 1
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
104
            }
105
            if not to_deactivate:
106 1
                return
107
108
            log.info(
109
                f"Handling link_down {link}, removing INT flows falling back to "
110 1
                f"mef_eline, EVC ids: {list(to_deactivate)}"
111
            )
112
            metadata = {
113
                "telemetry": {
114
                    "enabled": True,
115
                    "status": "DOWN",
116
                    "status_reason": ["proxy_port_down"],
117
                    "status_updated_at": datetime.utcnow().strftime(
118
                        "%Y-%m-%dT%H:%M:%S"
119
                    ),
120 1
                }
121
            }
122 1
            await self.remove_int_flows(to_deactivate, metadata)
123
124 1
    async def handle_pp_link_up(self, link: Link) -> None:
125
        """Handle proxy_port link_up."""
126 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
127 1
            return
128
        pp = self.srcs_pp.get(link.endpoint_a.id)
129 1
        if not pp:
130
            pp = self.srcs_pp.get(link.endpoint_b.id)
131
        if not pp or not pp.evc_ids:
132 1
            return
133 1
134
        async with self._topo_link_lock:
135 1
            if link.status != EntityStatus.UP or link.status_reason:
136
                return
137
            evcs = await api.get_evcs(
138
                **{
139
                    "metadata.telemetry.enabled": "true",
140
                    "metadata.telemetry.status": "DOWN",
141
                }
142 1
            )
143 1
144 1
            to_install = {}
145
            for evc_id, evc in evcs.items():
146
                if any(
147
                    (
148
                        not evc["active"],
149
                        evc["archived"],
150
                        evc_id not in pp.evc_ids,
151
                        evc["uni_a"]["interface_id"] not in self.unis_src,
152
                        evc["uni_z"]["interface_id"] not in self.unis_src,
153
                    )
154
                ):
155 1
                    continue
156 1
157 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
158
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
159
                if (
160
                    src_a_id in self.srcs_pp
161
                    and src_z_id in self.srcs_pp
162
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
163 1
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
164
                ):
165 1
                    to_install[evc_id] = evc
166
167
            if not to_install:
168 1
                return
169 1
170
            try:
171
                to_install = self._validate_map_enable_evcs(to_install, force=True)
172
            except EVCError as exc:
173
                log.exception(exc)
174 1
                return
175
176
            log.info(
177
                f"Handling link_up {link}, deploying INT flows, "
178 1
                f"EVC ids: {list(to_install)}"
179
            )
180
            metadata = {
181
                "telemetry": {
182
                    "enabled": True,
183
                    "status": "UP",
184
                    "status_reason": [],
185
                    "status_updated_at": datetime.utcnow().strftime(
186
                        "%Y-%m-%dT%H:%M:%S"
187
                    ),
188 1
                }
189 1
            }
190
            try:
191
                await self.install_int_flows(to_install, metadata)
192
            except FlowsNotFound as exc:
193
                log.exception(f"FlowsNotFound {str(exc)}")
194 1
                return
195
196 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
197
        """Handle proxy port metadata removed."""
198 1
        if "proxy_port" in intf.metadata:
199 1
            return
200 1
        try:
201
            pp = self.srcs_pp[self.unis_src[intf.id]]
202
            if not pp.evc_ids:
203
                return
204
        except KeyError:
205 1
            return
206 1
207
        async with self._intf_meta_lock:
208
            evcs = await api.get_evcs(
209
                **{
210
                    "metadata.telemetry.enabled": "true",
211
                    "metadata.telemetry.status": "UP",
212 1
                }
213
            )
214
            to_deactivate = {
215 1
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
216
            }
217
            if not to_deactivate:
218 1
                return
219
220
            log.info(
221
                f"Handling interface metadata removed on {intf}, removing INT flows "
222 1
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
223
            )
224
            metadata = {
225
                "telemetry": {
226
                    "enabled": True,
227
                    "status": "DOWN",
228
                    "status_reason": ["proxy_port_metadata_removed"],
229
                    "status_updated_at": datetime.utcnow().strftime(
230
                        "%Y-%m-%dT%H:%M:%S"
231
                    ),
232 1
                }
233
            }
234 1
            await self.remove_int_flows(to_deactivate, metadata)
235
236
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
237
        """Handle proxy port metadata added.
238
239
        If an existing ProxyPort gets its proxy_port meadata updated
240
        and has associated EVCs then it'll remove and install the flows accordingly.
241 1
242
        """
243 1
        if "proxy_port" not in intf.metadata:
244 1
            return
245 1
        try:
246
            pp = self.srcs_pp[self.unis_src[intf.id]]
247
            if not pp.evc_ids:
248
                return
249
        except KeyError:
250 1
            return
251
252
        cur_source_intf = intf.switch.get_interface_by_port_no(
253 1
            intf.metadata.get("proxy_port")
254 1
        )
255
        if cur_source_intf == pp.source:
256 1
            return
257 1
258
        async with self._intf_meta_lock:
259 1
            pp.source = cur_source_intf
260
261
            evcs = await api.get_evcs(
262
                **{
263
                    "metadata.telemetry.enabled": "true",
264 1
                }
265
            )
266
            affected_evcs = {
267 1
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
268 1
            }
269
            if not affected_evcs:
270 1
                return
271
272
            log.info(
273
                f"Handling interface metadata updated on {intf}. It'll disable the "
274
                "EVCs to be safe, and then try to enable again with the updated "
275 1
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
276 1
            )
277 1
            await self.disable_int(affected_evcs, force=True)
278
            try:
279
                await self.enable_int(affected_evcs, force=True)
280
            except ProxyPortSameSourceIntraEVC as exc:
281
                msg = (
282
                    f"Validation error when updating interface {intf} proxy port {pp}"
283
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
284
                )
285 1
                log.error(msg)
286
287
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
288
        """Disable INT on EVCs.
289
290
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
291
292
        The force bool option, if True, will bypass the following:
293
294
        1 - EVC not found
295
        2 - EVC doesn't have INT
296
        3 - ProxyPortNotFound or ProxyPortDestNotFound
297 1
298 1
        """
299
        self._validate_disable_evcs(evcs, force)
300 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
301
302
        metadata = {
303
            "telemetry": {
304
                "enabled": False,
305
                "status": "DOWN",
306
                "status_reason": ["disabled"],
307
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
308 1
            }
309 1
        }
310 1
        await self.remove_int_flows(evcs, metadata, force=force)
311
        try:
312
            self._discard_pps_evc_ids(evcs)
313
        except ProxyPortError:
314
            if not force:
315 1
                raise
316
317
    async def remove_int_flows(
318
        self, evcs: dict[str, dict], metadata: dict, force=False
319 1
    ) -> None:
320
        """Remove INT flows and set metadata on EVCs."""
321
        stored_flows = await api.get_stored_flows(
322 1
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
323
        )
324
        await asyncio.gather(
325
            self._remove_int_flows_by_cookies(stored_flows),
326
            api.add_evcs_metadata(evcs, metadata, force),
327 1
        )
328
329
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
330
        """Enable INT on EVCs.
331
332
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
333
334
        The force bool option, if True, will bypass the following:
335
336
        1 - EVC already has INT
337
        2 - ProxyPort isn't UP
338
        Other cases won't be bypassed since at the point it won't have the data needed.
339 1
340 1
        """
341
        evcs = self._validate_map_enable_evcs(evcs, force)
342 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
343
344
        metadata = {
345
            "telemetry": {
346
                "enabled": True,
347
                "status": "UP",
348
                "status_reason": [],
349
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
350 1
            }
351 1
        }
352
        await self.install_int_flows(evcs, metadata)
353 1
        self._add_pps_evc_ids(evcs)
354
355
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
356
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
357
358 1
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
359 1
        """
360 1
        self._validate_has_int(evcs)
361
        evcs = self._validate_map_enable_evcs(evcs, force=True)
362 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
363
364
        stored_flows = await api.get_stored_flows(
365 1
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
366 1
        )
367
        await self._remove_int_flows_by_cookies(stored_flows)
368
        metadata = {
369
            "telemetry": {
370
                "enabled": True,
371
                "status": "UP",
372
                "status_reason": [],
373
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
374 1
            }
375
        }
376 1
        await self.install_int_flows(evcs, metadata, force=True)
377
378
    async def install_int_flows(
379
        self, evcs: dict[str, dict], metadata: dict, force=False
380 1
    ) -> None:
381
        """Install INT flows and set metadata on EVCs."""
382
        stored_flows = self.flow_builder.build_int_flows(
383
            evcs,
384
            await utils.get_found_stored_flows(
385
                [
386
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
387
                    for evc_id in evcs
388
                ]
389
            ),
390 1
        )
391 1
        self._validate_evcs_stored_flows(evcs, stored_flows)
392 1
393
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
394
        for evc_id, evc in evcs.items():
395 1
            if not evc["active"]:
396
                inactive_evcs[evc_id] = evc
397
                continue
398
            if any(
399 1
                (
400 1
                    evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
401
                    evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
402
                )
403 1
            ):
404 1
                pp_down_evcs[evc_id] = evc
405 1
                continue
406 1
            active_evcs[evc_id] = evc
407 1
408
        inactive_metadata = copy.deepcopy(metadata)
409 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
410
        pp_down_metadata = copy.deepcopy(inactive_metadata)
411
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
412
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
413
414
        await asyncio.gather(
415
            self._install_int_flows(stored_flows),
416 1
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
417
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
418
            api.add_evcs_metadata(active_evcs, metadata, force),
419 1
        )
420 1
421 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
422
        """Return a ProxyPort assigned to a UNI or raise."""
423 1
424 1
        interface = self.controller.get_interface_by_id(intf_id)
425
        if not interface:
426
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
427
428 1
        if "proxy_port" not in interface.metadata:
429
            raise ProxyPortNotFound(
430
                evc_id, f"proxy_port metadata not found in {intf_id}"
431 1
            )
432
433
        source_intf = interface.switch.get_interface_by_port_no(
434
            interface.metadata.get("proxy_port")
435
        )
436
        if not source_intf:
437 1
            raise ProxyPortNotFound(
438 1
                evc_id,
439 1
                f"proxy_port of {intf_id} source interface not found",
440 1
            )
441
442 1
        pp = self.srcs_pp.get(source_intf.id)
443 1
        if not pp:
444
            pp = ProxyPort(self.controller, source_intf)
445
            self.srcs_pp[source_intf.id] = pp
446
447
        if not pp.destination:
448
            raise ProxyPortDestNotFound(
449 1
                evc_id,
450
                f"proxy_port of {intf_id} isn't looped or destination interface "
451 1
                "not found",
452
            )
453
454
        return pp
455
456
    def _validate_disable_evcs(
457 1
        self,
458
        evcs: dict[str, dict],
459
        force=False,
460
    ) -> None:
461
        """Validate disable EVCs."""
462
        for evc_id, evc in evcs.items():
463 1
            if not evc and not force:
464
                raise EVCNotFound(evc_id)
465
            if not utils.has_int_enabled(evc) and not force:
466
                raise EVCHasNoINT(evc_id)
467
468
    def _validate_evcs_stored_flows(
469
        self, evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
470
    ) -> None:
471 1
        """Validate that each active EVC has corresponding flows."""
472 1
        for evc_id, evc in evcs.items():
473 1
            if evc["active"] and not stored_flows.get(
474
                utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
475
            ):
476
                raise FlowsNotFound(evc_id)
477
478
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
479
        """Validate that an intra EVC is using different proxy ports.
480 1
481 1
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
482 1
        would ended up being overwritten since they'd be the same. Currently, an
483
        external loop will have unidirectional flows matching in the lower (source)
484 1
        port number.
485
        """
486
        pp_a = evc["uni_a"].get("proxy_port")
487
        pp_z = evc["uni_z"].get("proxy_port")
488 1
        if any(
489
            (
490
                not utils.is_intra_switch_evc(evc),
491
                pp_a is None,
492
                pp_z is None,
493
            )
494
        ):
495
            return
496
        if pp_a.source != pp_z.source:
497
            return
498
499 1
        raise ProxyPortSameSourceIntraEVC(
500 1
            evc["id"], "intra EVC UNIs must use different proxy ports"
501
        )
502 1
503
    async def handle_failover_flows(
504
        self, evcs_content: dict[str, dict], event_name: str
505 1
    ) -> None:
506 1
        """Handle failover flows. This method will generate the subset
507 1
        of INT flows. EVCs with 'flows' key will be installed, and
508
        'old_flows' will be removed.
509 1
510 1
        If a given proxy port has an unexpected state INT will be
511
        removed falling back to mef_eline flows.
512 1
        """
513
        to_install, to_remove, to_remove_with_err = {}, {}, {}
514
        new_flows: dict[int, list[dict]] = defaultdict(list)
515
        old_flows: dict[int, list[dict]] = defaultdict(list)
516
517
        old_flows_key = "removed_flows"
518
        new_flows_key = "flows"
519
520
        for evc_id, evc in evcs_content.items():
521 1
            if not utils.has_int_enabled(evc):
522
                continue
523
            try:
524
                uni_a, uni_z = utils.get_evc_unis(evc)
525
                pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
526
                pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
527
                uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
528
                evc["id"] = evc_id
529
                evc["uni_a"], evc["uni_z"] = uni_a, uni_z
530
            except ProxyPortError as e:
531 1
                log.error(
532 1
                    f"Unexpected proxy port state: {str(e)}."
533
                    f"INT will be removed on evc id {evc_id}"
534 1
                )
535 1
                to_remove_with_err[evc_id] = evc
536 1
                continue
537 1
538
            for dpid, flows in evc.get(new_flows_key, {}).items():
539 1
                for flow in flows:
540
                    new_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
541
542
            for dpid, flows in evc.get(old_flows_key, {}).items():
543
                for flow in flows:
544 1
                    # set priority and table_group just so INT flows can be built
545 1
                    # the priority doesn't matter for deletion
546 1
                    flow["priority"] = 21000
547 1
                    flow["table_group"] = (
548 1
                        "evpl" if "dl_vlan" in flow.get("match", {}) else "epl"
549 1
                    )
550 1
                    old_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
551 1
552
            if evc.get(new_flows_key):
553 1
                to_install[evc_id] = evc
554
                evc.pop(new_flows_key)
555
            if evc.get(old_flows_key):
556
                to_remove[evc_id] = evc
557
                evc.pop(old_flows_key, None)
558 1
559 1
        if to_remove:
560 1
            log.info(
561 1
                f"Handling {event_name} flows remove on EVC ids: {to_remove.keys()}"
562 1
            )
563 1
            await self._remove_int_flows(
564
                self._build_failover_old_flows(to_remove, old_flows)
565 1
            )
566
        if to_remove_with_err:
567
            log.error(
568
                f"Handling {event_name} proxy_port_error falling back "
569
                f"to mef_eline, EVC ids: {list(to_remove_with_err.keys())}"
570
            )
571
            metadata = {
572
                "telemetry": {
573
                    "enabled": True,
574
                    "status": "DOWN",
575 1
                    "status_reason": ["proxy_port_error"],
576
                    "status_updated_at": datetime.utcnow().strftime(
577
                        "%Y-%m-%dT%H:%M:%S"
578 1
                    ),
579
                }
580
            }
581
            await self.remove_int_flows(to_remove_with_err, metadata, force=True)
582 1
        if to_install:
583 1
            log.info(
584 1
                f"Handling {event_name} flows install on EVC ids: {to_install.keys()}"
585
            )
586 1
            await self._install_int_flows(
587
                self.flow_builder.build_int_flows(to_install, new_flows)
588
            )
589
590
    def _build_failover_old_flows(
591 1
        self, evcs: dict[str, list[dict]], old_flows: dict[int, list[dict]]
592
    ) -> dict[int, list[dict]]:
593 1
        """Build (old path) failover related to remove flows.
594
595
        If sink nnis svlan are different it'll regenerate the rest of sink loop flows,
596
        otherwise, it'll just remove the same received flows except with int cookie
597
        value the deletion uses flow mod OFPFC_DELETE, so no need to include the
598
        additional INT keys in the match like nw_proto for deletion.
599
        """
600
601
        removed_flows = defaultdict(list)
602
        for evc_id, evc in evcs.items():
603
            dpid_a, dpid_z = evc["uni_a"]["switch"], evc["uni_z"]["switch"]
604
605 1
            cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
606 1
            int_cookie = settings.INT_COOKIE_PREFIX << 56 | (cookie & 0xFFFFFFFFFFFFFF)
607
            cur_sink_a_svlan, cur_sink_z_svlan = None, None
608 1
            sink_a_flows: list[dict] = []
609
            sink_z_flows: list[dict] = []
610
611
            for link in evc["current_path"]:
612
                if cur_sink_a_svlan is None and (
613
                    svlan := utils.get_svlan_dpid_link(link, dpid_a)
614
                ):
615
                    cur_sink_a_svlan = svlan
616
                if cur_sink_z_svlan is None and (
617
                    svlan := utils.get_svlan_dpid_link(link, dpid_z)
618
                ):
619 1
                    cur_sink_z_svlan = svlan
620 1
                if cur_sink_a_svlan is not None and cur_sink_z_svlan is not None:
621 1
                    break
622 1
623
            log.info(
624 1
                f"bar sink_a_svlan {cur_sink_a_svlan}, sink_z_svlan {cur_sink_z_svlan}"
625 1
            )
626 1
627 1
            for flow in old_flows[cookie]:
628
                if not sink_a_flows and flow["switch"] == dpid_a:
629
                    if (
630 1
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_a_svlan
631 1
                        and cur_sink_a_svlan
632
                    ):
633 1
                        sink_a_flows = self.flow_builder._build_int_sink_flows(
634
                            "uni_a", evc, old_flows
635
                        )
636
                    else:
637
                        flow["flow"]["cookie"] = int_cookie
638
                        sink_a_flows = [flow]
639
                elif not sink_z_flows and flow["switch"] == dpid_z:
640
                    if (
641 1
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_z_svlan
642
                        and cur_sink_z_svlan
643
                    ):
644
                        sink_z_flows = self.flow_builder._build_int_sink_flows(
645
                            "uni_z", evc, old_flows
646
                        )
647
                    else:
648
                        flow["flow"]["cookie"] = int_cookie
649 1
                        sink_z_flows = [flow]
650
                if sink_a_flows and sink_z_flows:
651 1
                    break
652
653
            hop_flows = self.flow_builder._build_int_hop_flows(evc, old_flows)
654
            removed_flows[cookie] = list(
655
                itertools.chain(sink_a_flows, hop_flows, sink_z_flows)
656
            )
657
        return removed_flows
658 1
659 1
    def _validate_map_enable_evcs(
660 1
        self,
661 1
        evcs: dict[str, dict],
662
        force=False,
663 1
    ) -> dict[str, dict]:
664 1
        """Validate map enabling EVCs.
665 1
666 1
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
667
        so it can be reused later during provisioning.
668
669 1
        """
670 1
        for evc_id, evc in evcs.items():
671
            if not evc:
672 1
                raise EVCNotFound(evc_id)
673 1
            if utils.has_int_enabled(evc) and not force:
674
                raise EVCHasINT(evc_id)
675
676
            uni_a, uni_z = utils.get_evc_unis(evc)
677
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
678
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
679
680
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
681 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
682
683
            if pp_a.status != EntityStatus.UP and not force:
684
                dest_id = pp_a.destination.id if pp_a.destination else None
685
                dest_status = pp_a.status if pp_a.destination else None
686
                raise ProxyPortStatusNotUP(
687
                    evc_id,
688
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
689
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
690
                    f"destination {dest_id} status {dest_status}",
691
                )
692
            if pp_z.status != EntityStatus.UP and not force:
693
                dest_id = pp_z.destination.id if pp_z.destination else None
694
                dest_status = pp_z.status if pp_z.destination else None
695
                raise ProxyPortStatusNotUP(
696
                    evc_id,
697
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
698
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
699
                    f"destination {dest_id} status {dest_status}",
700
                )
701
702
            self._validate_intra_evc_different_proxy_ports(evc)
703
        return evcs
704
705
    def _validate_has_int(self, evcs: dict[str, dict]):
706
        for evc_id, evc in evcs.items():
707
            if not utils.has_int_enabled(evc):
708
                raise EVCHasNoINT(evc_id)
709
710
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
711
        """Add proxy ports evc_ids.
712
713
        This is meant to be called after an EVC is enabled.
714
        """
715
        for evc_id, evc in evcs.items():
716
            uni_a, uni_z = utils.get_evc_unis(evc)
717
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
718
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
719
            pp_a.evc_ids.add(evc_id)
720
            pp_z.evc_ids.add(evc_id)
721
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
722
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
723
724
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
725
        """Discard proxy port evc_ids.
726
727
        This is meant to be called when an EVC is disabled.
728
        """
729
        for evc_id, evc in evcs.items():
730
            uni_a, uni_z = utils.get_evc_unis(evc)
731
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
732
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
733
            pp_a.evc_ids.discard(evc_id)
734
            pp_z.evc_ids.discard(evc_id)
735
736
    def evc_compare(
737
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
738
    ) -> dict[str, list]:
739
        """EVC compare.
740
741
        Cases:
742
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
743
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
744
745
        """
746
        int_flows = {
747
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
748
        }
749
        mef_flows = {
750
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
751
        }
752
753
        results = defaultdict(list)
754
        for evc in evcs.values():
755
            evc_id = evc["id"]
756
757
            if (
758
                not utils.has_int_enabled(evc)
759
                and evc_id in int_flows
760
                and int_flows[evc_id]
761
            ):
762
                results[evc_id].append("wrong_metadata_has_int_flows")
763
764
            if (
765
                utils.has_int_enabled(evc)
766
                and evc_id in mef_flows
767
                and mef_flows[evc_id]
768
                and (
769
                    evc_id not in int_flows
770
                    or (
771
                        evc_id in int_flows
772
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
773
                    )
774
                )
775
            ):
776
                results[evc_id].append("missing_some_int_flows")
777
        return results
778
779
    async def _remove_int_flows_by_cookies(
780
        self, stored_flows: dict[int, list[dict]]
781
    ) -> None:
782
        """Delete int flows given a prefiltered stored_flows by cookies.
783
        You should use this type of removal when you need to remove all
784
        flows associated with a cookie, if you need to include all keys in the match
785
        to remove only a subset use `_remove_int_flows(stored_flows)` method instead.
786
787
        Removal is driven by the stored flows instead of EVC ids and dpids to also
788
        be able to handle the force mode when an EVC no longer exists. It also follows
789
        the same pattern that mef_eline currently uses.
790
        """
791
        switch_flows_cookies = defaultdict(set)
792
        for flows in stored_flows.values():
793
            for flow in flows:
794
                switch_flows_cookies[flow["switch"]].add(flow["flow"]["cookie"])
795
796
        switch_flows = defaultdict(list)
797
        for dpid, cookies in switch_flows_cookies.items():
798
            for cookie in cookies:
799
                switch_flows[dpid].append(
800
                    {
801
                        "cookie": cookie,
802
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
803
                        "table_id": Table.OFPTT_ALL.value,
804
                    }
805
                )
806
        await self._send_flows(switch_flows, "delete")
807
808
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
809
        """Delete int flows given a prefiltered stored_flows. This method is meant
810
        to be used when you need to match all the flow match keys, so, typically when
811
        you're removing just a subset of INT flows.
812
813
        Removal is driven by the stored flows instead of EVC ids and dpids to also
814
        be able to handle the force mode when an EVC no longer exists. It also follows
815
        the same pattern that mef_eline currently uses."""
816
        switch_flows = defaultdict(list)
817
        for flows in stored_flows.values():
818
            for flow in flows:
819
                switch_flows[flow["switch"]].append(flow["flow"])
820
        await self._send_flows(switch_flows, "delete")
821
822
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
823
        """Install INT flow mods."""
824
        switch_flows = defaultdict(list)
825
        for flows in stored_flows.values():
826
            for flow in flows:
827
                switch_flows[flow["switch"]].append(flow["flow"])
828
        await self._send_flows(switch_flows, "install")
829
830
    async def _send_flows(
831
        self, switch_flows: dict[str, list[dict]], cmd: Literal["install", "delete"]
832
    ):
833
        """Send batched flows by dpid to flow_manager.
834
835
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
836
        for settings.BATCH_INTERVAL per batch iteration.
837
        """
838
        for dpid, flows in switch_flows.items():
839
            batch_size = settings.BATCH_SIZE
840
            if batch_size <= 0:
841
                batch_size = len(flows)
842
843
            for i in range(0, len(flows), batch_size):
844
                if i > 0:
845
                    await asyncio.sleep(settings.BATCH_INTERVAL)
846
                flows = flows[i : i + batch_size]
847
                event = KytosEvent(
848
                    f"kytos.flow_manager.flows.{cmd}",
849
                    content={
850
                        "dpid": dpid,
851
                        "force": True,
852
                        "flow_dict": {"flows": flows},
853
                    },
854
                )
855
                await self.controller.buffers.app.aput(event)
856