Passed
Push — master ( 03a8ab...fb285f )
by Vinicius
06:10 queued 03:58
created

FlowBuilder._build_int_hop_flows()   B

Complexity

Conditions 7

Size

Total Lines 47
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7.004

Importance

Changes 0
Metric Value
cc 7
eloc 28
nop 3
dl 0
loc 47
ccs 22
cts 23
cp 0.9565
crap 7.004
rs 7.808
c 0
b 0
f 0
1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int.exceptions import FlowsNotFound
11 1
from napps.kytos.telemetry_int import settings
12
13
14 1
class FlowBuilder:
15
    """FlowBuilder."""
16
17 1
    def __init__(self):
18
        """Constructor of FlowBuilder."""
19 1
        self.table_group = {"evpl": 2, "epl": 3}
20
21 1
    def build_int_flows(
22
        self,
23
        evcs: dict[str, dict],
24
        stored_flows: dict[int, list[dict]],
25
    ) -> dict[int, list[dict]]:
26
        """build INT flows.
27
28
        It'll map and create all INT flows needed, for now since each EVC
29
        is bidirectional, it'll provision bidirectionally too. In the future,
30
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
31
        """
32 1
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
33 1
        for evc_id, evc in evcs.items():
34 1
            for flow in itertools.chain(
35
                self._build_int_source_flows("uni_a", evc, stored_flows),
36
                self._build_int_source_flows("uni_z", evc, stored_flows),
37
                self._build_int_hop_flows(evc, stored_flows),
38
                self._build_int_sink_flows("uni_z", evc, stored_flows),
39
                self._build_int_sink_flows("uni_a", evc, stored_flows),
40
            ):
41 1
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
42 1
                flows_per_cookie[cookie].append(flow)
43 1
        return flows_per_cookie
44
45 1
    def _build_int_source_flows(
46
        self,
47
        uni_src_key: Literal["uni_a", "uni_z"],
48
        evc: dict,
49
        stored_flows: dict[int, list[dict]],
50
    ) -> list[dict]:
51
        """Build INT source flows.
52
53
        At the INT source, one flow becomes 3: one for UDP on table 0,
54
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
55
        On table 0, we use just new instructions: push_int and goto_table
56
        On table X, we add add_int_metadata before the original actions
57
        INT flows will have higher priority.
58
        """
59 1
        new_flows = []
60 1
        new_int_flow_tbl_0_tcp = {}
61 1
        src_uni = evc[uni_src_key]
62
63
        # Get the original flows
64 1
        dpid = src_uni["switch"]
65 1
        for flow in stored_flows[
66
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
67
        ]:
68 1
            if (
69
                flow["switch"] == dpid
70
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
71
            ):
72 1
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
73 1
                break
74
75 1
        if not new_int_flow_tbl_0_tcp:
76 1
            if not evc["active"]:
77 1
                return []
78 1
            raise FlowsNotFound(evc["id"])
79
80 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
81 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
82 1
        utils.set_owner(new_int_flow_tbl_0_tcp)
83
84
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
85 1
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
86
87
        # Prepare TCP Flow for Table 0
88 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
89 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
90 1
        utils.set_priority(new_int_flow_tbl_0_tcp)
91
92
        # The flow_manager has two outputs: instructions and actions.
93 1
        table_group = self.table_group
94 1
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
95 1
        instructions = [
96
            {
97
                "instruction_type": "apply_actions",
98
                "actions": [{"action_type": "push_int"}],
99
            },
100
            {"instruction_type": "goto_table", "table_id": new_table_id},
101
        ]
102 1
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
103
104
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
105 1
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
106 1
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
107
108
        # Prepare Flows for Table X - No TCP or UDP specifics
109 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
110
111
        # if intra-switch EVC, then output port should be the dst UNI's source port
112 1
        if utils.is_intra_switch_evc(evc):
113 1
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
114 1
            proxy_port = dst_uni["proxy_port"]
115 1
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
116 1
                if instruction["instruction_type"] == "apply_actions":
117 1
                    for action in instruction["actions"]:
118 1
                        if action["action_type"] == "output":
119
                            # Since this is the INT Source, we use source
120
                            # to avoid worrying about single or multi
121
                            # home physical loops.
122
                            # The choice for destination is at the INT Sink.
123 1
                            action["port"] = proxy_port.source.port_number
124
125
                    # remove set_vlan action if it exists, this is for
126
                    # avoding a redundant set_vlan since it'll be set in the egress sink
127 1
                    instruction["actions"] = utils.modify_actions(
128
                        instruction["actions"], ["set_vlan"], remove=True
129
                    )
130
131 1
        instructions = utils.add_to_apply_actions(
132
            new_int_flow_tbl_x["flow"]["instructions"],
133
            new_instruction={"action_type": "add_int_metadata"},
134
            position=0,
135
        )
136
137 1
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
138
139 1
        new_flows.append(new_int_flow_tbl_0_tcp)
140 1
        new_flows.append(new_int_flow_tbl_0_udp)
141 1
        new_flows.append(new_int_flow_tbl_x)
142
143 1
        return new_flows
144
145 1
    def _build_int_hop_flows(
146
        self,
147
        evc: dict,
148
        stored_flows: dict[int, list[dict]],
149
    ) -> list[dict]:
150
        """Build INT hop flows.
151
152
        At the INT hops, one flow adds two more: one for UDP on table 0,
153
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
154
        before other actions.
155
        """
156
157 1
        new_flows = []
158 1
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
159
160 1
        for flow in stored_flows[
161
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
162
        ]:
163 1
            if flow["switch"] in excluded_dpids:
164 1
                continue
165 1
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
166
                continue
167
168 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
169 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
170 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
171 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
172
173
            # Prepare TCP Flow
174 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
175 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
176 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
177
178 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
179 1
                if instruction["instruction_type"] == "apply_actions":
180 1
                    instruction["actions"].insert(
181
                        0, {"action_type": "add_int_metadata"}
182
                    )
183
184
            # Prepare UDP Flow
185 1
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
186 1
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
187
188 1
            new_flows.append(new_int_flow_tbl_0_tcp)
189 1
            new_flows.append(new_int_flow_tbl_0_udp)
190
191 1
        return new_flows
192
193 1
    def _build_int_sink_flows(
194
        self,
195
        uni_dst_key: Literal["uni_a", "uni_z"],
196
        evc: dict,
197
        stored_flows: dict[int, list[dict]],
198
    ) -> list[dict]:
199
        """
200
        Build INT sink flows.
201
202
        At the INT sink, one flow becomes many:
203
        1. Before the proxy, we do add_int_metadata as an INT hop.
204
        We need to keep the set_queue
205
        2. After the proxy, we do send_report and pop_int and output
206
        We only use table 0 for #1.
207
        We use table X (2 or 3) for #2. for pop_int and output
208
        """
209 1
        new_flows = []
210 1
        dst_uni = evc[uni_dst_key]
211 1
        proxy_port = dst_uni["proxy_port"]
212 1
        dpid = dst_uni["switch"]
213
214 1
        for flow in stored_flows[
215
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
216
        ]:
217
            # Only consider this sink's dpid flows
218 1
            if flow["switch"] != dpid:
219 1
                continue
220
            # Only consider flows coming from NNI interfaces
221 1
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
222 1
                continue
223
224 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
225
226 1
            if not new_int_flow_tbl_0_tcp:
227
                if not evc["active"]:
228
                    return []
229
                raise FlowsNotFound(evc["id"])
230
231 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
232 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
233 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
234
235
            # Save for pos-proxy flows
236 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
237 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
238
239
            # Prepare TCP Flow for Table 0 PRE proxy
240 1
            if not utils.is_intra_switch_evc(evc):
241 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
242 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
243 1
                utils.set_priority(new_int_flow_tbl_0_tcp)
244
245
                # Add telemetry, keep set_queue, output to the proxy port.
246 1
                output_port_no = proxy_port.source.port_number
247 1
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
248 1
                    if instruction["instruction_type"] == "apply_actions":
249
                        # Keep set_queue
250 1
                        actions = utils.modify_actions(
251
                            instruction["actions"],
252
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
253
                            remove=True,
254
                        )
255 1
                        actions.insert(0, {"action_type": "add_int_metadata"})
256 1
                        actions.append(
257
                            {"action_type": "output", "port": output_port_no}
258
                        )
259 1
                        instruction["actions"] = actions
260
261
                # Prepare UDP Flow for Table 0
262 1
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
263 1
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
264
265 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
266 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
267
268
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
269 1
            in_port_no = proxy_port.destination.port_number
270
271 1
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
272 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
273
274 1
            table_group = self.table_group
275 1
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
276 1
            instructions = [
277
                {
278
                    "instruction_type": "apply_actions",
279
                    "actions": [{"action_type": "send_report"}],
280
                },
281
                {
282
                    "instruction_type": "goto_table",
283
                    "table_id": new_table_id,
284
                },
285
            ]
286 1
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
287
288
            # Prepare Flows for Table X POS proxy
289 1
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
290 1
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
291
292 1
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
293 1
                if instruction["instruction_type"] == "apply_actions":
294 1
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
295
296 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
297 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
298
299
        return new_flows
300