Passed
Push — master ( 03a8ab...fb285f )
by Vinicius
06:10 queued 03:58
created

build.managers.int   F

Complexity

Total Complexity 112

Size/Duplication

Total Lines 662
Duplicated Lines 0 %

Test Coverage

Coverage 81.75%

Importance

Changes 0
Metric Value
eloc 423
dl 0
loc 662
ccs 233
cts 285
cp 0.8175
rs 2
c 0
b 0
f 0
wmc 112

20 Methods

Rating   Name   Duplication   Size   Complexity  
A INTManager.enable_int() 0 25 1
C INTManager.load_uni_src_proxy_ports() 0 31 9
B INTManager._install_int_flows() 0 31 7
F INTManager.handle_pp_link_up() 0 71 17
B INTManager._validate_disable_evcs() 0 11 6
A INTManager._add_pps_evc_ids() 0 13 2
D INTManager._validate_map_enable_evcs() 0 45 13
B INTManager.get_proxy_port_or_raise() 0 34 6
A INTManager.remove_int_flows() 0 10 1
A INTManager._discard_pps_evc_ids() 0 11 2
C INTManager.evc_compare() 0 42 11
A INTManager.redeploy_int() 0 21 1
A INTManager.install_int_flows() 0 24 1
B INTManager.handle_pp_metadata_added() 0 50 8
B INTManager.handle_pp_link_down() 0 38 7
A INTManager.__init__() 0 11 1
B INTManager._remove_int_flows() 0 42 7
A INTManager.disable_int() 0 29 3
A INTManager._validate_intra_evc_different_proxy_ports() 0 23 3
B INTManager.handle_pp_metadata_removed() 0 39 6

How to fix   Complexity   

Complexity

Complex classes like build.managers.int often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
from collections import defaultdict
6 1
from datetime import datetime
7
8 1
from pyof.v0x04.controller2switch.table_mod import Table
9
10 1
from kytos.core.controller import Controller
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.core.interface import Interface
13 1
from napps.kytos.telemetry_int import utils
14 1
from napps.kytos.telemetry_int import settings
15 1
from kytos.core import log
16 1
from kytos.core.link import Link
17 1
import napps.kytos.telemetry_int.kytos_api_helper as api
18 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
19 1
from kytos.core.common import EntityStatus
20 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
21
22 1
from napps.kytos.telemetry_int.exceptions import (
23
    EVCError,
24
    EVCNotFound,
25
    EVCHasINT,
26
    EVCHasNoINT,
27
    FlowsNotFound,
28
    ProxyPortError,
29
    ProxyPortStatusNotUP,
30
    ProxyPortDestNotFound,
31
    ProxyPortNotFound,
32
    ProxyPortSameSourceIntraEVC,
33
)
34
35
36 1
class INTManager:
37
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
38
39 1
    def __init__(self, controller: Controller) -> None:
40
        """INTManager."""
41 1
        self.controller = controller
42 1
        self.flow_builder = FlowBuilder()
43 1
        self._topo_link_lock = asyncio.Lock()
44 1
        self._intf_meta_lock = asyncio.Lock()
45
46
        # Keep track between each uni intf id and its src intf id port
47 1
        self.unis_src: dict[str, str] = {}
48
        # Keep track between src intf id and its ProxyPort instance
49 1
        self.srcs_pp: dict[str, ProxyPort] = {}
50
51 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
52
        """Load UNI ids src ids and their ProxyPort instances."""
53 1
        for evc_id, evc in evcs.items():
54 1
            if not utils.has_int_enabled(evc):
55 1
                continue
56
57 1
            uni_a_id = evc["uni_a"]["interface_id"]
58 1
            uni_z_id = evc["uni_z"]["interface_id"]
59 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
60 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
61 1
            if uni_a and "proxy_port" in uni_a.metadata:
62 1
                src_a = uni_a.switch.get_interface_by_port_no(
63
                    uni_a.metadata["proxy_port"]
64
                )
65 1
                self.unis_src[uni_a.id] = src_a.id
66 1
                try:
67 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
68
                except ProxyPortDestNotFound:
69
                    pp = self.srcs_pp[src_a.id]
70 1
                pp.evc_ids.add(evc_id)
71
72 1
            if uni_z and "proxy_port" in uni_z.metadata:
73 1
                src_z = uni_z.switch.get_interface_by_port_no(
74
                    uni_z.metadata["proxy_port"]
75
                )
76 1
                self.unis_src[uni_z.id] = src_z.id
77 1
                try:
78 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
79
                except ProxyPortDestNotFound:
80
                    pp = self.srcs_pp[src_z.id]
81 1
                pp.evc_ids.add(evc_id)
82
83 1
    async def handle_pp_link_down(self, link: Link) -> None:
84
        """Handle proxy_port link_down."""
85 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
86
            return
87 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
88 1
        if not pp:
89
            pp = self.srcs_pp.get(link.endpoint_b.id)
90 1
        if not pp or not pp.evc_ids:
91
            return
92
93 1
        async with self._topo_link_lock:
94 1
            evcs = await api.get_evcs(
95
                **{
96
                    "metadata.telemetry.enabled": "true",
97
                    "metadata.telemetry.status": "UP",
98
                }
99
            )
100 1
            to_deactivate = {
101
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
102
            }
103 1
            if not to_deactivate:
104
                return
105
106 1
            log.info(
107
                f"Handling link_down {link}, removing INT flows falling back to "
108
                f"mef_eline, EVC ids: {list(to_deactivate)}"
109
            )
110 1
            metadata = {
111
                "telemetry": {
112
                    "enabled": True,
113
                    "status": "DOWN",
114
                    "status_reason": ["proxy_port_down"],
115
                    "status_updated_at": datetime.utcnow().strftime(
116
                        "%Y-%m-%dT%H:%M:%S"
117
                    ),
118
                }
119
            }
120 1
            await self.remove_int_flows(to_deactivate, metadata)
121
122 1
    async def handle_pp_link_up(self, link: Link) -> None:
123
        """Handle proxy_port link_up."""
124 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
125
            return
126 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
127 1
        if not pp:
128
            pp = self.srcs_pp.get(link.endpoint_b.id)
129 1
        if not pp or not pp.evc_ids:
130
            return
131
132 1
        async with self._topo_link_lock:
133 1
            if link.status != EntityStatus.UP or link.status_reason:
134
                return
135 1
            evcs = await api.get_evcs(
136
                **{
137
                    "metadata.telemetry.enabled": "true",
138
                    "metadata.telemetry.status": "DOWN",
139
                }
140
            )
141
142 1
            to_install = {}
143 1
            for evc_id, evc in evcs.items():
144 1
                if any(
145
                    (
146
                        not evc["active"],
147
                        evc["archived"],
148
                        evc_id not in pp.evc_ids,
149
                        evc["uni_a"]["interface_id"] not in self.unis_src,
150
                        evc["uni_z"]["interface_id"] not in self.unis_src,
151
                    )
152
                ):
153
                    continue
154
155 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
156 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
157 1
                if (
158
                    src_a_id in self.srcs_pp
159
                    and src_z_id in self.srcs_pp
160
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
161
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
162
                ):
163 1
                    to_install[evc_id] = evc
164
165 1
            if not to_install:
166
                return
167
168 1
            try:
169 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
170
            except EVCError as exc:
171
                log.exception(exc)
172
                return
173
174 1
            log.info(
175
                f"Handling link_up {link}, deploying INT flows, "
176
                f"EVC ids: {list(to_install)}"
177
            )
178 1
            metadata = {
179
                "telemetry": {
180
                    "enabled": True,
181
                    "status": "UP",
182
                    "status_reason": [],
183
                    "status_updated_at": datetime.utcnow().strftime(
184
                        "%Y-%m-%dT%H:%M:%S"
185
                    ),
186
                }
187
            }
188 1
            try:
189 1
                await self.install_int_flows(to_install, metadata)
190
            except FlowsNotFound as exc:
191
                log.exception(f"FlowsNotFound {str(exc)}")
192
                return
193
194 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
195
        """Handle proxy port metadata removed."""
196 1
        if "proxy_port" in intf.metadata:
197
            return
198 1
        try:
199 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
200 1
            if not pp.evc_ids:
201
                return
202
        except KeyError:
203
            return
204
205 1
        async with self._intf_meta_lock:
206 1
            evcs = await api.get_evcs(
207
                **{
208
                    "metadata.telemetry.enabled": "true",
209
                    "metadata.telemetry.status": "UP",
210
                }
211
            )
212 1
            to_deactivate = {
213
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
214
            }
215 1
            if not to_deactivate:
216
                return
217
218 1
            log.info(
219
                f"Handling interface metadata removed on {intf}, removing INT flows "
220
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
221
            )
222 1
            metadata = {
223
                "telemetry": {
224
                    "enabled": True,
225
                    "status": "DOWN",
226
                    "status_reason": ["proxy_port_metadata_removed"],
227
                    "status_updated_at": datetime.utcnow().strftime(
228
                        "%Y-%m-%dT%H:%M:%S"
229
                    ),
230
                }
231
            }
232 1
            await self.remove_int_flows(to_deactivate, metadata)
233
234 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
235
        """Handle proxy port metadata added.
236
237
        If an existing ProxyPort gets its proxy_port meadata updated
238
        and has associated EVCs then it'll remove and install the flows accordingly.
239
240
        """
241 1
        if "proxy_port" not in intf.metadata:
242
            return
243 1
        try:
244 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
245 1
            if not pp.evc_ids:
246
                return
247
        except KeyError:
248
            return
249
250 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
251
            intf.metadata.get("proxy_port")
252
        )
253 1
        if cur_source_intf == pp.source:
254 1
            return
255
256 1
        async with self._intf_meta_lock:
257 1
            pp.source = cur_source_intf
258
259 1
            evcs = await api.get_evcs(
260
                **{
261
                    "metadata.telemetry.enabled": "true",
262
                }
263
            )
264 1
            affected_evcs = {
265
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
266
            }
267 1
            if not affected_evcs:
268 1
                return
269
270 1
            log.info(
271
                f"Handling interface metadata updated on {intf}. It'll disable the "
272
                "EVCs to be safe, and then try to enable again with the updated "
273
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
274
            )
275 1
            await self.disable_int(affected_evcs, force=True)
276 1
            try:
277 1
                await self.enable_int(affected_evcs, force=True)
278
            except ProxyPortSameSourceIntraEVC as exc:
279
                msg = (
280
                    f"Validation error when updating interface {intf} proxy port {pp}"
281
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
282
                )
283
                log.error(msg)
284
285 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
286
        """Disable INT on EVCs.
287
288
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
289
290
        The force bool option, if True, will bypass the following:
291
292
        1 - EVC not found
293
        2 - EVC doesn't have INT
294
        3 - ProxyPortNotFound or ProxyPortDestNotFound
295
296
        """
297 1
        self._validate_disable_evcs(evcs, force)
298 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
299
300 1
        metadata = {
301
            "telemetry": {
302
                "enabled": False,
303
                "status": "DOWN",
304
                "status_reason": ["disabled"],
305
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
306
            }
307
        }
308 1
        await self.remove_int_flows(evcs, metadata, force=force)
309 1
        try:
310 1
            self._discard_pps_evc_ids(evcs)
311
        except ProxyPortError:
312
            if not force:
313
                raise
314
315 1
    async def remove_int_flows(
316
        self, evcs: dict[str, dict], metadata: dict, force=False
317
    ) -> None:
318
        """Remove INT flows and set metadata on EVCs."""
319 1
        stored_flows = await api.get_stored_flows(
320
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
321
        )
322 1
        await asyncio.gather(
323
            self._remove_int_flows(stored_flows),
324
            api.add_evcs_metadata(evcs, metadata, force),
325
        )
326
327 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
328
        """Enable INT on EVCs.
329
330
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
331
332
        The force bool option, if True, will bypass the following:
333
334
        1 - EVC already has INT
335
        2 - ProxyPort isn't UP
336
        Other cases won't be bypassed since at the point it won't have the data needed.
337
338
        """
339 1
        evcs = self._validate_map_enable_evcs(evcs, force)
340 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
341
342 1
        metadata = {
343
            "telemetry": {
344
                "enabled": True,
345
                "status": "UP",
346
                "status_reason": [],
347
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
348
            }
349
        }
350 1
        await self.install_int_flows(evcs, metadata)
351 1
        self._add_pps_evc_ids(evcs)
352
353 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
354
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
355
356
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
357
        """
358 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
359 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
360
361 1
        stored_flows = await api.get_stored_flows(
362
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
363
        )
364 1
        await self._remove_int_flows(stored_flows)
365 1
        metadata = {
366
            "telemetry": {
367
                "enabled": True,
368
                "status": "UP",
369
                "status_reason": [],
370
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
371
            }
372
        }
373 1
        await self.install_int_flows(evcs, metadata, force=True)
374
375 1
    async def install_int_flows(
376
        self, evcs: dict[str, dict], metadata: dict, force=False
377
    ) -> None:
378
        """Install INT flows and set metadata on EVCs."""
379 1
        stored_flows = self.flow_builder.build_int_flows(
380
            evcs,
381
            await utils.get_found_stored_flows(
382
                [
383
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
384
                    for evc_id in evcs
385
                ]
386
            ),
387
        )
388
389 1
        active_evcs = {k: v for k, v in evcs.items() if v["active"]}
390 1
        inactive_evcs = {k: v for k, v in evcs.items() if not v["active"]}
391 1
        inactive_metadata = copy.deepcopy(metadata)
392 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
393 1
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
394
395 1
        await asyncio.gather(
396
            self._install_int_flows(stored_flows),
397
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
398
            api.add_evcs_metadata(active_evcs, metadata, force),
399
        )
400
401 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
402
        """Return a ProxyPort assigned to a UNI or raise."""
403
404 1
        interface = self.controller.get_interface_by_id(intf_id)
405 1
        if not interface:
406 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
407
408 1
        if "proxy_port" not in interface.metadata:
409 1
            raise ProxyPortNotFound(
410
                evc_id, f"proxy_port metadata not found in {intf_id}"
411
            )
412
413 1
        source_intf = interface.switch.get_interface_by_port_no(
414
            interface.metadata.get("proxy_port")
415
        )
416 1
        if not source_intf:
417
            raise ProxyPortNotFound(
418
                evc_id,
419
                f"proxy_port of {intf_id} source interface not found",
420
            )
421
422 1
        pp = self.srcs_pp.get(source_intf.id)
423 1
        if not pp:
424 1
            pp = ProxyPort(self.controller, source_intf)
425 1
            self.srcs_pp[source_intf.id] = pp
426
427 1
        if not pp.destination:
428 1
            raise ProxyPortDestNotFound(
429
                evc_id,
430
                f"proxy_port of {intf_id} isn't looped or destination interface "
431
                "not found",
432
            )
433
434 1
        return pp
435
436 1
    def _validate_disable_evcs(
437
        self,
438
        evcs: dict[str, dict],
439
        force=False,
440
    ) -> None:
441
        """Validate disable EVCs."""
442 1
        for evc_id, evc in evcs.items():
443
            if not evc and not force:
444
                raise EVCNotFound(evc_id)
445
            if not utils.has_int_enabled(evc) and not force:
446
                raise EVCHasNoINT(evc_id)
447
448 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
449
        """Validate that an intra EVC is using different proxy ports.
450
451
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
452
        would ended up being overwritten since they'd be the same. Currently, an
453
        external loop will have unidirectional flows matching in the lower (source)
454
        port number.
455
        """
456 1
        pp_a = evc["uni_a"].get("proxy_port")
457 1
        pp_z = evc["uni_z"].get("proxy_port")
458 1
        if any(
459
            (
460
                not utils.is_intra_switch_evc(evc),
461
                pp_a is None,
462
                pp_z is None,
463
            )
464
        ):
465 1
            return
466 1
        if pp_a.source != pp_z.source:
467 1
            return
468
469 1
        raise ProxyPortSameSourceIntraEVC(
470
            evc["id"], "intra EVC UNIs must use different proxy ports"
471
        )
472
473 1
    def _validate_map_enable_evcs(
474
        self,
475
        evcs: dict[str, dict],
476
        force=False,
477
    ) -> dict[str, dict]:
478
        """Validate map enabling EVCs.
479
480
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
481
        so it can be reused later during provisioning.
482
483
        """
484 1
        for evc_id, evc in evcs.items():
485 1
            if not evc:
486
                raise EVCNotFound(evc_id)
487 1
            if utils.has_int_enabled(evc) and not force:
488
                raise EVCHasINT(evc_id)
489
490 1
            uni_a, uni_z = utils.get_evc_unis(evc)
491 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
492 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
493
494 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
495 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
496
497 1
            if pp_a.status != EntityStatus.UP and not force:
498
                dest_id = pp_a.destination.id if pp_a.destination else None
499
                dest_status = pp_a.status if pp_a.destination else None
500
                raise ProxyPortStatusNotUP(
501
                    evc_id,
502
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
503
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
504
                    f"destination {dest_id} status {dest_status}",
505
                )
506 1
            if pp_z.status != EntityStatus.UP and not force:
507
                dest_id = pp_z.destination.id if pp_z.destination else None
508
                dest_status = pp_z.status if pp_z.destination else None
509
                raise ProxyPortStatusNotUP(
510
                    evc_id,
511
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
512
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
513
                    f"destination {dest_id} status {dest_status}",
514
                )
515
516 1
            self._validate_intra_evc_different_proxy_ports(evc)
517 1
        return evcs
518
519 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
520
        """Add proxy ports evc_ids.
521
522
        This is meant to be called after an EVC is enabled.
523
        """
524 1
        for evc_id, evc in evcs.items():
525 1
            uni_a, uni_z = utils.get_evc_unis(evc)
526 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
527 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
528 1
            pp_a.evc_ids.add(evc_id)
529 1
            pp_z.evc_ids.add(evc_id)
530 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
531 1
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
532
533 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
534
        """Discard proxy port evc_ids.
535
536
        This is meant to be called when an EVC is disabled.
537
        """
538 1
        for evc_id, evc in evcs.items():
539 1
            uni_a, uni_z = utils.get_evc_unis(evc)
540 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
541 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
542 1
            pp_a.evc_ids.discard(evc_id)
543 1
            pp_z.evc_ids.discard(evc_id)
544
545 1
    def evc_compare(
546
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
547
    ) -> dict[str, list]:
548
        """EVC compare.
549
550
        Cases:
551
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
552
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
553
554
        """
555 1
        int_flows = {
556
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
557
        }
558 1
        mef_flows = {
559
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
560
        }
561
562 1
        results = defaultdict(list)
563 1
        for evc in evcs.values():
564 1
            evc_id = evc["id"]
565
566 1
            if (
567
                not utils.has_int_enabled(evc)
568
                and evc_id in int_flows
569
                and int_flows[evc_id]
570
            ):
571 1
                results[evc_id].append("wrong_metadata_has_int_flows")
572
573 1
            if (
574
                utils.has_int_enabled(evc)
575
                and evc_id in mef_flows
576
                and mef_flows[evc_id]
577
                and (
578
                    evc_id not in int_flows
579
                    or (
580
                        evc_id in int_flows
581
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
582
                    )
583
                )
584
            ):
585 1
                results[evc_id].append("missing_some_int_flows")
586 1
        return results
587
588 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
589
        """Delete int flows given a prefiltered stored_flows.
590
591
        Removal is driven by the stored flows instead of EVC ids and dpids to also
592
        be able to handle the force mode when an EVC no longer exists. It also follows
593
        the same pattern that mef_eline currently uses.
594
595
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
596
        for settings.BATCH_INTERVAL per batch iteration.
597
598
        """
599 1
        switch_flows = defaultdict(set)
600 1
        for flows in stored_flows.values():
601 1
            for flow in flows:
602 1
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
603
604 1
        for dpid, cookies in switch_flows.items():
605 1
            cookie_vals = list(cookies)
606 1
            batch_size = settings.BATCH_SIZE
607 1
            if batch_size <= 0:
608
                batch_size = len(cookie_vals)
609
610 1
            for i in range(0, len(cookie_vals), batch_size):
611 1
                if i > 0:
612
                    await asyncio.sleep(settings.BATCH_INTERVAL)
613 1
                flows = [
614
                    {
615
                        "cookie": cookie,
616
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
617
                        "table_id": Table.OFPTT_ALL.value,
618
                    }
619
                    for cookie in cookie_vals[i : i + batch_size]
620
                ]
621 1
                event = KytosEvent(
622
                    "kytos.flow_manager.flows.delete",
623
                    content={
624
                        "dpid": dpid,
625
                        "force": True,
626
                        "flow_dict": {"flows": flows},
627
                    },
628
                )
629 1
                await self.controller.buffers.app.aput(event)
630
631 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
632
        """Install INT flow mods.
633
634
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
635
        for settings.BATCH_INTERVAL per batch iteration.
636
        """
637
638 1
        switch_flows = defaultdict(list)
639 1
        for flows in stored_flows.values():
640 1
            for flow in flows:
641 1
                switch_flows[flow["switch"]].append(flow["flow"])
642
643 1
        for dpid, flows in switch_flows.items():
644 1
            flow_vals = list(flows)
645 1
            batch_size = settings.BATCH_SIZE
646 1
            if batch_size <= 0:
647
                batch_size = len(flow_vals)
648
649 1
            for i in range(0, len(flow_vals), batch_size):
650 1
                if i > 0:
651
                    await asyncio.sleep(settings.BATCH_INTERVAL)
652 1
                flows = flow_vals[i : i + batch_size]
653 1
                event = KytosEvent(
654
                    "kytos.flow_manager.flows.install",
655
                    content={
656
                        "dpid": dpid,
657
                        "force": True,
658
                        "flow_dict": {"flows": flows},
659
                    },
660
                )
661
                await self.controller.buffers.app.aput(event)
662