Passed
Push — master ( 124bde...9bde40 )
by Vinicius
08:06 queued 05:24
created

INTManager.handle_pp_metadata_added()   B

Complexity

Conditions 8

Size

Total Lines 50
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 9.4049

Importance

Changes 0
Metric Value
cc 8
eloc 31
nop 2
dl 0
loc 50
ccs 18
cts 25
cp 0.72
crap 9.4049
rs 7.2693
c 0
b 0
f 0
1
"""INTManager module."""
2
3 1
import asyncio
4 1
import copy
5 1
from collections import defaultdict
6 1
from datetime import datetime
7
8 1
from pyof.v0x04.controller2switch.table_mod import Table
9
10 1
from kytos.core.controller import Controller
11 1
from kytos.core.events import KytosEvent
12 1
from kytos.core.interface import Interface
13 1
from napps.kytos.telemetry_int import utils
14 1
from napps.kytos.telemetry_int import settings
15 1
from kytos.core import log
16 1
from kytos.core.link import Link
17 1
import napps.kytos.telemetry_int.kytos_api_helper as api
18 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
19 1
from kytos.core.common import EntityStatus
20 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
21
22 1
from napps.kytos.telemetry_int.exceptions import (
23
    EVCError,
24
    EVCNotFound,
25
    EVCHasINT,
26
    EVCHasNoINT,
27
    FlowsNotFound,
28
    ProxyPortError,
29
    ProxyPortStatusNotUP,
30
    ProxyPortDestNotFound,
31
    ProxyPortNotFound,
32
    ProxyPortSameSourceIntraEVC,
33
)
34
35
36 1
class INTManager:
37
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
38
39 1
    def __init__(self, controller: Controller) -> None:
40
        """INTManager."""
41 1
        self.controller = controller
42 1
        self.flow_builder = FlowBuilder()
43 1
        self._topo_link_lock = asyncio.Lock()
44 1
        self._intf_meta_lock = asyncio.Lock()
45
46
        # Keep track between each uni intf id and its src intf id port
47 1
        self.unis_src: dict[str, str] = {}
48
        # Keep track between src intf id and its ProxyPort instance
49 1
        self.srcs_pp: dict[str, ProxyPort] = {}
50
51 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
52
        """Load UNI ids src ids and their ProxyPort instances."""
53 1
        for evc_id, evc in evcs.items():
54 1
            if not utils.has_int_enabled(evc):
55 1
                continue
56
57 1
            uni_a_id = evc["uni_a"]["interface_id"]
58 1
            uni_z_id = evc["uni_z"]["interface_id"]
59 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
60 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
61 1
            if uni_a and "proxy_port" in uni_a.metadata:
62 1
                src_a = uni_a.switch.get_interface_by_port_no(
63
                    uni_a.metadata["proxy_port"]
64
                )
65 1
                self.unis_src[uni_a.id] = src_a.id
66 1
                try:
67 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
68
                except ProxyPortDestNotFound:
69
                    pp = self.srcs_pp[src_a.id]
70 1
                pp.evc_ids.add(evc_id)
71
72 1
            if uni_z and "proxy_port" in uni_z.metadata:
73 1
                src_z = uni_z.switch.get_interface_by_port_no(
74
                    uni_z.metadata["proxy_port"]
75
                )
76 1
                self.unis_src[uni_z.id] = src_z.id
77 1
                try:
78 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
79
                except ProxyPortDestNotFound:
80
                    pp = self.srcs_pp[src_z.id]
81 1
                pp.evc_ids.add(evc_id)
82
83 1
    async def handle_pp_link_down(self, link: Link) -> None:
84
        """Handle proxy_port link_down."""
85 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
86
            return
87 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
88 1
        if not pp:
89
            pp = self.srcs_pp.get(link.endpoint_b.id)
90 1
        if not pp or not pp.evc_ids:
91
            return
92
93 1
        async with self._topo_link_lock:
94 1
            evcs = await api.get_evcs(
95
                **{
96
                    "metadata.telemetry.enabled": "true",
97
                    "metadata.telemetry.status": "UP",
98
                }
99
            )
100 1
            to_deactivate = {
101
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
102
            }
103 1
            if not to_deactivate:
104
                return
105
106 1
            log.info(
107
                f"Handling link_down {link}, removing INT flows falling back to "
108
                f"mef_eline, EVC ids: {list(to_deactivate)}"
109
            )
110 1
            metadata = {
111
                "telemetry": {
112
                    "enabled": True,
113
                    "status": "DOWN",
114
                    "status_reason": ["proxy_port_down"],
115
                    "status_updated_at": datetime.utcnow().strftime(
116
                        "%Y-%m-%dT%H:%M:%S"
117
                    ),
118
                }
119
            }
120 1
            await self.remove_int_flows(to_deactivate, metadata)
121
122 1
    async def handle_pp_link_up(self, link: Link) -> None:
123
        """Handle proxy_port link_up."""
124 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
125
            return
126 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
127 1
        if not pp:
128
            pp = self.srcs_pp.get(link.endpoint_b.id)
129 1
        if not pp or not pp.evc_ids:
130
            return
131
132 1
        async with self._topo_link_lock:
133 1
            if link.status != EntityStatus.UP or link.status_reason:
134
                return
135 1
            evcs = await api.get_evcs(
136
                **{
137
                    "metadata.telemetry.enabled": "true",
138
                    "metadata.telemetry.status": "DOWN",
139
                }
140
            )
141
142 1
            to_install = {}
143 1
            for evc_id, evc in evcs.items():
144 1
                if any(
145
                    (
146
                        not evc["active"],
147
                        evc["archived"],
148
                        evc_id not in pp.evc_ids,
149
                        evc["uni_a"]["interface_id"] not in self.unis_src,
150
                        evc["uni_z"]["interface_id"] not in self.unis_src,
151
                    )
152
                ):
153
                    continue
154
155 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
156 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
157 1
                if (
158
                    src_a_id in self.srcs_pp
159
                    and src_z_id in self.srcs_pp
160
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
161
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
162
                ):
163 1
                    to_install[evc_id] = evc
164
165 1
            if not to_install:
166
                return
167
168 1
            try:
169 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
170
            except EVCError as exc:
171
                log.exception(exc)
172
                return
173
174 1
            log.info(
175
                f"Handling link_up {link}, deploying INT flows, "
176
                f"EVC ids: {list(to_install)}"
177
            )
178 1
            metadata = {
179
                "telemetry": {
180
                    "enabled": True,
181
                    "status": "UP",
182
                    "status_reason": [],
183
                    "status_updated_at": datetime.utcnow().strftime(
184
                        "%Y-%m-%dT%H:%M:%S"
185
                    ),
186
                }
187
            }
188 1
            try:
189 1
                await self.install_int_flows(to_install, metadata)
190
            except FlowsNotFound as exc:
191
                log.exception(f"FlowsNotFound {str(exc)}")
192
                return
193
194 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
195
        """Handle proxy port metadata removed."""
196 1
        if "proxy_port" in intf.metadata:
197
            return
198 1
        try:
199 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
200 1
            if not pp.evc_ids:
201
                return
202
        except KeyError:
203
            return
204
205 1
        async with self._intf_meta_lock:
206 1
            evcs = await api.get_evcs(
207
                **{
208
                    "metadata.telemetry.enabled": "true",
209
                    "metadata.telemetry.status": "UP",
210
                }
211
            )
212 1
            to_deactivate = {
213
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
214
            }
215 1
            if not to_deactivate:
216
                return
217
218 1
            log.info(
219
                f"Handling interface metadata removed on {intf}, removing INT flows "
220
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
221
            )
222 1
            metadata = {
223
                "telemetry": {
224
                    "enabled": True,
225
                    "status": "DOWN",
226
                    "status_reason": ["proxy_port_metadata_removed"],
227
                    "status_updated_at": datetime.utcnow().strftime(
228
                        "%Y-%m-%dT%H:%M:%S"
229
                    ),
230
                }
231
            }
232 1
            await self.remove_int_flows(to_deactivate, metadata)
233
234 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
235
        """Handle proxy port metadata added.
236
237
        If an existing ProxyPort gets its proxy_port meadata updated
238
        and has associated EVCs then it'll remove and install the flows accordingly.
239
240
        """
241 1
        if "proxy_port" not in intf.metadata:
242
            return
243 1
        try:
244 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
245 1
            if not pp.evc_ids:
246
                return
247
        except KeyError:
248
            return
249
250 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
251
            intf.metadata.get("proxy_port")
252
        )
253 1
        if cur_source_intf == pp.source:
254 1
            return
255
256 1
        async with self._intf_meta_lock:
257 1
            pp.source = cur_source_intf
258
259 1
            evcs = await api.get_evcs(
260
                **{
261
                    "metadata.telemetry.enabled": "true",
262
                }
263
            )
264 1
            affected_evcs = {
265
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
266
            }
267 1
            if not affected_evcs:
268 1
                return
269
270 1
            log.info(
271
                f"Handling interface metadata updated on {intf}. It'll disable the "
272
                "EVCs to be safe, and then try to enable again with the updated "
273
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
274
            )
275 1
            await self.disable_int(affected_evcs, force=True)
276 1
            try:
277 1
                await self.enable_int(affected_evcs, force=True)
278
            except ProxyPortSameSourceIntraEVC as exc:
279
                msg = (
280
                    f"Validation error when updating interface {intf} proxy port {pp}"
281
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
282
                )
283
                log.error(msg)
284
285 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
286
        """Disable INT on EVCs.
287
288
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
289
290
        The force bool option, if True, will bypass the following:
291
292
        1 - EVC not found
293
        2 - EVC doesn't have INT
294
        3 - ProxyPortNotFound or ProxyPortDestNotFound
295
296
        """
297 1
        self._validate_disable_evcs(evcs, force)
298 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
299
300 1
        metadata = {
301
            "telemetry": {
302
                "enabled": False,
303
                "status": "DOWN",
304
                "status_reason": ["disabled"],
305
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
306
            }
307
        }
308 1
        await self.remove_int_flows(evcs, metadata, force=force)
309 1
        try:
310 1
            self._discard_pps_evc_ids(evcs)
311
        except ProxyPortError:
312
            if not force:
313
                raise
314
315 1
    async def remove_int_flows(
316
        self, evcs: dict[str, dict], metadata: dict, force=False
317
    ) -> None:
318
        """Remove INT flows and set metadata on EVCs."""
319 1
        stored_flows = await api.get_stored_flows(
320
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
321
        )
322 1
        await asyncio.gather(
323
            self._remove_int_flows(stored_flows),
324
            api.add_evcs_metadata(evcs, metadata, force),
325
        )
326
327 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
328
        """Enable INT on EVCs.
329
330
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
331
332
        The force bool option, if True, will bypass the following:
333
334
        1 - EVC already has INT
335
        2 - ProxyPort isn't UP
336
        Other cases won't be bypassed since at the point it won't have the data needed.
337
338
        """
339 1
        evcs = self._validate_map_enable_evcs(evcs, force)
340 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
341
342 1
        metadata = {
343
            "telemetry": {
344
                "enabled": True,
345
                "status": "UP",
346
                "status_reason": [],
347
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
348
            }
349
        }
350 1
        await self.install_int_flows(evcs, metadata)
351 1
        self._add_pps_evc_ids(evcs)
352
353 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
354
        """Redeploy INT on EVCs. It'll remove, install and update metadata.
355
356
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
357
        """
358 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
359 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
360
361 1
        stored_flows = await api.get_stored_flows(
362
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
363
        )
364 1
        await self._remove_int_flows(stored_flows)
365 1
        metadata = {
366
            "telemetry": {
367
                "enabled": True,
368
                "status": "UP",
369
                "status_reason": [],
370
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
371
            }
372
        }
373 1
        await self.install_int_flows(evcs, metadata, force=True)
374
375 1
    async def install_int_flows(
376
        self, evcs: dict[str, dict], metadata: dict, force=False
377
    ) -> None:
378
        """Install INT flows and set metadata on EVCs."""
379 1
        stored_flows = self.flow_builder.build_int_flows(
380
            evcs,
381
            await utils.get_found_stored_flows(
382
                [
383
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
384
                    for evc_id in evcs
385
                ]
386
            ),
387
        )
388
389 1
        active_evcs, inactive_evcs, pp_down_evcs = {}, {}, {}
390 1
        for evc_id, evc in evcs.items():
391 1
            if not evc["active"]:
392
                inactive_evcs[evc_id] = evc
393
                continue
394 1
            if any((
395
                evc["uni_a"]["proxy_port"].status != EntityStatus.UP,
396
                evc["uni_z"]["proxy_port"].status != EntityStatus.UP,
397
            )):
398 1
                pp_down_evcs[evc_id] = evc
399 1
                continue
400
            active_evcs[evc_id] = evc
401
402 1
        inactive_metadata = copy.deepcopy(metadata)
403 1
        inactive_metadata["telemetry"]["status"] = "DOWN"
404 1
        pp_down_metadata = copy.deepcopy(inactive_metadata)
405 1
        inactive_metadata["telemetry"]["status_reason"] = ["no_flows"]
406 1
        pp_down_metadata["telemetry"]["status_reason"] = ["proxy_port_down"]
407
408 1
        await asyncio.gather(
409
            self._install_int_flows(stored_flows),
410
            api.add_evcs_metadata(inactive_evcs, inactive_metadata, force),
411
            api.add_evcs_metadata(pp_down_evcs, pp_down_metadata, force),
412
            api.add_evcs_metadata(active_evcs, metadata, force),
413
        )
414
415 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
416
        """Return a ProxyPort assigned to a UNI or raise."""
417
418 1
        interface = self.controller.get_interface_by_id(intf_id)
419 1
        if not interface:
420 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
421
422 1
        if "proxy_port" not in interface.metadata:
423 1
            raise ProxyPortNotFound(
424
                evc_id, f"proxy_port metadata not found in {intf_id}"
425
            )
426
427 1
        source_intf = interface.switch.get_interface_by_port_no(
428
            interface.metadata.get("proxy_port")
429
        )
430 1
        if not source_intf:
431
            raise ProxyPortNotFound(
432
                evc_id,
433
                f"proxy_port of {intf_id} source interface not found",
434
            )
435
436 1
        pp = self.srcs_pp.get(source_intf.id)
437 1
        if not pp:
438 1
            pp = ProxyPort(self.controller, source_intf)
439 1
            self.srcs_pp[source_intf.id] = pp
440
441 1
        if not pp.destination:
442 1
            raise ProxyPortDestNotFound(
443
                evc_id,
444
                f"proxy_port of {intf_id} isn't looped or destination interface "
445
                "not found",
446
            )
447
448 1
        return pp
449
450 1
    def _validate_disable_evcs(
451
        self,
452
        evcs: dict[str, dict],
453
        force=False,
454
    ) -> None:
455
        """Validate disable EVCs."""
456 1
        for evc_id, evc in evcs.items():
457
            if not evc and not force:
458
                raise EVCNotFound(evc_id)
459
            if not utils.has_int_enabled(evc) and not force:
460
                raise EVCHasNoINT(evc_id)
461
462 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
463
        """Validate that an intra EVC is using different proxy ports.
464
465
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
466
        would ended up being overwritten since they'd be the same. Currently, an
467
        external loop will have unidirectional flows matching in the lower (source)
468
        port number.
469
        """
470 1
        pp_a = evc["uni_a"].get("proxy_port")
471 1
        pp_z = evc["uni_z"].get("proxy_port")
472 1
        if any(
473
            (
474
                not utils.is_intra_switch_evc(evc),
475
                pp_a is None,
476
                pp_z is None,
477
            )
478
        ):
479 1
            return
480 1
        if pp_a.source != pp_z.source:
481 1
            return
482
483 1
        raise ProxyPortSameSourceIntraEVC(
484
            evc["id"], "intra EVC UNIs must use different proxy ports"
485
        )
486
487 1
    def _validate_map_enable_evcs(
488
        self,
489
        evcs: dict[str, dict],
490
        force=False,
491
    ) -> dict[str, dict]:
492
        """Validate map enabling EVCs.
493
494
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
495
        so it can be reused later during provisioning.
496
497
        """
498 1
        for evc_id, evc in evcs.items():
499 1
            if not evc:
500
                raise EVCNotFound(evc_id)
501 1
            if utils.has_int_enabled(evc) and not force:
502
                raise EVCHasINT(evc_id)
503
504 1
            uni_a, uni_z = utils.get_evc_unis(evc)
505 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
506 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
507
508 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
509 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
510
511 1
            if pp_a.status != EntityStatus.UP and not force:
512
                dest_id = pp_a.destination.id if pp_a.destination else None
513
                dest_status = pp_a.status if pp_a.destination else None
514
                raise ProxyPortStatusNotUP(
515
                    evc_id,
516
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
517
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
518
                    f"destination {dest_id} status {dest_status}",
519
                )
520 1
            if pp_z.status != EntityStatus.UP and not force:
521
                dest_id = pp_z.destination.id if pp_z.destination else None
522
                dest_status = pp_z.status if pp_z.destination else None
523
                raise ProxyPortStatusNotUP(
524
                    evc_id,
525
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
526
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
527
                    f"destination {dest_id} status {dest_status}",
528
                )
529
530 1
            self._validate_intra_evc_different_proxy_ports(evc)
531 1
        return evcs
532
533 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
534
        """Add proxy ports evc_ids.
535
536
        This is meant to be called after an EVC is enabled.
537
        """
538 1
        for evc_id, evc in evcs.items():
539 1
            uni_a, uni_z = utils.get_evc_unis(evc)
540 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
541 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
542 1
            pp_a.evc_ids.add(evc_id)
543 1
            pp_z.evc_ids.add(evc_id)
544 1
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
545 1
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
546
547 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
548
        """Discard proxy port evc_ids.
549
550
        This is meant to be called when an EVC is disabled.
551
        """
552 1
        for evc_id, evc in evcs.items():
553 1
            uni_a, uni_z = utils.get_evc_unis(evc)
554 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
555 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
556 1
            pp_a.evc_ids.discard(evc_id)
557 1
            pp_z.evc_ids.discard(evc_id)
558
559 1
    def evc_compare(
560
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
561
    ) -> dict[str, list]:
562
        """EVC compare.
563
564
        Cases:
565
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
566
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
567
568
        """
569 1
        int_flows = {
570
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
571
        }
572 1
        mef_flows = {
573
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
574
        }
575
576 1
        results = defaultdict(list)
577 1
        for evc in evcs.values():
578 1
            evc_id = evc["id"]
579
580 1
            if (
581
                not utils.has_int_enabled(evc)
582
                and evc_id in int_flows
583
                and int_flows[evc_id]
584
            ):
585 1
                results[evc_id].append("wrong_metadata_has_int_flows")
586
587 1
            if (
588
                utils.has_int_enabled(evc)
589
                and evc_id in mef_flows
590
                and mef_flows[evc_id]
591
                and (
592
                    evc_id not in int_flows
593
                    or (
594
                        evc_id in int_flows
595
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
596
                    )
597
                )
598
            ):
599 1
                results[evc_id].append("missing_some_int_flows")
600 1
        return results
601
602 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
603
        """Delete int flows given a prefiltered stored_flows.
604
605
        Removal is driven by the stored flows instead of EVC ids and dpids to also
606
        be able to handle the force mode when an EVC no longer exists. It also follows
607
        the same pattern that mef_eline currently uses.
608
609
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
610
        for settings.BATCH_INTERVAL per batch iteration.
611
612
        """
613 1
        switch_flows = defaultdict(set)
614 1
        for flows in stored_flows.values():
615 1
            for flow in flows:
616 1
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
617
618 1
        for dpid, cookies in switch_flows.items():
619 1
            cookie_vals = list(cookies)
620 1
            batch_size = settings.BATCH_SIZE
621 1
            if batch_size <= 0:
622
                batch_size = len(cookie_vals)
623
624 1
            for i in range(0, len(cookie_vals), batch_size):
625 1
                if i > 0:
626
                    await asyncio.sleep(settings.BATCH_INTERVAL)
627 1
                flows = [
628
                    {
629
                        "cookie": cookie,
630
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
631
                        "table_id": Table.OFPTT_ALL.value,
632
                    }
633
                    for cookie in cookie_vals[i : i + batch_size]
634
                ]
635 1
                event = KytosEvent(
636
                    "kytos.flow_manager.flows.delete",
637
                    content={
638
                        "dpid": dpid,
639
                        "force": True,
640
                        "flow_dict": {"flows": flows},
641
                    },
642
                )
643 1
                await self.controller.buffers.app.aput(event)
644
645 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
646
        """Install INT flow mods.
647
648
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
649
        for settings.BATCH_INTERVAL per batch iteration.
650
        """
651
652 1
        switch_flows = defaultdict(list)
653 1
        for flows in stored_flows.values():
654 1
            for flow in flows:
655 1
                switch_flows[flow["switch"]].append(flow["flow"])
656
657 1
        for dpid, flows in switch_flows.items():
658 1
            flow_vals = list(flows)
659 1
            batch_size = settings.BATCH_SIZE
660 1
            if batch_size <= 0:
661
                batch_size = len(flow_vals)
662
663 1
            for i in range(0, len(flow_vals), batch_size):
664 1
                if i > 0:
665
                    await asyncio.sleep(settings.BATCH_INTERVAL)
666 1
                flows = flow_vals[i : i + batch_size]
667 1
                event = KytosEvent(
668
                    "kytos.flow_manager.flows.install",
669
                    content={
670
                        "dpid": dpid,
671
                        "force": True,
672
                        "flow_dict": {"flows": flows},
673
                    },
674
                )
675
                await self.controller.buffers.app.aput(event)
676