Passed
Pull Request — master (#35)
by Vinicius
02:46
created

build.managers.int   A

Complexity

Total Complexity 34

Size/Duplication

Total Lines 226
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 128
dl 0
loc 226
ccs 0
cts 83
cp 0
rs 9.68
c 0
b 0
f 0
wmc 34

7 Methods

Rating   Name   Duplication   Size   Complexity  
B INTManager._install_int_flows() 0 30 6
A INTManager.enable_int() 0 34 1
B INTManager._validate_disable_evcs() 0 11 6
D INTManager._validate_map_enable_evcs() 0 47 13
B INTManager._remove_int_flows() 0 37 6
A INTManager.__init__() 0 3 1
A INTManager.disable_int() 0 28 1
1
"""INTManager module."""
2
import asyncio
3
from collections import defaultdict
4
from datetime import datetime
5
6
7
from kytos.core.controller import Controller
8
from kytos.core.events import KytosEvent
9
from napps.kytos.telemetry_int import utils
10
from napps.kytos.telemetry_int import settings
11
from kytos.core import log
12
import napps.kytos.telemetry_int.kytos_api_helper as api
13
from napps.kytos.telemetry_int.managers import flow_builder
14
from kytos.core.common import EntityStatus
15
16
from napps.kytos.telemetry_int.exceptions import (
17
    EVCNotFound,
18
    EVCHasINT,
19
    EVCHasNoINT,
20
    ProxyPortStatusNotUP,
21
)
22
23
24
class INTManager:
25
26
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
27
28
    def __init__(self, controller: Controller) -> None:
29
        """INTManager."""
30
        self.controller = controller
31
32
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
33
        """Disable INT on EVCs.
34
35
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
36
37
        The force bool option, if True, will bypass the following:
38
39
        1 - EVC not found
40
        2 - EVC doesn't have INT
41
42
        """
43
        self._validate_disable_evcs(evcs, force)
44
45
        log.info(f"Disabling telemetry INT on EVC ids: {list(evcs.keys())}")
46
47
        stored_flows = await api.get_stored_flows(
48
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
49
        )
50
51
        metadata = {
52
            "telemetry": {
53
                "enabled": False,
54
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
55
            }
56
        }
57
        await asyncio.gather(
58
            self._remove_int_flows(stored_flows),
59
            api.add_evcs_metadata(evcs, metadata, force),
60
        )
61
62
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
63
        """Enable INT on EVCs.
64
65
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
66
67
        The force bool option, if True, will bypass the following:
68
69
        1 - EVC already has INT
70
        2 - ProxyPort isn't UP
71
        Other cases won't be bypassed since at the point it won't have the data needed.
72
73
        """
74
        evcs = self._validate_map_enable_evcs(evcs, force)
75
76
        log.info(f"Enabling telemetry INT on EVC ids: {list(evcs.keys())}")
77
78
        stored_flows = flow_builder.build_int_flows(
79
            evcs,
80
            await utils.get_found_stored_flows(
81
                [
82
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
83
                    for evc_id in evcs
84
                ]
85
            ),
86
        )
87
88
        metadata = {
89
            "telemetry": {
90
                "enabled": True,
91
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
92
            }
93
        }
94
        await asyncio.gather(
95
            self._install_int_flows(stored_flows), api.add_evcs_metadata(evcs, metadata)
96
        )
97
98
    def _validate_disable_evcs(
99
        self,
100
        evcs: dict[str, dict],
101
        force=False,
102
    ) -> None:
103
        """Validate disable EVCs."""
104
        for evc_id, evc in evcs.items():
105
            if not evc and not force:
106
                raise EVCNotFound(evc_id)
107
            if not utils.has_int_enabled(evc) and not force:
108
                raise EVCHasNoINT(evc_id)
109
110
    def _validate_map_enable_evcs(
111
        self,
112
        evcs: dict[str, dict],
113
        force=False,
114
    ) -> dict[str, dict]:
115
        """Validate map enabling EVCs.
116
117
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
118
        so it can be reused later during provisioning.
119
120
        """
121
        for evc_id, evc in evcs.items():
122
            if not evc:
123
                raise EVCNotFound(evc_id)
124
            if utils.has_int_enabled(evc) and not force:
125
                raise EVCHasINT(evc_id)
126
127
            uni_a, uni_z = utils.get_evc_unis(evc)
128
            pp_a = utils.get_proxy_port_or_raise(
129
                self.controller, uni_a["interface_id"], evc_id
130
            )
131
            pp_z = utils.get_proxy_port_or_raise(
132
                self.controller, uni_z["interface_id"], evc_id
133
            )
134
135
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
136
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
137
138
            if pp_a.status != EntityStatus.UP and not force:
139
                dest_id = pp_a.destination.id if pp_a.destination else None
140
                dest_status = pp_a.status if pp_a.destination else None
141
                raise ProxyPortStatusNotUP(
142
                    evc_id,
143
                    f"proxy_port of {uni_a['interface_id']} isn't UP."
144
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
145
                    f"destination {dest_id} status {dest_status}",
146
                )
147
            if pp_z.status != EntityStatus.UP and not force:
148
                dest_id = pp_z.destination.id if pp_z.destination else None
149
                dest_status = pp_z.status if pp_z.destination else None
150
                raise ProxyPortStatusNotUP(
151
                    evc_id,
152
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
153
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
154
                    f"destination {dest_id} status {dest_status}",
155
                )
156
        return evcs
157
158
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
159
        """Delete int flows given a prefiltered stored_flows.
160
161
        Removal is driven by the stored flows instead of EVC ids and dpids to also
162
        be able to handle the force mode when an EVC no longer exists. It also follows
163
        the same pattern that mef_eline currently uses.
164
165
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
166
        for settings.BATCH_INTERVAL per batch iteration.
167
168
        """
169
        switch_flows = defaultdict(set)
170
        for flows in stored_flows.values():
171
            for flow in flows:
172
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
173
174
        for dpid, cookies in switch_flows.items():
175
            cookie_vals = list(cookies)
176
            batch_size = settings.BATCH_SIZE
177
            if batch_size <= 0:
178
                batch_size = len(cookie_vals)
179
180
            for i in range(0, len(cookie_vals), batch_size):
181
                flows = [
182
                    {"cookie": cookie, "cookie_mask": int(0xFFFFFFFFFFFFFFFF)}
183
                    for cookie in cookie_vals[i : i + batch_size]
184
                ]
185
                event = KytosEvent(
186
                    "kytos.flow_manager.flows.delete",
187
                    content={
188
                        "dpid": dpid,
189
                        "force": True,
190
                        "flow_dict": {"flows": flows},
191
                    },
192
                )
193
                await self.controller.buffers.app.aput(event)
194
                await asyncio.sleep(settings.BATCH_INTERVAL)
195
196
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
197
        """Install INT flow mods.
198
199
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
200
        for settings.BATCH_INTERVAL per batch iteration.
201
        """
202
203
        switch_flows = defaultdict(list)
204
        for flows in stored_flows.values():
205
            for flow in flows:
206
                switch_flows[flow["switch"]].append(flow["flow"])
207
208
        for dpid, flows in switch_flows.items():
209
            flow_vals = list(flows)
210
            batch_size = settings.BATCH_SIZE
211
            if batch_size <= 0:
212
                batch_size = len(flow_vals)
213
214
            for i in range(0, len(flow_vals), batch_size):
215
                flows = flow_vals[i : i + batch_size]
216
                event = KytosEvent(
217
                    "kytos.flow_manager.flows.install",
218
                    content={
219
                        "dpid": dpid,
220
                        "force": True,
221
                        "flow_dict": {"flows": flows},
222
                    },
223
                )
224
                await self.controller.buffers.app.aput(event)
225
                await asyncio.sleep(settings.BATCH_INTERVAL)
226