Test Failed
Pull Request — master (#104)
by Vinicius
01:49
created

INTManager._validate_evcs_stored_flows()   A

Complexity

Conditions 4

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 6
nop 3
dl 0
loc 9
ccs 3
cts 3
cp 1
crap 4
rs 10
c 0
b 0
f 0
1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
from collections import defaultdict
6 1
from datetime import datetime
7
from typing import Literal
8 1
9
from pyof.v0x04.controller2switch.table_mod import Table
10 1
11 1
from kytos.core.controller import Controller
12 1
from kytos.core.events import KytosEvent
13 1
from kytos.core.interface import Interface
14 1
from napps.kytos.telemetry_int import utils
15 1
from napps.kytos.telemetry_int import settings
16 1
from kytos.core import log
17 1
from kytos.core.link import Link
18 1
import napps.kytos.telemetry_int.kytos_api_helper as api
19 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
20 1
from kytos.core.common import EntityStatus
21
from napps.kytos.telemetry_int.proxy_port import ProxyPort
22 1
23
from napps.kytos.telemetry_int.exceptions import (
24
    EVCError,
25
    EVCNotFound,
26
    EVCHasINT,
27
    EVCHasNoINT,
28
    FlowsNotFound,
29
    ProxyPortError,
30
    ProxyPortStatusNotUP,
31
    ProxyPortDestNotFound,
32
    ProxyPortNotFound,
33
    ProxyPortSameSourceIntraEVC,
34
)
35
36 1
37
class INTManager:
38
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
39 1
40
    def __init__(self, controller: Controller) -> None:
41 1
        """INTManager."""
42 1
        self.controller = controller
43 1
        self.flow_builder = FlowBuilder()
44 1
        self._topo_link_lock = asyncio.Lock()
45
        self._intf_meta_lock = asyncio.Lock()
46
47 1
        # Keep track between each uni intf id and its src intf id port
48
        self.unis_src: dict[str, str] = {}
49 1
        # Keep track between src intf id and its ProxyPort instance
50
        self.srcs_pp: dict[str, ProxyPort] = {}
51 1
52
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
53 1
        """Load UNI ids src ids and their ProxyPort instances."""
54 1
        for evc_id, evc in evcs.items():
55 1
            if not utils.has_int_enabled(evc):
56
                continue
57 1
58 1
            uni_a_id = evc["uni_a"]["interface_id"]
59 1
            uni_z_id = evc["uni_z"]["interface_id"]
60 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
61 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
62 1
            if uni_a and "proxy_port" in uni_a.metadata:
63
                src_a = uni_a.switch.get_interface_by_port_no(
64
                    uni_a.metadata["proxy_port"]
65 1
                )
66 1
                self.unis_src[uni_a.id] = src_a.id
67 1
                try:
68
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
69
                except ProxyPortDestNotFound:
70 1
                    pp = self.srcs_pp[src_a.id]
71
                pp.evc_ids.add(evc_id)
72 1
73 1
            if uni_z and "proxy_port" in uni_z.metadata:
74
                src_z = uni_z.switch.get_interface_by_port_no(
75
                    uni_z.metadata["proxy_port"]
76 1
                )
77 1
                self.unis_src[uni_z.id] = src_z.id
78 1
                try:
79
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
80
                except ProxyPortDestNotFound:
81 1
                    pp = self.srcs_pp[src_z.id]
82
                pp.evc_ids.add(evc_id)
83 1
84
    async def handle_pp_link_down(self, link: Link) -> None:
85 1
        """Handle proxy_port link_down."""
86
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
87 1
            return
88 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
89
        if not pp:
90 1
            pp = self.srcs_pp.get(link.endpoint_b.id)
91
        if not pp or not pp.evc_ids:
92
            return
93 1
94 1
        async with self._topo_link_lock:
95
            evcs = await api.get_evcs(
96
                **{
97
                    "metadata.telemetry.enabled": "true",
98
                    "metadata.telemetry.status": "UP",
99
                }
100 1
            )
101
            to_deactivate = {
102
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
103 1
            }
104
            if not to_deactivate:
105
                return
106 1
107
            log.info(
108
                f"Handling link_down {link}, removing INT flows falling back to "
109
                f"mef_eline, EVC ids: {list(to_deactivate)}"
110 1
            )
111
            metadata = {
112
                "telemetry": {
113
                    "enabled": True,
114
                    "status": "DOWN",
115
                    "status_reason": ["proxy_port_down"],
116
                    "status_updated_at": datetime.utcnow().strftime(
117
                        "%Y-%m-%dT%H:%M:%S"
118
                    ),
119
                }
120 1
            }
121
            await self.remove_int_flows(to_deactivate, metadata)
122 1
123
    async def handle_pp_link_up(self, link: Link) -> None:
124 1
        """Handle proxy_port link_up."""
125
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
126 1
            return
127 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
128
        if not pp:
129 1
            pp = self.srcs_pp.get(link.endpoint_b.id)
130
        if not pp or not pp.evc_ids:
131
            return
132 1
133 1
        async with self._topo_link_lock:
134
            if link.status != EntityStatus.UP or link.status_reason:
135 1
                return
136
            evcs = await api.get_evcs(
137
                **{
138
                    "metadata.telemetry.enabled": "true",
139
                    "metadata.telemetry.status": "DOWN",
140
                }
141
            )
142 1
143 1
            to_install = {}
144 1
            for evc_id, evc in evcs.items():
145
                if any(
146
                    (
147
                        not evc["active"],
148
                        evc["archived"],
149
                        evc_id not in pp.evc_ids,
150
                        evc["uni_a"]["interface_id"] not in self.unis_src,
151
                        evc["uni_z"]["interface_id"] not in self.unis_src,
152
                    )
153
                ):
154
                    continue
155 1
156 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
157 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
158
                if (
159
                    src_a_id in self.srcs_pp
160
                    and src_z_id in self.srcs_pp
161
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
162
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
163 1
                ):
164
                    to_install[evc_id] = evc
165 1
166
            if not to_install:
167
                return
168 1
169 1
            try:
170
                to_install = self._validate_map_enable_evcs(to_install, force=True)
171
            except EVCError as exc:
172
                log.exception(exc)
173
                return
174 1
175
            log.info(
176
                f"Handling link_up {link}, deploying INT flows, "
177
                f"EVC ids: {list(to_install)}"
178 1
            )
179
            metadata = {
180
                "telemetry": {
181
                    "enabled": True,
182
                    "status": "UP",
183
                    "status_reason": [],
184
                    "status_updated_at": datetime.utcnow().strftime(
185
                        "%Y-%m-%dT%H:%M:%S"
186
                    ),
187
                }
188 1
            }
189 1
            try:
190
                await self.install_int_flows(to_install, metadata)
191
            except FlowsNotFound as exc:
192
                log.exception(f"FlowsNotFound {str(exc)}")
193
                return
194 1
195
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
196 1
        """Handle proxy port metadata removed."""
197
        if "proxy_port" in intf.metadata:
198 1
            return
199 1
        try:
200 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
201
            if not pp.evc_ids:
202
                return
203
        except KeyError:
204
            return
205 1
206 1
        async with self._intf_meta_lock:
207
            evcs = await api.get_evcs(
208
                **{
209
                    "metadata.telemetry.enabled": "true",
210
                    "metadata.telemetry.status": "UP",
211
                }
212 1
            )
213
            to_deactivate = {
214
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
215 1
            }
216
            if not to_deactivate:
217
                return
218 1
219
            log.info(
220
                f"Handling interface metadata removed on {intf}, removing INT flows "
221
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
222 1
            )
223
            metadata = {
224
                "telemetry": {
225
                    "enabled": True,
226
                    "status": "DOWN",
227
                    "status_reason": ["proxy_port_metadata_removed"],
228
                    "status_updated_at": datetime.utcnow().strftime(
229
                        "%Y-%m-%dT%H:%M:%S"
230
                    ),
231
                }
232 1
            }
233
            await self.remove_int_flows(to_deactivate, metadata)
234 1
235
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
236
        """Handle proxy port metadata added.
237
238
        If an existing ProxyPort gets its proxy_port meadata updated
239
        and has associated EVCs then it'll remove and install the flows accordingly.
240
241 1
        """
242
        if "proxy_port" not in intf.metadata:
243 1
            return
244 1
        try:
245 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
246
            if not pp.evc_ids:
247
                return
248
        except KeyError:
249
            return
250 1
251
        cur_source_intf = intf.switch.get_interface_by_port_no(
252
            intf.metadata.get("proxy_port")
253 1
        )
254 1
        if cur_source_intf == pp.source:
255
            return
256 1
257 1
        async with self._intf_meta_lock:
258
            pp.source = cur_source_intf
259 1
260
            evcs = await api.get_evcs(
261
                **{
262
                    "metadata.telemetry.enabled": "true",
263
                }
264 1
            )
265
            affected_evcs = {
266
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
267 1
            }
268 1
            if not affected_evcs:
269
                return
270 1
271
            log.info(
272
                f"Handling interface metadata updated on {intf}. It'll disable the "
273
                "EVCs to be safe, and then try to enable again with the updated "
274
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
275 1
            )
276 1
            await self.disable_int(affected_evcs, force=True)
277 1
            try:
278
                await self.enable_int(affected_evcs, force=True)
279
            except ProxyPortSameSourceIntraEVC as exc:
280
                msg = (
281
                    f"Validation error when updating interface {intf} proxy port {pp}"
282
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
283
                )
284
                log.error(msg)
285 1
286
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
287
        """Disable INT on EVCs.
288
289
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
290
291
        The force bool option, if True, will bypass the following:
292
293
        1 - EVC not found
294
        2 - EVC doesn't have INT
295
        3 - ProxyPortNotFound or ProxyPortDestNotFound
296
297 1
        """
298 1
        self._validate_disable_evcs(evcs, force)
299
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
300 1
301
        metadata = {
302
            "telemetry": {
303
                "enabled": False,
304
                "status": "DOWN",
305
                "status_reason": ["disabled"],
306
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
307
            }
308 1
        }
309 1
        await self.remove_int_flows(evcs, metadata, force=force)
310 1
        try:
311
            self._discard_pps_evc_ids(evcs)
312
        except ProxyPortError:
313
            if not force:
314
                raise
315 1
316
    async def remove_int_flows(
317
        self, evcs: dict[str, dict], metadata: dict, force=False
318
    ) -> None:
319 1
        """Remove INT flows and set metadata on EVCs."""
320
        stored_flows = await api.get_stored_flows(
321
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
322 1
        )
323
        await asyncio.gather(
324
            self._remove_int_flows_by_cookies(stored_flows),
325
            api.add_evcs_metadata(evcs, metadata, force),
326
        )
327 1
328
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
329
        """Enable INT on EVCs.
330
331
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
332
333
        The force bool option, if True, will bypass the following:
334
335
        1 - EVC already has INT
336
        2 - ProxyPort isn't UP
337
        Other cases won't be bypassed since at the point it won't have the data needed.
338
339 1
        """
340 1
        evcs = self._validate_map_enable_evcs(evcs, force)
341
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
342 1
343
        metadata = {
344
            "telemetry": {
345
                "enabled": True,
346
                "status": "UP",
347
                "status_reason": [],
348
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
349
            }
350 1
        }
351 1
        await self.install_int_flows(evcs, metadata)
352
        self._add_pps_evc_ids(evcs)
353 1
354
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
355
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
356
357
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
358 1
        """
359 1
        self._validate_has_int(evcs)
360 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
361
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
362 1
363
        stored_flows = await api.get_stored_flows(
364
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
365 1
        )
366 1
        await self._remove_int_flows_by_cookies(stored_flows)
367
        metadata = {
368
            "telemetry": {
369
                "enabled": True,
370
                "status": "UP",
371
                "status_reason": [],
372
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
373
            }
374 1
        }
375
        await self.install_int_flows(evcs, metadata, force=True)
376 1
377
    async def install_int_flows(
378
        self, evcs: dict[str, dict], metadata: dict, force=False
379
    ) -> None:
380 1
        """Install INT flows and set metadata on EVCs."""
381
        stored_flows = self.flow_builder.build_int_flows(
382
            evcs,
383
            await utils.get_found_stored_flows(
384
                [
385
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
386
                    for evc_id in evcs
387
                ]
388
            ),
389
        )
390 1
        self._validate_evcs_stored_flows(evcs, stored_flows)
391 1
392 1
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
393
        for evc_id, evc in evcs.items():
394
            if not evc["active"]:
395 1
                inactive_evcs[evc_id] = evc
396
                continue
397
            if any(
398
                (
399 1
                    evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
400 1
                    evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
401
                )
402
            ):
403 1
                pp_down_evcs[evc_id] = evc
404 1
                continue
405 1
            active_evcs[evc_id] = evc
406 1
407 1
        inactive_metadata = copy.deepcopy(metadata)
408
        inactive_metadata["telemetry"]["status"] = "DOWN"
409 1
        pp_down_metadata = copy.deepcopy(inactive_metadata)
410
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
411
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
412
413
        await asyncio.gather(
414
            self._install_int_flows(stored_flows),
415
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
416 1
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
417
            api.add_evcs_metadata(active_evcs, metadata, force),
418
        )
419 1
420 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
421 1
        """Return a ProxyPort assigned to a UNI or raise."""
422
423 1
        interface = self.controller.get_interface_by_id(intf_id)
424 1
        if not interface:
425
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
426
427
        if "proxy_port" not in interface.metadata:
428 1
            raise ProxyPortNotFound(
429
                evc_id, f"proxy_port metadata not found in {intf_id}"
430
            )
431 1
432
        source_intf = interface.switch.get_interface_by_port_no(
433
            interface.metadata.get("proxy_port")
434
        )
435
        if not source_intf:
436
            raise ProxyPortNotFound(
437 1
                evc_id,
438 1
                f"proxy_port of {intf_id} source interface not found",
439 1
            )
440 1
441
        pp = self.srcs_pp.get(source_intf.id)
442 1
        if not pp:
443 1
            pp = ProxyPort(self.controller, source_intf)
444
            self.srcs_pp[source_intf.id] = pp
445
446
        if not pp.destination:
447
            raise ProxyPortDestNotFound(
448
                evc_id,
449 1
                f"proxy_port of {intf_id} isn't looped or destination interface "
450
                "not found",
451 1
            )
452
453
        return pp
454
455
    def _validate_disable_evcs(
456
        self,
457 1
        evcs: dict[str, dict],
458
        force=False,
459
    ) -> None:
460
        """Validate disable EVCs."""
461
        for evc_id, evc in evcs.items():
462
            if not evc and not force:
463 1
                raise EVCNotFound(evc_id)
464
            if not utils.has_int_enabled(evc) and not force:
465
                raise EVCHasNoINT(evc_id)
466
467
    def _validate_evcs_stored_flows(
468
        self, evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
469
    ) -> None:
470
        """Validate that each active EVC has corresponding flows."""
471 1
        for evc_id, evc in evcs.items():
472 1
            if evc["active"] and not stored_flows.get(
473 1
                utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
474
            ):
475
                raise FlowsNotFound(evc_id)
476
477
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
478
        """Validate that an intra EVC is using different proxy ports.
479
480 1
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
481 1
        would ended up being overwritten since they'd be the same. Currently, an
482 1
        external loop will have unidirectional flows matching in the lower (source)
483
        port number.
484 1
        """
485
        pp_a = evc["uni_a"].get("proxy_port")
486
        pp_z = evc["uni_z"].get("proxy_port")
487
        if any(
488 1
            (
489
                not utils.is_intra_switch_evc(evc),
490
                pp_a is None,
491
                pp_z is None,
492
            )
493
        ):
494
            return
495
        if pp_a.source != pp_z.source:
496
            return
497
498
        raise ProxyPortSameSourceIntraEVC(
499 1
            evc["id"], "intra EVC UNIs must use different proxy ports"
500 1
        )
501
502 1
    async def handle_failover_flows(
503
        self, evcs_content: dict[str, dict], event_name: str
504
    ) -> None:
505 1
        """Handle failover flows. This method will generate the subset
506 1
        of INT flows. EVCs with 'flows' key will be installed, and
507 1
        'old_flows' will be removed.
508
509 1
        If a given proxy port has an unexpected state INT will be
510 1
        removed falling back to mef_eline flows.
511
        """
512 1
        to_install, to_remove, to_remove_with_err = {}, {}, {}
513
        new_flows: dict[int, list[dict]] = defaultdict(list)
514
        old_flows: dict[int, list[dict]] = defaultdict(list)
515
516
        old_flows_key = "removed_flows"
517
        new_flows_key = "flows"
518
519
        for evc_id, evc in evcs_content.items():
520
            if not utils.has_int_enabled(evc):
521 1
                continue
522
            try:
523
                uni_a, uni_z = utils.get_evc_unis(evc)
524
                pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
525
                pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
526
                uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
527
                evc["id"] = evc_id
528
                evc["uni_a"], evc["uni_z"] = uni_a, uni_z
529
            except ProxyPortError as e:
530
                log.error(
531 1
                    f"Unexpected proxy port state: {str(e)}."
532 1
                    f"INT will be removed on evc id {evc_id}"
533
                )
534 1
                to_remove_with_err[evc_id] = evc
535 1
                continue
536 1
537 1
            for dpid, flows in evc.get(new_flows_key, {}).items():
538
                for flow in flows:
539 1
                    new_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
540
541
            for dpid, flows in evc.get(old_flows_key, {}).items():
542
                for flow in flows:
543
                    # set priority and table_group just so INT flows can be built
544 1
                    # the priority doesn't matter for deletion
545 1
                    flow["priority"] = 21000
546 1
                    flow["table_group"] = (
547 1
                        "evpl" if "dl_vlan" in flow.get("match", {}) else "epl"
548 1
                    )
549 1
                    old_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
550 1
551 1
            if evc.get(new_flows_key):
552
                to_install[evc_id] = evc
553 1
                evc.pop(new_flows_key)
554
            if evc.get(old_flows_key):
555
                to_remove[evc_id] = evc
556
                evc.pop(old_flows_key, None)
557
558 1
        if to_remove:
559 1
            log.info(
560 1
                f"Handling {event_name} flows remove on EVC ids: {to_remove.keys()}"
561 1
            )
562 1
            await self._remove_int_flows(
563 1
                self.flow_builder.build_failover_old_flows(to_remove, old_flows)
564
            )
565 1
        if to_remove_with_err:
566
            log.error(
567
                f"Handling {event_name} proxy_port_error falling back "
568
                f"to mef_eline, EVC ids: {list(to_remove_with_err.keys())}"
569
            )
570
            metadata = {
571
                "telemetry": {
572
                    "enabled": True,
573
                    "status": "DOWN",
574
                    "status_reason": ["proxy_port_error"],
575 1
                    "status_updated_at": datetime.utcnow().strftime(
576
                        "%Y-%m-%dT%H:%M:%S"
577
                    ),
578 1
                }
579
            }
580
            await self.remove_int_flows(to_remove_with_err, metadata, force=True)
581
        if to_install:
582 1
            log.info(
583 1
                f"Handling {event_name} flows install on EVC ids: {to_install.keys()}"
584 1
            )
585
            await self._install_int_flows(
586 1
                self.flow_builder.build_int_flows(to_install, new_flows)
587
            )
588
589
    def _validate_map_enable_evcs(
590
        self,
591 1
        evcs: dict[str, dict],
592
        force=False,
593 1
    ) -> dict[str, dict]:
594
        """Validate map enabling EVCs.
595
596
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
597
        so it can be reused later during provisioning.
598
599
        """
600
        for evc_id, evc in evcs.items():
601
            if not evc:
602
                raise EVCNotFound(evc_id)
603
            if utils.has_int_enabled(evc) and not force:
604
                raise EVCHasINT(evc_id)
605 1
606 1
            uni_a, uni_z = utils.get_evc_unis(evc)
607
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
608 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
609
610
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
611
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
612
613
            if pp_a.status != EntityStatus.UP and not force:
614
                dest_id = pp_a.destination.id if pp_a.destination else None
615
                dest_status = pp_a.status if pp_a.destination else None
616
                raise ProxyPortStatusNotUP(
617
                    evc_id,
618
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
619 1
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
620 1
                    f"destination {dest_id} status {dest_status}",
621 1
                )
622 1
            if pp_z.status != EntityStatus.UP and not force:
623
                dest_id = pp_z.destination.id if pp_z.destination else None
624 1
                dest_status = pp_z.status if pp_z.destination else None
625 1
                raise ProxyPortStatusNotUP(
626 1
                    evc_id,
627 1
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
628
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
629
                    f"destination {dest_id} status {dest_status}",
630 1
                )
631 1
632
            self._validate_intra_evc_different_proxy_ports(evc)
633 1
        return evcs
634
635
    def _validate_has_int(self, evcs: dict[str, dict]):
636
        for evc_id, evc in evcs.items():
637
            if not utils.has_int_enabled(evc):
638
                raise EVCHasNoINT(evc_id)
639
640
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
641 1
        """Add proxy ports evc_ids.
642
643
        This is meant to be called after an EVC is enabled.
644
        """
645
        for evc_id, evc in evcs.items():
646
            uni_a, uni_z = utils.get_evc_unis(evc)
647
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
648
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
649 1
            pp_a.evc_ids.add(evc_id)
650
            pp_z.evc_ids.add(evc_id)
651 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
652
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
653
654
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
655
        """Discard proxy port evc_ids.
656
657
        This is meant to be called when an EVC is disabled.
658 1
        """
659 1
        for evc_id, evc in evcs.items():
660 1
            uni_a, uni_z = utils.get_evc_unis(evc)
661 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
662
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
663 1
            pp_a.evc_ids.discard(evc_id)
664 1
            pp_z.evc_ids.discard(evc_id)
665 1
666 1
    def evc_compare(
667
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
668
    ) -> dict[str, list]:
669 1
        """EVC compare.
670 1
671
        Cases:
672 1
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
673 1
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
674
675
        """
676
        int_flows = {
677
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
678
        }
679
        mef_flows = {
680
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
681 1
        }
682
683
        results = defaultdict(list)
684
        for evc in evcs.values():
685
            evc_id = evc["id"]
686
687
            if (
688
                not utils.has_int_enabled(evc)
689
                and evc_id in int_flows
690
                and int_flows[evc_id]
691
            ):
692
                results[evc_id].append("wrong_metadata_has_int_flows")
693
694
            if (
695
                utils.has_int_enabled(evc)
696
                and evc_id in mef_flows
697
                and mef_flows[evc_id]
698
                and (
699
                    evc_id not in int_flows
700
                    or (
701
                        evc_id in int_flows
702
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
703
                    )
704
                )
705
            ):
706
                results[evc_id].append("missing_some_int_flows")
707
        return results
708
709
    async def _remove_int_flows_by_cookies(
710
        self, stored_flows: dict[int, list[dict]]
711
    ) -> dict[str, list[dict]]:
712
        """Delete int flows given a prefiltered stored_flows by cookies.
713
        You should use this type of removal when you need to remove all
714
        flows associated with a cookie, if you need to include all keys in the match
715
        to remove only a subset use `_remove_int_flows(stored_flows)` method instead.
716
717
        Removal is driven by the stored flows instead of EVC ids and dpids to also
718
        be able to handle the force mode when an EVC no longer exists. It also follows
719
        the same pattern that mef_eline currently uses.
720
        """
721
        switch_flows_cookies = defaultdict(set)
722
        for flows in stored_flows.values():
723
            for flow in flows:
724
                switch_flows_cookies[flow["switch"]].add(flow["flow"]["cookie"])
725
726
        switch_flows = defaultdict(list)
727
        for dpid, cookies in switch_flows_cookies.items():
728
            for cookie in cookies:
729
                switch_flows[dpid].append(
730
                    {
731
                        "cookie": cookie,
732
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
733
                        "table_id": Table.OFPTT_ALL.value,
734
                    }
735
                )
736
        await self._send_flows(switch_flows, "delete")
737
        return switch_flows
738
739
    async def _remove_int_flows(
740
        self, stored_flows: dict[int, list[dict]]
741
    ) -> dict[str, list[dict]]:
742
        """Delete int flows given a prefiltered stored_flows. This method is meant
743
        to be used when you need to match all the flow match keys, so, typically when
744
        you're removing just a subset of INT flows.
745
746
        Removal is driven by the stored flows instead of EVC ids and dpids to also
747
        be able to handle the force mode when an EVC no longer exists. It also follows
748
        the same pattern that mef_eline currently uses."""
749
        switch_flows = defaultdict(list)
750
        for flows in stored_flows.values():
751
            for flow in flows:
752
                switch_flows[flow["switch"]].append(flow["flow"])
753
        await self._send_flows(switch_flows, "delete")
754
        return switch_flows
755
756
    async def _install_int_flows(
757
        self, stored_flows: dict[int, list[dict]]
758
    ) -> dict[str, list[dict]]:
759
        """Install INT flow mods."""
760
        switch_flows = defaultdict(list)
761
        for flows in stored_flows.values():
762
            for flow in flows:
763
                switch_flows[flow["switch"]].append(flow["flow"])
764
        await self._send_flows(switch_flows, "install")
765
        return switch_flows
766
767
    async def _send_flows(
768
        self, switch_flows: dict[str, list[dict]], cmd: Literal["install", "delete"]
769
    ):
770
        """Send batched flows by dpid to flow_manager.
771
772
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
773
        for settings.BATCH_INTERVAL per batch iteration.
774
        """
775
        for dpid, flows in switch_flows.items():
776
            batch_size = settings.BATCH_SIZE
777
            if batch_size <= 0:
778
                batch_size = len(flows)
779
780
            for i in range(0, len(flows), batch_size):
781
                if i > 0:
782
                    await asyncio.sleep(settings.BATCH_INTERVAL)
783
                flows = flows[i : i + batch_size]
784
                event = KytosEvent(
785
                    f"kytos.flow_manager.flows.{cmd}",
786
                    content={
787
                        "dpid": dpid,
788
                        "force": True,
789
                        "flow_dict": {"flows": flows},
790
                    },
791
                )
792
                await self.controller.buffers.app.aput(event)
793