Test Failed
Pull Request — master (#35)
by Vinicius
06:08
created

build.managers.flow_builder.build_int_flows()   A

Complexity

Conditions 3

Size

Total Lines 21
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 2
dl 0
loc 21
rs 9.75
c 0
b 0
f 0
1
"""flow_builder module responsible for building and mapping flows."""
2
3
import copy
4
from collections import defaultdict
5
import itertools
6
7
from typing import Literal
8
9
from napps.kytos.telemetry_int import utils
10
from napps.kytos.telemetry_int.exceptions import FlowsNotFound
11
from napps.kytos.telemetry_int import settings
12
13
14
def build_int_flows(
15
    evcs: dict[str, dict], stored_flows: dict[int, list[dict]]
16
) -> dict[int, list[dict]]:
17
    """build INT flows.
18
19
    It'll map and create all INT flows needed, for now since each EVC
20
    is bidirectional, it'll provision bidirectionally too. In the future,
21
    if mef_eline supports unidirectional EVC, it'll follow suit accordingly.
22
    """
23
    flows_per_cookie: dict[int, list[dict]] = defaultdict(list)
24
    for evc_id, evc in evcs.items():
25
        for flow in itertools.chain(
26
            _build_int_source_flows("uni_a", evc, stored_flows),
27
            _build_int_source_flows("uni_z", evc, stored_flows),
28
            _build_int_hop_flows(evc, stored_flows),
29
            _build_int_sink_flows("uni_z", evc, stored_flows),
30
            _build_int_sink_flows("uni_a", evc, stored_flows),
31
        ):
32
            cookie = utils.get_cookie(evc_id, settings.MEF_COOKIE_PREFIX)
33
            flows_per_cookie[cookie].append(flow)
34
    return flows_per_cookie
35
36
37
def _build_int_source_flows(
38
    uni_src_key: Literal["uni_a", "uni_z"],
39
    evc: dict,
40
    stored_flows: dict[int, list[dict]],
41
) -> list[dict]:
42
    """Build INT source flows.
43
44
    At the INT source, one flow becomes 3: one for UDP on table 0,
45
    one for TCP on table 0, and one on table 2
46
    On table 0, we use just new instructions: push_int and goto_table
47
    On table 2, we add add_int_metadata before the original actions
48
    INT flows will have higher priority.
49
    """
50
    new_flows = []
51
    new_int_flow_tbl_0_tcp = {}
52
    src_uni = evc[uni_src_key]
53
    proxy_port = src_uni["proxy_port"]
54
55
    # Get the original flows
56
    dpid = src_uni["switch"]
57
    for flow in stored_flows[utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)]:
58
        if (
59
            flow["switch"] == dpid
60
            and flow["flow"]["match"]["in_port"] == src_uni["port_number"]
61
        ):
62
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
63
            break
64
65
    if not new_int_flow_tbl_0_tcp:
66
        raise FlowsNotFound(evc["id"])
67
68
    utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
69
    utils.set_new_cookie(new_int_flow_tbl_0_tcp)
70
    utils.set_owner(new_int_flow_tbl_0_tcp)
71
72
    # Deepcopy to use for table 2 later
73
    new_int_flow_tbl_2 = copy.deepcopy(new_int_flow_tbl_0_tcp)
74
75
    # Prepare TCP Flow for Table 0
76
    new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
77
    new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
78
    utils.set_priority(new_int_flow_tbl_0_tcp)
79
80
    # The flow_manager has two outputs: instructions and actions.
81
    instructions = [
82
        {
83
            "instruction_type": "apply_actions",
84
            "actions": [{"action_type": "push_int"}],
85
        },
86
        {"instruction_type": "goto_table", "table_id": settings.INT_TABLE},
87
    ]
88
    new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
89
90
    # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
91
    new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
92
    new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
93
94
    # Prepare Flows for Table 2 - No TCP or UDP specifics
95
    new_int_flow_tbl_2["flow"]["table_id"] = settings.INT_TABLE
96
    utils.set_table_group(new_int_flow_tbl_2)
97
98
    # if intra-switch EVC, then output port should be the proxy
99
    if utils.is_intra_switch_evc(evc):
100
        for instruction in new_int_flow_tbl_2["flow"]["instructions"]:
101
            if instruction["instruction_type"] == "apply_actions":
102
                for action in instruction["actions"]:
103
                    if action["action_type"] == "output":
104
                        # Since this is the INT Source, we use source
105
                        # to avoid worrying about single or multi
106
                        # home physical loops.
107
                        # The choice for destination is at the INT Sink.
108
                        action["port"] = proxy_port.source.port_number
109
110
    instructions = utils.add_to_apply_actions(
111
        new_int_flow_tbl_2["flow"]["instructions"],
112
        new_instruction={"action_type": "add_int_metadata"},
113
        position=0,
114
    )
115
116
    new_int_flow_tbl_2["flow"]["instructions"] = instructions
117
118
    new_flows.append(new_int_flow_tbl_0_tcp)
119
    new_flows.append(new_int_flow_tbl_0_udp)
120
    new_flows.append(new_int_flow_tbl_2)
121
122
    return new_flows
123
124
125
def _build_int_hop_flows(
126
    evc: dict,
127
    stored_flows: dict[int, list[dict]],
128
) -> list[dict]:
129
    """Build INT hop flows.
130
131
    At the INT hops, one flow adds two more: one for UDP on table 0,
132
    one for TCP on table 0. On table 0, we add 'add_int_metadata'
133
    before other actions.
134
    """
135
136
    new_flows = []
137
    excluded_dpids = set([evc["uni_a"]["switch"], evc["uni_z"]["switch"]])
138
139
    for flow in stored_flows[utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)]:
140
        if flow["switch"] in excluded_dpids:
141
            continue
142
        if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
143
            continue
144
145
        new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
146
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
147
        utils.set_new_cookie(flow)
148
        utils.set_owner(new_int_flow_tbl_0_tcp)
149
150
        # Prepare TCP Flow
151
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
152
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
153
        utils.set_priority(new_int_flow_tbl_0_tcp)
154
155
        for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
156
            if instruction["instruction_type"] == "apply_actions":
157
                instruction["actions"].insert(0, {"action_type": "add_int_metadata"})
158
159
        # Prepare UDP Flow
160
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
161
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
162
163
        new_flows.append(new_int_flow_tbl_0_tcp)
164
        new_flows.append(new_int_flow_tbl_0_udp)
165
166
    return new_flows
167
168
169
def _build_int_sink_flows(
170
    uni_dst_key: Literal["uni_a", "uni_z"],
171
    evc: dict,
172
    stored_flows: dict[int, list[dict]],
173
) -> list[dict]:
174
    """
175
    Build INT sink flows.
176
177
    At the INT sink, one flow becomes many:
178
    1. Before the proxy, we do add_int_metadata as an INT hop.
179
    We need to keep the set_queue
180
    2. After the proxy, we do send_report and pop_int and output
181
    We only use table 0 for #1.
182
    We use table 2 for #2. for pop_int and output
183
    """
184
    new_flows = []
185
    dst_uni = evc[uni_dst_key]
186
    proxy_port = dst_uni["proxy_port"]
187
    dpid = dst_uni["switch"]
188
189
    for flow in stored_flows[utils.get_cookie(evc["id"], settings.MEF_COOKIE_PREFIX)]:
190
        # Only consider this sink's dpid flows
191
        if flow["switch"] != dpid:
192
            continue
193
        # Only consider flows coming from NNI interfaces
194
        if flow["flow"]["match"]["in_port"] == dst_uni["port_number"]:
195
            continue
196
197
        new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
198
199
        if not new_int_flow_tbl_0_tcp:
200
            raise FlowsNotFound(evc["id"])
201
202
        utils.set_new_cookie(flow)
203
        utils.set_owner(new_int_flow_tbl_0_tcp)
204
        utils.set_instructions_from_actions(new_int_flow_tbl_0_tcp)
205
206
        # Save for pos-proxy flows
207
        new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
208
        new_int_flow_tbl_2_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
209
210
        # Prepare TCP Flow for Table 0 PRE proxy
211
        if not utils.is_intra_switch_evc(evc):
212
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
213
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
214
            utils.set_priority(new_int_flow_tbl_0_tcp)
215
216
            # Add telemetry, keep set_queue, output to the proxy port.
217
            output_port_no = proxy_port.source.port_number
218
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
219
                if instruction["instruction_type"] == "apply_actions":
220
                    # Keep set_queue
221
                    actions = utils.modify_actions(
222
                        instruction["actions"],
223
                        ["pop_vlan", "push_vlan", "set_vlan", "output"],
224
                        remove=True,
225
                    )
226
                    actions.insert(0, {"action_type": "add_int_metadata"})
227
                    actions.append({"action_type": "output", "port": output_port_no})
228
                    instruction["actions"] = actions
229
230
            # Prepare UDP Flow for Table 0
231
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
232
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
233
234
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
235
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
236
237
        # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
238
        in_port_no = proxy_port.destination.port_number
239
240
        new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
241
        utils.set_priority(new_int_flow_tbl_0_tcp)
242
243
        instructions = [
244
            {
245
                "instruction_type": "apply_actions",
246
                "actions": [{"action_type": "send_report"}],
247
            },
248
            {"instruction_type": "goto_table", "table_id": settings.INT_TABLE},
249
        ]
250
        new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
251
252
        # Prepare Flows for Table 2 POS proxy
253
        new_int_flow_tbl_2_pos["flow"]["match"]["in_port"] = in_port_no
254
        new_int_flow_tbl_2_pos["flow"]["table_id"] = settings.INT_TABLE
255
        utils.set_table_group(new_int_flow_tbl_2_pos)
256
257
        for instruction in new_int_flow_tbl_2_pos["flow"]["instructions"]:
258
            if instruction["instruction_type"] == "apply_actions":
259
                instruction["actions"].insert(0, {"action_type": "pop_int"})
260
261
        new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
262
        new_flows.append(copy.deepcopy(new_int_flow_tbl_2_pos))
263
264
    return new_flows
265