Passed
Pull Request — master (#35)
by Vinicius
02:37
created

INTManager._validate_disable_evcs()   B

Complexity

Conditions 6

Size

Total Lines 11
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
cc 6
eloc 9
nop 3
dl 0
loc 11
ccs 0
cts 6
cp 0
crap 42
rs 8.6666
c 0
b 0
f 0
1
"""INTManager module."""
2
import asyncio
3
from collections import defaultdict
4
from datetime import datetime
5
6
from pyof.v0x04.controller2switch.table_mod import Table
7
8
from kytos.core.controller import Controller
9
from kytos.core.events import KytosEvent
10
from napps.kytos.telemetry_int import utils
11
from napps.kytos.telemetry_int import settings
12
from kytos.core import log
13
import napps.kytos.telemetry_int.kytos_api_helper as api
14
from napps.kytos.telemetry_int.managers import flow_builder
15
from kytos.core.common import EntityStatus
16
17
from napps.kytos.telemetry_int.exceptions import (
18
    EVCNotFound,
19
    EVCHasINT,
20
    EVCHasNoINT,
21
    ProxyPortStatusNotUP,
22
)
23
24
25
class INTManager:
26
27
    """INTManager encapsulates and aggregates telemetry-related functionalities."""
28
29
    def __init__(self, controller: Controller) -> None:
30
        """INTManager."""
31
        self.controller = controller
32
33
    async def disable_int(self, evcs: dict[str, dict], force=False) -> None:
34
        """Disable INT on EVCs.
35
36
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
37
38
        The force bool option, if True, will bypass the following:
39
40
        1 - EVC not found
41
        2 - EVC doesn't have INT
42
43
        """
44
        self._validate_disable_evcs(evcs, force)
45
46
        log.info(f"Disabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
47
48
        stored_flows = await api.get_stored_flows(
49
            [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX) for evc_id in evcs]
50
        )
51
52
        metadata = {
53
            "telemetry": {
54
                "enabled": False,
55
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
56
            }
57
        }
58
        await asyncio.gather(
59
            self.remove_int_flows(stored_flows),
60
            api.add_evcs_metadata(evcs, metadata, force),
61
        )
62
63
    async def enable_int(self, evcs: dict[str, dict], force=False) -> None:
64
        """Enable INT on EVCs.
65
66
        evcs is a dict of prefetched EVCs from mef_eline based on evc_ids.
67
68
        The force bool option, if True, will bypass the following:
69
70
        1 - EVC already has INT
71
        2 - ProxyPort isn't UP
72
        Other cases won't be bypassed since at the point it won't have the data needed.
73
74
        """
75
        evcs = self._validate_map_enable_evcs(evcs, force)
76
77
        log.info(f"Enabling INT on EVC ids: {list(evcs.keys())}, force: {force}")
78
79
        stored_flows = flow_builder.build_int_flows(
80
            evcs,
81
            await utils.get_found_stored_flows(
82
                [
83
                    utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
84
                    for evc_id in evcs
85
                ]
86
            ),
87
        )
88
89
        metadata = {
90
            "telemetry": {
91
                "enabled": True,
92
                "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S"),
93
            }
94
        }
95
        await asyncio.gather(
96
            self.install_int_flows(stored_flows), api.add_evcs_metadata(evcs, metadata)
97
        )
98
99
    def _validate_disable_evcs(
100
        self,
101
        evcs: dict[str, dict],
102
        force=False,
103
    ) -> None:
104
        """Validate disable EVCs."""
105
        for evc_id, evc in evcs.items():
106
            if not evc and not force:
107
                raise EVCNotFound(evc_id)
108
            if not utils.has_int_enabled(evc) and not force:
109
                raise EVCHasNoINT(evc_id)
110
111
    def _validate_map_enable_evcs(
112
        self,
113
        evcs: dict[str, dict],
114
        force=False,
115
    ) -> dict[str, dict]:
116
        """Validate map enabling EVCs.
117
118
        This function also maps both uni_a and uni_z dicts with their ProxyPorts, just
119
        so it can be reused later during provisioning.
120
121
        """
122
        for evc_id, evc in evcs.items():
123
            if not evc:
124
                raise EVCNotFound(evc_id)
125
            if utils.has_int_enabled(evc) and not force:
126
                raise EVCHasINT(evc_id)
127
128
            uni_a, uni_z = utils.get_evc_unis(evc)
129
            pp_a = utils.get_proxy_port_or_raise(
130
                self.controller, uni_a["interface_id"], evc_id
131
            )
132
            pp_z = utils.get_proxy_port_or_raise(
133
                self.controller, uni_z["interface_id"], evc_id
134
            )
135
136
            uni_a["proxy_port"], uni_z["proxy_port"] = pp_a, pp_z
137
            evc["uni_a"], evc["uni_z"] = uni_a, uni_z
138
139
            if pp_a.status != EntityStatus.UP and not force:
140
                dest_id = pp_a.destination.id if pp_a.destination else None
141
                dest_status = pp_a.status if pp_a.destination else None
142
                raise ProxyPortStatusNotUP(
143
                    evc_id,
144
                    f"proxy_port of {uni_a['interface_id']} isn't UP."
145
                    f"source {pp_a.source.id} status {pp_a.source.status}, "
146
                    f"destination {dest_id} status {dest_status}",
147
                )
148
            if pp_z.status != EntityStatus.UP and not force:
149
                dest_id = pp_z.destination.id if pp_z.destination else None
150
                dest_status = pp_z.status if pp_z.destination else None
151
                raise ProxyPortStatusNotUP(
152
                    evc_id,
153
                    f"proxy_port of {uni_z['interface_id']} isn't UP."
154
                    f"source {pp_z.source.id} status {pp_z.source.status}, "
155
                    f"destination {dest_id} status {dest_status}",
156
                )
157
        return evcs
158
159
    async def remove_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
160
        """Delete int flows given a prefiltered stored_flows.
161
162
        Removal is driven by the stored flows instead of EVC ids and dpids to also
163
        be able to handle the force mode when an EVC no longer exists. It also follows
164
        the same pattern that mef_eline currently uses.
165
166
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
167
        for settings.BATCH_INTERVAL per batch iteration.
168
169
        """
170
        switch_flows = defaultdict(set)
171
        for flows in stored_flows.values():
172
            for flow in flows:
173
                switch_flows[flow["switch"]].add(flow["flow"]["cookie"])
174
175
        for dpid, cookies in switch_flows.items():
176
            cookie_vals = list(cookies)
177
            batch_size = settings.BATCH_SIZE
178
            if batch_size <= 0:
179
                batch_size = len(cookie_vals)
180
181
            for i in range(0, len(cookie_vals), batch_size):
182
                if i > 0:
183
                    await asyncio.sleep(settings.BATCH_INTERVAL)
184
                flows = [
185
                    {
186
                        "cookie": cookie,
187
                        "cookie_mask": int(0xFFFFFFFFFFFFFFFF),
188
                        "table_id": Table.OFPTT_ALL.value,
189
                    }
190
                    for cookie in cookie_vals[i : i + batch_size]
191
                ]
192
                event = KytosEvent(
193
                    "kytos.flow_manager.flows.delete",
194
                    content={
195
                        "dpid": dpid,
196
                        "force": True,
197
                        "flow_dict": {"flows": flows},
198
                    },
199
                )
200
                await self.controller.buffers.app.aput(event)
201
202
    async def install_int_flows(self, stored_flows: dict[int, list[dict]]) -> None:
203
        """Install INT flow mods.
204
205
        The flows will be batched per dpid based on settings.BATCH_SIZE and will wait
206
        for settings.BATCH_INTERVAL per batch iteration.
207
        """
208
209
        switch_flows = defaultdict(list)
210
        for flows in stored_flows.values():
211
            for flow in flows:
212
                switch_flows[flow["switch"]].append(flow["flow"])
213
214
        for dpid, flows in switch_flows.items():
215
            flow_vals = list(flows)
216
            batch_size = settings.BATCH_SIZE
217
            if batch_size <= 0:
218
                batch_size = len(flow_vals)
219
220
            for i in range(0, len(flow_vals), batch_size):
221
                if i > 0:
222
                    await asyncio.sleep(settings.BATCH_INTERVAL)
223
                flows = flow_vals[i : i + batch_size]
224
                event = KytosEvent(
225
                    "kytos.flow_manager.flows.install",
226
                    content={
227
                        "dpid": dpid,
228
                        "force": True,
229
                        "flow_dict": {"flows": flows},
230
                    },
231
                )
232
                await self.controller.buffers.app.aput(event)
233