Test Failed
Pull Request — master (#45)
by Vinicius
06:06
created

build.main.Main.setup()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
ccs 5
cts 5
cp 1
crap 1
1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
from threading import Lock
4 1
from typing import Generator
5
6 1
from kytos.core import KytosEvent, KytosNApp, log, rest
7 1
from kytos.core.helpers import listen_to
8 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
9 1
                                 get_json_or_400)
10
from napps.kytos.pathfinder.graph import KytosGraph
11 1
12
13
class Main(KytosNApp):
14 1
    """
15
    Main class of kytos/pathfinder NApp.
16
17
    This class is the entry point for this napp.
18
    """
19
20
    def setup(self):
21 1
        """Create a graph to handle the nodes and edges."""
22
        self.graph = KytosGraph()
23 1
        self._topology = None
24 1
        self._lock = Lock()
25 1
        self._topology_updated_at = None
26 1
        self._links_updated_at = {}
27 1
28
    def execute(self):
29 1
        """Do nothing."""
30
31
    def shutdown(self):
32 1
        """Shutdown the napp."""
33
34
    def _filter_paths_le_cost(self, paths, max_cost):
35 1
        """Filter by paths where the cost is le <= max_cost."""
36
        if not max_cost:
37 1
            return paths
38 1
        return [path for path in paths if path["cost"] <= max_cost]
39 1
40
    def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict:
41 1
        """Map endpoints from link ids."""
42
        endpoints = {}
43 1
        for link_id in link_ids:
44 1
            try:
45 1
                link = self._topology.links[link_id]
46 1
                endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b
47 1
                endpoints[(endpoint_a.id, endpoint_b.id)] = link
48 1
            except KeyError:
49 1
                pass
50 1
        return endpoints
51 1
52
    def _find_all_link_ids(
53 1
        self, paths: list[dict], link_ids: list[str]
54
    ) -> Generator[int, None, None]:
55
        """Find indexes of the paths that contain all link ids."""
56
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
57 1
        if not endpoints_links:
58 1
            return None
59 1
        endpoint_keys = set(endpoints_links.keys())
60 1
        for idx, path in enumerate(paths):
61 1
            head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set()
62 1
            for endpoint_a, endpoint_b in zip(head, tail):
63 1
                if (endpoint_a, endpoint_b) in endpoints_links:
64 1
                    found_endpoints.add((endpoint_a, endpoint_b))
65 1
                if (endpoint_b, endpoint_a) in endpoints_links:
66 1
                    found_endpoints.add((endpoint_b, endpoint_a))
67
            if found_endpoints == endpoint_keys:
68 1
                yield idx
69 1
        return None
70 1
71
    def _find_any_link_ids(
72 1
        self, paths: list[dict], link_ids: list[str]
73
    ) -> Generator[int, None, None]:
74
        """Find indexes of the paths that contain any of the link ids."""
75
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
76 1
        if not endpoints_links:
77 1
            return None
78 1
        for idx, path in enumerate(paths):
79 1
            head, tail, found = path["hops"][:-1], path["hops"][1:], False
80 1
            for endpoint_a, endpoint_b in zip(head, tail):
81 1
                if any(
82 1
                    (
83
                        (endpoint_a, endpoint_b) in endpoints_links,
84
                        (endpoint_b, endpoint_a) in endpoints_links,
85
                    )
86
                ):
87
                    found = True
88 1
                    break
89 1
            if found:
90 1
                yield idx
91 1
        return None
92 1
93
    def _filter_paths_undesired_links(
94 1
        self, paths: list[dict], undesired: list[str]
95
    ) -> list[dict]:
96
        """Filter by undesired_links, it performs a logical OR."""
97
        if not undesired:
98 1
            return paths
99 1
        excluded_indexes = set(self._find_any_link_ids(paths, undesired))
100 1
        return [path for idx, path in enumerate(paths) if idx not in excluded_indexes]
101 1
102
    def _filter_paths_desired_links(
103 1
        self, paths: list[dict], desired: list[str]
104
    ) -> list[dict]:
105
        """Filter by desired_links, it performs a logical AND."""
106
        if not desired:
107 1
            return paths
108 1
        included_indexes = set(self._find_all_link_ids(paths, desired))
109 1
        return [path for idx, path in enumerate(paths) if idx in included_indexes]
110 1
111
    def _validate_payload(self, data):
112 1
        """Validate shortest_path v2/ POST endpoint."""
113
        if "source" not in data or not isinstance(data["source"], str):
114 1
            raise HTTPException(
115 1
                400,
116
                detail=f"'source' is mandatory, got {data.get('source')}"
117
            )
118
        if "destination" not in data or not isinstance(data["destination"], str):
119
            raise HTTPException(
120
                400,
121 1
                detail=f"'destination' is mandatory, got {data.get('destination')}"
122
            )
123
        if data.get("desired_links"):
124
            if not isinstance(data["desired_links"], list):
125
                raise HTTPException(
126
                    400,
127
                    f"TypeError: desired_links is supposed to be a list."
128 1
                    f" type: {type(data['desired_links'])}"
129 1
                )
130 1
131 1
        if data.get("undesired_links"):
132 1
            if not isinstance(data["undesired_links"], list):
133
                raise HTTPException(
134 1
                    400,
135
                    f"TypeError: undesired_links is supposed to be a list."
136
                    f" type: {type(data['undesired_links'])}"
137
                )
138
139
        parameter = data.get("parameter")
140 1
        spf_attr = data.get("spf_attribute")
141 1
        if not spf_attr:
142
            spf_attr = parameter or "hop"
143
        data["spf_attribute"] = spf_attr
144
145
        if spf_attr not in self.graph.spf_edge_data_cbs:
146
            raise HTTPException(
147 1
                400,
148 1
                "Invalid 'spf_attribute'. Valid values: "
149
                f"{', '.join(self.graph.spf_edge_data_cbs.keys())}"
150
            )
151
152
        try:
153
            data["spf_max_paths"] = max(int(data.get("spf_max_paths", 2)), 1)
154
        except (TypeError, ValueError):
155
            raise HTTPException(
156
                400,
157
                f"spf_max_paths {data.get('spf_max_pahts')} must be an int"
158 1
            )
159 1
160
        spf_max_path_cost = data.get("spf_max_path_cost")
161 1
        if spf_max_path_cost:
162 1
            try:
163 1
                spf_max_path_cost = max(int(spf_max_path_cost), 1)
164 1
                data["spf_max_path_cost"] = spf_max_path_cost
165
            except (TypeError, ValueError):
166
                raise HTTPException(
167 1
                    400,
168
                    f"spf_max_path_cost {data.get('spf_max_path_cost')} must"
169
                    " be an int"
170
                )
171
172
        data["mandatory_metrics"] = data.get("mandatory_metrics", {})
173 1
        data["flexible_metrics"] = data.get("flexible_metrics", {})
174
175 1
        try:
176 1
            minimum_hits = data.get("minimum_flexible_hits")
177
            if minimum_hits:
178 1
                minimum_hits = min(
179 1
                    len(data["flexible_metrics"]), max(0, int(minimum_hits))
180
                )
181 1
            data["minimum_flexible_hits"] = minimum_hits
182 1
        except (TypeError, ValueError):
183
            raise HTTPException(
184 1
                400,
185 1
                f"minimum_hits {data.get('minimum_flexible_hits')} must be an int"
186 1
            )
187 1
188 1
        return data
189 1
190 1
    @rest("v2/", methods=["POST"])
191
    def shortest_path(self, request: Request) -> JSONResponse:
192 1
        """Calculate the best path between the source and destination."""
193 1
        data = get_json_or_400(request, self.controller.loop)
194 1
        if not isinstance(data, dict):
195 1
            raise HTTPException(400, detail=f"Invalid body value: {data}")
196
        data = self._validate_payload(data)
197
198
        desired = data.get("desired_links")
199
        undesired = data.get("undesired_links")
200
201
        spf_attr = data.get("spf_attribute")
202
        spf_max_paths = data.get("spf_max_paths")
203
        spf_max_path_cost = data.get("spf_max_path_cost")
204
        mandatory_metrics = data.get("mandatory_metrics")
205 1
        flexible_metrics = data.get("flexible_metrics")
206
        minimum_hits = data.get("minimum_flexible_hits")
207
        log.debug(f"POST v2/ payload data: {data}")
208
209
        try:
210
            with self._lock:
211
                if any([mandatory_metrics, flexible_metrics]):
212 1
                    paths = self.graph.constrained_k_shortest_paths(
213
                        data["source"],
214
                        data["destination"],
215
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
216 1
                        k=spf_max_paths,
217 1
                        minimum_hits=minimum_hits,
218 1
                        mandatory_metrics=mandatory_metrics,
219
                        flexible_metrics=flexible_metrics,
220 1
                    )
221 1
                else:
222 1
                    paths = self.graph.k_shortest_paths(
223 1
                        data["source"],
224 1
                        data["destination"],
225
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
226 1
                        k=spf_max_paths,
227
                    )
228
229
                paths = self.graph.path_cost_builder(
230
                    paths,
231 1
                    weight=spf_attr,
232
                )
233
            log.debug(f"Found paths: {paths}")
234
        except TypeError as err:
235 1
            raise HTTPException(400, str(err))
236
237 1
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
238 1
        paths = self._filter_paths_undesired_links(paths, undesired)
239 1
        paths = self._filter_paths_desired_links(paths, desired)
240 1
        log.debug(f"Filtered paths: {paths}")
241 1
        return JSONResponse({"paths": paths})
242
243
    @listen_to(
244
        "kytos.topology.updated",
245 1
        "kytos/topology.current",
246 1
        "kytos/topology.topology_loaded",
247 1
    )
248 1
    def on_topology_updated(self, event):
249 1
        """Update the graph when the network topology is updated."""
250 1
        self.update_topology(event)
251 1
252
    def update_topology(self, event):
253 1
        """Update the graph when the network topology is updated."""
254
        if "topology" not in event.content:
255 1
            return
256 1
        topology = event.content["topology"]
257 1
        with self._lock:
258 1
            if (
259
                self._topology_updated_at
260
                and self._topology_updated_at > event.timestamp
261
            ):
262 1
                return
263 1
            self._topology = topology
264 1
            self._topology_updated_at = event.timestamp
265 1
            self.graph.update_topology(topology)
266 1
        switches = list(topology.switches.keys())
267 1
        links = list(topology.links.keys())
268 1
        log.debug(f"Topology graph updated with switches: {switches}, links: {links}.")
269
270
    def update_links_metadata_changed(self, event) -> None:
271
        """Update the graph when links' metadata are added or removed."""
272 1
        link = event.content["link"]
273
        try:
274 1
            with self._lock:
275 1
                if (
276
                    link.id in self._links_updated_at
277
                    and self._links_updated_at[link.id] > event.timestamp
278
                ):
279
                    return
280
                self.graph.update_link_metadata(link)
281
                self._links_updated_at[link.id] = event.timestamp
282
            metadata = event.content["metadata"]
283
            log.debug(f"Topology graph updated link id: {link.id} metadata: {metadata}")
284
        except KeyError as exc:
285
            log.warning(
286
                f"Unexpected KeyError {str(exc)} on event {event}."
287
                " pathfinder will reconciliate the topology"
288
            )
289
            self.controller.buffers.app.put(KytosEvent(name="kytos/topology.get"))
290
291
    @listen_to("kytos/topology.links.metadata.(added|removed)")
292
    def on_links_metadata_changed(self, event):
293
        """Update the graph when links' metadata are added or removed."""
294
        self.update_links_metadata_changed(event)
295