Passed
Pull Request — master (#35)
by Vinicius
02:31
created

build.managers.int.INTManager._install_int_flows()   B

Complexity

Conditions 7

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 21
nop 2
dl 0
loc 31
ccs 0
cts 16
cp 0
crap 56
rs 7.9759
c 0
b 0
f 0
1
"""INTManager module."""
2
import asyncio
3
from collections import defaultdict
4
from datetime import datetime
5
6
7
from kytos.core.controller import Controller
8
from kytos.core.events import KytosEvent
9
from napps.kytos.telemetry_int import utils
10
from napps.kytos.telemetry_int import settings
11
from kytos.core import log
12
import napps.kytos.telemetry_int.kytos_api_helper as api
13
from napps.kytos.telemetry_int.managers import flow_builder
14
from kytos.core.common import EntityStatus
15
16
from napps.kytos.telemetry_int.exceptions import (
17
    EVCNotFound,
18
    EVCHasINT,
19
    EVCHasNoINT,
20
    ProxyPortStatusNotUP,
21
)
22
23
24
class INTManager:
25
26
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
27
28
    def __init__(self, controller: Controller) -> None:
29
        """INTManager."""
30
        self.controller = controller
31
32
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
33
        """Disable INT on EVCs.
34
35
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
36
37
        The force bool option, if True, will bypass the following:
38
39
        1 - EVC not found
40
        2 - EVC doesn't have INT
41
42
        """
43
        self._validate_disable_evcs(evcs, force)
44
45
        log.info(
46
            f"Disabling telemetry INT on EVC ids: {list(evcs.keys())}, force: {force}"
47
        )
48
49
        stored_flows = await api.get_stored_flows(
50
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
51
        )
52
53
        metadata = {
54
            "telemetry": {
55
                "enabled": False,
56
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
57
            }
58
        }
59
        await asyncio.gather(
60
            self._remove_int_flows(stored_flows),
61
            api.add_evcs_metadata(evcs, metadata, force),
62
        )
63
64
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
65
        """Enable INT on EVCs.
66
67
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
68
69
        The force bool option, if True, will bypass the following:
70
71
        1 - EVC already has INT
72
        2 - ProxyPort isn't UP
73
        Other cases won't be bypassed since at the point it won't have the data needed.
74
75
        """
76
        evcs = self._validate_map_enable_evcs(evcs, force)
77
78
        log.info(
79
            f"Enabling telemetry INT on EVC ids: {list(evcs.keys())}, force: {force}"
80
        )
81
82
        stored_flows = flow_builder.build_int_flows(
83
            evcs,
84
            await utils.get_found_stored_flows(
85
                [
86
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
87
                    for evc_id in evcs
88
                ]
89
            ),
90
        )
91
92
        metadata = {
93
            "telemetry": {
94
                "enabled": True,
95
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
96
            }
97
        }
98
        await asyncio.gather(
99
            self._install_int_flows(stored_flows), api.add_evcs_metadata(evcs, metadata)
100
        )
101
102
    def _validate_disable_evcs(
103
        self,
104
        evcs: dict[str, dict],
105
        force=False,
106
    ) -> None:
107
        """Validate disable EVCs."""
108
        for evc_id, evc in evcs.items():
109
            if not evc and not force:
110
                raise EVCNotFound(evc_id)
111
            if not utils.has_int_enabled(evc) and not force:
112
                raise EVCHasNoINT(evc_id)
113
114
    def _validate_map_enable_evcs(
115
        self,
116
        evcs: dict[str, dict],
117
        force=False,
118
    ) -> dict[str, dict]:
119
        """Validate map enabling EVCs.
120
121
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
122
        so it can be reused later during provisioning.
123
124
        """
125
        for evc_id, evc in evcs.items():
126
            if not evc:
127
                raise EVCNotFound(evc_id)
128
            if utils.has_int_enabled(evc) and not force:
129
                raise EVCHasINT(evc_id)
130
131
            uni_a, uni_z = utils.get_evc_unis(evc)
132
            pp_a = utils.get_proxy_port_or_raise(
133
                self.controller, uni_a["interface_id"], evc_id
134
            )
135
            pp_z = utils.get_proxy_port_or_raise(
136
                self.controller, uni_z["interface_id"], evc_id
137
            )
138
139
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
140
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
141
142
            if pp_a.status != EntityStatus.UP and not force:
143
                dest_id = pp_a.destination.id if pp_a.destination else None
144
                dest_status = pp_a.status if pp_a.destination else None
145
                raise ProxyPortStatusNotUP(
146
                    evc_id,
147
                    f"proxy_port of {uni_a['interface_id']} isn't UP."
148
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
149
                    f"destination {dest_id} status {dest_status}",
150
                )
151
            if pp_z.status != EntityStatus.UP and not force:
152
                dest_id = pp_z.destination.id if pp_z.destination else None
153
                dest_status = pp_z.status if pp_z.destination else None
154
                raise ProxyPortStatusNotUP(
155
                    evc_id,
156
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
157
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
158
                    f"destination {dest_id} status {dest_status}",
159
                )
160
        return evcs
161
162
    async def _remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
163
        """Delete int flows given a prefiltered stored_flows.
164
165
        Removal is driven by the stored flows instead of EVC ids and dpids to also
166
        be able to handle the force mode when an EVC no longer exists. It also follows
167
        the same pattern that mef_eline currently uses.
168
169
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
170
        for settings.BATCH_INTERVAL per batch iteration.
171
172
        """
173
        switch_flows = defaultdict(set)
174
        for flows in stored_flows.values():
175
            for flow in flows:
176
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
177
178
        for dpid, cookies in switch_flows.items():
179
            cookie_vals = list(cookies)
180
            batch_size = settings.BATCH_SIZE
181
            if batch_size <= 0:
182
                batch_size = len(cookie_vals)
183
184
            for i in range(0, len(cookie_vals), batch_size):
185
                if i > 0:
186
                    await asyncio.sleep(settings.BATCH_INTERVAL)
187
                flows = [
188
                    {"cookie": cookie, "cookie_mask": int(0xFFFFFFFFFFFFFFFF)}
189
                    for cookie in cookie_vals[i : i + batch_size]
190
                ]
191
                event = KytosEvent(
192
                    "kytos.flow_manager.flows.delete",
193
                    content={
194
                        "dpid": dpid,
195
                        "force": True,
196
                        "flow_dict": {"flows": flows},
197
                    },
198
                )
199
                await self.controller.buffers.app.aput(event)
200
201
    async def _install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
202
        """Install INT flow mods.
203
204
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
205
        for settings.BATCH_INTERVAL per batch iteration.
206
        """
207
208
        switch_flows = defaultdict(list)
209
        for flows in stored_flows.values():
210
            for flow in flows:
211
                switch_flows[flow["switch"]].append(flow["flow"])
212
213
        for dpid, flows in switch_flows.items():
214
            flow_vals = list(flows)
215
            batch_size = settings.BATCH_SIZE
216
            if batch_size <= 0:
217
                batch_size = len(flow_vals)
218
219
            for i in range(0, len(flow_vals), batch_size):
220
                if i > 0:
221
                    await asyncio.sleep(settings.BATCH_INTERVAL)
222
                flows = flow_vals[i : i + batch_size]
223
                event = KytosEvent(
224
                    "kytos.flow_manager.flows.install",
225
                    content={
226
                        "dpid": dpid,
227
                        "force": True,
228
                        "flow_dict": {"flows": flows},
229
                    },
230
                )
231
                await self.controller.buffers.app.aput(event)
232