Passed
Pull Request — master (#104)
by Vinicius
04:02
created

FlowBuilder._build_int_sink_flows()   C

Complexity

Conditions 10

Size

Total Lines 105
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 47
CRAP Score 10.0008

Importance

Changes 0
Metric Value
cc 10
eloc 61
nop 4
dl 0
loc 105
ccs 47
cts 48
cp 0.9792
crap 10.0008
rs 5.4763
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.managers.flow_builder.FlowBuilder._build_int_sink_flows() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int import settings
11
12
13 1
class FlowBuilder:
14
    """FlowBuilder."""
15
16 1
    def __init__(self):
17
        """Constructor of FlowBuilder."""
18 1
        self.table_group = {"evpl": 2, "epl": 3}
19
20 1
    def build_int_flows(
21
        self,
22
        evcs: dict[str, dict],
23
        stored_flows: dict[int, list[dict]],
24
    ) -> dict[int, list[dict]]:
25
        """build INT flows.
26
27
        It'll map and create all INT flows needed, for now since each EVC
28
        is bidirectional, it'll provision bidirectionally too. In the future,
29
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
30
        """
31 1
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
32 1
        for evc_id, evc in evcs.items():
33 1
            for flow in itertools.chain(
34
                self._build_int_source_flows("uni_a", evc, stored_flows),
35
                self._build_int_source_flows("uni_z", evc, stored_flows),
36
                self._build_int_hop_flows(evc, stored_flows),
37
                self._build_int_sink_flows("uni_z", evc, stored_flows),
38
                self._build_int_sink_flows("uni_a", evc, stored_flows),
39
            ):
40 1
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
41 1
                flows_per_cookie[cookie].append(flow)
42 1
        return flows_per_cookie
43
44 1
    def _build_int_source_flows(
45
        self,
46
        uni_src_key: Literal["uni_a", "uni_z"],
47
        evc: dict,
48
        stored_flows: dict[int, list[dict]],
49
    ) -> list[dict]:
50
        """Build INT source flows.
51
52
        At the INT source, one flow becomes 3: one for UDP on table 0,
53
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
54
        On table 0, we use just new instructions: push_int and goto_table
55
        On table X, we add add_int_metadata before the original actions
56
        INT flows will have higher priority.
57
        """
58 1
        new_flows = []
59 1
        new_int_flow_tbl_0_tcp = {}
60 1
        src_uni = evc[uni_src_key]
61
62
        # Get the original flows
63 1
        dpid = src_uni["switch"]
64 1
        for flow in stored_flows[
65
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
66
        ]:
67 1
            if (
68
                flow["switch"] == dpid
69
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
70
            ):
71 1
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
72 1
                break
73
74 1
        if not new_int_flow_tbl_0_tcp:
75 1
            return []
76
77 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
78 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
79 1
        utils.set_owner(new_int_flow_tbl_0_tcp)
80
81
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
82 1
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
83
84
        # Prepare TCP Flow for Table 0
85 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
86 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
87 1
        utils.set_priority(new_int_flow_tbl_0_tcp)
88
89
        # The flow_manager has two outputs: instructions and actions.
90 1
        table_group = self.table_group
91 1
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
92 1
        instructions = [
93
            {
94
                "instruction_type": "apply_actions",
95
                "actions": [{"action_type": "push_int"}],
96
            },
97
            {"instruction_type": "goto_table", "table_id": new_table_id},
98
        ]
99 1
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
100
101
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
102 1
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
103 1
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
104
105
        # Prepare Flows for Table X - No TCP or UDP specifics
106 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
107
108
        # if intra-switch EVC, then output port should be the dst UNI's source port
109 1
        if utils.is_intra_switch_evc(evc):
110 1
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
111 1
            proxy_port = dst_uni["proxy_port"]
112 1
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
113 1
                if instruction["instruction_type"] == "apply_actions":
114 1
                    for action in instruction["actions"]:
115 1
                        if action["action_type"] == "output":
116
                            # Since this is the INT Source, we use source
117
                            # to avoid worrying about single or multi
118
                            # home physical loops.
119
                            # The choice for destination is at the INT Sink.
120 1
                            action["port"] = proxy_port.source.port_number
121
122
                    # remove set_vlan action if it exists, this is for
123
                    # avoding a redundant set_vlan since it'll be set in the egress sink
124 1
                    instruction["actions"] = utils.modify_actions(
125
                        instruction["actions"], ["set_vlan"], remove=True
126
                    )
127
128 1
        instructions = utils.add_to_apply_actions(
129
            new_int_flow_tbl_x["flow"]["instructions"],
130
            new_instruction={"action_type": "add_int_metadata"},
131
            position=0,
132
        )
133
134 1
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
135
136 1
        new_flows.append(new_int_flow_tbl_0_tcp)
137 1
        new_flows.append(new_int_flow_tbl_0_udp)
138 1
        new_flows.append(new_int_flow_tbl_x)
139
140 1
        return new_flows
141
142 1
    def _build_int_hop_flows(
143
        self,
144
        evc: dict,
145
        stored_flows: dict[int, list[dict]],
146
    ) -> list[dict]:
147
        """Build INT hop flows.
148
149
        At the INT hops, one flow adds two more: one for UDP on table 0,
150
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
151
        before other actions.
152
        """
153
154 1
        new_flows = []
155 1
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
156
157 1
        for flow in stored_flows[
158
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
159
        ]:
160 1
            if flow["switch"] in excluded_dpids:
161 1
                continue
162 1
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
163
                continue
164
165 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
166 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
167 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
168 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
169
170
            # Prepare TCP Flow
171 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
172 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
173 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
174
175 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
176 1
                if instruction["instruction_type"] == "apply_actions":
177 1
                    instruction["actions"].insert(
178
                        0, {"action_type": "add_int_metadata"}
179
                    )
180
181
            # Prepare UDP Flow
182 1
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
183 1
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
184
185 1
            new_flows.append(new_int_flow_tbl_0_tcp)
186 1
            new_flows.append(new_int_flow_tbl_0_udp)
187
188 1
        return new_flows
189
190 1
    def _build_int_sink_flows(
191
        self,
192
        uni_dst_key: Literal["uni_a", "uni_z"],
193
        evc: dict,
194
        stored_flows: dict[int, list[dict]],
195
    ) -> list[dict]:
196
        """
197
        Build INT sink flows.
198
199
        At the INT sink, one flow becomes many:
200
        1. Before the proxy, we do add_int_metadata as an INT hop.
201
        We need to keep the set_queue
202
        2. After the proxy, we do send_report and pop_int and output
203
        We only use table 0 for #1.
204
        We use table X (2 or 3) for #2. for pop_int and output
205
        """
206 1
        new_flows = []
207 1
        dst_uni = evc[uni_dst_key]
208 1
        proxy_port = dst_uni["proxy_port"]
209 1
        dpid = dst_uni["switch"]
210
211 1
        for flow in stored_flows[
212
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
213
        ]:
214
            # Only consider this sink's dpid flows
215 1
            if flow["switch"] != dpid:
216 1
                continue
217
            # Only consider flows coming from NNI interfaces
218 1
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
219 1
                continue
220
221 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
222
223 1
            if not new_int_flow_tbl_0_tcp:
224
                continue
225
226 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
227 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
228 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
229
230
            # Save for pos-proxy flows
231 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
232 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
233
234
            # Prepare TCP Flow for Table 0 PRE proxy
235 1
            if not utils.is_intra_switch_evc(evc):
236 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
237 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
238 1
                utils.set_priority(new_int_flow_tbl_0_tcp)
239
240
                # Add telemetry, keep set_queue, output to the proxy port.
241 1
                output_port_no = proxy_port.source.port_number
242 1
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
243 1
                    if instruction["instruction_type"] == "apply_actions":
244
                        # Keep set_queue
245 1
                        actions = utils.modify_actions(
246
                            instruction["actions"],
247
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
248
                            remove=True,
249
                        )
250 1
                        actions.insert(0, {"action_type": "add_int_metadata"})
251 1
                        actions.append(
252
                            {"action_type": "output", "port": output_port_no}
253
                        )
254 1
                        instruction["actions"] = actions
255
256
                # Prepare UDP Flow for Table 0
257 1
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
258 1
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
259
260 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
261 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
262
263
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
264 1
            in_port_no = proxy_port.destination.port_number
265
266 1
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
267 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
268
269 1
            table_group = self.table_group
270 1
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
271 1
            instructions = [
272
                {
273
                    "instruction_type": "apply_actions",
274
                    "actions": [{"action_type": "send_report"}],
275
                },
276
                {
277
                    "instruction_type": "goto_table",
278
                    "table_id": new_table_id,
279
                },
280
            ]
281 1
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
282
283
            # Prepare Flows for Table X POS proxy
284 1
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
285 1
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
286
287 1
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
288 1
                if instruction["instruction_type"] == "apply_actions":
289 1
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
290
291 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
292 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
293
294
        return new_flows
295