Test Failed
Pull Request — master (#104)
by Vinicius
05:33
created

FlowBuilder._build_int_sink_flows()   C

Complexity

Conditions 10

Size

Total Lines 105
Code Lines 61

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 45
CRAP Score 10.0244

Importance

Changes 0
Metric Value
cc 10
eloc 61
nop 4
dl 0
loc 105
ccs 45
cts 48
cp 0.9375
crap 10.0244
rs 5.4763
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.managers.flow_builder.FlowBuilder._build_int_sink_flows() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int import settings
11 1
12
13
class FlowBuilder:
14 1
    """FlowBuilder."""
15
16
    def __init__(self):
17 1
        """Constructor of FlowBuilder."""
18
        self.table_group = {"evpl": 2, "epl": 3}
19 1
20
    def build_int_flows(
21 1
        self,
22
        evcs: dict[str, dict],
23
        stored_flows: dict[int, list[dict]],
24
    ) -> dict[int, list[dict]]:
25
        """build INT flows.
26
27
        It'll map and create all INT flows needed, for now since each EVC
28
        is bidirectional, it'll provision bidirectionally too. In the future,
29
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
30
        """
31
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
32 1
        for evc_id, evc in evcs.items():
33 1
            for flow in itertools.chain(
34 1
                self._build_int_source_flows("uni_a", evc, stored_flows),
35
                self._build_int_source_flows("uni_z", evc, stored_flows),
36
                self._build_int_hop_flows(evc, stored_flows),
37
                self._build_int_sink_flows("uni_z", evc, stored_flows),
38
                self._build_int_sink_flows("uni_a", evc, stored_flows),
39
            ):
40
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
41 1
                flows_per_cookie[cookie].append(flow)
42 1
        return flows_per_cookie
43 1
44
    def _build_int_source_flows(
45 1
        self,
46
        uni_src_key: Literal["uni_a", "uni_z"],
47
        evc: dict,
48
        stored_flows: dict[int, list[dict]],
49
    ) -> list[dict]:
50
        """Build INT source flows.
51
52
        At the INT source, one flow becomes 3: one for UDP on table 0,
53
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
54
        On table 0, we use just new instructions: push_int and goto_table
55
        On table X, we add add_int_metadata before the original actions
56
        INT flows will have higher priority.
57
        """
58
        new_flows = []
59 1
        new_int_flow_tbl_0_tcp = {}
60 1
        src_uni = evc[uni_src_key]
61 1
62
        # Get the original flows
63
        dpid = src_uni["switch"]
64 1
        for flow in stored_flows[
65 1
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
66
        ]:
67
            if (
68 1
                flow["switch"] == dpid
69
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
70
            ):
71
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
72 1
                break
73 1
74
        if not new_int_flow_tbl_0_tcp:
75 1
            return []
76 1
77 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
78 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
79
        utils.set_owner(new_int_flow_tbl_0_tcp)
80 1
81 1
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
82 1
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
83
84
        # Prepare TCP Flow for Table 0
85 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
86
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
87
        utils.set_priority(new_int_flow_tbl_0_tcp)
88 1
89 1
        # The flow_manager has two outputs: instructions and actions.
90 1
        table_group = self.table_group
91
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
92
        instructions = [
93 1
            {
94 1
                "instruction_type": "apply_actions",
95 1
                "actions": [{"action_type": "push_int"}],
96
            },
97
            {"instruction_type": "goto_table", "table_id": new_table_id},
98
        ]
99
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
100
101
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
102 1
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
103
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
104
105 1
        # Prepare Flows for Table X - No TCP or UDP specifics
106 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
107
108
        # if intra-switch EVC, then output port should be the dst UNI's source port
109 1
        if utils.is_intra_switch_evc(evc):
110
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
111
            proxy_port = dst_uni["proxy_port"]
112 1
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
113 1
                if instruction["instruction_type"] == "apply_actions":
114 1
                    for action in instruction["actions"]:
115 1
                        if action["action_type"] == "output":
116 1
                            # Since this is the INT Source, we use source
117 1
                            # to avoid worrying about single or multi
118 1
                            # home physical loops.
119
                            # The choice for destination is at the INT Sink.
120
                            action["port"] = proxy_port.source.port_number
121
122
                    # remove set_vlan action if it exists, this is for
123 1
                    # avoding a redundant set_vlan since it'll be set in the egress sink
124
                    instruction["actions"] = utils.modify_actions(
125
                        instruction["actions"], ["set_vlan"], remove=True
126
                    )
127 1
128
        instructions = utils.add_to_apply_actions(
129
            new_int_flow_tbl_x["flow"]["instructions"],
130
            new_instruction={"action_type": "add_int_metadata"},
131 1
            position=0,
132
        )
133
134
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
135
136
        new_flows.append(new_int_flow_tbl_0_tcp)
137 1
        new_flows.append(new_int_flow_tbl_0_udp)
138
        new_flows.append(new_int_flow_tbl_x)
139 1
140 1
        return new_flows
141 1
142
    def _build_int_hop_flows(
143 1
        self,
144
        evc: dict,
145 1
        stored_flows: dict[int, list[dict]],
146
    ) -> list[dict]:
147
        """Build INT hop flows.
148
149
        At the INT hops, one flow adds two more: one for UDP on table 0,
150
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
151
        before other actions.
152
        """
153
154
        new_flows = []
155
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
156
157 1
        for flow in stored_flows[
158 1
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
159
        ]:
160 1
            if flow["switch"] in excluded_dpids:
161
                continue
162
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
163 1
                continue
164 1
165 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
166
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
167
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
168 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
169 1
170 1
            # Prepare TCP Flow
171 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
172
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
173
            utils.set_priority(new_int_flow_tbl_0_tcp)
174 1
175 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
176 1
                if instruction["instruction_type"] == "apply_actions":
177
                    instruction["actions"].insert(
178 1
                        0, {"action_type": "add_int_metadata"}
179 1
                    )
180 1
181
            # Prepare UDP Flow
182
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
183
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
184
185 1
            new_flows.append(new_int_flow_tbl_0_tcp)
186 1
            new_flows.append(new_int_flow_tbl_0_udp)
187
188 1
        return new_flows
189 1
190
    def _build_int_sink_flows(
191 1
        self,
192
        uni_dst_key: Literal["uni_a", "uni_z"],
193 1
        evc: dict,
194
        stored_flows: dict[int, list[dict]],
195
    ) -> list[dict]:
196
        """
197
        Build INT sink flows.
198
199
        At the INT sink, one flow becomes many:
200
        1. Before the proxy, we do add_int_metadata as an INT hop.
201
        We need to keep the set_queue
202
        2. After the proxy, we do send_report and pop_int and output
203
        We only use table 0 for #1.
204
        We use table X (2 or 3) for #2. for pop_int and output
205
        """
206
        new_flows = []
207
        dst_uni = evc[uni_dst_key]
208
        proxy_port = dst_uni["proxy_port"]
209 1
        dpid = dst_uni["switch"]
210 1
211 1
        for flow in stored_flows[
212 1
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
213
        ]:
214 1
            # Only consider this sink's dpid flows
215
            if flow["switch"] != dpid:
216
                continue
217
            # Only consider flows coming from NNI interfaces
218 1
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
219 1
                continue
220
221 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
222 1
223
            if not new_int_flow_tbl_0_tcp:
224 1
                continue
225
226 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
227
            utils.set_owner(new_int_flow_tbl_0_tcp)
228
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
229
230
            # Save for pos-proxy flows
231 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
232 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
233 1
234
            # Prepare TCP Flow for Table 0 PRE proxy
235
            if not utils.is_intra_switch_evc(evc):
236 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
237 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
238
                utils.set_priority(new_int_flow_tbl_0_tcp)
239
240 1
                # Add telemetry, keep set_queue, output to the proxy port.
241 1
                output_port_no = proxy_port.source.port_number
242 1
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
243 1
                    if instruction["instruction_type"] == "apply_actions":
244
                        # Keep set_queue
245
                        actions = utils.modify_actions(
246 1
                            instruction["actions"],
247 1
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
248 1
                            remove=True,
249
                        )
250 1
                        actions.insert(0, {"action_type": "add_int_metadata"})
251
                        actions.append(
252
                            {"action_type": "output", "port": output_port_no}
253
                        )
254
                        instruction["actions"] = actions
255 1
256 1
                # Prepare UDP Flow for Table 0
257
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
258
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
259 1
260
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
261
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
262 1
263 1
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
264
            in_port_no = proxy_port.destination.port_number
265 1
266 1
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
267
            utils.set_priority(new_int_flow_tbl_0_tcp)
268
269 1
            table_group = self.table_group
270
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
271 1
            instructions = [
272 1
                {
273
                    "instruction_type": "apply_actions",
274 1
                    "actions": [{"action_type": "send_report"}],
275 1
                },
276 1
                {
277
                    "instruction_type": "goto_table",
278
                    "table_id": new_table_id,
279
                },
280
            ]
281
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
282
283
            # Prepare Flows for Table X POS proxy
284
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
285
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
286 1
287
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
288
                if instruction["instruction_type"] == "apply_actions":
289 1
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
290 1
291
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
292 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
293 1
294
        return new_flows
295