Passed
Push — master ( 68b174...92444a )
by Vinicius
05:28 queued 03:26
created

INTManager.handle_pp_metadata_added()   B

Complexity

Conditions 8

Size

Total Lines 50
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 9.4049

Importance

Changes 0
Metric Value
cc 8
eloc 31
nop 2
dl 0
loc 50
ccs 18
cts 25
cp 0.72
crap 9.4049
rs 7.2693
c 0
b 0
f 0
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from kytos.core.interface import Interface
11 1
from napps.kytos.telemetry_int import utils
12 1
from napps.kytos.telemetry_int import settings
13 1
from kytos.core import log
14 1
from kytos.core.link import Link
15 1
import napps.kytos.telemetry_int.kytos_api_helper as api
16 1
from napps.kytos.telemetry_int.managers.flow_builder import FlowBuilder
17 1
from kytos.core.common import EntityStatus
18 1
from napps.kytos.telemetry_int.proxy_port import ProxyPort
19
20 1
from napps.kytos.telemetry_int.exceptions import (
21
    EVCError,
22
    EVCNotFound,
23
    EVCHasINT,
24
    EVCHasNoINT,
25
    FlowsNotFound,
26
    ProxyPortError,
27
    ProxyPortStatusNotUP,
28
    ProxyPortDestNotFound,
29
    ProxyPortNotFound,
30
    ProxyPortSameSourceIntraEVC,
31
)
32
33
34 1
class INTManager:
35
36
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
37
38 1
    def __init__(self, controller: Controller) -> None:
39
        """INTManager."""
40 1
        self.controller = controller
41 1
        self.flow_builder = FlowBuilder()
42 1
        self._topo_link_lock = asyncio.Lock()
43 1
        self._intf_meta_lock = asyncio.Lock()
44
45
        # Keep track between each uni intf id and its src intf id port
46 1
        self.unis_src: dict[str, str] = {}
47
        # Keep track between src intf id and its ProxyPort instance
48 1
        self.srcs_pp: dict[str, ProxyPort] = {}
49
50 1
    def load_uni_src_proxy_ports(self, evcs: dict[str, dict]) -> None:
51
        """Load UNI ids src ids and their ProxyPort instances."""
52 1
        for evc_id, evc in evcs.items():
53 1
            if not utils.has_int_enabled(evc):
54 1
                continue
55
56 1
            uni_a_id = evc["uni_a"]["interface_id"]
57 1
            uni_z_id = evc["uni_z"]["interface_id"]
58 1
            uni_a = self.controller.get_interface_by_id(uni_a_id)
59 1
            uni_z = self.controller.get_interface_by_id(uni_z_id)
60 1
            if uni_a and "proxy_port" in uni_a.metadata:
61 1
                src_a = uni_a.switch.get_interface_by_port_no(
62
                    uni_a.metadata["proxy_port"]
63
                )
64 1
                self.unis_src[uni_a.id] = src_a.id
65 1
                try:
66 1
                    pp = self.get_proxy_port_or_raise(uni_a.id, evc_id)
67
                except ProxyPortDestNotFound:
68
                    pp = self.srcs_pp[src_a.id]
69 1
                pp.evc_ids.add(evc_id)
70
71 1
            if uni_z and "proxy_port" in uni_z.metadata:
72 1
                src_z = uni_z.switch.get_interface_by_port_no(
73
                    uni_z.metadata["proxy_port"]
74
                )
75 1
                self.unis_src[uni_z.id] = src_z.id
76 1
                try:
77 1
                    pp = self.get_proxy_port_or_raise(uni_z.id, evc_id)
78
                except ProxyPortDestNotFound:
79
                    pp = self.srcs_pp[src_z.id]
80 1
                pp.evc_ids.add(evc_id)
81
82 1
    async def handle_pp_link_down(self, link: Link) -> None:
83
        """Handle proxy_port link_down."""
84 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
85
            return
86 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
87 1
        if not pp:
88
            pp = self.srcs_pp.get(link.endpoint_b.id)
89 1
        if not pp or not pp.evc_ids:
90
            return
91
92 1
        async with self._topo_link_lock:
93 1
            evcs = await api.get_evcs(
94
                **{
95
                    "metadata.telemetry.enabled": "true",
96
                    "metadata.telemetry.status": "UP",
97
                }
98
            )
99 1
            to_deactivate = {
100
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
101
            }
102 1
            if not to_deactivate:
103
                return
104
105 1
            log.info(
106
                f"Handling link_down {link}, removing INT flows falling back to "
107
                f"mef_eline, EVC ids: {list(to_deactivate)}"
108
            )
109 1
            metadata = {
110
                "telemetry": {
111
                    "enabled": True,
112
                    "status": "DOWN",
113
                    "status_reason": ["proxy_port_down"],
114
                    "status_updated_at": datetime.utcnow().strftime(
115
                        "%Y-%m-%dT%H:%M:%S"
116
                    ),
117
                }
118
            }
119 1
            await self.remove_int_flows(to_deactivate, metadata)
120
121 1
    async def handle_pp_link_up(self, link: Link) -> None:
122
        """Handle proxy_port link_up."""
123 1
        if not settings.FALLBACK_TO_MEF_LOOP_DOWN:
124
            return
125 1
        pp = self.srcs_pp.get(link.endpoint_a.id)
126 1
        if not pp:
127
            pp = self.srcs_pp.get(link.endpoint_b.id)
128 1
        if not pp or not pp.evc_ids:
129
            return
130
131 1
        async with self._topo_link_lock:
132 1
            if link.status != EntityStatus.UP or link.status_reason:
133
                return
134 1
            evcs = await api.get_evcs(
135
                **{
136
                    "metadata.telemetry.enabled": "true",
137
                    "metadata.telemetry.status": "DOWN",
138
                }
139
            )
140
141 1
            to_install = {}
142 1
            for evc_id, evc in evcs.items():
143 1
                if any(
144
                    (
145
                        not evc["active"],
146
                        evc["archived"],
147
                        evc_id not in pp.evc_ids,
148
                        evc["uni_a"]["interface_id"] not in self.unis_src,
149
                        evc["uni_z"]["interface_id"] not in self.unis_src,
150
                    )
151
                ):
152
                    continue
153
154 1
                src_a_id = self.unis_src[evc["uni_a"]["interface_id"]]
155 1
                src_z_id = self.unis_src[evc["uni_z"]["interface_id"]]
156 1
                if (
157
                    src_a_id in self.srcs_pp
158
                    and src_z_id in self.srcs_pp
159
                    and self.srcs_pp[src_a_id].status == EntityStatus.UP
160
                    and self.srcs_pp[src_z_id].status == EntityStatus.UP
161
                ):
162 1
                    to_install[evc_id] = evc
163
164 1
            if not to_install:
165
                return
166
167 1
            try:
168 1
                to_install = self._validate_map_enable_evcs(to_install, force=True)
169
            except EVCError as exc:
170
                log.exception(exc)
171
                return
172
173 1
            log.info(
174
                f"Handling link_up {link}, deploying INT flows, "
175
                f"EVC ids: {list(to_install)}"
176
            )
177 1
            metadata = {
178
                "telemetry": {
179
                    "enabled": True,
180
                    "status": "UP",
181
                    "status_reason": [],
182
                    "status_updated_at": datetime.utcnow().strftime(
183
                        "%Y-%m-%dT%H:%M:%S"
184
                    ),
185
                }
186
            }
187 1
            try:
188 1
                await self.install_int_flows(to_install, metadata)
189
            except FlowsNotFound as exc:
190
                log.exception(f"FlowsNotFound {str(exc)}")
191
                return
192
193 1
    async def handle_pp_metadata_removed(self, intf: Interface) -> None:
194
        """Handle proxy port metadata removed."""
195 1
        if "proxy_port" in intf.metadata:
196
            return
197 1
        try:
198 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
199 1
            if not pp.evc_ids:
200
                return
201
        except KeyError:
202
            return
203
204 1
        async with self._intf_meta_lock:
205 1
            evcs = await api.get_evcs(
206
                **{
207
                    "metadata.telemetry.enabled": "true",
208
                    "metadata.telemetry.status": "UP",
209
                }
210
            )
211 1
            to_deactivate = {
212
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
213
            }
214 1
            if not to_deactivate:
215
                return
216
217 1
            log.info(
218
                f"Handling interface metadata removed on {intf}, removing INT flows "
219
                f"falling back to mef_eline, EVC ids: {list(to_deactivate)}"
220
            )
221 1
            metadata = {
222
                "telemetry": {
223
                    "enabled": True,
224
                    "status": "DOWN",
225
                    "status_reason": ["proxy_port_metadata_removed"],
226
                    "status_updated_at": datetime.utcnow().strftime(
227
                        "%Y-%m-%dT%H:%M:%S"
228
                    ),
229
                }
230
            }
231 1
            await self.remove_int_flows(to_deactivate, metadata)
232
233 1
    async def handle_pp_metadata_added(self, intf: Interface) -> None:
234
        """Handle proxy port metadata added.
235
236
        If an existing ProxyPort gets its proxy_port meadata updated
237
        and has associated EVCs then it'll remove and install the flows accordingly.
238
239
        """
240 1
        if "proxy_port" not in intf.metadata:
241
            return
242 1
        try:
243 1
            pp = self.srcs_pp[self.unis_src[intf.id]]
244 1
            if not pp.evc_ids:
245
                return
246
        except KeyError:
247
            return
248
249 1
        cur_source_intf = intf.switch.get_interface_by_port_no(
250
            intf.metadata.get("proxy_port")
251
        )
252 1
        if cur_source_intf == pp.source:
253 1
            return
254
255 1
        async with self._intf_meta_lock:
256 1
            pp.source = cur_source_intf
257
258 1
            evcs = await api.get_evcs(
259
                **{
260
                    "metadata.telemetry.enabled": "true",
261
                }
262
            )
263 1
            affected_evcs = {
264
                evc_id: evc for evc_id, evc in evcs.items() if evc_id in pp.evc_ids
265
            }
266 1
            if not affected_evcs:
267 1
                return
268
269 1
            log.info(
270
                f"Handling interface metadata updated on {intf}. It'll disable the "
271
                "EVCs to be safe, and then try to enable again with the updated "
272
                f" proxy port {pp}, EVC ids: {list(affected_evcs)}"
273
            )
274 1
            await self.disable_int(affected_evcs, force=True)
275 1
            try:
276 1
                await self.enable_int(affected_evcs, force=True)
277
            except ProxyPortSameSourceIntraEVC as exc:
278
                msg = (
279
                    f"Validation error when updating interface {intf} proxy port {pp}"
280
                    f" EVC ids: {list(affected_evcs)}, exception {str(exc)}"
281
                )
282
                log.error(msg)
283
284 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
285
        """Disable INT on EVCs.
286
287
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
288
289
        The force bool option, if True, will bypass the following:
290
291
        1 - EVC not found
292
        2 - EVC doesn't have INT
293
        3 - ProxyPortNotFound or ProxyPortDestNotFound
294
295
        """
296 1
        self._validate_disable_evcs(evcs, force)
297 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
298
299 1
        metadata = {
300
            "telemetry": {
301
                "enabled": False,
302
                "status": "DOWN",
303
                "status_reason": ["disabled"],
304
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
305
            }
306
        }
307 1
        await self.remove_int_flows(evcs, metadata, force=force)
308 1
        try:
309 1
            self._discard_pps_evc_ids(evcs)
310
        except ProxyPortError:
311
            if not force:
312
                raise
313
314 1
    async def remove_int_flows(
315
        self, evcs: dict[str, dict], metadata: dict, force=False
316
    ) -> None:
317
        """Remove INT flows and set metadata on EVCs."""
318 1
        stored_flows = await api.get_stored_flows(
319
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
320
        )
321 1
        await asyncio.gather(
322
            self._remove_int_flows(stored_flows),
323
            api.add_evcs_metadata(evcs, metadata, force),
324
        )
325
326 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
327
        """Enable INT on EVCs.
328
329
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
330
331
        The force bool option, if True, will bypass the following:
332
333
        1 - EVC already has INT
334
        2 - ProxyPort isn't UP
335
        Other cases won't be bypassed since at the point it won't have the data needed.
336
337
        """
338 1
        evcs = self._validate_map_enable_evcs(evcs, force)
339 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
340
341 1
        metadata = {
342
            "telemetry": {
343
                "enabled": True,
344
                "status": "UP",
345
                "status_reason": [],
346
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
347
            }
348
        }
349 1
        await self.install_int_flows(evcs, metadata)
350 1
        self._add_pps_evc_ids(evcs)
351
352 1
    async def redeploy_int(self, evcs: dict[str, dict]) -> None:
353
        """Redeploy INT on EVCs. It'll only delete and install the flows again.
354
355
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
356
        """
357 1
        evcs = self._validate_map_enable_evcs(evcs, force=True)
358 1
        log.info(f"Redeploying INT on EVC ids: {list(evcs.keys())}, force: True")
359
360 1
        stored_flows = await api.get_stored_flows(
361
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
362
        )
363 1
        await self._remove_int_flows(stored_flows)
364
365 1
        found_stored_flows = self.flow_builder.build_int_flows(
366
            evcs,
367
            await utils.get_found_stored_flows(
368
                [
369
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
370
                    for evc_id in evcs
371
                ]
372
            ),
373
        )
374 1
        await self._install_int_flows(found_stored_flows)
375
376 1
    async def install_int_flows(self, evcs: dict[str, dict], metadata: dict) -> None:
377
        """Install INT flows and set metadata on EVCs."""
378 1
        stored_flows = self.flow_builder.build_int_flows(
379
            evcs,
380
            await utils.get_found_stored_flows(
381
                [
382
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
383
                    for evc_id in evcs
384
                ]
385
            ),
386
        )
387 1
        await asyncio.gather(
388
            self._install_int_flows(stored_flows),
389
            api.add_evcs_metadata(evcs, metadata),
390
        )
391
392 1
    def get_proxy_port_or_raise(self, intf_id: str, evc_id: str) -> ProxyPort:
393
        """Return a ProxyPort assigned to a UNI or raise."""
394
395 1
        interface = self.controller.get_interface_by_id(intf_id)
396 1
        if not interface:
397 1
            raise ProxyPortNotFound(evc_id, f"UNI interface {intf_id} not found")
398
399 1
        if "proxy_port" not in interface.metadata:
400 1
            raise ProxyPortNotFound(
401
                evc_id, f"proxy_port metadata not found in {intf_id}"
402
            )
403
404 1
        source_intf = interface.switch.get_interface_by_port_no(
405
            interface.metadata.get("proxy_port")
406
        )
407 1
        if not source_intf:
408
            raise ProxyPortNotFound(
409
                evc_id,
410
                f"proxy_port of {intf_id} source interface not found",
411
            )
412
413 1
        pp = self.srcs_pp.get(source_intf.id)
414 1
        if not pp:
415 1
            pp = ProxyPort(self.controller, source_intf)
416 1
            self.srcs_pp[source_intf.id] = pp
417
418 1
        if not pp.destination:
419 1
            raise ProxyPortDestNotFound(
420
                evc_id,
421
                f"proxy_port of {intf_id} isn't looped or destination interface "
422
                "not found",
423
            )
424
425 1
        return pp
426
427 1
    def _validate_disable_evcs(
428
        self,
429
        evcs: dict[str, dict],
430
        force=False,
431
    ) -> None:
432
        """Validate disable EVCs."""
433 1
        for evc_id, evc in evcs.items():
434
            if not evc and not force:
435
                raise EVCNotFound(evc_id)
436
            if not utils.has_int_enabled(evc) and not force:
437
                raise EVCHasNoINT(evc_id)
438
439 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
440
        """Validate that an intra EVC is using different proxy ports.
441
442
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
443
        would ended up being overwritten since they'd be the same. Currently, an
444
        external loop will have unidirectional flows matching in the lower (source)
445
        port number.
446
        """
447 1
        pp_a = evc["uni_a"].get("proxy_port")
448 1
        pp_z = evc["uni_z"].get("proxy_port")
449 1
        if any(
450
            (
451
                not utils.is_intra_switch_evc(evc),
452
                pp_a is None,
453
                pp_z is None,
454
            )
455
        ):
456 1
            return
457 1
        if pp_a.source != pp_z.source:
458 1
            return
459
460 1
        raise ProxyPortSameSourceIntraEVC(
461
            evc["id"], "intra EVC UNIs must use different proxy ports"
462
        )
463
464 1
    def _validate_map_enable_evcs(
465
        self,
466
        evcs: dict[str, dict],
467
        force=False,
468
    ) -> dict[str, dict]:
469
        """Validate map enabling EVCs.
470
471
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
472
        so it can be reused later during provisioning.
473
474
        """
475 1
        for evc_id, evc in evcs.items():
476 1
            if not evc:
477
                raise EVCNotFound(evc_id)
478 1
            if utils.has_int_enabled(evc) and not force:
479
                raise EVCHasINT(evc_id)
480
481 1
            uni_a, uni_z = utils.get_evc_unis(evc)
482 1
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
483 1
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
484
485 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
486 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
487
488 1
            if pp_a.status != EntityStatus.UP and not force:
489
                dest_id = pp_a.destination.id if pp_a.destination else None
490
                dest_status = pp_a.status if pp_a.destination else None
491
                raise ProxyPortStatusNotUP(
492
                    evc_id,
493
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
494
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
495
                    f"destination {dest_id} status {dest_status}",
496
                )
497 1
            if pp_z.status != EntityStatus.UP and not force:
498
                dest_id = pp_z.destination.id if pp_z.destination else None
499
                dest_status = pp_z.status if pp_z.destination else None
500
                raise ProxyPortStatusNotUP(
501
                    evc_id,
502
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
503
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
504
                    f"destination {dest_id} status {dest_status}",
505
                )
506
507 1
            self._validate_intra_evc_different_proxy_ports(evc)
508 1
        return evcs
509
510 1
    def _add_pps_evc_ids(self, evcs: dict[str, dict]):
511
        """Add proxy ports evc_ids.
512
513
        This is meant to be called after an EVC is enabled.
514
        """
515 1
        for evc_id, evc in evcs.items():
516
            uni_a, uni_z = utils.get_evc_unis(evc)
517
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
518
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
519
            pp_a.evc_ids.add(evc_id)
520
            pp_z.evc_ids.add(evc_id)
521
            self.unis_src[evc["uni_a"]["interface_id"]] = pp_a.source.id
522
            self.unis_src[evc["uni_z"]["interface_id"]] = pp_z.source.id
523
524 1
    def _discard_pps_evc_ids(self, evcs: dict[str, dict]) -> None:
525
        """Discard proxy port evc_ids.
526
527
        This is meant to be called when an EVC is disabled.
528
        """
529 1
        for evc_id, evc in evcs.items():
530
            uni_a, uni_z = utils.get_evc_unis(evc)
531
            pp_a = self.get_proxy_port_or_raise(uni_a["interface_id"], evc_id)
532
            pp_z = self.get_proxy_port_or_raise(uni_z["interface_id"], evc_id)
533
            pp_a.evc_ids.discard(evc_id)
534
            pp_z.evc_ids.discard(evc_id)
535
536 1
    def evc_compare(
537
        self, stored_int_flows: dict, stored_mef_flows: dict, evcs: dict
538
    ) -> dict[str, list]:
539
        """EVC compare.
540
541
        Cases:
542
        - No INT enabled but has INT flows -> wrong_metadata_has_int_flows
543
        - INT enabled but has less flows than mef flows -> missing_some_int_flows
544
545
        """
546 1
        int_flows = {
547
            utils.get_id_from_cookie(k): v for k, v in stored_int_flows.items()
548
        }
549 1
        mef_flows = {
550
            utils.get_id_from_cookie(k): v for k, v in stored_mef_flows.items()
551
        }
552
553 1
        results = defaultdict(list)
554 1
        for evc in evcs.values():
555 1
            evc_id = evc["id"]
556
557 1
            if (
558
                not utils.has_int_enabled(evc)
559
                and evc_id in int_flows
560
                and int_flows[evc_id]
561
            ):
562 1
                results[evc_id].append("wrong_metadata_has_int_flows")
563
564 1
            if (
565
                utils.has_int_enabled(evc)
566
                and evc_id in mef_flows
567
                and mef_flows[evc_id]
568
                and (
569
                    evc_id not in int_flows
570
                    or (
571
                        evc_id in int_flows
572
                        and len(int_flows[evc_id]) < len(mef_flows[evc_id])
573
                    )
574
                )
575
            ):
576 1
                results[evc_id].append("missing_some_int_flows")
577 1
        return results
578
579 1
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
580
        """Delete int flows given a prefiltered stored_flows.
581
582
        Removal is driven by the stored flows instead of EVC ids and dpids to also
583
        be able to handle the force mode when an EVC no longer exists. It also follows
584
        the same pattern that mef_eline currently uses.
585
586
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
587
        for settings.BATCH_INTERVAL per batch iteration.
588
589
        """
590
        switch_flows = defaultdict(set)
591
        for flows in stored_flows.values():
592
            for flow in flows:
593
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
594
595
        for dpid, cookies in switch_flows.items():
596
            cookie_vals = list(cookies)
597
            batch_size = settings.BATCH_SIZE
598
            if batch_size <= 0:
599
                batch_size = len(cookie_vals)
600
601
            for i in range(0, len(cookie_vals), batch_size):
602
                if i > 0:
603
                    await asyncio.sleep(settings.BATCH_INTERVAL)
604
                flows = [
605
                    {
606
                        "cookie": cookie,
607
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
608
                        "table_id": Table.OFPTT_ALL.value,
609
                    }
610
                    for cookie in cookie_vals[i : i + batch_size]
611
                ]
612
                event = KytosEvent(
613
                    "kytos.flow_manager.flows.delete",
614
                    content={
615
                        "dpid": dpid,
616
                        "force": True,
617
                        "flow_dict": {"flows": flows},
618
                    },
619
                )
620
                await self.controller.buffers.app.aput(event)
621
622 1
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
623
        """Install INT flow mods.
624
625
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
626
        for settings.BATCH_INTERVAL per batch iteration.
627
        """
628
629 1
        switch_flows = defaultdict(list)
630 1
        for flows in stored_flows.values():
631
            for flow in flows:
632
                switch_flows[flow["switch"]].append(flow["flow"])
633
634 1
        for dpid, flows in switch_flows.items():
635
            flow_vals = list(flows)
636
            batch_size = settings.BATCH_SIZE
637
            if batch_size <= 0:
638
                batch_size = len(flow_vals)
639
640
            for i in range(0, len(flow_vals), batch_size):
641
                if i > 0:
642
                    await asyncio.sleep(settings.BATCH_INTERVAL)
643
                flows = flow_vals[i : i + batch_size]
644
                event = KytosEvent(
645
                    "kytos.flow_manager.flows.install",
646
                    content={
647
                        "dpid": dpid,
648
                        "force": True,
649
                        "flow_dict": {"flows": flows},
650
                    },
651
                )
652
                await self.controller.buffers.app.aput(event)
653