Passed
Push — master ( 9bde40...c0a17f )
by Vinicius
02:56 queued 18s
created

build.managers.int.INTManager.evc_compare()   C

Complexity

Conditions 11

Size

Total Lines 42
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 11

Importance

Changes 0
Metric Value
cc 11
eloc 23
nop 4
dl 0
loc 42
ccs 11
cts 11
cp 1
crap 11
rs 5.4
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.managers.int.INTManager.evc_compare() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
from collections import defaultdict
6 1
from datetime import datetime
7
8 1
from pyof.v0x04.controller2switch.table_mod import Table
9
10 1
from kytos.core.controller import Controller
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.core.interface import Interface
13 1
from napps.kytos.telemetry_int import utils
14 1
from napps.kytos.telemetry_int import settings
15 1
from kytos.core import log
16 1
from kytos.core.link import Link
17 1
import napps.kytos.telemetry_int.kytos_api_helper as api
18 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
19 1
from kytos.core.common import EntityStatus
20 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
21
22 1
from napps.kytos.telemetry_int.exceptions import (
23
    EVCError,
24
    EVCNotFound,
25
    EVCHasINT,
26
    EVCHasNoINT,
27
    FlowsNotFound,
28
    ProxyPortError,
29
    ProxyPortStatusNotUP,
30
    ProxyPortDestNotFound,
31
    ProxyPortNotFound,
32
    ProxyPortSameSourceIntraEVC,
33
)
34
35
36 1
class INTManager:
37
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
38
39 1
    def __init__(self, controller: Controller) -> None:
40
        """INTManager."""
41 1
        self.controller = controller
42 1
        self.flow_builder = FlowBuilder()
43 1
        self._topo_link_lock = asyncio.Lock()
44 1
        self._intf_meta_lock = asyncio.Lock()
45
46
        # Keep track between each uni intf id and its src intf id port
47 1
        self.unis_src: dict[str, str] = {}
48
        # Keep track between src intf id and its ProxyPort instance
49 1
        self.srcs_pp: dict[str, ProxyPort] = {}
50
51 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
52
        """Load UNI ids src ids and their ProxyPort instances."""
53 1
        for evc_id, evc in evcs.items():
54 1
            if not utils.has_int_enabled(evc):
55 1
                continue
56
57 1
            uni_a_id = evc["uni_a"]["interface_id"]
58 1
            uni_z_id = evc["uni_z"]["interface_id"]
59 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
60 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
61 1
            if uni_a and "proxy_port" in uni_a.metadata:
62 1
                src_a = uni_a.switch.get_interface_by_port_no(
63
                    uni_a.metadata["proxy_port"]
64
                )
65 1
                self.unis_src[uni_a.id] = src_a.id
66 1
                try:
67 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
68
                except ProxyPortDestNotFound:
69
                    pp = self.srcs_pp[src_a.id]
70 1
                pp.evc_ids.add(evc_id)
71
72 1
            if uni_z and "proxy_port" in uni_z.metadata:
73 1
                src_z = uni_z.switch.get_interface_by_port_no(
74
                    uni_z.metadata["proxy_port"]
75
                )
76 1
                self.unis_src[uni_z.id] = src_z.id
77 1
                try:
78 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
79
                except ProxyPortDestNotFound:
80
                    pp = self.srcs_pp[src_z.id]
81 1
                pp.evc_ids.add(evc_id)
82
83 1
    async def handle_pp_link_down(self, link: Link) -> None:
84
        """Handle proxy_port link_down."""
85 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
86
            return
87 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
88 1
        if not pp:
89
            pp = self.srcs_pp.get(link.endpoint_b.id)
90 1
        if not pp or not pp.evc_ids:
91
            return
92
93 1
        async with self._topo_link_lock:
94 1
            evcs = await api.get_evcs(
95
                **{
96
                    "metadata.telemetry.enabled": "true",
97
                    "metadata.telemetry.status": "UP",
98
                }
99
            )
100 1
            to_deactivate = {
101
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
102
            }
103 1
            if not to_deactivate:
104
                return
105
106 1
            log.info(
107
                f"Handling link_down {link}, removing INT flows falling back to "
108
                f"mef_eline, EVC ids: {list(to_deactivate)}"
109
            )
110 1
            metadata = {
111
                "telemetry": {
112
                    "enabled": True,
113
                    "status": "DOWN",
114
                    "status_reason": ["proxy_port_down"],
115
                    "status_updated_at": datetime.utcnow().strftime(
116
                        "%Y-%m-%dT%H:%M:%S"
117
                    ),
118
                }
119
            }
120 1
            await self.remove_int_flows(to_deactivate, metadata)
121
122 1
    async def handle_pp_link_up(self, link: Link) -> None:
123
        """Handle proxy_port link_up."""
124 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
125
            return
126 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
127 1
        if not pp:
128
            pp = self.srcs_pp.get(link.endpoint_b.id)
129 1
        if not pp or not pp.evc_ids:
130
            return
131
132 1
        async with self._topo_link_lock:
133 1
            if link.status != EntityStatus.UP or link.status_reason:
134
                return
135 1
            evcs = await api.get_evcs(
136
                **{
137
                    "metadata.telemetry.enabled": "true",
138
                    "metadata.telemetry.status": "DOWN",
139
                }
140
            )
141
142 1
            to_install = {}
143 1
            for evc_id, evc in evcs.items():
144 1
                if any(
145
                    (
146
                        not evc["active"],
147
                        evc["archived"],
148
                        evc_id not in pp.evc_ids,
149
                        evc["uni_a"]["interface_id"] not in self.unis_src,
150
                        evc["uni_z"]["interface_id"] not in self.unis_src,
151
                    )
152
                ):
153
                    continue
154
155 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
156 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
157 1
                if (
158
                    src_a_id in self.srcs_pp
159
                    and src_z_id in self.srcs_pp
160
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
161
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
162
                ):
163 1
                    to_install[evc_id] = evc
164
165 1
            if not to_install:
166
                return
167
168 1
            try:
169 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
170
            except EVCError as exc:
171
                log.exception(exc)
172
                return
173
174 1
            log.info(
175
                f"Handling link_up {link}, deploying INT flows, "
176
                f"EVC ids: {list(to_install)}"
177
            )
178 1
            metadata = {
179
                "telemetry": {
180
                    "enabled": True,
181
                    "status": "UP",
182
                    "status_reason": [],
183
                    "status_updated_at": datetime.utcnow().strftime(
184
                        "%Y-%m-%dT%H:%M:%S"
185
                    ),
186
                }
187
            }
188 1
            try:
189 1
                await self.install_int_flows(to_install, metadata)
190
            except FlowsNotFound as exc:
191
                log.exception(f"FlowsNotFound {str(exc)}")
192
                return
193
194 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
195
        """Handle proxy port metadata removed."""
196 1
        if "proxy_port" in intf.metadata:
197
            return
198 1
        try:
199 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
200 1
            if not pp.evc_ids:
201
                return
202
        except KeyError:
203
            return
204
205 1
        async with self._intf_meta_lock:
206 1
            evcs = await api.get_evcs(
207
                **{
208
                    "metadata.telemetry.enabled": "true",
209
                    "metadata.telemetry.status": "UP",
210
                }
211
            )
212 1
            to_deactivate = {
213
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
214
            }
215 1
            if not to_deactivate:
216
                return
217
218 1
            log.info(
219
                f"Handling interface metadata removed on {intf}, removing INT flows "
220
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
221
            )
222 1
            metadata = {
223
                "telemetry": {
224
                    "enabled": True,
225
                    "status": "DOWN",
226
                    "status_reason": ["proxy_port_metadata_removed"],
227
                    "status_updated_at": datetime.utcnow().strftime(
228
                        "%Y-%m-%dT%H:%M:%S"
229
                    ),
230
                }
231
            }
232 1
            await self.remove_int_flows(to_deactivate, metadata)
233
234 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
235
        """Handle proxy port metadata added.
236
237
        If an existing ProxyPort gets its proxy_port meadata updated
238
        and has associated EVCs then it'll remove and install the flows accordingly.
239
240
        """
241 1
        if "proxy_port" not in intf.metadata:
242
            return
243 1
        try:
244 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
245 1
            if not pp.evc_ids:
246
                return
247
        except KeyError:
248
            return
249
250 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
251
            intf.metadata.get("proxy_port")
252
        )
253 1
        if cur_source_intf == pp.source:
254 1
            return
255
256 1
        async with self._intf_meta_lock:
257 1
            pp.source = cur_source_intf
258
259 1
            evcs = await api.get_evcs(
260
                **{
261
                    "metadata.telemetry.enabled": "true",
262
                }
263
            )
264 1
            affected_evcs = {
265
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
266
            }
267 1
            if not affected_evcs:
268 1
                return
269
270 1
            log.info(
271
                f"Handling interface metadata updated on {intf}. It'll disable the "
272
                "EVCs to be safe, and then try to enable again with the updated "
273
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
274
            )
275 1
            await self.disable_int(affected_evcs, force=True)
276 1
            try:
277 1
                await self.enable_int(affected_evcs, force=True)
278
            except ProxyPortSameSourceIntraEVC as exc:
279
                msg = (
280
                    f"Validation error when updating interface {intf} proxy port {pp}"
281
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
282
                )
283
                log.error(msg)
284
285 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
286
        """Disable INT on EVCs.
287
288
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
289
290
        The force bool option, if True, will bypass the following:
291
292
        1 - EVC not found
293
        2 - EVC doesn't have INT
294
        3 - ProxyPortNotFound or ProxyPortDestNotFound
295
296
        """
297 1
        self._validate_disable_evcs(evcs, force)
298 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
299
300 1
        metadata = {
301
            "telemetry": {
302
                "enabled": False,
303
                "status": "DOWN",
304
                "status_reason": ["disabled"],
305
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
306
            }
307
        }
308 1
        await self.remove_int_flows(evcs, metadata, force=force)
309 1
        try:
310 1
            self._discard_pps_evc_ids(evcs)
311
        except ProxyPortError:
312
            if not force:
313
                raise
314
315 1
    async def remove_int_flows(
316
        self, evcs: dict[str, dict], metadata: dict, force=False
317
    ) -> None:
318
        """Remove INT flows and set metadata on EVCs."""
319 1
        stored_flows = await api.get_stored_flows(
320
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
321
        )
322 1
        await asyncio.gather(
323
            self._remove_int_flows(stored_flows),
324
            api.add_evcs_metadata(evcs, metadata, force),
325
        )
326
327 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
328
        """Enable INT on EVCs.
329
330
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
331
332
        The force bool option, if True, will bypass the following:
333
334
        1 - EVC already has INT
335
        2 - ProxyPort isn't UP
336
        Other cases won't be bypassed since at the point it won't have the data needed.
337
338
        """
339 1
        evcs = self._validate_map_enable_evcs(evcs, force)
340 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
341
342 1
        metadata = {
343
            "telemetry": {
344
                "enabled": True,
345
                "status": "UP",
346
                "status_reason": [],
347
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
348
            }
349
        }
350 1
        await self.install_int_flows(evcs, metadata)
351 1
        self._add_pps_evc_ids(evcs)
352
353 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
354
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
355
356
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
357
        """
358 1
        self._validate_has_int(evcs)
359 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
360 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
361
362 1
        stored_flows = await api.get_stored_flows(
363
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
364
        )
365 1
        await self._remove_int_flows(stored_flows)
366 1
        metadata = {
367
            "telemetry": {
368
                "enabled": True,
369
                "status": "UP",
370
                "status_reason": [],
371
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
372
            }
373
        }
374 1
        await self.install_int_flows(evcs, metadata, force=True)
375
376 1
    async def install_int_flows(
377
        self, evcs: dict[str, dict], metadata: dict, force=False
378
    ) -> None:
379
        """Install INT flows and set metadata on EVCs."""
380 1
        stored_flows = self.flow_builder.build_int_flows(
381
            evcs,
382
            await utils.get_found_stored_flows(
383
                [
384
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
385
                    for evc_id in evcs
386
                ]
387
            ),
388
        )
389
390 1
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
391 1
        for evc_id, evc in evcs.items():
392 1
            if not evc["active"]:
393
                inactive_evcs[evc_id] = evc
394
                continue
395 1
            if any((
396
                evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
397
                evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
398
            )):
399 1
                pp_down_evcs[evc_id] = evc
400 1
                continue
401
            active_evcs[evc_id] = evc
402
403 1
        inactive_metadata = copy.deepcopy(metadata)
404 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
405 1
        pp_down_metadata = copy.deepcopy(inactive_metadata)
406 1
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
407 1
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
408
409 1
        await asyncio.gather(
410
            self._install_int_flows(stored_flows),
411
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
412
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
413
            api.add_evcs_metadata(active_evcs, metadata, force),
414
        )
415
416 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
417
        """Return a ProxyPort assigned to a UNI or raise."""
418
419 1
        interface = self.controller.get_interface_by_id(intf_id)
420 1
        if not interface:
421 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
422
423 1
        if "proxy_port" not in interface.metadata:
424 1
            raise ProxyPortNotFound(
425
                evc_id, f"proxy_port metadata not found in {intf_id}"
426
            )
427
428 1
        source_intf = interface.switch.get_interface_by_port_no(
429
            interface.metadata.get("proxy_port")
430
        )
431 1
        if not source_intf:
432
            raise ProxyPortNotFound(
433
                evc_id,
434
                f"proxy_port of {intf_id} source interface not found",
435
            )
436
437 1
        pp = self.srcs_pp.get(source_intf.id)
438 1
        if not pp:
439 1
            pp = ProxyPort(self.controller, source_intf)
440 1
            self.srcs_pp[source_intf.id] = pp
441
442 1
        if not pp.destination:
443 1
            raise ProxyPortDestNotFound(
444
                evc_id,
445
                f"proxy_port of {intf_id} isn't looped or destination interface "
446
                "not found",
447
            )
448
449 1
        return pp
450
451 1
    def _validate_disable_evcs(
452
        self,
453
        evcs: dict[str, dict],
454
        force=False,
455
    ) -> None:
456
        """Validate disable EVCs."""
457 1
        for evc_id, evc in evcs.items():
458
            if not evc and not force:
459
                raise EVCNotFound(evc_id)
460
            if not utils.has_int_enabled(evc) and not force:
461
                raise EVCHasNoINT(evc_id)
462
463 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
464
        """Validate that an intra EVC is using different proxy ports.
465
466
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
467
        would ended up being overwritten since they'd be the same. Currently, an
468
        external loop will have unidirectional flows matching in the lower (source)
469
        port number.
470
        """
471 1
        pp_a = evc["uni_a"].get("proxy_port")
472 1
        pp_z = evc["uni_z"].get("proxy_port")
473 1
        if any(
474
            (
475
                not utils.is_intra_switch_evc(evc),
476
                pp_a is None,
477
                pp_z is None,
478
            )
479
        ):
480 1
            return
481 1
        if pp_a.source != pp_z.source:
482 1
            return
483
484 1
        raise ProxyPortSameSourceIntraEVC(
485
            evc["id"], "intra EVC UNIs must use different proxy ports"
486
        )
487
488 1
    def _validate_map_enable_evcs(
489
        self,
490
        evcs: dict[str, dict],
491
        force=False,
492
    ) -> dict[str, dict]:
493
        """Validate map enabling EVCs.
494
495
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
496
        so it can be reused later during provisioning.
497
498
        """
499 1
        for evc_id, evc in evcs.items():
500 1
            if not evc:
501
                raise EVCNotFound(evc_id)
502 1
            if utils.has_int_enabled(evc) and not force:
503
                raise EVCHasINT(evc_id)
504
505 1
            uni_a, uni_z = utils.get_evc_unis(evc)
506 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
507 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
508
509 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
510 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
511
512 1
            if pp_a.status != EntityStatus.UP and not force:
513
                dest_id = pp_a.destination.id if pp_a.destination else None
514
                dest_status = pp_a.status if pp_a.destination else None
515
                raise ProxyPortStatusNotUP(
516
                    evc_id,
517
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
518
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
519
                    f"destination {dest_id} status {dest_status}",
520
                )
521 1
            if pp_z.status != EntityStatus.UP and not force:
522
                dest_id = pp_z.destination.id if pp_z.destination else None
523
                dest_status = pp_z.status if pp_z.destination else None
524
                raise ProxyPortStatusNotUP(
525
                    evc_id,
526
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
527
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
528
                    f"destination {dest_id} status {dest_status}",
529
                )
530
531 1
            self._validate_intra_evc_different_proxy_ports(evc)
532 1
        return evcs
533
534 1
    def _validate_has_int(self, evcs: dict[str, dict]):
535 1
        for evc_id, evc in evcs.items():
536 1
            if not utils.has_int_enabled(evc):
537 1
                raise EVCHasNoINT(evc_id)
538
539 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
540
        """Add proxy ports evc_ids.
541
542
        This is meant to be called after an EVC is enabled.
543
        """
544 1
        for evc_id, evc in evcs.items():
545 1
            uni_a, uni_z = utils.get_evc_unis(evc)
546 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
547 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
548 1
            pp_a.evc_ids.add(evc_id)
549 1
            pp_z.evc_ids.add(evc_id)
550 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
551 1
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
552
553 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
554
        """Discard proxy port evc_ids.
555
556
        This is meant to be called when an EVC is disabled.
557
        """
558 1
        for evc_id, evc in evcs.items():
559 1
            uni_a, uni_z = utils.get_evc_unis(evc)
560 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
561 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
562 1
            pp_a.evc_ids.discard(evc_id)
563 1
            pp_z.evc_ids.discard(evc_id)
564
565 1
    def evc_compare(
566
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
567
    ) -> dict[str, list]:
568
        """EVC compare.
569
570
        Cases:
571
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
572
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
573
574
        """
575 1
        int_flows = {
576
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
577
        }
578 1
        mef_flows = {
579
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
580
        }
581
582 1
        results = defaultdict(list)
583 1
        for evc in evcs.values():
584 1
            evc_id = evc["id"]
585
586 1
            if (
587
                not utils.has_int_enabled(evc)
588
                and evc_id in int_flows
589
                and int_flows[evc_id]
590
            ):
591 1
                results[evc_id].append("wrong_metadata_has_int_flows")
592
593 1
            if (
594
                utils.has_int_enabled(evc)
595
                and evc_id in mef_flows
596
                and mef_flows[evc_id]
597
                and (
598
                    evc_id not in int_flows
599
                    or (
600
                        evc_id in int_flows
601
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
602
                    )
603
                )
604
            ):
605 1
                results[evc_id].append("missing_some_int_flows")
606 1
        return results
607
608 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
609
        """Delete int flows given a prefiltered stored_flows.
610
611
        Removal is driven by the stored flows instead of EVC ids and dpids to also
612
        be able to handle the force mode when an EVC no longer exists. It also follows
613
        the same pattern that mef_eline currently uses.
614
615
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
616
        for settings.BATCH_INTERVAL per batch iteration.
617
618
        """
619 1
        switch_flows = defaultdict(set)
620 1
        for flows in stored_flows.values():
621 1
            for flow in flows:
622 1
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
623
624 1
        for dpid, cookies in switch_flows.items():
625 1
            cookie_vals = list(cookies)
626 1
            batch_size = settings.BATCH_SIZE
627 1
            if batch_size <= 0:
628
                batch_size = len(cookie_vals)
629
630 1
            for i in range(0, len(cookie_vals), batch_size):
631 1
                if i > 0:
632
                    await asyncio.sleep(settings.BATCH_INTERVAL)
633 1
                flows = [
634
                    {
635
                        "cookie": cookie,
636
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
637
                        "table_id": Table.OFPTT_ALL.value,
638
                    }
639
                    for cookie in cookie_vals[i : i + batch_size]
640
                ]
641 1
                event = KytosEvent(
642
                    "kytos.flow_manager.flows.delete",
643
                    content={
644
                        "dpid": dpid,
645
                        "force": True,
646
                        "flow_dict": {"flows": flows},
647
                    },
648
                )
649 1
                await self.controller.buffers.app.aput(event)
650
651 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
652
        """Install INT flow mods.
653
654
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
655
        for settings.BATCH_INTERVAL per batch iteration.
656
        """
657
658 1
        switch_flows = defaultdict(list)
659 1
        for flows in stored_flows.values():
660 1
            for flow in flows:
661 1
                switch_flows[flow["switch"]].append(flow["flow"])
662
663 1
        for dpid, flows in switch_flows.items():
664 1
            flow_vals = list(flows)
665 1
            batch_size = settings.BATCH_SIZE
666 1
            if batch_size <= 0:
667
                batch_size = len(flow_vals)
668
669 1
            for i in range(0, len(flow_vals), batch_size):
670 1
                if i > 0:
671
                    await asyncio.sleep(settings.BATCH_INTERVAL)
672 1
                flows = flow_vals[i : i + batch_size]
673 1
                event = KytosEvent(
674
                    "kytos.flow_manager.flows.install",
675
                    content={
676
                        "dpid": dpid,
677
                        "force": True,
678
                        "flow_dict": {"flows": flows},
679
                    },
680
                )
681
                await self.controller.buffers.app.aput(event)
682