Passed
Push — master ( bdb3e4...e1fc2e )
by Aldo
03:14
created

build.main.Main._update_to_topology()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 10
nop 2
dl 0
loc 13
ccs 8
cts 8
cp 1
crap 2
rs 9.9
c 0
b 0
f 0
1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
import pathlib
4 1
from threading import Lock
5 1
from typing import Generator
6
7 1
from kytos.core import KytosNApp, log, rest
8 1
from kytos.core.helpers import load_spec, validate_openapi
9 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
10
                                 get_json_or_400)
11 1
from kytos.core.retry import before_sleep
12 1
from napps.kytos.pathfinder.graph import KytosGraph
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_fixed)
15
16 1
from .exceptions import LinkNotFound
17
18
19 1
class Main(KytosNApp):
20
    """
21
    Main class of kytos/pathfinder NApp.
22
23
    This class is the entry point for this napp.
24
    """
25
26 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
27
28 1
    def setup(self):
29
        """Create a graph to handle the nodes and edges."""
30 1
        self.graph = KytosGraph()
31 1
        self._topology = None
32 1
        self._lock = Lock()
33
34 1
    def execute(self):
35
        """Do nothing."""
36
37 1
    def shutdown(self):
38
        """Shutdown the napp."""
39
40 1
    def _filter_paths_le_cost(self, paths, max_cost):
41
        """Filter by paths where the cost is le <= max_cost."""
42 1
        if not max_cost:
43 1
            return paths
44 1
        return [path for path in paths if path["cost"] <= max_cost]
45
46 1
    def _non_excluded_edges(self, links: list[str]) -> list[tuple[str, str]]:
47
        """Exlude undesired links. It'll return the remaning edges."""
48
49 1
        endpoints: list[tuple[str, str]] = []
50 1
        if not self._topology:
51 1
            return endpoints
52 1
        endpoint_ids = self._map_endpoints_from_link_ids(links)
53 1
        for edge in self.graph.graph.edges:
54 1
            if edge not in endpoint_ids:
55 1
                endpoints.append(edge)
56 1
        return endpoints
57
58 1
    def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict:
59
        """Map endpoints from link ids."""
60 1
        endpoints = {}
61 1
        for link_id in link_ids:
62 1
            try:
63 1
                link = self._topology.links[link_id]
64 1
                endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b
65 1
                endpoints[(endpoint_a.id, endpoint_b.id)] = link
66
            except KeyError:
67
                pass
68 1
        return endpoints
69
70 1
    def _find_all_link_ids(
71
        self, paths: list[dict], link_ids: list[str]
72
    ) -> Generator[int, None, None]:
73
        """Find indexes of the paths that contain all link ids."""
74
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
75
        if not endpoints_links:
76
            return None
77
        endpoint_keys = set(endpoints_links.keys())
78
        for idx, path in enumerate(paths):
79
            head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set()
80
            for endpoint_a, endpoint_b in zip(head, tail):
81
                if (endpoint_a, endpoint_b) in endpoints_links:
82
                    found_endpoints.add((endpoint_a, endpoint_b))
83
                if (endpoint_b, endpoint_a) in endpoints_links:
84
                    found_endpoints.add((endpoint_b, endpoint_a))
85
            if found_endpoints == endpoint_keys:
86
                yield idx
87
        return None
88
89 1
    @rest("v3/", methods=["POST"])
90 1
    @validate_openapi(spec)
91 1
    def shortest_path(self, request: Request) -> JSONResponse:
92
        """Calculate the best path between the source and destination."""
93 1
        data = get_json_or_400(request, self.controller.loop)
94 1
        if not isinstance(data, dict):
95
            raise HTTPException(400, detail=f"Invalid body value: {data}")
96
97 1
        undesired = data.get("undesired_links", [])
98 1
        spf_attr = data.get("spf_attribute", "hop")
99 1
        spf_max_paths = data.get("spf_max_paths", 2)
100 1
        spf_max_path_cost = data.get("spf_max_path_cost")
101 1
        mandatory_metrics = data.get("mandatory_metrics", {})
102 1
        flexible_metrics = data.get("flexible_metrics", {})
103 1
        minimum_hits = data.get("minimum_flexible_hits")
104 1
        log.debug(f"POST v2/ payload data: {data}")
105
106 1
        try:
107 1
            with self._lock:
108 1
                self._get_latest_topology()
109 1
                graph = self.graph.graph
110 1
                if undesired:
111
                    non_excluded_edges = self._non_excluded_edges(undesired)
112
                    graph = graph.edge_subgraph(non_excluded_edges)
113
114 1
                if any([mandatory_metrics, flexible_metrics]):
115 1
                    paths = self.graph.constrained_k_shortest_paths(
116
                        data["source"],
117
                        data["destination"],
118
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
119
                        k=spf_max_paths,
120
                        graph=graph,
121
                        minimum_hits=minimum_hits,
122
                        mandatory_metrics=mandatory_metrics,
123
                        flexible_metrics=flexible_metrics,
124
                    )
125
                else:
126 1
                    paths = self.graph.k_shortest_paths(
127
                        data["source"],
128
                        data["destination"],
129
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
130
                        k=spf_max_paths,
131
                        graph=graph,
132
                    )
133
134 1
                paths = self.graph.path_cost_builder(
135
                    paths,
136
                    weight=spf_attr,
137
                )
138 1
            log.debug(f"Found paths: {paths}")
139 1
        except TypeError as err:
140 1
            raise HTTPException(400, str(err))
141 1
        except LinkNotFound as err:
142 1
            log.error(f"Link {err.link} not found in pathfinder graph.")
143 1
            raise HTTPException(409, str(err))
144
145 1
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
146 1
        log.debug(f"Filtered paths: {paths}")
147 1
        return JSONResponse({"paths": paths})
148
149 1
    @retry(
150
        stop=stop_after_attempt(3),
151
        wait=wait_fixed(1),
152
        retry=retry_if_exception_type(LinkNotFound),
153
        before_sleep=before_sleep,
154
        reraise=True,
155
    )
156 1
    def _get_latest_topology(self):
157
        """Get the latest topology from the topology napp."""
158 1
        try:
159 1
            topology_napp = self.controller.napps[("kytos", "topology")]
160 1
        except KeyError:
161 1
            log.warning("Failed to get topology napp for forcing topology update.")
162 1
            return
163 1
        topology = topology_napp.get_latest_topology()
164 1
        self._update_to_topology(topology)
165
166 1
    def _update_to_topology(
167
        self,
168
        topology
169
    ):
170 1
        if self._topology is topology:
171 1
            return
172
173 1
        self.graph.update_topology(topology)
174 1
        self._topology = topology
175
176 1
        switches = list(topology.switches.keys())
177 1
        links = list(topology.links.keys())
178
        log.debug(f"Topology graph updated with switches: {switches}, links: {links}.")
179