Passed
Pull Request — master (#104)
by Vinicius
04:02
created

build.managers.flow_builder   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 295
Duplicated Lines 0 %

Test Coverage

Coverage 98.43%

Importance

Changes 0
Metric Value
eloc 169
dl 0
loc 295
ccs 125
cts 127
cp 0.9843
rs 9.84
c 0
b 0
f 0
wmc 32

5 Methods

Rating   Name   Duplication   Size   Complexity  
C FlowBuilder._build_int_source_flows() 0 97 11
C FlowBuilder._build_int_sink_flows() 0 105 10
A FlowBuilder.__init__() 0 3 1
A FlowBuilder.build_int_flows() 0 23 3
B FlowBuilder._build_int_hop_flows() 0 47 7
1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int import settings
11
12
13 1
class FlowBuilder:
14
    """FlowBuilder."""
15
16 1
    def __init__(self):
17
        """Constructor of FlowBuilder."""
18 1
        self.table_group = {"evpl": 2, "epl": 3}
19
20 1
    def build_int_flows(
21
        self,
22
        evcs: dict[str, dict],
23
        stored_flows: dict[int, list[dict]],
24
    ) -> dict[int, list[dict]]:
25
        """build INT flows.
26
27
        It'll map and create all INT flows needed, for now since each EVC
28
        is bidirectional, it'll provision bidirectionally too. In the future,
29
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
30
        """
31 1
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
32 1
        for evc_id, evc in evcs.items():
33 1
            for flow in itertools.chain(
34
                self._build_int_source_flows("uni_a", evc, stored_flows),
35
                self._build_int_source_flows("uni_z", evc, stored_flows),
36
                self._build_int_hop_flows(evc, stored_flows),
37
                self._build_int_sink_flows("uni_z", evc, stored_flows),
38
                self._build_int_sink_flows("uni_a", evc, stored_flows),
39
            ):
40 1
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
41 1
                flows_per_cookie[cookie].append(flow)
42 1
        return flows_per_cookie
43
44 1
    def _build_int_source_flows(
45
        self,
46
        uni_src_key: Literal["uni_a", "uni_z"],
47
        evc: dict,
48
        stored_flows: dict[int, list[dict]],
49
    ) -> list[dict]:
50
        """Build INT source flows.
51
52
        At the INT source, one flow becomes 3: one for UDP on table 0,
53
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
54
        On table 0, we use just new instructions: push_int and goto_table
55
        On table X, we add add_int_metadata before the original actions
56
        INT flows will have higher priority.
57
        """
58 1
        new_flows = []
59 1
        new_int_flow_tbl_0_tcp = {}
60 1
        src_uni = evc[uni_src_key]
61
62
        # Get the original flows
63 1
        dpid = src_uni["switch"]
64 1
        for flow in stored_flows[
65
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
66
        ]:
67 1
            if (
68
                flow["switch"] == dpid
69
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
70
            ):
71 1
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
72 1
                break
73
74 1
        if not new_int_flow_tbl_0_tcp:
75 1
            return []
76
77 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
78 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
79 1
        utils.set_owner(new_int_flow_tbl_0_tcp)
80
81
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
82 1
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
83
84
        # Prepare TCP Flow for Table 0
85 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
86 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
87 1
        utils.set_priority(new_int_flow_tbl_0_tcp)
88
89
        # The flow_manager has two outputs: instructions and actions.
90 1
        table_group = self.table_group
91 1
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
92 1
        instructions = [
93
            {
94
                "instruction_type": "apply_actions",
95
                "actions": [{"action_type": "push_int"}],
96
            },
97
            {"instruction_type": "goto_table", "table_id": new_table_id},
98
        ]
99 1
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
100
101
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
102 1
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
103 1
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
104
105
        # Prepare Flows for Table X - No TCP or UDP specifics
106 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
107
108
        # if intra-switch EVC, then output port should be the dst UNI's source port
109 1
        if utils.is_intra_switch_evc(evc):
110 1
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
111 1
            proxy_port = dst_uni["proxy_port"]
112 1
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
113 1
                if instruction["instruction_type"] == "apply_actions":
114 1
                    for action in instruction["actions"]:
115 1
                        if action["action_type"] == "output":
116
                            # Since this is the INT Source, we use source
117
                            # to avoid worrying about single or multi
118
                            # home physical loops.
119
                            # The choice for destination is at the INT Sink.
120 1
                            action["port"] = proxy_port.source.port_number
121
122
                    # remove set_vlan action if it exists, this is for
123
                    # avoding a redundant set_vlan since it'll be set in the egress sink
124 1
                    instruction["actions"] = utils.modify_actions(
125
                        instruction["actions"], ["set_vlan"], remove=True
126
                    )
127
128 1
        instructions = utils.add_to_apply_actions(
129
            new_int_flow_tbl_x["flow"]["instructions"],
130
            new_instruction={"action_type": "add_int_metadata"},
131
            position=0,
132
        )
133
134 1
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
135
136 1
        new_flows.append(new_int_flow_tbl_0_tcp)
137 1
        new_flows.append(new_int_flow_tbl_0_udp)
138 1
        new_flows.append(new_int_flow_tbl_x)
139
140 1
        return new_flows
141
142 1
    def _build_int_hop_flows(
143
        self,
144
        evc: dict,
145
        stored_flows: dict[int, list[dict]],
146
    ) -> list[dict]:
147
        """Build INT hop flows.
148
149
        At the INT hops, one flow adds two more: one for UDP on table 0,
150
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
151
        before other actions.
152
        """
153
154 1
        new_flows = []
155 1
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
156
157 1
        for flow in stored_flows[
158
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
159
        ]:
160 1
            if flow["switch"] in excluded_dpids:
161 1
                continue
162 1
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
163
                continue
164
165 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
166 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
167 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
168 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
169
170
            # Prepare TCP Flow
171 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
172 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
173 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
174
175 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
176 1
                if instruction["instruction_type"] == "apply_actions":
177 1
                    instruction["actions"].insert(
178
                        0, {"action_type": "add_int_metadata"}
179
                    )
180
181
            # Prepare UDP Flow
182 1
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
183 1
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
184
185 1
            new_flows.append(new_int_flow_tbl_0_tcp)
186 1
            new_flows.append(new_int_flow_tbl_0_udp)
187
188 1
        return new_flows
189
190 1
    def _build_int_sink_flows(
191
        self,
192
        uni_dst_key: Literal["uni_a", "uni_z"],
193
        evc: dict,
194
        stored_flows: dict[int, list[dict]],
195
    ) -> list[dict]:
196
        """
197
        Build INT sink flows.
198
199
        At the INT sink, one flow becomes many:
200
        1. Before the proxy, we do add_int_metadata as an INT hop.
201
        We need to keep the set_queue
202
        2. After the proxy, we do send_report and pop_int and output
203
        We only use table 0 for #1.
204
        We use table X (2 or 3) for #2. for pop_int and output
205
        """
206 1
        new_flows = []
207 1
        dst_uni = evc[uni_dst_key]
208 1
        proxy_port = dst_uni["proxy_port"]
209 1
        dpid = dst_uni["switch"]
210
211 1
        for flow in stored_flows[
212
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
213
        ]:
214
            # Only consider this sink's dpid flows
215 1
            if flow["switch"] != dpid:
216 1
                continue
217
            # Only consider flows coming from NNI interfaces
218 1
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
219 1
                continue
220
221 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
222
223 1
            if not new_int_flow_tbl_0_tcp:
224
                continue
225
226 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
227 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
228 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
229
230
            # Save for pos-proxy flows
231 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
232 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
233
234
            # Prepare TCP Flow for Table 0 PRE proxy
235 1
            if not utils.is_intra_switch_evc(evc):
236 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
237 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
238 1
                utils.set_priority(new_int_flow_tbl_0_tcp)
239
240
                # Add telemetry, keep set_queue, output to the proxy port.
241 1
                output_port_no = proxy_port.source.port_number
242 1
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
243 1
                    if instruction["instruction_type"] == "apply_actions":
244
                        # Keep set_queue
245 1
                        actions = utils.modify_actions(
246
                            instruction["actions"],
247
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
248
                            remove=True,
249
                        )
250 1
                        actions.insert(0, {"action_type": "add_int_metadata"})
251 1
                        actions.append(
252
                            {"action_type": "output", "port": output_port_no}
253
                        )
254 1
                        instruction["actions"] = actions
255
256
                # Prepare UDP Flow for Table 0
257 1
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
258 1
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
259
260 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
261 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
262
263
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
264 1
            in_port_no = proxy_port.destination.port_number
265
266 1
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
267 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
268
269 1
            table_group = self.table_group
270 1
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
271 1
            instructions = [
272
                {
273
                    "instruction_type": "apply_actions",
274
                    "actions": [{"action_type": "send_report"}],
275
                },
276
                {
277
                    "instruction_type": "goto_table",
278
                    "table_id": new_table_id,
279
                },
280
            ]
281 1
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
282
283
            # Prepare Flows for Table X POS proxy
284 1
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
285 1
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
286
287 1
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
288 1
                if instruction["instruction_type"] == "apply_actions":
289 1
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
290
291 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
292 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
293
294
        return new_flows
295