Passed
Push — master ( 4bf6f4...81fe6d )
by Vinicius
02:40 queued 17s
created

build.managers.int.INTManager._send_flows()   B

Complexity

Conditions 6

Size

Total Lines 29
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 6.4425

Importance

Changes 0
Metric Value
cc 6
eloc 19
nop 3
dl 0
loc 29
ccs 10
cts 13
cp 0.7692
crap 6.4425
rs 8.5166
c 0
b 0
f 0
1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
from collections import defaultdict
6 1
from datetime import datetime
7 1
from typing import Literal
8
9 1
from pyof.v0x04.controller2switch.table_mod import Table
10
11 1
from kytos.core.controller import Controller
12 1
from kytos.core.events import KytosEvent
13 1
from kytos.core.interface import Interface
14 1
from napps.kytos.telemetry_int import utils
15 1
from napps.kytos.telemetry_int import settings
16 1
from kytos.core import log
17 1
from kytos.core.link import Link
18 1
import napps.kytos.telemetry_int.kytos_api_helper as api
19 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
20 1
from kytos.core.common import EntityStatus
21 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
22
23 1
from napps.kytos.telemetry_int.exceptions import (
24
    EVCError,
25
    EVCNotFound,
26
    EVCHasINT,
27
    EVCHasNoINT,
28
    FlowsNotFound,
29
    ProxyPortError,
30
    ProxyPortStatusNotUP,
31
    ProxyPortDestNotFound,
32
    ProxyPortNotFound,
33
    ProxyPortSameSourceIntraEVC,
34
)
35
36
37 1
class INTManager:
38
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
39
40 1
    def __init__(self, controller: Controller) -> None:
41
        """INTManager."""
42 1
        self.controller = controller
43 1
        self.flow_builder = FlowBuilder()
44 1
        self._topo_link_lock = asyncio.Lock()
45 1
        self._intf_meta_lock = asyncio.Lock()
46
47
        # Keep track between each uni intf id and its src intf id port
48 1
        self.unis_src: dict[str, str] = {}
49
        # Keep track between src intf id and its ProxyPort instance
50 1
        self.srcs_pp: dict[str, ProxyPort] = {}
51
52 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
53
        """Load UNI ids src ids and their ProxyPort instances."""
54 1
        for evc_id, evc in evcs.items():
55 1
            if not utils.has_int_enabled(evc):
56 1
                continue
57
58 1
            uni_a_id = evc["uni_a"]["interface_id"]
59 1
            uni_z_id = evc["uni_z"]["interface_id"]
60 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
61 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
62 1
            if uni_a and "proxy_port" in uni_a.metadata:
63 1
                src_a = uni_a.switch.get_interface_by_port_no(
64
                    uni_a.metadata["proxy_port"]
65
                )
66 1
                self.unis_src[uni_a.id] = src_a.id
67 1
                try:
68 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
69
                except ProxyPortDestNotFound:
70
                    pp = self.srcs_pp[src_a.id]
71 1
                pp.evc_ids.add(evc_id)
72
73 1
            if uni_z and "proxy_port" in uni_z.metadata:
74 1
                src_z = uni_z.switch.get_interface_by_port_no(
75
                    uni_z.metadata["proxy_port"]
76
                )
77 1
                self.unis_src[uni_z.id] = src_z.id
78 1
                try:
79 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
80
                except ProxyPortDestNotFound:
81
                    pp = self.srcs_pp[src_z.id]
82 1
                pp.evc_ids.add(evc_id)
83
84 1
    async def handle_pp_link_down(self, link: Link) -> None:
85
        """Handle proxy_port link_down."""
86 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
87
            return
88 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
89 1
        if not pp:
90
            pp = self.srcs_pp.get(link.endpoint_b.id)
91 1
        if not pp or not pp.evc_ids:
92
            return
93
94 1
        async with self._topo_link_lock:
95 1
            evcs = await api.get_evcs(
96
                **{
97
                    "metadata.telemetry.enabled": "true",
98
                    "metadata.telemetry.status": "UP",
99
                }
100
            )
101 1
            to_deactivate = {
102
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
103
            }
104 1
            if not to_deactivate:
105
                return
106
107 1
            log.info(
108
                f"Handling link_down {link}, removing INT flows falling back to "
109
                f"mef_eline, EVC ids: {list(to_deactivate)}"
110
            )
111 1
            metadata = {
112
                "telemetry": {
113
                    "enabled": True,
114
                    "status": "DOWN",
115
                    "status_reason": ["proxy_port_down"],
116
                    "status_updated_at": datetime.utcnow().strftime(
117
                        "%Y-%m-%dT%H:%M:%S"
118
                    ),
119
                }
120
            }
121 1
            await self.remove_int_flows(to_deactivate, metadata)
122
123 1
    async def handle_pp_link_up(self, link: Link) -> None:
124
        """Handle proxy_port link_up."""
125 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
126
            return
127 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
128 1
        if not pp:
129
            pp = self.srcs_pp.get(link.endpoint_b.id)
130 1
        if not pp or not pp.evc_ids:
131
            return
132
133 1
        async with self._topo_link_lock:
134 1
            if link.status != EntityStatus.UP or link.status_reason:
135
                return
136 1
            evcs = await api.get_evcs(
137
                **{
138
                    "metadata.telemetry.enabled": "true",
139
                    "metadata.telemetry.status": "DOWN",
140
                }
141
            )
142
143 1
            to_install = {}
144 1
            for evc_id, evc in evcs.items():
145 1
                if any(
146
                    (
147
                        not evc["active"],
148
                        evc["archived"],
149
                        evc_id not in pp.evc_ids,
150
                        evc["uni_a"]["interface_id"] not in self.unis_src,
151
                        evc["uni_z"]["interface_id"] not in self.unis_src,
152
                    )
153
                ):
154
                    continue
155
156 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
157 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
158 1
                if (
159
                    src_a_id in self.srcs_pp
160
                    and src_z_id in self.srcs_pp
161
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
162
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
163
                ):
164 1
                    to_install[evc_id] = evc
165
166 1
            if not to_install:
167
                return
168
169 1
            try:
170 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
171
            except EVCError as exc:
172
                log.exception(exc)
173
                return
174
175 1
            log.info(
176
                f"Handling link_up {link}, deploying INT flows, "
177
                f"EVC ids: {list(to_install)}"
178
            )
179 1
            metadata = {
180
                "telemetry": {
181
                    "enabled": True,
182
                    "status": "UP",
183
                    "status_reason": [],
184
                    "status_updated_at": datetime.utcnow().strftime(
185
                        "%Y-%m-%dT%H:%M:%S"
186
                    ),
187
                }
188
            }
189 1
            try:
190 1
                await self.install_int_flows(to_install, metadata)
191
            except FlowsNotFound as exc:
192
                log.exception(f"FlowsNotFound {str(exc)}")
193
                return
194
195 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
196
        """Handle proxy port metadata removed."""
197 1
        if "proxy_port" in intf.metadata:
198
            return
199 1
        try:
200 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
201 1
            if not pp.evc_ids:
202
                return
203
        except KeyError:
204
            return
205
206 1
        async with self._intf_meta_lock:
207 1
            evcs = await api.get_evcs(
208
                **{
209
                    "metadata.telemetry.enabled": "true",
210
                    "metadata.telemetry.status": "UP",
211
                }
212
            )
213 1
            to_deactivate = {
214
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
215
            }
216 1
            if not to_deactivate:
217
                return
218
219 1
            log.info(
220
                f"Handling interface metadata removed on {intf}, removing INT flows "
221
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
222
            )
223 1
            metadata = {
224
                "telemetry": {
225
                    "enabled": True,
226
                    "status": "DOWN",
227
                    "status_reason": ["proxy_port_metadata_removed"],
228
                    "status_updated_at": datetime.utcnow().strftime(
229
                        "%Y-%m-%dT%H:%M:%S"
230
                    ),
231
                }
232
            }
233 1
            await self.remove_int_flows(to_deactivate, metadata)
234
235 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
236
        """Handle proxy port metadata added.
237
238
        If an existing ProxyPort gets its proxy_port meadata updated
239
        and has associated EVCs then it'll remove and install the flows accordingly.
240
241
        """
242 1
        if "proxy_port" not in intf.metadata:
243
            return
244 1
        try:
245 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
246 1
            if not pp.evc_ids:
247
                return
248
        except KeyError:
249
            return
250
251 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
252
            intf.metadata.get("proxy_port")
253
        )
254 1
        if cur_source_intf == pp.source:
255 1
            return
256
257 1
        async with self._intf_meta_lock:
258 1
            pp.source = cur_source_intf
259
260 1
            evcs = await api.get_evcs(
261
                **{
262
                    "metadata.telemetry.enabled": "true",
263
                }
264
            )
265 1
            affected_evcs = {
266
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
267
            }
268 1
            if not affected_evcs:
269 1
                return
270
271 1
            log.info(
272
                f"Handling interface metadata updated on {intf}. It'll disable the "
273
                "EVCs to be safe, and then try to enable again with the updated "
274
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
275
            )
276 1
            await self.disable_int(affected_evcs, force=True)
277 1
            try:
278 1
                await self.enable_int(affected_evcs, force=True)
279
            except ProxyPortSameSourceIntraEVC as exc:
280
                msg = (
281
                    f"Validation error when updating interface {intf} proxy port {pp}"
282
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
283
                )
284
                log.error(msg)
285
286 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
287
        """Disable INT on EVCs.
288
289
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
290
291
        The force bool option, if True, will bypass the following:
292
293
        1 - EVC not found
294
        2 - EVC doesn't have INT
295
        3 - ProxyPortNotFound or ProxyPortDestNotFound
296
297
        """
298 1
        self._validate_disable_evcs(evcs, force)
299 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
300
301 1
        metadata = {
302
            "telemetry": {
303
                "enabled": False,
304
                "status": "DOWN",
305
                "status_reason": ["disabled"],
306
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
307
            }
308
        }
309 1
        await self.remove_int_flows(evcs, metadata, force=force)
310 1
        try:
311 1
            self._discard_pps_evc_ids(evcs)
312
        except ProxyPortError:
313
            if not force:
314
                raise
315
316 1
    async def remove_int_flows(
317
        self, evcs: dict[str, dict], metadata: dict, force=False
318
    ) -> None:
319
        """Remove INT flows and set metadata on EVCs."""
320 1
        stored_flows = await api.get_stored_flows(
321
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
322
        )
323 1
        await asyncio.gather(
324
            self._remove_int_flows_by_cookies(stored_flows),
325
            api.add_evcs_metadata(evcs, metadata, force),
326
        )
327
328 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
329
        """Enable INT on EVCs.
330
331
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
332
333
        The force bool option, if True, will bypass the following:
334
335
        1 - EVC already has INT
336
        2 - ProxyPort isn't UP
337
        Other cases won't be bypassed since at the point it won't have the data needed.
338
339
        """
340 1
        evcs = self._validate_map_enable_evcs(evcs, force)
341 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
342
343 1
        metadata = {
344
            "telemetry": {
345
                "enabled": True,
346
                "status": "UP",
347
                "status_reason": [],
348
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
349
            }
350
        }
351 1
        await self.install_int_flows(evcs, metadata)
352 1
        self._add_pps_evc_ids(evcs)
353
354 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
355
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
356
357
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
358
        """
359 1
        self._validate_has_int(evcs)
360 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
361 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
362
363 1
        stored_flows = await api.get_stored_flows(
364
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
365
        )
366 1
        await self._remove_int_flows_by_cookies(stored_flows)
367 1
        metadata = {
368
            "telemetry": {
369
                "enabled": True,
370
                "status": "UP",
371
                "status_reason": [],
372
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
373
            }
374
        }
375 1
        await self.install_int_flows(evcs, metadata, force=True)
376
377 1
    async def install_int_flows(
378
        self, evcs: dict[str, dict], metadata: dict, force=False
379
    ) -> None:
380
        """Install INT flows and set metadata on EVCs."""
381 1
        stored_flows = self.flow_builder.build_int_flows(
382
            evcs,
383
            await utils.get_found_stored_flows(
384
                [
385
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
386
                    for evc_id in evcs
387
                ]
388
            ),
389
        )
390 1
        self._validate_evcs_stored_flows(evcs, stored_flows)
391
392 1
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
393 1
        for evc_id, evc in evcs.items():
394 1
            if not evc["active"]:
395
                inactive_evcs[evc_id] = evc
396
                continue
397 1
            if any(
398
                (
399
                    evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
400
                    evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
401
                )
402
            ):
403 1
                pp_down_evcs[evc_id] = evc
404 1
                continue
405
            active_evcs[evc_id] = evc
406
407 1
        inactive_metadata = copy.deepcopy(metadata)
408 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
409 1
        pp_down_metadata = copy.deepcopy(inactive_metadata)
410 1
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
411 1
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
412
413 1
        await asyncio.gather(
414
            self._install_int_flows(stored_flows),
415
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
416
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
417
            api.add_evcs_metadata(active_evcs, metadata, force),
418
        )
419
420 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
421
        """Return a ProxyPort assigned to a UNI or raise."""
422
423 1
        interface = self.controller.get_interface_by_id(intf_id)
424 1
        if not interface:
425 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
426
427 1
        if "proxy_port" not in interface.metadata:
428 1
            raise ProxyPortNotFound(
429
                evc_id, f"proxy_port metadata not found in {intf_id}"
430
            )
431
432 1
        source_intf = interface.switch.get_interface_by_port_no(
433
            interface.metadata.get("proxy_port")
434
        )
435 1
        if not source_intf:
436
            raise ProxyPortNotFound(
437
                evc_id,
438
                f"proxy_port of {intf_id} source interface not found",
439
            )
440
441 1
        pp = self.srcs_pp.get(source_intf.id)
442 1
        if not pp:
443 1
            pp = ProxyPort(self.controller, source_intf)
444 1
            self.srcs_pp[source_intf.id] = pp
445
446 1
        if not pp.destination:
447 1
            raise ProxyPortDestNotFound(
448
                evc_id,
449
                f"proxy_port of {intf_id} isn't looped or destination interface "
450
                "not found",
451
            )
452
453 1
        return pp
454
455 1
    def _validate_disable_evcs(
456
        self,
457
        evcs: dict[str, dict],
458
        force=False,
459
    ) -> None:
460
        """Validate disable EVCs."""
461 1
        for evc_id, evc in evcs.items():
462
            if not evc and not force:
463
                raise EVCNotFound(evc_id)
464
            if not utils.has_int_enabled(evc) and not force:
465
                raise EVCHasNoINT(evc_id)
466
467 1
    def _validate_evcs_stored_flows(
468
        self, evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
469
    ) -> None:
470
        """Validate that each active EVC has corresponding flows."""
471 1
        for evc_id, evc in evcs.items():
472 1
            if evc["active"] and not stored_flows.get(
473
                utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
474
            ):
475 1
                raise FlowsNotFound(evc_id)
476
477 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
478
        """Validate that an intra EVC is using different proxy ports.
479
480
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
481
        would ended up being overwritten since they'd be the same. Currently, an
482
        external loop will have unidirectional flows matching in the lower (source)
483
        port number.
484
        """
485 1
        pp_a = evc["uni_a"].get("proxy_port")
486 1
        pp_z = evc["uni_z"].get("proxy_port")
487 1
        if any(
488
            (
489
                not utils.is_intra_switch_evc(evc),
490
                pp_a is None,
491
                pp_z is None,
492
            )
493
        ):
494 1
            return
495 1
        if pp_a.source != pp_z.source:
496 1
            return
497
498 1
        raise ProxyPortSameSourceIntraEVC(
499
            evc["id"], "intra EVC UNIs must use different proxy ports"
500
        )
501
502 1
    async def handle_failover_flows(
503
        self, evcs_content: dict[str, dict], event_name: str
504
    ) -> None:
505
        """Handle failover flows. This method will generate the subset
506
        of INT flows. EVCs with 'flows' key will be installed, and
507
        'old_flows' will be removed.
508
509
        If a given proxy port has an unexpected state INT will be
510
        removed falling back to mef_eline flows.
511
        """
512 1
        to_install, to_remove, to_remove_with_err = {}, {}, {}
513 1
        new_flows: dict[int, list[dict]] = defaultdict(list)
514 1
        old_flows: dict[int, list[dict]] = defaultdict(list)
515
516 1
        old_flows_key = "removed_flows"
517 1
        new_flows_key = "flows"
518
519 1
        for evc_id, evc in evcs_content.items():
520 1
            if not utils.has_int_enabled(evc):
521
                continue
522 1
            try:
523 1
                uni_a, uni_z = utils.get_evc_unis(evc)
524 1
                pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
525 1
                pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
526 1
                uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
527 1
                evc["id"] = evc_id
528 1
                evc["uni_a"], evc["uni_z"] = uni_a, uni_z
529
            except ProxyPortError as e:
530
                log.error(
531
                    f"Unexpected proxy port state: {str(e)}."
532
                    f"INT will be removed on evc id {evc_id}"
533
                )
534
                to_remove_with_err[evc_id] = evc
535
                continue
536
537 1
            for dpid, flows in evc.get(new_flows_key, {}).items():
538 1
                for flow in flows:
539 1
                    new_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
540
541 1
            for dpid, flows in evc.get(old_flows_key, {}).items():
542 1
                for flow in flows:
543
                    # set priority and table_group just so INT flows can be built
544
                    # the priority doesn't matter for deletion
545 1
                    flow["priority"] = 21000
546 1
                    flow["table_group"] = (
547
                        "evpl" if "dl_vlan" in flow.get("match", {}) else "epl"
548
                    )
549 1
                    old_flows[flow["cookie"]].append({"flow": flow, "switch": dpid})
550
551 1
            if evc.get(new_flows_key):
552 1
                to_install[evc_id] = evc
553 1
                evc.pop(new_flows_key)
554 1
            if evc.get(old_flows_key):
555 1
                to_remove[evc_id] = evc
556 1
                evc.pop(old_flows_key, None)
557
558 1
        if to_remove:
559 1
            log.info(
560
                f"Handling {event_name} flows remove on EVC ids: {to_remove.keys()}"
561
            )
562 1
            await self._remove_int_flows(
563
                self.flow_builder.build_failover_old_flows(to_remove, old_flows)
564
            )
565 1
        if to_remove_with_err:
566
            log.error(
567
                f"Handling {event_name} proxy_port_error falling back "
568
                f"to mef_eline, EVC ids: {list(to_remove_with_err.keys())}"
569
            )
570
            metadata = {
571
                "telemetry": {
572
                    "enabled": True,
573
                    "status": "DOWN",
574
                    "status_reason": ["proxy_port_error"],
575
                    "status_updated_at": datetime.utcnow().strftime(
576
                        "%Y-%m-%dT%H:%M:%S"
577
                    ),
578
                }
579
            }
580
            await self.remove_int_flows(to_remove_with_err, metadata, force=True)
581 1
        if to_install:
582 1
            log.info(
583
                f"Handling {event_name} flows install on EVC ids: {to_install.keys()}"
584
            )
585 1
            await self._install_int_flows(
586
                self.flow_builder.build_int_flows(to_install, new_flows)
587
            )
588
589 1
    def _validate_map_enable_evcs(
590
        self,
591
        evcs: dict[str, dict],
592
        force=False,
593
    ) -> dict[str, dict]:
594
        """Validate map enabling EVCs.
595
596
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
597
        so it can be reused later during provisioning.
598
599
        """
600 1
        for evc_id, evc in evcs.items():
601 1
            if not evc:
602
                raise EVCNotFound(evc_id)
603 1
            if utils.has_int_enabled(evc) and not force:
604
                raise EVCHasINT(evc_id)
605
606 1
            uni_a, uni_z = utils.get_evc_unis(evc)
607 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
608 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
609
610 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
611 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
612
613 1
            if pp_a.status != EntityStatus.UP and not force:
614
                dest_id = pp_a.destination.id if pp_a.destination else None
615
                dest_status = pp_a.status if pp_a.destination else None
616
                raise ProxyPortStatusNotUP(
617
                    evc_id,
618
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
619
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
620
                    f"destination {dest_id} status {dest_status}",
621
                )
622 1
            if pp_z.status != EntityStatus.UP and not force:
623
                dest_id = pp_z.destination.id if pp_z.destination else None
624
                dest_status = pp_z.status if pp_z.destination else None
625
                raise ProxyPortStatusNotUP(
626
                    evc_id,
627
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
628
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
629
                    f"destination {dest_id} status {dest_status}",
630
                )
631
632 1
            self._validate_intra_evc_different_proxy_ports(evc)
633 1
        return evcs
634
635 1
    def _validate_has_int(self, evcs: dict[str, dict]):
636 1
        for evc_id, evc in evcs.items():
637 1
            if not utils.has_int_enabled(evc):
638 1
                raise EVCHasNoINT(evc_id)
639
640 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
641
        """Add proxy ports evc_ids.
642
643
        This is meant to be called after an EVC is enabled.
644
        """
645 1
        for evc_id, evc in evcs.items():
646 1
            uni_a, uni_z = utils.get_evc_unis(evc)
647 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
648 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
649 1
            pp_a.evc_ids.add(evc_id)
650 1
            pp_z.evc_ids.add(evc_id)
651 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
652 1
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
653
654 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
655
        """Discard proxy port evc_ids.
656
657
        This is meant to be called when an EVC is disabled.
658
        """
659 1
        for evc_id, evc in evcs.items():
660 1
            uni_a, uni_z = utils.get_evc_unis(evc)
661 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
662 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
663 1
            pp_a.evc_ids.discard(evc_id)
664 1
            pp_z.evc_ids.discard(evc_id)
665
666 1
    def evc_compare(
667
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
668
    ) -> dict[str, list]:
669
        """EVC compare.
670
671
        Cases:
672
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
673
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
674
675
        """
676 1
        int_flows = {
677
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
678
        }
679 1
        mef_flows = {
680
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
681
        }
682
683 1
        results = defaultdict(list)
684 1
        for evc in evcs.values():
685 1
            evc_id = evc["id"]
686
687 1
            if (
688
                not utils.has_int_enabled(evc)
689
                and evc_id in int_flows
690
                and int_flows[evc_id]
691
            ):
692 1
                results[evc_id].append("wrong_metadata_has_int_flows")
693
694 1
            if (
695
                utils.has_int_enabled(evc)
696
                and evc_id in mef_flows
697
                and mef_flows[evc_id]
698
                and (
699
                    evc_id not in int_flows
700
                    or (
701
                        evc_id in int_flows
702
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
703
                    )
704
                )
705
            ):
706 1
                results[evc_id].append("missing_some_int_flows")
707 1
        return results
708
709 1
    async def _remove_int_flows_by_cookies(
710
        self, stored_flows: dict[int, list[dict]]
711
    ) -> dict[str, list[dict]]:
712
        """Delete int flows given a prefiltered stored_flows by cookies.
713
        You should use this type of removal when you need to remove all
714
        flows associated with a cookie, if you need to include all keys in the match
715
        to remove only a subset use `_remove_int_flows(stored_flows)` method instead.
716
717
        Removal is driven by the stored flows instead of EVC ids and dpids to also
718
        be able to handle the force mode when an EVC no longer exists. It also follows
719
        the same pattern that mef_eline currently uses.
720
        """
721 1
        switch_flows_cookies = defaultdict(set)
722 1
        for flows in stored_flows.values():
723 1
            for flow in flows:
724 1
                switch_flows_cookies[flow["switch"]].add(flow["flow"]["cookie"])
725
726 1
        switch_flows = defaultdict(list)
727 1
        for dpid, cookies in switch_flows_cookies.items():
728 1
            for cookie in cookies:
729 1
                switch_flows[dpid].append(
730
                    {
731
                        "cookie": cookie,
732
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
733
                        "table_id": Table.OFPTT_ALL.value,
734
                    }
735
                )
736 1
        await self._send_flows(switch_flows, "delete")
737 1
        return switch_flows
738
739 1
    async def _remove_int_flows(
740
        self, stored_flows: dict[int, list[dict]]
741
    ) -> dict[str, list[dict]]:
742
        """Delete int flows given a prefiltered stored_flows. This method is meant
743
        to be used when you need to match all the flow match keys, so, typically when
744
        you're removing just a subset of INT flows.
745
746
        Removal is driven by the stored flows instead of EVC ids and dpids to also
747
        be able to handle the force mode when an EVC no longer exists. It also follows
748
        the same pattern that mef_eline currently uses."""
749 1
        switch_flows = defaultdict(list)
750 1
        for flows in stored_flows.values():
751 1
            for flow in flows:
752 1
                switch_flows[flow["switch"]].append(flow["flow"])
753 1
        await self._send_flows(switch_flows, "delete")
754 1
        return switch_flows
755
756 1
    async def _install_int_flows(
757
        self, stored_flows: dict[int, list[dict]]
758
    ) -> dict[str, list[dict]]:
759
        """Install INT flow mods."""
760 1
        switch_flows = defaultdict(list)
761 1
        for flows in stored_flows.values():
762 1
            for flow in flows:
763 1
                switch_flows[flow["switch"]].append(flow["flow"])
764 1
        await self._send_flows(switch_flows, "install")
765 1
        return switch_flows
766
767 1
    async def _send_flows(
768
        self, switch_flows: dict[str, list[dict]], cmd: Literal["install", "delete"]
769
    ):
770
        """Send batched flows by dpid to flow_manager.
771
772
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
773
        for settings.BATCH_INTERVAL per batch iteration.
774
        """
775 1
        for dpid, flows in switch_flows.items():
776 1
            batch_size = settings.BATCH_SIZE
777 1
            if batch_size <= 0:
778
                batch_size = len(flows)
779
780 1
            for i in range(0, len(flows), batch_size):
781 1
                flows = flows[i : i + batch_size]
782 1
                if not flows:
783
                    continue
784
785 1
                if i > 0:
786
                    await asyncio.sleep(settings.BATCH_INTERVAL)
787 1
                event = KytosEvent(
788
                    f"kytos.flow_manager.flows.single.{cmd}",
789
                    content={
790
                        "dpid": dpid,
791
                        "force": True,
792
                        "flow_dict": {"flows": flows},
793
                    },
794
                )
795
                await self.controller.buffers.app.aput(event)
796