Passed
Push — master ( 5413c2...c98987 )
by Vinicius
08:23 queued 06:42
created

build.managers.flow_builder._build_int_hop_flows()   B

Complexity

Conditions 7

Size

Total Lines 42
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 23.4991

Importance

Changes 0
Metric Value
cc 7
eloc 25
nop 2
dl 0
loc 42
ccs 7
cts 23
cp 0.3043
crap 23.4991
rs 7.8799
c 0
b 0
f 0
1
"""flow_builder module responsible for building and mapping flows."""
2
3 1
import copy
4 1
from collections import defaultdict
5 1
import itertools
6
7 1
from typing import Literal
8
9 1
from napps.kytos.telemetry_int import utils
10 1
from napps.kytos.telemetry_int.exceptions import FlowsNotFound
11 1
from napps.kytos.telemetry_int import settings
12
13
14 1
def build_int_flows(
15
    evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
16
) -> dict[int, list[dict]]:
17
    """build INT flows.
18
19
    It'll map and create all INT flows needed, for now since each EVC
20
    is bidirectional, it'll provision bidirectionally too. In the future,
21
    if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
22
    """
23 1
    flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
24 1
    for evc_id, evc in evcs.items():
25 1
        for flow in itertools.chain(
26
            _build_int_source_flows("uni_a", evc, stored_flows),
27
            _build_int_source_flows("uni_z", evc, stored_flows),
28
            _build_int_hop_flows(evc, stored_flows),
29
            _build_int_sink_flows("uni_z", evc, stored_flows),
30
            _build_int_sink_flows("uni_a", evc, stored_flows),
31
        ):
32 1
            cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
33 1
            flows_per_cookie[cookie].append(flow)
34 1
    return flows_per_cookie
35
36
37 1
def _build_int_source_flows(
38
    uni_src_key: Literal["uni_a", "uni_z"],
39
    evc: dict,
40
    stored_flows: dict[int, list[dict]],
41
) -> list[dict]:
42
    """Build INT source flows.
43
44
    At the INT source, one flow becomes 3: one for UDP on table 0,
45
    one for TCP on table 0, and one on table 2
46
    On table 0, we use just new instructions: push_int and goto_table
47
    On table 2, we add add_int_metadata before the original actions
48
    INT flows will have higher priority.
49
    """
50 1
    new_flows = []
51 1
    new_int_flow_tbl_0_tcp = {}
52 1
    src_uni = evc[uni_src_key]
53
54
    # Get the original flows
55 1
    dpid = src_uni["switch"]
56 1
    for flow in stored_flows[utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)]:
57 1
        if (
58
            flow["switch"] == dpid
59
            and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
60
        ):
61 1
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
62 1
            break
63
64 1
    if not new_int_flow_tbl_0_tcp:
65
        raise FlowsNotFound(evc["id"])
66
67 1
    utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
68 1
    utils.set_new_cookie(new_int_flow_tbl_0_tcp)
69 1
    utils.set_owner(new_int_flow_tbl_0_tcp)
70
71
    # Deepcopy to use for table 2 later
72 1
    new_int_flow_tbl_2 = copy.deepcopy(new_int_flow_tbl_0_tcp)
73
74
    # Prepare TCP Flow for Table 0
75 1
    new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
76 1
    new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
77 1
    utils.set_priority(new_int_flow_tbl_0_tcp)
78
79
    # The flow_manager has two outputs: instructions and actions.
80 1
    instructions = [
81
        {
82
            "instruction_type": "apply_actions",
83
            "actions": [{"action_type": "push_int"}],
84
        },
85
        {"instruction_type": "goto_table", "table_id": settings.INT_TABLE},
86
    ]
87 1
    new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
88
89
    # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
90 1
    new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
91 1
    new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
92
93
    # Prepare Flows for Table 2 - No TCP or UDP specifics
94 1
    new_int_flow_tbl_2["flow"]["table_id"] = settings.INT_TABLE
95 1
    utils.set_table_group(new_int_flow_tbl_2)
96
97
    # if intra-switch EVC, then output port should be the dst UNI's source port
98 1
    if utils.is_intra_switch_evc(evc):
99 1
        dst_uni = evc["uni_z" if uni_src_key == "uni_a" else "uni_a"]
100 1
        proxy_port = dst_uni["proxy_port"]
101 1
        for instruction in new_int_flow_tbl_2["flow"]["instructions"]:
102 1
            if instruction["instruction_type"] == "apply_actions":
103 1
                for action in instruction["actions"]:
104 1
                    if action["action_type"] == "output":
105
                        # Since this is the INT Source, we use source
106
                        # to avoid worrying about single or multi
107
                        # home physical loops.
108
                        # The choice for destination is at the INT Sink.
109 1
                        action["port"] = proxy_port.source.port_number
110
111
                # remove set_vlan action if it exists, this is for
112
                # avoding a redundant set_vlan since it'll be set in the egress sink
113 1
                instruction["actions"] = utils.modify_actions(
114
                    instruction["actions"], ["set_vlan"], remove=True
115
                )
116
117 1
    instructions = utils.add_to_apply_actions(
118
        new_int_flow_tbl_2["flow"]["instructions"],
119
        new_instruction={"action_type": "add_int_metadata"},
120
        position=0,
121
    )
122
123 1
    new_int_flow_tbl_2["flow"]["instructions"] = instructions
124
125 1
    new_flows.append(new_int_flow_tbl_0_tcp)
126 1
    new_flows.append(new_int_flow_tbl_0_udp)
127 1
    new_flows.append(new_int_flow_tbl_2)
128
129 1
    return new_flows
130
131
132 1
def _build_int_hop_flows(
133
    evc: dict,
134
    stored_flows: dict[int, list[dict]],
135
) -> list[dict]:
136
    """Build INT hop flows.
137
138
    At the INT hops, one flow adds two more: one for UDP on table 0,
139
    one for TCP on table 0. On table 0, we add 'add_int_metadata'
140
    before other actions.
141
    """
142
143 1
    new_flows = []
144 1
    excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
145
146 1
    for flow in stored_flows[utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)]:
147 1
        if flow["switch"] in excluded_dpids:
148 1
            continue
149
        if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
150
            continue
151
152
        new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
153
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
154
        utils.set_new_cookie(flow)
155
        utils.set_owner(new_int_flow_tbl_0_tcp)
156
157
        # Prepare TCP Flow
158
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
159
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
160
        utils.set_priority(new_int_flow_tbl_0_tcp)
161
162
        for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
163
            if instruction["instruction_type"] == "apply_actions":
164
                instruction["actions"].insert(0, {"action_type": "add_int_metadata"})
165
166
        # Prepare UDP Flow
167
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
168
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
169
170
        new_flows.append(new_int_flow_tbl_0_tcp)
171
        new_flows.append(new_int_flow_tbl_0_udp)
172
173 1
    return new_flows
174
175
176 1
def _build_int_sink_flows(
177
    uni_dst_key: Literal["uni_a", "uni_z"],
178
    evc: dict,
179
    stored_flows: dict[int, list[dict]],
180
) -> list[dict]:
181
    """
182
    Build INT sink flows.
183
184
    At the INT sink, one flow becomes many:
185
    1. Before the proxy, we do add_int_metadata as an INT hop.
186
    We need to keep the set_queue
187
    2. After the proxy, we do send_report and pop_int and output
188
    We only use table 0 for #1.
189
    We use table 2 for #2. for pop_int and output
190
    """
191 1
    new_flows = []
192 1
    dst_uni = evc[uni_dst_key]
193 1
    proxy_port = dst_uni["proxy_port"]
194 1
    dpid = dst_uni["switch"]
195
196 1
    for flow in stored_flows[utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)]:
197
        # Only consider this sink's dpid flows
198 1
        if flow["switch"] != dpid:
199
            continue
200
        # Only consider flows coming from NNI interfaces
201 1
        if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
202 1
            continue
203
204 1
        new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
205
206 1
        if not new_int_flow_tbl_0_tcp:
207
            raise FlowsNotFound(evc["id"])
208
209 1
        utils.set_new_cookie(new_int_flow_tbl_0_tcp)
210 1
        utils.set_owner(new_int_flow_tbl_0_tcp)
211 1
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
212
213
        # Save for pos-proxy flows
214 1
        new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
215 1
        new_int_flow_tbl_2_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
216
217
        # Prepare TCP Flow for Table 0 PRE proxy
218 1
        if not utils.is_intra_switch_evc(evc):
219
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
220
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
221
            utils.set_priority(new_int_flow_tbl_0_tcp)
222
223
            # Add telemetry, keep set_queue, output to the proxy port.
224
            output_port_no = proxy_port.source.port_number
225
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
226
                if instruction["instruction_type"] == "apply_actions":
227
                    # Keep set_queue
228
                    actions = utils.modify_actions(
229
                        instruction["actions"],
230
                        ["pop_vlan", "push_vlan", "set_vlan", "output"],
231
                        remove=True,
232
                    )
233
                    actions.insert(0, {"action_type": "add_int_metadata"})
234
                    actions.append({"action_type": "output", "port": output_port_no})
235
                    instruction["actions"] = actions
236
237
            # Prepare UDP Flow for Table 0
238
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
239
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
240
241
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
242
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
243
244
        # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
245 1
        in_port_no = proxy_port.destination.port_number
246
247 1
        new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
248 1
        utils.set_priority(new_int_flow_tbl_0_tcp)
249
250 1
        instructions = [
251
            {
252
                "instruction_type": "apply_actions",
253
                "actions": [{"action_type": "send_report"}],
254
            },
255
            {"instruction_type": "goto_table", "table_id": settings.INT_TABLE},
256
        ]
257 1
        new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
258
259
        # Prepare Flows for Table 2 POS proxy
260 1
        new_int_flow_tbl_2_pos["flow"]["match"]["in_port"] = in_port_no
261 1
        new_int_flow_tbl_2_pos["flow"]["table_id"] = settings.INT_TABLE
262 1
        utils.set_table_group(new_int_flow_tbl_2_pos)
263
264 1
        for instruction in new_int_flow_tbl_2_pos["flow"]["instructions"]:
265 1
            if instruction["instruction_type"] == "apply_actions":
266 1
                instruction["actions"].insert(0, {"action_type": "pop_int"})
267
268 1
        new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
269 1
        new_flows.append(copy.deepcopy(new_int_flow_tbl_2_pos))
270
271
    return new_flows
272