1 | """Main module of kytos/pathfinder Kytos Network Application.""" |
||
2 | |||
3 | 1 | import pathlib |
|
4 | 1 | from threading import Lock |
|
5 | 1 | from typing import Generator |
|
6 | |||
7 | 1 | from kytos.core import KytosNApp, log, rest |
|
8 | 1 | from kytos.core.helpers import listen_to, load_spec, validate_openapi |
|
9 | 1 | from kytos.core.rest_api import (HTTPException, JSONResponse, Request, |
|
10 | get_json_or_400) |
||
11 | 1 | from napps.kytos.pathfinder.graph import KytosGraph |
|
12 | |||
13 | |||
14 | 1 | class Main(KytosNApp): |
|
15 | """ |
||
16 | Main class of kytos/pathfinder NApp. |
||
17 | |||
18 | This class is the entry point for this napp. |
||
19 | """ |
||
20 | |||
21 | 1 | spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml") |
|
22 | |||
23 | 1 | def setup(self): |
|
24 | """Create a graph to handle the nodes and edges.""" |
||
25 | 1 | self.graph = KytosGraph() |
|
26 | 1 | self._topology = None |
|
27 | 1 | self._lock = Lock() |
|
28 | |||
29 | 1 | def execute(self): |
|
30 | """Do nothing.""" |
||
31 | |||
32 | 1 | def shutdown(self): |
|
33 | """Shutdown the napp.""" |
||
34 | |||
35 | 1 | def _filter_paths_le_cost(self, paths, max_cost): |
|
36 | """Filter by paths where the cost is le <= max_cost.""" |
||
37 | 1 | if not max_cost: |
|
38 | 1 | return paths |
|
39 | 1 | return [path for path in paths if path["cost"] <= max_cost] |
|
40 | |||
41 | 1 | def _non_excluded_edges(self, links: list[str]) -> list[tuple[str, str]]: |
|
42 | """Exlude undesired links. It'll return the remaning edges.""" |
||
43 | |||
44 | 1 | endpoints: list[tuple[str, str]] = [] |
|
45 | 1 | if not self._topology: |
|
46 | 1 | return endpoints |
|
47 | 1 | endpoint_ids = self._map_endpoints_from_link_ids(links) |
|
48 | 1 | for edge in self.graph.graph.edges: |
|
49 | 1 | if edge not in endpoint_ids: |
|
50 | 1 | endpoints.append(edge) |
|
51 | 1 | return endpoints |
|
52 | |||
53 | 1 | def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict: |
|
54 | """Map endpoints from link ids.""" |
||
55 | 1 | endpoints = {} |
|
56 | 1 | for link_id in link_ids: |
|
57 | 1 | try: |
|
58 | 1 | link = self._topology.links[link_id] |
|
59 | 1 | endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b |
|
60 | 1 | endpoints[(endpoint_a.id, endpoint_b.id)] = link |
|
61 | except KeyError: |
||
62 | pass |
||
63 | 1 | return endpoints |
|
64 | |||
65 | 1 | def _find_all_link_ids( |
|
66 | self, paths: list[dict], link_ids: list[str] |
||
67 | ) -> Generator[int, None, None]: |
||
68 | """Find indexes of the paths that contain all link ids.""" |
||
69 | endpoints_links = self._map_endpoints_from_link_ids(link_ids) |
||
70 | if not endpoints_links: |
||
71 | return None |
||
72 | endpoint_keys = set(endpoints_links.keys()) |
||
73 | for idx, path in enumerate(paths): |
||
74 | head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set() |
||
75 | for endpoint_a, endpoint_b in zip(head, tail): |
||
76 | if (endpoint_a, endpoint_b) in endpoints_links: |
||
77 | found_endpoints.add((endpoint_a, endpoint_b)) |
||
78 | if (endpoint_b, endpoint_a) in endpoints_links: |
||
79 | found_endpoints.add((endpoint_b, endpoint_a)) |
||
80 | if found_endpoints == endpoint_keys: |
||
81 | yield idx |
||
82 | return None |
||
83 | |||
84 | 1 | @rest("v3/", methods=["POST"]) |
|
85 | 1 | @validate_openapi(spec) |
|
86 | 1 | def shortest_path(self, request: Request) -> JSONResponse: |
|
87 | """Calculate the best path between the source and destination.""" |
||
88 | 1 | data = get_json_or_400(request, self.controller.loop) |
|
89 | 1 | if not isinstance(data, dict): |
|
90 | raise HTTPException(400, detail=f"Invalid body value: {data}") |
||
91 | |||
92 | 1 | undesired = data.get("undesired_links", []) |
|
93 | 1 | spf_attr = data.get("spf_attribute", "hop") |
|
94 | 1 | spf_max_paths = data.get("spf_max_paths", 2) |
|
95 | 1 | spf_max_path_cost = data.get("spf_max_path_cost") |
|
96 | 1 | mandatory_metrics = data.get("mandatory_metrics", {}) |
|
97 | 1 | flexible_metrics = data.get("flexible_metrics", {}) |
|
98 | 1 | minimum_hits = data.get("minimum_flexible_hits") |
|
99 | 1 | log.debug(f"POST v2/ payload data: {data}") |
|
100 | |||
101 | 1 | try: |
|
102 | 1 | with self._lock: |
|
103 | 1 | self._get_latest_topology() |
|
104 | 1 | graph = self.graph.graph |
|
105 | 1 | if undesired: |
|
106 | non_excluded_edges = self._non_excluded_edges(undesired) |
||
107 | graph = graph.edge_subgraph(non_excluded_edges) |
||
108 | |||
109 | 1 | if any([mandatory_metrics, flexible_metrics]): |
|
110 | 1 | paths = self.graph.constrained_k_shortest_paths( |
|
111 | data["source"], |
||
112 | data["destination"], |
||
113 | weight=self.graph.spf_edge_data_cbs[spf_attr], |
||
114 | k=spf_max_paths, |
||
115 | graph=graph, |
||
116 | minimum_hits=minimum_hits, |
||
117 | mandatory_metrics=mandatory_metrics, |
||
118 | flexible_metrics=flexible_metrics, |
||
119 | ) |
||
120 | else: |
||
121 | 1 | paths = self.graph.k_shortest_paths( |
|
122 | data["source"], |
||
123 | data["destination"], |
||
124 | weight=self.graph.spf_edge_data_cbs[spf_attr], |
||
125 | k=spf_max_paths, |
||
126 | graph=graph, |
||
127 | ) |
||
128 | |||
129 | 1 | paths = self.graph.path_cost_builder( |
|
130 | paths, |
||
131 | weight=spf_attr, |
||
132 | ) |
||
133 | 1 | log.debug(f"Found paths: {paths}") |
|
134 | 1 | except TypeError as err: |
|
135 | 1 | raise HTTPException(400, str(err)) |
|
136 | |||
137 | 1 | paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost) |
|
138 | 1 | log.debug(f"Filtered paths: {paths}") |
|
139 | 1 | return JSONResponse({"paths": paths}) |
|
140 | |||
141 | 1 | @listen_to( |
|
142 | "kytos.topology.updated", |
||
143 | "kytos/topology.topology_loaded", |
||
144 | pool="dynamic_single" |
||
145 | ) |
||
146 | 1 | def on_topology_updated(self, event): |
|
147 | """Update the graph when the network topology is updated.""" |
||
148 | self.update_topology(event) |
||
149 | |||
150 | 1 | def update_topology(self, event): |
|
151 | """Update the graph when the network topology is updated.""" |
||
152 | 1 | if "topology" not in event.content: |
|
153 | 1 | return |
|
154 | 1 | topology = event.content["topology"] |
|
155 | 1 | with self._lock: |
|
156 | 1 | self._update_to_topology(topology) |
|
157 | |||
158 | 1 | def _get_latest_topology(self): |
|
159 | """Get the latest topology from the topology napp.""" |
||
160 | 1 | try: |
|
161 | 1 | topology_napp = self.controller.napps[("kytos", "topology")] |
|
162 | 1 | except KeyError: |
|
163 | 1 | log.warning("Failed to get topology napp for forcing topology update.") |
|
164 | 1 | return |
|
165 | 1 | topology = topology_napp.get_latest_topology() |
|
166 | 1 | self._update_to_topology(topology) |
|
167 | |||
168 | 1 | def _update_to_topology( |
|
169 | self, |
||
170 | topology |
||
171 | ): |
||
172 | 1 | if self._topology is topology: |
|
173 | 1 | return |
|
174 | |||
175 | 1 | self._topology = topology |
|
176 | 1 | self.graph.update_topology(topology) |
|
177 | |||
178 | 1 | switches = list(topology.switches.keys()) |
|
179 | 1 | links = list(topology.links.keys()) |
|
180 | log.debug(f"Topology graph updated with switches: {switches}, links: {links}.") |
||
181 |