Passed
Push — master ( 6bcbff...8461fc )
by Vinicius
05:03 queued 03:26
created

build.managers.int.INTManager.install_int_flows()   B

Complexity

Conditions 7

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 27.6718

Importance

Changes 0
Metric Value
cc 7
eloc 21
nop 2
dl 0
loc 31
ccs 4
cts 16
cp 0.25
crap 27.6718
rs 7.9759
c 0
b 0
f 0
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from napps.kytos.telemetry_int import utils
11 1
from napps.kytos.telemetry_int import settings
12 1
from kytos.core import log
13 1
import napps.kytos.telemetry_int.kytos_api_helper as api
14 1
from napps.kytos.telemetry_int.managers import flow_builder
15 1
from kytos.core.common import EntityStatus
16
17 1
from napps.kytos.telemetry_int.exceptions import (
18
    EVCNotFound,
19
    EVCHasINT,
20
    EVCHasNoINT,
21
    ProxyPortStatusNotUP,
22
    ProxyPortSameSourceIntraEVC
23
)
24
25
26 1
class INTManager:
27
28
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
29
30 1
    def __init__(self, controller: Controller) -> None:
31
        """INTManager."""
32 1
        self.controller = controller
33
34 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
35
        """Disable INT on EVCs.
36
37
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
38
39
        The force bool option, if True, will bypass the following:
40
41
        1 - EVC not found
42
        2 - EVC doesn't have INT
43
44
        """
45 1
        self._validate_disable_evcs(evcs, force)
46
47 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
48
49 1
        stored_flows = await api.get_stored_flows(
50
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
51
        )
52
53 1
        metadata = {
54
            "telemetry": {
55
                "enabled": False,
56
                "status": "DOWN",
57
                "status_reason": ["disabled"],
58
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
59
            }
60
        }
61 1
        await asyncio.gather(
62
            self.remove_int_flows(stored_flows),
63
            api.add_evcs_metadata(evcs, metadata, force),
64
        )
65
66 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
67
        """Enable INT on EVCs.
68
69
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
70
71
        The force bool option, if True, will bypass the following:
72
73
        1 - EVC already has INT
74
        2 - ProxyPort isn't UP
75
        Other cases won't be bypassed since at the point it won't have the data needed.
76
77
        """
78 1
        evcs = self._validate_map_enable_evcs(evcs, force)
79
80 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
81
82 1
        stored_flows = flow_builder.build_int_flows(
83
            evcs,
84
            await utils.get_found_stored_flows(
85
                [
86
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
87
                    for evc_id in evcs
88
                ]
89
            ),
90
        )
91
92 1
        metadata = {
93
            "telemetry": {
94
                "enabled": True,
95
                "status": "UP",
96
                "status_reason": [],
97
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
98
            }
99
        }
100 1
        await asyncio.gather(
101
            self.install_int_flows(stored_flows), api.add_evcs_metadata(evcs, metadata)
102
        )
103
104 1
    def _validate_disable_evcs(
105
        self,
106
        evcs: dict[str, dict],
107
        force=False,
108
    ) -> None:
109
        """Validate disable EVCs."""
110 1
        for evc_id, evc in evcs.items():
111
            if not evc and not force:
112
                raise EVCNotFound(evc_id)
113
            if not utils.has_int_enabled(evc) and not force:
114
                raise EVCHasNoINT(evc_id)
115
116 1
    def _validate_intra_evc_different_proxy_ports(self, evc: dict) -> None:
117
        """Validate that an intra EVC is using different proxy ports.
118
119
        If the same proxy port is used on both UNIs, of one the sink/pop related matches
120
        would ended up being overwritten since they'd be the same. Currently, an
121
        external loop will have unidirectional flows matching in the lower (source)
122
        port number.
123
        """
124 1
        pp_a = evc["uni_a"].get("proxy_port")
125 1
        pp_z = evc["uni_z"].get("proxy_port")
126 1
        if any(
127
            (
128
                not utils.is_intra_switch_evc(evc),
129
                pp_a is None,
130
                pp_z is None,
131
            )
132
        ):
133 1
            return
134 1
        if pp_a.source != pp_z.source:
135 1
            return
136
137 1
        raise ProxyPortSameSourceIntraEVC(
138
            evc["id"], "intra EVC UNIs must use different proxy ports"
139
        )
140
141 1
    def _validate_map_enable_evcs(
142
        self,
143
        evcs: dict[str, dict],
144
        force=False,
145
    ) -> dict[str, dict]:
146
        """Validate map enabling EVCs.
147
148
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
149
        so it can be reused later during provisioning.
150
151
        """
152 1
        for evc_id, evc in evcs.items():
153 1
            if not evc:
154
                raise EVCNotFound(evc_id)
155 1
            if utils.has_int_enabled(evc) and not force:
156
                raise EVCHasINT(evc_id)
157
158 1
            uni_a, uni_z = utils.get_evc_unis(evc)
159 1
            pp_a = utils.get_proxy_port_or_raise(
160
                self.controller, uni_a["interface_id"], evc_id
161
            )
162 1
            pp_z = utils.get_proxy_port_or_raise(
163
                self.controller, uni_z["interface_id"], evc_id
164
            )
165
166 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
167 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
168
169 1
            if pp_a.status != EntityStatus.UP and not force:
170
                dest_id = pp_a.destination.id if pp_a.destination else None
171
                dest_status = pp_a.status if pp_a.destination else None
172
                raise ProxyPortStatusNotUP(
173
                    evc_id,
174
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
175
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
176
                    f"destination {dest_id} status {dest_status}",
177
                )
178 1
            if pp_z.status != EntityStatus.UP and not force:
179
                dest_id = pp_z.destination.id if pp_z.destination else None
180
                dest_status = pp_z.status if pp_z.destination else None
181
                raise ProxyPortStatusNotUP(
182
                    evc_id,
183
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
184
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
185
                    f"destination {dest_id} status {dest_status}",
186
                )
187
188 1
            self._validate_intra_evc_different_proxy_ports(evc)
189 1
        return evcs
190
191 1
    async def remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
192
        """Delete int flows given a prefiltered stored_flows.
193
194
        Removal is driven by the stored flows instead of EVC ids and dpids to also
195
        be able to handle the force mode when an EVC no longer exists. It also follows
196
        the same pattern that mef_eline currently uses.
197
198
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
199
        for settings.BATCH_INTERVAL per batch iteration.
200
201
        """
202
        switch_flows = defaultdict(set)
203
        for flows in stored_flows.values():
204
            for flow in flows:
205
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
206
207
        for dpid, cookies in switch_flows.items():
208
            cookie_vals = list(cookies)
209
            batch_size = settings.BATCH_SIZE
210
            if batch_size <= 0:
211
                batch_size = len(cookie_vals)
212
213
            for i in range(0, len(cookie_vals), batch_size):
214
                if i > 0:
215
                    await asyncio.sleep(settings.BATCH_INTERVAL)
216
                flows = [
217
                    {
218
                        "cookie": cookie,
219
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
220
                        "table_id": Table.OFPTT_ALL.value,
221
                    }
222
                    for cookie in cookie_vals[i : i + batch_size]
223
                ]
224
                event = KytosEvent(
225
                    "kytos.flow_manager.flows.delete",
226
                    content={
227
                        "dpid": dpid,
228
                        "force": True,
229
                        "flow_dict": {"flows": flows},
230
                    },
231
                )
232
                await self.controller.buffers.app.aput(event)
233
234 1
    async def install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
235
        """Install INT flow mods.
236
237
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
238
        for settings.BATCH_INTERVAL per batch iteration.
239
        """
240
241 1
        switch_flows = defaultdict(list)
242 1
        for flows in stored_flows.values():
243
            for flow in flows:
244
                switch_flows[flow["switch"]].append(flow["flow"])
245
246 1
        for dpid, flows in switch_flows.items():
247
            flow_vals = list(flows)
248
            batch_size = settings.BATCH_SIZE
249
            if batch_size <= 0:
250
                batch_size = len(flow_vals)
251
252
            for i in range(0, len(flow_vals), batch_size):
253
                if i > 0:
254
                    await asyncio.sleep(settings.BATCH_INTERVAL)
255
                flows = flow_vals[i : i + batch_size]
256
                event = KytosEvent(
257
                    "kytos.flow_manager.flows.install",
258
                    content={
259
                        "dpid": dpid,
260
                        "force": True,
261
                        "flow_dict": {"flows": flows},
262
                    },
263
                )
264
                await self.controller.buffers.app.aput(event)
265