Passed
Pull Request — master (#64)
by Vinicius
07:12 queued 04:20
created

FlowBuilder.build_int_flows()   A

Complexity

Conditions 3

Size

Total Lines 23
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 15
nop 3
dl 0
loc 23
ccs 7
cts 7
cp 1
crap 3
rs 9.65
c 0
b 0
f 0
1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int.exceptions import FlowsNotFound
11 1
from napps.kytos.telemetry_int import settings
12
13
14 1
class FlowBuilder:
15
    """FlowBuilder."""
16
17 1
    def __init__(self):
18
        """Constructor of FlowBuilder."""
19 1
        self.table_group = {"evpl": 2, "epl": 3}
20
21 1
    def build_int_flows(
22
        self,
23
        evcs: dict[str, dict],
24
        stored_flows: dict[int, list[dict]],
25
    ) -> dict[int, list[dict]]:
26
        """build INT flows.
27
28
        It'll map and create all INT flows needed, for now since each EVC
29
        is bidirectional, it'll provision bidirectionally too. In the future,
30
        if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
31
        """
32 1
        flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
33 1
        for evc_id, evc in evcs.items():
34 1
            for flow in itertools.chain(
35
                self._build_int_source_flows("uni_a", evc, stored_flows),
36
                self._build_int_source_flows("uni_z", evc, stored_flows),
37
                self._build_int_hop_flows(evc, stored_flows),
38
                self._build_int_sink_flows("uni_z", evc, stored_flows),
39
                self._build_int_sink_flows("uni_a", evc, stored_flows),
40
            ):
41 1
                cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
42 1
                flows_per_cookie[cookie].append(flow)
43 1
        return flows_per_cookie
44
45 1
    def _build_int_source_flows(
46
        self,
47
        uni_src_key: Literal["uni_a", "uni_z"],
48
        evc: dict,
49
        stored_flows: dict[int, list[dict]],
50
    ) -> list[dict]:
51
        """Build INT source flows.
52
53
        At the INT source, one flow becomes 3: one for UDP on table 0,
54
        one for TCP on table 0, and one on table X (2 for evpl and 3 for epl by default)
55
        On table 0, we use just new instructions: push_int and goto_table
56
        On table X, we add add_int_metadata before the original actions
57
        INT flows will have higher priority.
58
        """
59 1
        new_flows = []
60 1
        new_int_flow_tbl_0_tcp = {}
61 1
        src_uni = evc[uni_src_key]
62
63
        # Get the original flows
64 1
        dpid = src_uni["switch"]
65 1
        for flow in stored_flows[
66
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
67
        ]:
68 1
            if (
69
                flow["switch"] == dpid
70
                and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
71
            ):
72 1
                new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
73 1
                break
74
75 1
        if not new_int_flow_tbl_0_tcp:
76
            raise FlowsNotFound(evc["id"])
77
78 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
79 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
80 1
        utils.set_owner(new_int_flow_tbl_0_tcp)
81
82
        # Deepcopy to use for table X (2 or 3 by default for EVPL or EPL respectively)
83 1
        new_int_flow_tbl_x = copy.deepcopy(new_int_flow_tbl_0_tcp)
84
85
        # Prepare TCP Flow for Table 0
86 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
87 1
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
88 1
        utils.set_priority(new_int_flow_tbl_0_tcp)
89
90
        # The flow_manager has two outputs: instructions and actions.
91 1
        table_group = self.table_group
92 1
        new_table_id = table_group[new_int_flow_tbl_x["flow"]["table_group"]]
93 1
        instructions = [
94
            {
95
                "instruction_type": "apply_actions",
96
                "actions": [{"action_type": "push_int"}],
97
            },
98
            {"instruction_type": "goto_table", "table_id": new_table_id},
99
        ]
100 1
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
101
102
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
103 1
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
104 1
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
105
106
        # Prepare Flows for Table X - No TCP or UDP specifics
107 1
        new_int_flow_tbl_x["flow"]["table_id"] = new_table_id
108
109
        # if intra-switch EVC, then output port should be the dst UNI's source port
110 1
        if utils.is_intra_switch_evc(evc):
111 1
            dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
112 1
            proxy_port = dst_uni["proxy_port"]
113 1
            for instruction in new_int_flow_tbl_x["flow"]["instructions"]:
114 1
                if instruction["instruction_type"] == "apply_actions":
115 1
                    for action in instruction["actions"]:
116 1
                        if action["action_type"] == "output":
117
                            # Since this is the INT Source, we use source
118
                            # to avoid worrying about single or multi
119
                            # home physical loops.
120
                            # The choice for destination is at the INT Sink.
121 1
                            action["port"] = proxy_port.source.port_number
122
123
                    # remove set_vlan action if it exists, this is for
124
                    # avoding a redundant set_vlan since it'll be set in the egress sink
125 1
                    instruction["actions"] = utils.modify_actions(
126
                        instruction["actions"], ["set_vlan"], remove=True
127
                    )
128
129 1
        instructions = utils.add_to_apply_actions(
130
            new_int_flow_tbl_x["flow"]["instructions"],
131
            new_instruction={"action_type": "add_int_metadata"},
132
            position=0,
133
        )
134
135 1
        new_int_flow_tbl_x["flow"]["instructions"] = instructions
136
137 1
        new_flows.append(new_int_flow_tbl_0_tcp)
138 1
        new_flows.append(new_int_flow_tbl_0_udp)
139 1
        new_flows.append(new_int_flow_tbl_x)
140
141 1
        return new_flows
142
143 1
    def _build_int_hop_flows(
144
        self,
145
        evc: dict,
146
        stored_flows: dict[int, list[dict]],
147
    ) -> list[dict]:
148
        """Build INT hop flows.
149
150
        At the INT hops, one flow adds two more: one for UDP on table 0,
151
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
152
        before other actions.
153
        """
154
155 1
        new_flows = []
156 1
        excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
157
158 1
        for flow in stored_flows[
159
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
160
        ]:
161 1
            if flow["switch"] in excluded_dpids:
162 1
                continue
163 1
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
164
                continue
165
166 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
167 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
168 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
169 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
170
171
            # Prepare TCP Flow
172 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
173 1
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
174 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
175
176 1
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
177 1
                if instruction["instruction_type"] == "apply_actions":
178 1
                    instruction["actions"].insert(
179
                        0, {"action_type": "add_int_metadata"}
180
                    )
181
182
            # Prepare UDP Flow
183 1
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
184 1
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
185
186 1
            new_flows.append(new_int_flow_tbl_0_tcp)
187 1
            new_flows.append(new_int_flow_tbl_0_udp)
188
189 1
        return new_flows
190
191 1
    def _build_int_sink_flows(
192
        self,
193
        uni_dst_key: Literal["uni_a", "uni_z"],
194
        evc: dict,
195
        stored_flows: dict[int, list[dict]],
196
    ) -> list[dict]:
197
        """
198
        Build INT sink flows.
199
200
        At the INT sink, one flow becomes many:
201
        1. Before the proxy, we do add_int_metadata as an INT hop.
202
        We need to keep the set_queue
203
        2. After the proxy, we do send_report and pop_int and output
204
        We only use table 0 for #1.
205
        We use table X (2 or 3) for #2. for pop_int and output
206
        """
207 1
        new_flows = []
208 1
        dst_uni = evc[uni_dst_key]
209 1
        proxy_port = dst_uni["proxy_port"]
210 1
        dpid = dst_uni["switch"]
211
212 1
        for flow in stored_flows[
213
            utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)
214
        ]:
215
            # Only consider this sink's dpid flows
216 1
            if flow["switch"] != dpid:
217 1
                continue
218
            # Only consider flows coming from NNI interfaces
219 1
            if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
220 1
                continue
221
222 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
223
224 1
            if not new_int_flow_tbl_0_tcp:
225
                raise FlowsNotFound(evc["id"])
226
227 1
            utils.set_new_cookie(new_int_flow_tbl_0_tcp)
228 1
            utils.set_owner(new_int_flow_tbl_0_tcp)
229 1
            utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
230
231
            # Save for pos-proxy flows
232 1
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
233 1
            new_int_flow_tbl_x_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
234
235
            # Prepare TCP Flow for Table 0 PRE proxy
236 1
            if not utils.is_intra_switch_evc(evc):
237 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
238 1
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
239 1
                utils.set_priority(new_int_flow_tbl_0_tcp)
240
241
                # Add telemetry, keep set_queue, output to the proxy port.
242 1
                output_port_no = proxy_port.source.port_number
243 1
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
244 1
                    if instruction["instruction_type"] == "apply_actions":
245
                        # Keep set_queue
246 1
                        actions = utils.modify_actions(
247
                            instruction["actions"],
248
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
249
                            remove=True,
250
                        )
251 1
                        actions.insert(0, {"action_type": "add_int_metadata"})
252 1
                        actions.append(
253
                            {"action_type": "output", "port": output_port_no}
254
                        )
255 1
                        instruction["actions"] = actions
256
257
                # Prepare UDP Flow for Table 0
258 1
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
259 1
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
260
261 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
262 1
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
263
264
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
265 1
            in_port_no = proxy_port.destination.port_number
266
267 1
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
268 1
            utils.set_priority(new_int_flow_tbl_0_tcp)
269
270 1
            table_group = self.table_group
271 1
            new_table_id = table_group[new_int_flow_tbl_x_pos["flow"]["table_group"]]
272 1
            instructions = [
273
                {
274
                    "instruction_type": "apply_actions",
275
                    "actions": [{"action_type": "send_report"}],
276
                },
277
                {
278
                    "instruction_type": "goto_table",
279
                    "table_id": new_table_id,
280
                },
281
            ]
282 1
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
283
284
            # Prepare Flows for Table X POS proxy
285 1
            new_int_flow_tbl_x_pos["flow"]["match"]["in_port"] = in_port_no
286 1
            new_int_flow_tbl_x_pos["flow"]["table_id"] = new_table_id
287
288 1
            for instruction in new_int_flow_tbl_x_pos["flow"]["instructions"]:
289 1
                if instruction["instruction_type"] == "apply_actions":
290 1
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
291
292 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
293 1
            new_flows.append(copy.deepcopy(new_int_flow_tbl_x_pos))
294
295
        return new_flows
296