Passed
Push — master ( 4bf6f4...81fe6d )
by Vinicius
02:40 queued 17s
created

build.managers.flow_builder.FlowBuilder.build_int_flows()   A

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 15
nop 3
dl 0
loc 23
ccs 7
cts 7
cp 1
crap 3
rs 9.65
c 0
b 0
f 0
1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int import settings
11
12
13 1
class FlowBuilder:
14
    """FlowBuilder."""
15
16 1
    def __init__(self):
17
        """Constructor of FlowBuilder."""
18 1
        self.table_group = {"evpl": 2, "epl": 3}
19
20 1
    def build_int_flows(
21
        self,
22
        evcs: dict[str, dict],
23
        stored_flows: dict[int, list[dict]],
24
    ) -> dict[int, list[dict]]:
25
        """build INT flows.
26
27
        It'll map and create all INT flows needed, for now since each EVC
28
        is bidirectional, it'll provision bidirectionally too. In the future,
29
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
30
        """
31 1
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
32 1
        for evc_id, evc in evcs.items():
33 1
            for flow in itertools.chain(
34
                self._build_int_source_flows("uni_a", evc, stored_flows),
35
                self._build_int_source_flows("uni_z", evc, stored_flows),
36
                self._build_int_hop_flows(evc, stored_flows),
37
                self._build_int_sink_flows("uni_z", evc, stored_flows),
38
                self._build_int_sink_flows("uni_a", evc, stored_flows),
39
            ):
40 1
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
41 1
                flows_per_cookie[cookie].append(flow)
42 1
        return flows_per_cookie
43
44 1
    def build_failover_old_flows(
45
        self, evcs: dict[str, dict], old_flows: dict[int, list[dict]]
46
    ) -> dict[int, list[dict]]:
47
        """Build (old path) failover related to remove flows.
48
49
        If sink NNIs svlan are different, it'll regenerate the rest of sink loop flows.
50
        Otherwise, it'll just remove the same received flows except with int cookie
51
        value the deletion uses flow mod OFPFC_DELETE, so no need to include the
52
        additional INT keys in the match like nw_proto for deletion.
53
        """
54
55 1
        removed_flows = defaultdict(list)
56 1
        for evc_id, evc in evcs.items():
57 1
            dpid_a, dpid_z = evc["uni_a"]["switch"], evc["uni_z"]["switch"]
58
59 1
            cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
60 1
            int_cookie = settings.INT_COOKIE_PREFIX << 56 | (cookie & 0xFFFFFFFFFFFFFF)
61 1
            cur_sink_a_svlan, cur_sink_z_svlan = None, None
62 1
            sink_a_flows: list[dict] = []
63 1
            sink_z_flows: list[dict] = []
64
65 1
            for link in evc["current_path"]:
66 1
                if cur_sink_a_svlan is None and (
67
                    svlan := utils.get_svlan_dpid_link(link, dpid_a)
68
                ):
69 1
                    cur_sink_a_svlan = svlan
70 1
                if cur_sink_z_svlan is None and (
71
                    svlan := utils.get_svlan_dpid_link(link, dpid_z)
72
                ):
73 1
                    cur_sink_z_svlan = svlan
74 1
                if cur_sink_a_svlan is not None and cur_sink_z_svlan is not None:
75 1
                    break
76
77 1
            for flow in old_flows[cookie]:
78 1
                if not sink_a_flows and flow["switch"] == dpid_a:
79 1
                    if (
80
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_a_svlan
81
                        and cur_sink_a_svlan
82
                    ):
83 1
                        sink_a_flows = self._build_int_sink_flows(
84
                            "uni_a", evc, old_flows
85
                        )
86
                    else:
87 1
                        flow["flow"]["cookie"] = int_cookie
88 1
                        sink_a_flows = [flow]
89 1
                elif not sink_z_flows and flow["switch"] == dpid_z:
90 1
                    if (
91
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_z_svlan
92
                        and cur_sink_z_svlan
93
                    ):
94 1
                        sink_z_flows = self._build_int_sink_flows(
95
                            "uni_z", evc, old_flows
96
                        )
97
                    else:
98 1
                        flow["flow"]["cookie"] = int_cookie
99 1
                        sink_z_flows = [flow]
100 1
                if sink_a_flows and sink_z_flows:
101 1
                    break
102
103 1
            hop_flows = self._build_int_hop_flows(evc, old_flows)
104 1
            removed_flows[cookie] = list(
105
                itertools.chain(sink_a_flows, hop_flows, sink_z_flows)
106
            )
107 1
        return removed_flows
108
109 1
    def _build_int_source_flows(
110
        self,
111
        uni_src_key: Literal["uni_a", "uni_z"],
112
        evc: dict,
113
        stored_flows: dict[int, list[dict]],
114
    ) -> list[dict]:
115
        """Build INT source flows.
116
117
        At the INT source, one flow becomes 3: one for UDP on table 0,
118
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
119
        On table 0, we use just new instructions: push_int and goto_table
120
        On table X, we add add_int_metadata before the original actions
121
        INT flows will have higher priority.
122
        """
123 1
        new_flows = []
124 1
        new_int_flow_tbl_0_tcp = {}
125 1
        src_uni = evc[uni_src_key]
126
127
        # Get the original flows
128 1
        dpid = src_uni["switch"]
129 1
        for flow in stored_flows[
130
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
131
        ]:
132 1
            if (
133
                flow["switch"] == dpid
134
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
135
            ):
136 1
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
137 1
                break
138
139 1
        if not new_int_flow_tbl_0_tcp:
140 1
            return []
141
142 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
143 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
144 1
        utils.set_owner(new_int_flow_tbl_0_tcp)
145
146
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
147 1
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
148
149
        # Prepare TCP Flow for Table 0
150 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
151 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
152 1
        utils.set_priority(new_int_flow_tbl_0_tcp)
153
154
        # The flow_manager has two outputs: instructions and actions.
155 1
        table_group = self.table_group
156 1
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
157 1
        instructions = [
158
            {
159
                "instruction_type": "apply_actions",
160
                "actions": [{"action_type": "push_int"}],
161
            },
162
            {"instruction_type": "goto_table", "table_id": new_table_id},
163
        ]
164 1
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
165
166
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
167 1
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
168 1
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
169
170
        # Prepare Flows for Table X - No TCP or UDP specifics
171 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
172
173
        # if intra-switch EVC, then output port should be the dst UNI's source port
174 1
        if utils.is_intra_switch_evc(evc):
175 1
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
176 1
            proxy_port = dst_uni["proxy_port"]
177 1
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
178 1
                if instruction["instruction_type"] == "apply_actions":
179 1
                    for action in instruction["actions"]:
180 1
                        if action["action_type"] == "output":
181
                            # Since this is the INT Source, we use source
182
                            # to avoid worrying about single or multi
183
                            # home physical loops.
184
                            # The choice for destination is at the INT Sink.
185 1
                            action["port"] = proxy_port.source.port_number
186
187
                    # remove set_vlan action if it exists, this is for
188
                    # avoding a redundant set_vlan since it'll be set in the egress sink
189 1
                    instruction["actions"] = utils.modify_actions(
190
                        instruction["actions"], ["set_vlan"], remove=True
191
                    )
192
193 1
        instructions = utils.add_to_apply_actions(
194
            new_int_flow_tbl_x["flow"]["instructions"],
195
            new_instruction={"action_type": "add_int_metadata"},
196
            position=0,
197
        )
198
199 1
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
200
201 1
        new_flows.append(new_int_flow_tbl_0_tcp)
202 1
        new_flows.append(new_int_flow_tbl_0_udp)
203 1
        new_flows.append(new_int_flow_tbl_x)
204
205 1
        return new_flows
206
207 1
    def _build_int_hop_flows(
208
        self,
209
        evc: dict,
210
        stored_flows: dict[int, list[dict]],
211
    ) -> list[dict]:
212
        """Build INT hop flows.
213
214
        At the INT hops, one flow adds two more: one for UDP on table 0,
215
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
216
        before other actions.
217
        """
218
219 1
        new_flows = []
220 1
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
221
222 1
        for flow in stored_flows[
223
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
224
        ]:
225 1
            if flow["switch"] in excluded_dpids:
226 1
                continue
227 1
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
228
                continue
229
230 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
231 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
232 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
233 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
234
235
            # Prepare TCP Flow
236 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
237 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
238 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
239
240 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
241 1
                if instruction["instruction_type"] == "apply_actions":
242 1
                    instruction["actions"].insert(
243
                        0, {"action_type": "add_int_metadata"}
244
                    )
245
246
            # Prepare UDP Flow
247 1
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
248 1
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
249
250 1
            new_flows.append(new_int_flow_tbl_0_tcp)
251 1
            new_flows.append(new_int_flow_tbl_0_udp)
252
253 1
        return new_flows
254
255 1
    def _build_int_sink_flows(
256
        self,
257
        uni_dst_key: Literal["uni_a", "uni_z"],
258
        evc: dict,
259
        stored_flows: dict[int, list[dict]],
260
    ) -> list[dict]:
261
        """
262
        Build INT sink flows.
263
264
        At the INT sink, one flow becomes many:
265
        1. Before the proxy, we do add_int_metadata as an INT hop.
266
        We need to keep the set_queue
267
        2. After the proxy, we do send_report and pop_int and output
268
        We only use table 0 for #1.
269
        We use table X (2 or 3) for #2. for pop_int and output
270
        """
271 1
        new_flows = []
272 1
        dst_uni = evc[uni_dst_key]
273 1
        proxy_port = dst_uni["proxy_port"]
274 1
        dpid = dst_uni["switch"]
275
276 1
        for flow in stored_flows[
277
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
278
        ]:
279
            # Only consider this sink's dpid flows
280 1
            if flow["switch"] != dpid:
281 1
                continue
282
            # Only consider flows coming from NNI interfaces
283 1
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
284 1
                continue
285
286 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
287
288 1
            if not new_int_flow_tbl_0_tcp:
289
                continue
290
291 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
292 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
293 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
294
295
            # Save for pos-proxy flows
296 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
297 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
298
299
            # Prepare TCP Flow for Table 0 PRE proxy
300 1
            if not utils.is_intra_switch_evc(evc):
301 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
302 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
303 1
                utils.set_priority(new_int_flow_tbl_0_tcp)
304
305
                # Add telemetry, keep set_queue, output to the proxy port.
306 1
                output_port_no = proxy_port.source.port_number
307 1
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
308 1
                    if instruction["instruction_type"] == "apply_actions":
309
                        # Keep set_queue
310 1
                        actions = utils.modify_actions(
311
                            instruction["actions"],
312
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
313
                            remove=True,
314
                        )
315 1
                        actions.insert(0, {"action_type": "add_int_metadata"})
316 1
                        actions.append(
317
                            {"action_type": "output", "port": output_port_no}
318
                        )
319 1
                        instruction["actions"] = actions
320
321
                # Prepare UDP Flow for Table 0
322 1
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
323 1
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
324
325 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
326 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
327
328
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
329 1
            in_port_no = proxy_port.destination.port_number
330
331 1
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
332 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
333
334 1
            table_group = self.table_group
335 1
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
336 1
            instructions = [
337
                {
338
                    "instruction_type": "apply_actions",
339
                    "actions": [{"action_type": "send_report"}],
340
                },
341
                {
342
                    "instruction_type": "goto_table",
343
                    "table_id": new_table_id,
344
                },
345
            ]
346 1
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
347
348
            # Prepare Flows for Table X POS proxy
349 1
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
350 1
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
351
352 1
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
353 1
                if instruction["instruction_type"] == "apply_actions":
354 1
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
355
356 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
357 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
358
359
        return new_flows
360