Passed
Push — master ( 5413c2...c98987 )
by Vinicius
08:23 queued 06:42
created

build.managers.int   A

Complexity

Total Complexity 36

Size/Duplication

Total Lines 237
Duplicated Lines 0 %

Test Coverage

Coverage 54.65%

Importance

Changes 0
Metric Value
eloc 137
dl 0
loc 237
ccs 47
cts 86
cp 0.5465
rs 9.52
c 0
b 0
f 0
wmc 36

7 Methods

Rating   Name   Duplication   Size   Complexity  
A INTManager.enable_int() 0 36 1
B INTManager._validate_disable_evcs() 0 11 6
B INTManager.remove_int_flows() 0 42 7
B INTManager.install_int_flows() 0 31 7
A INTManager.__init__() 0 3 1
A INTManager.disable_int() 0 30 1
D INTManager._validate_map_enable_evcs() 0 47 13
1
"""INTManager module."""
2 1
import asyncio
3 1
from collections import defaultdict
4 1
from datetime import datetime
5
6 1
from pyof.v0x04.controller2switch.table_mod import Table
7
8 1
from kytos.core.controller import Controller
9 1
from kytos.core.events import KytosEvent
10 1
from napps.kytos.telemetry_int import utils
11 1
from napps.kytos.telemetry_int import settings
12 1
from kytos.core import log
13 1
import napps.kytos.telemetry_int.kytos_api_helper as api
14 1
from napps.kytos.telemetry_int.managers import flow_builder
15 1
from kytos.core.common import EntityStatus
16
17 1
from napps.kytos.telemetry_int.exceptions import (
18
    EVCNotFound,
19
    EVCHasINT,
20
    EVCHasNoINT,
21
    ProxyPortStatusNotUP,
22
)
23
24
25 1
class INTManager:
26
27
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
28
29 1
    def __init__(self, controller: Controller) -> None:
30
        """INTManager."""
31 1
        self.controller = controller
32
33 1
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
34
        """Disable INT on EVCs.
35
36
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
37
38
        The force bool option, if True, will bypass the following:
39
40
        1 - EVC not found
41
        2 - EVC doesn't have INT
42
43
        """
44 1
        self._validate_disable_evcs(evcs, force)
45
46 1
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
47
48 1
        stored_flows = await api.get_stored_flows(
49
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
50
        )
51
52 1
        metadata = {
53
            "telemetry": {
54
                "enabled": False,
55
                "status": "DOWN",
56
                "status_reason": ["disabled"],
57
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
58
            }
59
        }
60 1
        await asyncio.gather(
61
            self.remove_int_flows(stored_flows),
62
            api.add_evcs_metadata(evcs, metadata, force),
63
        )
64
65 1
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
66
        """Enable INT on EVCs.
67
68
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
69
70
        The force bool option, if True, will bypass the following:
71
72
        1 - EVC already has INT
73
        2 - ProxyPort isn't UP
74
        Other cases won't be bypassed since at the point it won't have the data needed.
75
76
        """
77 1
        evcs = self._validate_map_enable_evcs(evcs, force)
78
79 1
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
80
81 1
        stored_flows = flow_builder.build_int_flows(
82
            evcs,
83
            await utils.get_found_stored_flows(
84
                [
85
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
86
                    for evc_id in evcs
87
                ]
88
            ),
89
        )
90
91 1
        metadata = {
92
            "telemetry": {
93
                "enabled": True,
94
                "status": "UP",
95
                "status_reason": [],
96
                "status_updated_at": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
97
            }
98
        }
99 1
        await asyncio.gather(
100
            self.install_int_flows(stored_flows), api.add_evcs_metadata(evcs, metadata)
101
        )
102
103 1
    def _validate_disable_evcs(
104
        self,
105
        evcs: dict[str, dict],
106
        force=False,
107
    ) -> None:
108
        """Validate disable EVCs."""
109 1
        for evc_id, evc in evcs.items():
110
            if not evc and not force:
111
                raise EVCNotFound(evc_id)
112
            if not utils.has_int_enabled(evc) and not force:
113
                raise EVCHasNoINT(evc_id)
114
115 1
    def _validate_map_enable_evcs(
116
        self,
117
        evcs: dict[str, dict],
118
        force=False,
119
    ) -> dict[str, dict]:
120
        """Validate map enabling EVCs.
121
122
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
123
        so it can be reused later during provisioning.
124
125
        """
126 1
        for evc_id, evc in evcs.items():
127 1
            if not evc:
128
                raise EVCNotFound(evc_id)
129 1
            if utils.has_int_enabled(evc) and not force:
130
                raise EVCHasINT(evc_id)
131
132 1
            uni_a, uni_z = utils.get_evc_unis(evc)
133 1
            pp_a = utils.get_proxy_port_or_raise(
134
                self.controller, uni_a["interface_id"], evc_id
135
            )
136 1
            pp_z = utils.get_proxy_port_or_raise(
137
                self.controller, uni_z["interface_id"], evc_id
138
            )
139
140 1
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
141 1
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
142
143 1
            if pp_a.status != EntityStatus.UP and not force:
144
                dest_id = pp_a.destination.id if pp_a.destination else None
145
                dest_status = pp_a.status if pp_a.destination else None
146
                raise ProxyPortStatusNotUP(
147
                    evc_id,
148
                    f"proxy_port of {uni_a['interface_id']} isn't UP. "
149
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
150
                    f"destination {dest_id} status {dest_status}",
151
                )
152 1
            if pp_z.status != EntityStatus.UP and not force:
153
                dest_id = pp_z.destination.id if pp_z.destination else None
154
                dest_status = pp_z.status if pp_z.destination else None
155
                raise ProxyPortStatusNotUP(
156
                    evc_id,
157
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
158
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
159
                    f"destination {dest_id} status {dest_status}",
160
                )
161 1
        return evcs
162
163 1
    async def remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
164
        """Delete int flows given a prefiltered stored_flows.
165
166
        Removal is driven by the stored flows instead of EVC ids and dpids to also
167
        be able to handle the force mode when an EVC no longer exists. It also follows
168
        the same pattern that mef_eline currently uses.
169
170
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
171
        for settings.BATCH_INTERVAL per batch iteration.
172
173
        """
174
        switch_flows = defaultdict(set)
175
        for flows in stored_flows.values():
176
            for flow in flows:
177
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
178
179
        for dpid, cookies in switch_flows.items():
180
            cookie_vals = list(cookies)
181
            batch_size = settings.BATCH_SIZE
182
            if batch_size <= 0:
183
                batch_size = len(cookie_vals)
184
185
            for i in range(0, len(cookie_vals), batch_size):
186
                if i > 0:
187
                    await asyncio.sleep(settings.BATCH_INTERVAL)
188
                flows = [
189
                    {
190
                        "cookie": cookie,
191
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
192
                        "table_id": Table.OFPTT_ALL.value,
193
                    }
194
                    for cookie in cookie_vals[i : i + batch_size]
195
                ]
196
                event = KytosEvent(
197
                    "kytos.flow_manager.flows.delete",
198
                    content={
199
                        "dpid": dpid,
200
                        "force": True,
201
                        "flow_dict": {"flows": flows},
202
                    },
203
                )
204
                await self.controller.buffers.app.aput(event)
205
206 1
    async def install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
207
        """Install INT flow mods.
208
209
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
210
        for settings.BATCH_INTERVAL per batch iteration.
211
        """
212
213 1
        switch_flows = defaultdict(list)
214 1
        for flows in stored_flows.values():
215
            for flow in flows:
216
                switch_flows[flow["switch"]].append(flow["flow"])
217
218 1
        for dpid, flows in switch_flows.items():
219
            flow_vals = list(flows)
220
            batch_size = settings.BATCH_SIZE
221
            if batch_size <= 0:
222
                batch_size = len(flow_vals)
223
224
            for i in range(0, len(flow_vals), batch_size):
225
                if i > 0:
226
                    await asyncio.sleep(settings.BATCH_INTERVAL)
227
                flows = flow_vals[i : i + batch_size]
228
                event = KytosEvent(
229
                    "kytos.flow_manager.flows.install",
230
                    content={
231
                        "dpid": dpid,
232
                        "force": True,
233
                        "flow_dict": {"flows": flows},
234
                    },
235
                )
236
                await self.controller.buffers.app.aput(event)
237