Test Failed
Pull Request — master (#104)
by Vinicius
01:49
created

build.managers.flow_builder.FlowBuilder._build_int_sink_flows()   D

Complexity

Conditions 11

Size

Total Lines 107
Code Lines 63

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 47
CRAP Score 11.0261

Importance

Changes 0
Metric Value
cc 11
eloc 63
nop 4
dl 0
loc 107
ccs 47
cts 50
cp 0.94
crap 11.0261
rs 4.9909
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.managers.flow_builder.FlowBuilder._build_int_sink_flows() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int import settings
11 1
12
13
class FlowBuilder:
14 1
    """FlowBuilder."""
15
16
    def __init__(self):
17 1
        """Constructor of FlowBuilder."""
18
        self.table_group = {"evpl": 2, "epl": 3}
19 1
20
    def build_int_flows(
21 1
        self,
22
        evcs: dict[str, dict],
23
        stored_flows: dict[int, list[dict]],
24
    ) -> dict[int, list[dict]]:
25
        """build INT flows.
26
27
        It'll map and create all INT flows needed, for now since each EVC
28
        is bidirectional, it'll provision bidirectionally too. In the future,
29
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
30
        """
31
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
32 1
        for evc_id, evc in evcs.items():
33 1
            for flow in itertools.chain(
34 1
                self._build_int_source_flows("uni_a", evc, stored_flows),
35
                self._build_int_source_flows("uni_z", evc, stored_flows),
36
                self._build_int_hop_flows(evc, stored_flows),
37
                self._build_int_sink_flows("uni_z", evc, stored_flows),
38
                self._build_int_sink_flows("uni_a", evc, stored_flows),
39
            ):
40
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
41 1
                flows_per_cookie[cookie].append(flow)
42 1
        return flows_per_cookie
43 1
44
    def build_failover_old_flows(
45 1
        self, evcs: dict[str, dict], old_flows: dict[int, list[dict]]
46
    ) -> dict[int, list[dict]]:
47
        """Build (old path) failover related to remove flows.
48
49
        If sink NNIs svlan are different, it'll regenerate the rest of sink loop flows.
50
        Otherwise, it'll just remove the same received flows except with int cookie
51
        value the deletion uses flow mod OFPFC_DELETE, so no need to include the
52
        additional INT keys in the match like nw_proto for deletion.
53
        """
54
55
        removed_flows = defaultdict(list)
56
        for evc_id, evc in evcs.items():
57
            dpid_a, dpid_z = evc["uni_a"]["switch"], evc["uni_z"]["switch"]
58
59 1
            cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
60 1
            int_cookie = settings.INT_COOKIE_PREFIX << 56 | (cookie & 0xFFFFFFFFFFFFFF)
61 1
            cur_sink_a_svlan, cur_sink_z_svlan = None, None
62
            sink_a_flows: list[dict] = []
63
            sink_z_flows: list[dict] = []
64 1
65 1
            for link in evc["current_path"]:
66
                if cur_sink_a_svlan is None and (
67
                    svlan := utils.get_svlan_dpid_link(link, dpid_a)
68 1
                ):
69
                    cur_sink_a_svlan = svlan
70
                if cur_sink_z_svlan is None and (
71
                    svlan := utils.get_svlan_dpid_link(link, dpid_z)
72 1
                ):
73 1
                    cur_sink_z_svlan = svlan
74
                if cur_sink_a_svlan is not None and cur_sink_z_svlan is not None:
75 1
                    break
76 1
77 1
            for flow in old_flows[cookie]:
78 1
                if not sink_a_flows and flow["switch"] == dpid_a:
79
                    if (
80 1
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_a_svlan
81 1
                        and cur_sink_a_svlan
82 1
                    ):
83
                        sink_a_flows = self._build_int_sink_flows(
84
                            "uni_a", evc, old_flows
85 1
                        )
86
                    else:
87
                        flow["flow"]["cookie"] = int_cookie
88 1
                        sink_a_flows = [flow]
89 1
                elif not sink_z_flows and flow["switch"] == dpid_z:
90 1
                    if (
91
                        flow["flow"]["match"]["dl_vlan"] != cur_sink_z_svlan
92
                        and cur_sink_z_svlan
93 1
                    ):
94 1
                        sink_z_flows = self._build_int_sink_flows(
95 1
                            "uni_z", evc, old_flows
96
                        )
97
                    else:
98
                        flow["flow"]["cookie"] = int_cookie
99
                        sink_z_flows = [flow]
100
                if sink_a_flows and sink_z_flows:
101
                    break
102 1
103
            hop_flows = self._build_int_hop_flows(evc, old_flows)
104
            removed_flows[cookie] = list(
105 1
                itertools.chain(sink_a_flows, hop_flows, sink_z_flows)
106 1
            )
107
        return removed_flows
108
109 1
    def _build_int_source_flows(
110
        self,
111
        uni_src_key: Literal["uni_a", "uni_z"],
112 1
        evc: dict,
113 1
        stored_flows: dict[int, list[dict]],
114 1
    ) -> list[dict]:
115 1
        """Build INT source flows.
116 1
117 1
        At the INT source, one flow becomes 3: one for UDP on table 0,
118 1
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
119
        On table 0, we use just new instructions: push_int and goto_table
120
        On table X, we add add_int_metadata before the original actions
121
        INT flows will have higher priority.
122
        """
123 1
        new_flows = []
124
        new_int_flow_tbl_0_tcp = {}
125
        src_uni = evc[uni_src_key]
126
127 1
        # Get the original flows
128
        dpid = src_uni["switch"]
129
        for flow in stored_flows[
130
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
131 1
        ]:
132
            if (
133
                flow["switch"] == dpid
134
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
135
            ):
136
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
137 1
                break
138
139 1
        if not new_int_flow_tbl_0_tcp:
140 1
            return []
141 1
142
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
143 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
144
        utils.set_owner(new_int_flow_tbl_0_tcp)
145 1
146
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
147
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
148
149
        # Prepare TCP Flow for Table 0
150
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
151
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
152
        utils.set_priority(new_int_flow_tbl_0_tcp)
153
154
        # The flow_manager has two outputs: instructions and actions.
155
        table_group = self.table_group
156
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
157 1
        instructions = [
158 1
            {
159
                "instruction_type": "apply_actions",
160 1
                "actions": [{"action_type": "push_int"}],
161
            },
162
            {"instruction_type": "goto_table", "table_id": new_table_id},
163 1
        ]
164 1
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
165 1
166
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
167
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
168 1
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
169 1
170 1
        # Prepare Flows for Table X - No TCP or UDP specifics
171 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
172
173
        # if intra-switch EVC, then output port should be the dst UNI's source port
174 1
        if utils.is_intra_switch_evc(evc):
175 1
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
176 1
            proxy_port = dst_uni["proxy_port"]
177
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
178 1
                if instruction["instruction_type"] == "apply_actions":
179 1
                    for action in instruction["actions"]:
180 1
                        if action["action_type"] == "output":
181
                            # Since this is the INT Source, we use source
182
                            # to avoid worrying about single or multi
183
                            # home physical loops.
184
                            # The choice for destination is at the INT Sink.
185 1
                            action["port"] = proxy_port.source.port_number
186 1
187
                    # remove set_vlan action if it exists, this is for
188 1
                    # avoding a redundant set_vlan since it'll be set in the egress sink
189 1
                    instruction["actions"] = utils.modify_actions(
190
                        instruction["actions"], ["set_vlan"], remove=True
191 1
                    )
192
193 1
        instructions = utils.add_to_apply_actions(
194
            new_int_flow_tbl_x["flow"]["instructions"],
195
            new_instruction={"action_type": "add_int_metadata"},
196
            position=0,
197
        )
198
199
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
200
201
        new_flows.append(new_int_flow_tbl_0_tcp)
202
        new_flows.append(new_int_flow_tbl_0_udp)
203
        new_flows.append(new_int_flow_tbl_x)
204
205
        return new_flows
206
207
    def _build_int_hop_flows(
208
        self,
209 1
        evc: dict,
210 1
        stored_flows: dict[int, list[dict]],
211 1
    ) -> list[dict]:
212 1
        """Build INT hop flows.
213
214 1
        At the INT hops, one flow adds two more: one for UDP on table 0,
215
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
216
        before other actions.
217
        """
218 1
219 1
        new_flows = []
220
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
221 1
222 1
        for flow in stored_flows[
223
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
224 1
        ]:
225
            if flow["switch"] in excluded_dpids:
226 1
                continue
227
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
228
                continue
229
230
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
231 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
232 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
233 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
234
235
            # Prepare TCP Flow
236 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
237 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
238
            utils.set_priority(new_int_flow_tbl_0_tcp)
239
240 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
241 1
                if instruction["instruction_type"] == "apply_actions":
242 1
                    instruction["actions"].insert(
243 1
                        0, {"action_type": "add_int_metadata"}
244
                    )
245
246 1
            # Prepare UDP Flow
247 1
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
248 1
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
249
250 1
            new_flows.append(new_int_flow_tbl_0_tcp)
251
            new_flows.append(new_int_flow_tbl_0_udp)
252
253
        return new_flows
254
255 1
    def _build_int_sink_flows(
256 1
        self,
257
        uni_dst_key: Literal["uni_a", "uni_z"],
258
        evc: dict,
259 1
        stored_flows: dict[int, list[dict]],
260
    ) -> list[dict]:
261
        """
262 1
        Build INT sink flows.
263 1
264
        At the INT sink, one flow becomes many:
265 1
        1. Before the proxy, we do add_int_metadata as an INT hop.
266 1
        We need to keep the set_queue
267
        2. After the proxy, we do send_report and pop_int and output
268
        We only use table 0 for #1.
269 1
        We use table X (2 or 3) for #2. for pop_int and output
270
        """
271 1
        new_flows = []
272 1
        dst_uni = evc[uni_dst_key]
273
        proxy_port = dst_uni["proxy_port"]
274 1
        dpid = dst_uni["switch"]
275 1
276 1
        for flow in stored_flows[
277
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
278
        ]:
279
            # Only consider this sink's dpid flows
280
            if flow["switch"] != dpid:
281
                continue
282
            # Only consider flows coming from NNI interfaces
283
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
284
                continue
285
286 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
287
288
            if not new_int_flow_tbl_0_tcp:
289 1
                continue
290 1
291
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
292 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
293 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
294 1
295
            # Save for pos-proxy flows
296 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
297 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
298
299 1
            # Prepare TCP Flow for Table 0 PRE proxy
300
            if not utils.is_intra_switch_evc(evc):
301
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
302
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
303
                utils.set_priority(new_int_flow_tbl_0_tcp)
304
305
                # Add telemetry, keep set_queue, output to the proxy port.
306
                output_port_no = proxy_port.source.port_number
307
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
308
                    if instruction["instruction_type"] == "apply_actions":
309
                        # Keep set_queue
310
                        actions = utils.modify_actions(
311
                            instruction["actions"],
312
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
313
                            remove=True,
314
                        )
315
                        actions.insert(0, {"action_type": "add_int_metadata"})
316
                        actions.append(
317
                            {"action_type": "output", "port": output_port_no}
318
                        )
319
                        instruction["actions"] = actions
320
321
                # Prepare UDP Flow for Table 0
322
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
323
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
324
325
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
326
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
327
328
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
329
            in_port_no = proxy_port.destination.port_number
330
331
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
332
            utils.set_priority(new_int_flow_tbl_0_tcp)
333
334
            table_group = self.table_group
335
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
336
            instructions = [
337
                {
338
                    "instruction_type": "apply_actions",
339
                    "actions": [{"action_type": "send_report"}],
340
                },
341
                {
342
                    "instruction_type": "goto_table",
343
                    "table_id": new_table_id,
344
                },
345
            ]
346
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
347
348
            # Prepare Flows for Table X POS proxy
349
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
350
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
351
352
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
353
                if instruction["instruction_type"] == "apply_actions":
354
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
355
356
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
357
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
358
359
        return new_flows
360