Passed
Push — master ( 7f62ad...2ec8a9 )
by Vinicius
02:07 queued 13s
created

build.main.Main.remove_int_flows()   A

Complexity

Conditions 3

Size

Total Lines 27
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
cc 3
eloc 20
nop 2
dl 0
loc 27
ccs 0
cts 9
cp 0
crap 12
rs 9.4
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
import copy
7
import itertools
8
9
from napps.kytos.telemetry_int import settings
10
11
from kytos.core import KytosNApp, rest
12
from kytos.core.events import KytosEvent
13
from kytos.core.rest_api import HTTPException, JSONResponse, Request, get_json_or_400
14
15
from .exceptions import ErrorBase, EvcHasNoINT, FlowsNotFound, NoProxyPortsAvailable
16
from .kytos_api_helper import get_evc, get_evc_flows, get_evcs
17
from .proxy_port import ProxyPort
18
from .utils import (
19
    add_to_apply_actions,
20
    get_cookie,
21
    get_cookie_telemetry,
22
    get_evc_unis,
23
    get_evc_with_telemetry,
24
    get_path_hop_interface_ids,
25
    get_proxy_port,
26
    has_int_enabled,
27
    is_intra_switch_evc,
28
    modify_actions,
29
    push_flows,
30
    set_instructions_from_actions,
31
    set_new_cookie,
32
    set_priority,
33
    set_telemetry_false_for_evc,
34
    set_telemetry_true_for_evc,
35
)
36
37
# pylint: disable=fixme
38
39
40
class Main(KytosNApp):
41
    """Main class of kytos/telemetry NApp.
42
43
    This class is the entry point for this NApp.
44
    """
45
46
    def setup(self):
47
        """Replace the '__init__' method for the KytosNApp subclass.
48
49
        The setup method is automatically called by the controller when your
50
        application is loaded.
51
52
        So, if you have any setup routine, insert it here.
53
        """
54
55
        # TODO: only loads after all other napps are loaded.
56
57
    def execute(self):
58
        """Run after the setup method execution.
59
60
        You can also use this method in loop mode if you add to the above setup
61
        method a line like the following example:
62
63
            self.execute_as_loop(30)  # 30-second interval.
64
        """
65
66
    def shutdown(self):
67
        """Run when your NApp is unloaded.
68
69
        If you have some cleanup procedure, insert it here.
70
        """
71
72
    @staticmethod
73
    def enable_int_source(
74
        source_uni: dict, evc: dict, proxy_port: ProxyPort
75
    ) -> list[dict]:
76
        """At the INT source, one flow becomes 3: one for UDP on table 0,
77
        one for TCP on table 0, and one on table 2
78
        On table 0, we use just new instructions: push_int and goto_table
79
        On table 2, we add add_int_metadata before the original actions
80
        INT flows will have higher priority. We don't delete the old flows.
81
        """
82
        new_flows = []
83
        new_int_flow_tbl_0_tcp = {}
84
85
        # Get the original flows
86
        dpid = source_uni["switch"]
87
        for flow in get_evc_flows(get_cookie(evc["id"]), dpid).get(dpid, []):
88
            if flow["flow"]["match"]["in_port"] == source_uni["port_number"]:
89
                new_int_flow_tbl_0_tcp = flow
90
                break
91
92
        if not new_int_flow_tbl_0_tcp:
93
            raise FlowsNotFound(evc["id"])
94
95
        set_instructions_from_actions(new_int_flow_tbl_0_tcp)
96
        set_new_cookie(new_int_flow_tbl_0_tcp)
97
98
        # Deepcopy to use for table 2 later
99
        new_int_flow_tbl_2 = copy.deepcopy(new_int_flow_tbl_0_tcp)
100
101
        # Prepare TCP Flow for Table 0
102
        new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
103
        new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
104
        # TODO: Create an exception for when the priority has reached max value
105
        set_priority(new_int_flow_tbl_0_tcp)
106
107
        # The flow_manager has two outputs: instructions and actions.
108
        instructions = [
109
            {
110
                "instruction_type": "apply_actions",
111
                "actions": [{"action_type": "push_int"}],
112
            },
113
            {"instruction_type": "goto_table", "table_id": settings.INT_TABLE},
114
        ]
115
        new_int_flow_tbl_0_tcp["flow"]["instructions"] = instructions
116
117
        # Prepare UDP Flow for Table 0. Everything the same as TCP except the nw_proto
118
        new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
119
        new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
120
121
        # Prepare Flows for Table 2 - No TCP or UDP specifics
122
        new_int_flow_tbl_2["flow"]["table_id"] = settings.INT_TABLE
123
124
        # if intra-switch EVC, then output port should be the proxy
125
        if is_intra_switch_evc(evc):
126
            for instruction in new_int_flow_tbl_2["flow"]["instructions"]:
127
                if instruction["instruction_type"] == "apply_actions":
128
                    for action in instruction["actions"]:
129
                        if action["action_type"] == "output":
130
                            # Since this is the INT Source, we use source
131
                            # to avoid worrying about single or multi
132
                            # home physical loops.
133
                            # The choice for destination is at the INT Sink.
134
                            action["port"] = proxy_port.source.port_number
135
136
        instructions = add_to_apply_actions(
137
            new_int_flow_tbl_2["flow"]["instructions"],
138
            new_instruction={"action_type": "add_int_metadata"},
139
            position=0,
140
        )
141
142
        new_int_flow_tbl_2["flow"]["instructions"] = instructions
143
144
        new_flows.append(new_int_flow_tbl_0_tcp)
145
        new_flows.append(new_int_flow_tbl_0_udp)
146
        new_flows.append(new_int_flow_tbl_2)
147
148
        return new_flows
149
150
    @staticmethod
151
    def enable_int_hop(
152
        evc: dict, source_uni: dict, destination_uni: dict
153
    ) -> list[dict]:
154
        """At the INT hops, one flow adds two more: one for UDP on table 0,
155
        one for TCP on table 0. On table 0, we add 'add_int_metadata'
156
        before other actions. We use source and destination to create the
157
        unidirectional support for telemetry.
158
        """
159
160
        new_flows = []
161
        dpid_ports, dpids = set(), set()
162
        intf_ids = get_path_hop_interface_ids(evc, source_uni, destination_uni)
163
        for interface_id in intf_ids:
164
            intf_split = interface_id.split(":")
165
            switch, port_number = ":".join(intf_split[:-1]), int(intf_split[-1])
166
            dpid_ports.add((switch, port_number))
167
            dpids.add(switch)
168
169
        for flow in itertools.chain(
170
            *get_evc_flows(get_cookie(evc["id"]), *dpids).values()
171
        ):
172
            if "match" not in flow["flow"] or "in_port" not in flow["flow"]["match"]:
173
                continue
174
            if (flow["switch"], flow["flow"]["match"]["in_port"]) not in dpid_ports:
175
                continue
176
177
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
178
            set_instructions_from_actions(new_int_flow_tbl_0_tcp)
179
            set_new_cookie(flow)
180
181
            # Prepare TCP Flow
182
            new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
183
            new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
184
            set_priority(new_int_flow_tbl_0_tcp)
185
186
            for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
187
                if instruction["instruction_type"] == "apply_actions":
188
                    instruction["actions"].insert(
189
                        0, {"action_type": "add_int_metadata"}
190
                    )
191
192
            # Prepare UDP Flow
193
            new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
194
            new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
195
196
            new_flows.append(new_int_flow_tbl_0_tcp)
197
            new_flows.append(new_int_flow_tbl_0_udp)
198
199
        return new_flows
200
201
    @staticmethod
202
    def enable_int_sink(destination_uni: dict, evc: dict, proxy_port: ProxyPort):
203
        """At the INT sink, one flow becomes many:
204
        1. Before the proxy, we do add_int_metadata as an INT hop.
205
        We need to keep the set_queue
206
        2. After the proxy, we do send_report and pop_int and output
207
        We only use table 0 for #1.
208
        We use table 2 for #2. for pop_int and output
209
        """
210
        new_flows = []
211
        dpid = destination_uni["switch"]
212
        for flow in get_evc_flows(get_cookie(evc["id"]), dpid).get(dpid, []):
213
            # Only consider flows coming from NNI interfaces
214
            if flow["flow"]["match"]["in_port"] == destination_uni["port_number"]:
215
                continue
216
217
            new_int_flow_tbl_0_tcp = copy.deepcopy(flow)
218
            set_new_cookie(flow)
219
220
            if not new_int_flow_tbl_0_tcp:
221
                raise FlowsNotFound(evc["id"])
222
223
            set_instructions_from_actions(new_int_flow_tbl_0_tcp)
224
            # Save for pos-proxy flows
225
            new_int_flow_tbl_0_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
226
            new_int_flow_tbl_2_pos = copy.deepcopy(new_int_flow_tbl_0_tcp)
227
228
            # Prepare TCP Flow for Table 0 PRE proxy
229
            if not is_intra_switch_evc(evc):
230
                new_int_flow_tbl_0_tcp["flow"]["match"]["dl_type"] = settings.IPv4
231
                new_int_flow_tbl_0_tcp["flow"]["match"]["nw_proto"] = settings.TCP
232
                set_priority(new_int_flow_tbl_0_tcp)
233
234
                # Add telemetry, keep set_queue, output to the proxy port.
235
                output_port_no = proxy_port.source.port_number
236
                for instruction in new_int_flow_tbl_0_tcp["flow"]["instructions"]:
237
                    if instruction["instruction_type"] == "apply_actions":
238
                        # Keep set_queue
239
                        actions = modify_actions(
240
                            instruction["actions"],
241
                            ["pop_vlan", "push_vlan", "set_vlan", "output"],
242
                            remove=True,
243
                        )
244
                        actions.insert(0, {"action_type": "add_int_metadata"})
245
                        actions.append(
246
                            {"action_type": "output", "port": output_port_no}
247
                        )
248
                        instruction["actions"] = actions
249
250
                # Prepare UDP Flow for Table 0
251
                new_int_flow_tbl_0_udp = copy.deepcopy(new_int_flow_tbl_0_tcp)
252
                new_int_flow_tbl_0_udp["flow"]["match"]["nw_proto"] = settings.UDP
253
254
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_tcp))
255
                new_flows.append(copy.deepcopy(new_int_flow_tbl_0_udp))
256
                del instruction  # pylint: disable=W0631
0 ignored issues
show
introduced by
The variable instruction does not seem to be defined for all execution paths.
Loading history...
257
258
            # Prepare Flows for Table 0 AFTER proxy. No difference between TCP or UDP
259
            in_port_no = proxy_port.destination.port_number
260
261
            new_int_flow_tbl_0_pos["flow"]["match"]["in_port"] = in_port_no
262
            set_priority(new_int_flow_tbl_0_tcp)
263
264
            instructions = [
265
                {
266
                    "instruction_type": "apply_actions",
267
                    "actions": [{"action_type": "send_report"}],
268
                },
269
                {"instruction_type": "goto_table", "table_id": settings.INT_TABLE},
270
            ]
271
            new_int_flow_tbl_0_pos["flow"]["instructions"] = instructions
272
273
            # Prepare Flows for Table 2 POS proxy
274
            new_int_flow_tbl_2_pos["flow"]["match"]["in_port"] = in_port_no
275
            new_int_flow_tbl_2_pos["flow"]["table_id"] = settings.INT_TABLE
276
277
            for instruction in new_int_flow_tbl_2_pos["flow"]["instructions"]:
278
                if instruction["instruction_type"] == "apply_actions":
279
                    instruction["actions"].insert(0, {"action_type": "pop_int"})
280
281
            new_flows.append(copy.deepcopy(new_int_flow_tbl_0_pos))
282
            new_flows.append(copy.deepcopy(new_int_flow_tbl_2_pos))
283
284
        return new_flows
285
286
    def provision_int_unidirectional(
287
        self, evc: dict, source_uni: dict, destination_uni: dict, proxy_port: ProxyPort
288
    ) -> bool:
289
        """Create INT flows from source to destination."""
290
291
        # Create flows for the first switch (INT Source)
292
        new_flows = self.enable_int_source(source_uni, evc, proxy_port)
293
294
        # Create flows the INT hops
295
        new_flows += list(self.enable_int_hop(evc, source_uni, destination_uni))
296
297
        # # Create flows for the last switch (INT Sink)
298
        new_flows += list(self.enable_int_sink(destination_uni, evc, proxy_port))
299
300
        return push_flows(new_flows)
301
302
    # pylint: disable=too-many-branches
303
    def provision_int(self, evc: dict) -> str:
304
        """Create telemetry flows for an EVC."""
305
306
        # TODO refactor to always ensure it has proxy port based on EP031 augmented
307
308
        # Get the EVC endpoints
309
        evc_id = evc["id"]
310
        uni_a, uni_z = get_evc_unis(evc)
311
312
        # Check if there are proxy ports on the endpoints' switches
313
        uni_a_proxy_port = get_proxy_port(self.controller, uni_a["interface_id"])
314
        uni_z_proxy_port = get_proxy_port(self.controller, uni_z["interface_id"])
315
316
        # INT is enabled per direction.
317
        # It's possible and acceptable to have INT just in one direction.
318
319
        # Direction uni_z -> uni_a
320
        if uni_a_proxy_port:
321
            self.provision_int_unidirectional(evc, uni_z, uni_a, uni_a_proxy_port)
322
            # change EVC metadata "telemetry": {"enabled": true } via API
323
324
        # Direction uni_a -> uni_z
325
        if uni_z_proxy_port:
326
            self.provision_int_unidirectional(evc, uni_a, uni_z, uni_z_proxy_port)
327
328
        # Change EVC metadata "telemetry": {"enabled": true } via API
329
        if uni_a_proxy_port and uni_z_proxy_port:
330
            if not set_telemetry_true_for_evc(evc_id, "bidirectional"):
331
                raise ErrorBase(
332
                    evc_id, "failed to add telemetry bidirectional metadata"
333
                )
334
            msg = f"INT enabled for EVC ID {evc_id} on both directions"
335
336
        elif uni_a_proxy_port or uni_z_proxy_port:
337
            if not set_telemetry_true_for_evc(evc_id, "unidirectional"):
338
                raise ErrorBase(
339
                    evc_id, "failed to add telemetry unidirectional metadata"
340
                )
341
342
            msg = f"INT enabled for EVC ID {evc_id} on direction "
343
            if uni_z_proxy_port:
344
                msg += (
345
                    f"{evc['uni_a']['interface_id']} -> {evc['uni_z']['interface_id']}"
346
                )
347
            else:
348
                msg += (
349
                    f"{evc['uni_z']['interface_id']} -> {evc['uni_a']['interface_id']}"
350
                )
351
352
        else:
353
            raise NoProxyPortsAvailable(evc_id)
354
355
        return msg
356
357
    # pylint: enable=too-many-branches
358
359
    def decommission_int(self, evc: dict) -> str:
360
        """Remove all INT flows for an EVC"""
361
362
        evc_id = evc["id"]
363
        self.remove_int_flows(evc)
364
365
        # Update mef_eline.
366
        if not set_telemetry_false_for_evc(evc_id):
367
            raise ErrorBase(evc_id, "failed to disable telemetry metadata")
368
369
        return f"EVC ID {evc_id} is no longer INT-enabled."
370
371
    def remove_int_flows(self, evc: dict) -> None:
372
        """Delete int flows of a given EVC."""
373
        cookie = get_cookie_telemetry(evc["id"])
374
        dpids = set()
375
        for path_item in itertools.chain(
376
            evc.get("current_path", []),
377
            evc.get("failover_path", []),
378
            evc.get("primary_path", []),
379
            evc.get("backup_path", []),
380
        ):
381
            dpids.add(path_item["endpoint_a"]["switch"])
382
            dpids.add(path_item["endpoint_b"]["switch"])
383
384
        for dpid in dpids:
385
            event = KytosEvent(
386
                "kytos.flow_manager.flows.delete",
387
                content={
388
                    "dpid": dpid,
389
                    "flow_dict": {
390
                        "force": True,
391
                        "flows": [
392
                            {"cookie": cookie, "cookie_mask": int(0xFFFFFFFFFFFFFFFF)}
393
                        ],
394
                    },
395
                },
396
            )
397
            self.controller.buffers.app.put(event)
398
399
    # REST methods
400
401 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
402
    def enable_telemetry(self, request: Request) -> JSONResponse:
403
        """REST to enable/create INT flows for one or more EVC_IDs.
404
                  evcs are provided via POST as a list
405
        Args:
406
            {"evc_ids": [list of evc_ids] }
407
408
        Returns:
409
            200 and outcomes for each evc listed.
410
        """
411
412
        try:
413
            content = get_json_or_400(request, self.controller.loop)
414
            evc_ids = content["evc_ids"]
415
        except (TypeError, KeyError):
416
            raise HTTPException(400, detail=f"Invalid payload: {content}")
417
418
        status = {}
419
        evcs = get_evcs() if len(evc_ids) != 1 else get_evc(evc_ids[0])
420
421
        # TODO extract this and cover proxy port validations too
422
        for evc_id in evc_ids:
423
            if evc_id not in evcs:
424
                raise HTTPException(404, detail=f"EVC {evc_id} doesn't exist")
425
            if has_int_enabled(evcs[evc_id]):
426
                raise HTTPException(400, detail=f"EVC {evc_id} already has INT enabled")
427
428
        if not evc_ids:
429
            # Enable telemetry for ALL non INT EVCs.
430
            evcs = {k: v for k, v in evcs.items() if not has_int_enabled(v)}
431
        else:
432
            evcs = {evc_id: evcs[evc_id] for evc_id in evc_ids}
433
434
        # Process each EVC individually
435
        # TODO dispatch in batch and update metadata in bulk shortly after
436
        for evc_id, evc in evcs.items():
437
            try:
438
                status[evc_id] = self.provision_int(evc)
439
            except ErrorBase as err_msg:
440
                status[evc_id] = err_msg.message
441
442
        return JSONResponse(status)
443
444 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
445
    def disable_telemetry(self, request: Request) -> JSONResponse:
446
        """REST to disable/remove INT flows for an EVC_ID
447
        Args:
448
            {"evc_ids": [list of evc_ids] }
449
        Returns:
450
            200 if successful
451
            400 is otherwise
452
        """
453
        try:
454
            content = get_json_or_400(request, self.controller.loop)
455
            evc_ids = content["evc_ids"]
456
        except (TypeError, KeyError):
457
            raise HTTPException(400, detail=f"Invalid payload: {content}")
458
459
        status = {}
460
461
        evcs = get_evcs() if len(evc_ids) != 1 else get_evc(evc_ids[0])
462
463
        # TODO extract this and cover proxy port validations too
464
        for evc_id in evc_ids:
465
            if evc_id not in evcs:
466
                raise HTTPException(404, detail=f"EVC {evc_id} doesn't exist")
467
            if not has_int_enabled(evcs[evc_id]):
468
                raise HTTPException(
469
                    400, detail=f"EVC {evc_id} doesn't have INT enabled"
470
                )
471
472
        if not evc_ids:
473
            # Enable telemetry for ALL INT EVCs.
474
            evcs = {k: v for k, v in evcs.items() if has_int_enabled(v)}
475
        else:
476
            evcs = {evc_id: evcs[evc_id] for evc_id in evc_ids}
477
478
        # Process each EVC individually
479
        # TODO dispatch in batch and update metadata in bulk shortly after
480
        for evc_id, evc in evcs.items():
481
            try:
482
                status[evc_id] = self.decommission_int(evc)
483
            except EvcHasNoINT as err_msg:
484
                # Ignore since it is not an issue.
485
                status[evc_id] = err_msg.message
486
            except ErrorBase as err_msg:
487
                # Rollback INT configuration. This error will lead to inconsistency.
488
                # Critical
489
                status[evc_id] = err_msg.message
490
491
        return JSONResponse(status)
492
493
    @rest("v1/evc")
494
    def get_evcs(self, _request: Request) -> JSONResponse:
495
        """REST to return the list of EVCs with INT enabled"""
496
        return JSONResponse(get_evc_with_telemetry())
497
498
    @rest("v1/sync")
499
    def sync_flows(self, _request: Request) -> JSONResponse:
500
        """Endpoint to force the telemetry napp to search for INT flows and delete them
501
        accordingly to the evc metadata."""
502
503
        # TODO
504
        # for evc_id in get_evcs_ids():
505
        return JSONResponse("TBD")
506
507
    @rest("v1/evc/update")
508
    def update_evc(self, _request: Request) -> JSONResponse:
509
        """If an EVC changed from unidirectional to bidirectional telemetry,
510
        make the change."""
511
        return JSONResponse({})
512
513
    # Event-driven methods: future
514
    def listen_for_new_evcs(self):
515
        """Change newly created EVC to INT-enabled EVC based on the metadata field
516
        (future)"""
517
        pass
518
519
    def listen_for_evc_change(self):
520
        """Change newly created EVC to INT-enabled EVC based on the
521
        metadata field (future)"""
522
        pass
523
524
    def listen_for_path_changes(self):
525
        """Change EVC's new path to INT-enabled EVC based on the metadata field
526
        when there is a path change. (future)"""
527
        pass
528
529
    def listen_for_evcs_removed(self):
530
        """Remove all INT flows belonging the just removed EVC (future)"""
531
        pass
532
533
    def listen_for_topology_changes(self):
534
        """If the topology changes, make sure it is not the loop ports.
535
        If so, update proxy ports"""
536
        # TODO:
537
        # self.proxy_ports = create_proxy_ports(self.proxy_ports)
538
        pass
539
540
    def listen_for_evc_metadata_changes(self):
541
        """If the proxy port changes, the flows have to be reconfigured."""
542
        pass
543