build.main   A
last analyzed

Complexity

Total Complexity 33

Size/Duplication

Total Lines 181
Duplicated Lines 0 %

Test Coverage

Coverage 81.65%

Importance

Changes 0
Metric Value
eloc 133
dl 0
loc 181
ccs 89
cts 109
cp 0.8165
rs 9.76
c 0
b 0
f 0
wmc 33

12 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.shutdown() 0 2 1
A Main.execute() 0 2 1
A Main._filter_paths_le_cost() 0 5 2
B Main._find_all_link_ids() 0 18 7
A Main._non_excluded_edges() 0 11 4
A Main._map_endpoints_from_link_ids() 0 11 3
A Main.setup() 0 5 1
B Main.shortest_path() 0 56 6
A Main.on_topology_updated() 0 8 1
A Main._update_to_topology() 0 13 2
A Main.update_topology() 0 7 3
A Main._get_latest_topology() 0 9 2
1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
import pathlib
4 1
from threading import Lock
5 1
from typing import Generator
6
7 1
from kytos.core import KytosNApp, log, rest
8 1
from kytos.core.helpers import listen_to, load_spec, validate_openapi
9 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
10
                                 get_json_or_400)
11 1
from napps.kytos.pathfinder.graph import KytosGraph
12
13
14 1
class Main(KytosNApp):
15
    """
16
    Main class of kytos/pathfinder NApp.
17
18
    This class is the entry point for this napp.
19
    """
20
21 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
22
23 1
    def setup(self):
24
        """Create a graph to handle the nodes and edges."""
25 1
        self.graph = KytosGraph()
26 1
        self._topology = None
27 1
        self._lock = Lock()
28
29 1
    def execute(self):
30
        """Do nothing."""
31
32 1
    def shutdown(self):
33
        """Shutdown the napp."""
34
35 1
    def _filter_paths_le_cost(self, paths, max_cost):
36
        """Filter by paths where the cost is le <= max_cost."""
37 1
        if not max_cost:
38 1
            return paths
39 1
        return [path for path in paths if path["cost"] <= max_cost]
40
41 1
    def _non_excluded_edges(self, links: list[str]) -> list[tuple[str, str]]:
42
        """Exlude undesired links. It'll return the remaning edges."""
43
44 1
        endpoints: list[tuple[str, str]] = []
45 1
        if not self._topology:
46 1
            return endpoints
47 1
        endpoint_ids = self._map_endpoints_from_link_ids(links)
48 1
        for edge in self.graph.graph.edges:
49 1
            if edge not in endpoint_ids:
50 1
                endpoints.append(edge)
51 1
        return endpoints
52
53 1
    def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict:
54
        """Map endpoints from link ids."""
55 1
        endpoints = {}
56 1
        for link_id in link_ids:
57 1
            try:
58 1
                link = self._topology.links[link_id]
59 1
                endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b
60 1
                endpoints[(endpoint_a.id, endpoint_b.id)] = link
61
            except KeyError:
62
                pass
63 1
        return endpoints
64
65 1
    def _find_all_link_ids(
66
        self, paths: list[dict], link_ids: list[str]
67
    ) -> Generator[int, None, None]:
68
        """Find indexes of the paths that contain all link ids."""
69
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
70
        if not endpoints_links:
71
            return None
72
        endpoint_keys = set(endpoints_links.keys())
73
        for idx, path in enumerate(paths):
74
            head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set()
75
            for endpoint_a, endpoint_b in zip(head, tail):
76
                if (endpoint_a, endpoint_b) in endpoints_links:
77
                    found_endpoints.add((endpoint_a, endpoint_b))
78
                if (endpoint_b, endpoint_a) in endpoints_links:
79
                    found_endpoints.add((endpoint_b, endpoint_a))
80
            if found_endpoints == endpoint_keys:
81
                yield idx
82
        return None
83
84 1
    @rest("v3/", methods=["POST"])
85 1
    @validate_openapi(spec)
86 1
    def shortest_path(self, request: Request) -> JSONResponse:
87
        """Calculate the best path between the source and destination."""
88 1
        data = get_json_or_400(request, self.controller.loop)
89 1
        if not isinstance(data, dict):
90
            raise HTTPException(400, detail=f"Invalid body value: {data}")
91
92 1
        undesired = data.get("undesired_links", [])
93 1
        spf_attr = data.get("spf_attribute", "hop")
94 1
        spf_max_paths = data.get("spf_max_paths", 2)
95 1
        spf_max_path_cost = data.get("spf_max_path_cost")
96 1
        mandatory_metrics = data.get("mandatory_metrics", {})
97 1
        flexible_metrics = data.get("flexible_metrics", {})
98 1
        minimum_hits = data.get("minimum_flexible_hits")
99 1
        log.debug(f"POST v2/ payload data: {data}")
100
101 1
        try:
102 1
            with self._lock:
103 1
                self._get_latest_topology()
104 1
                graph = self.graph.graph
105 1
                if undesired:
106
                    non_excluded_edges = self._non_excluded_edges(undesired)
107
                    graph = graph.edge_subgraph(non_excluded_edges)
108
109 1
                if any([mandatory_metrics, flexible_metrics]):
110 1
                    paths = self.graph.constrained_k_shortest_paths(
111
                        data["source"],
112
                        data["destination"],
113
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
114
                        k=spf_max_paths,
115
                        graph=graph,
116
                        minimum_hits=minimum_hits,
117
                        mandatory_metrics=mandatory_metrics,
118
                        flexible_metrics=flexible_metrics,
119
                    )
120
                else:
121 1
                    paths = self.graph.k_shortest_paths(
122
                        data["source"],
123
                        data["destination"],
124
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
125
                        k=spf_max_paths,
126
                        graph=graph,
127
                    )
128
129 1
                paths = self.graph.path_cost_builder(
130
                    paths,
131
                    weight=spf_attr,
132
                )
133 1
            log.debug(f"Found paths: {paths}")
134 1
        except TypeError as err:
135 1
            raise HTTPException(400, str(err))
136
137 1
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
138 1
        log.debug(f"Filtered paths: {paths}")
139 1
        return JSONResponse({"paths": paths})
140
141 1
    @listen_to(
142
        "kytos.topology.updated",
143
        "kytos/topology.topology_loaded",
144
        pool="dynamic_single"
145
    )
146 1
    def on_topology_updated(self, event):
147
        """Update the graph when the network topology is updated."""
148
        self.update_topology(event)
149
150 1
    def update_topology(self, event):
151
        """Update the graph when the network topology is updated."""
152 1
        if "topology" not in event.content:
153 1
            return
154 1
        topology = event.content["topology"]
155 1
        with self._lock:
156 1
            self._update_to_topology(topology)
157
158 1
    def _get_latest_topology(self):
159
        """Get the latest topology from the topology napp."""
160 1
        try:
161 1
            topology_napp = self.controller.napps[("kytos", "topology")]
162 1
        except KeyError:
163 1
            log.warning("Failed to get topology napp for forcing topology update.")
164 1
            return
165 1
        topology = topology_napp.get_latest_topology()
166 1
        self._update_to_topology(topology)
167
168 1
    def _update_to_topology(
169
        self,
170
        topology
171
    ):
172 1
        if self._topology is topology:
173 1
            return
174
175 1
        self._topology = topology
176 1
        self.graph.update_topology(topology)
177
178 1
        switches = list(topology.switches.keys())
179 1
        links = list(topology.links.keys())
180
        log.debug(f"Topology graph updated with switches: {switches}, links: {links}.")
181