Test Failed
Pull Request — master (#74)
by
unknown
02:02
created

build.main.Main.shortest_path()   B

Complexity

Conditions 6

Size

Total Lines 55
Code Lines 45

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 45
nop 2
dl 0
loc 55
rs 7.8666
c 0
b 0
f 0
ccs 10
cts 10
cp 1
crap 6

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
import pathlib
4 1
from threading import Lock
5 1
from typing import Generator
6
7
from kytos.core import KytosEvent, KytosNApp, log, rest
8 1
from kytos.core.helpers import listen_to, load_spec, validate_openapi
9
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
10
                                 get_json_or_400)
11
from napps.kytos.pathfinder.graph import KytosGraph
12
13 1
14
class Main(KytosNApp):
15
    """
16
    Main class of kytos/pathfinder NApp.
17
18
    This class is the entry point for this napp.
19 1
    """
20
21 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
22 1
23
    def setup(self):
24 1
        """Create a graph to handle the nodes and edges."""
25
        self.graph = KytosGraph()
26
        self._topology = None
27 1
        self._lock = Lock()
28
        self._topology_updated_at = None
29
        self._links_updated_at = {}
30 1
31
    def execute(self):
32
        """Do nothing."""
33
34
    def shutdown(self):
35
        """Shutdown the napp."""
36 1
37
    def _filter_paths_le_cost(self, paths, max_cost):
38 1
        """Filter by paths where the cost is le <= max_cost."""
39 1
        if not max_cost:
40 1
            return paths
41 1
        return [path for path in paths if path["cost"] <= max_cost]
42 1
43
    def _non_excluded_edges(self, links: list[str]) -> list[tuple[str, str]]:
44
        """Exlude undesired links. It'll return the remaning edges."""
45
46 1
        endpoints: list[tuple[str, str]] = []
47 1
        if not self._topology:
48 1
            return endpoints
49 1
        endpoint_ids = self._map_endpoints_from_link_ids(links)
50
        for edge in self.graph.graph.edges:
51 1
            if edge not in endpoint_ids:
52
                endpoints.append(edge)
53 1
        return endpoints
54
55 1
    def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict:
56 1
        """Map endpoints from link ids."""
57 1
        endpoints = {}
58 1
        for link_id in link_ids:
59 1
            try:
60
                link = self._topology.links[link_id]
61
                endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b
62
                endpoints[(endpoint_a.id, endpoint_b.id)] = link
63 1
            except KeyError:
64 1
                pass
65 1
        return endpoints
66 1
67
    def _find_all_link_ids(
68
        self, paths: list[dict], link_ids: list[str]
69 1
    ) -> Generator[int, None, None]:
70
        """Find indexes of the paths that contain all link ids."""
71 1
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
72
        if not endpoints_links:
73 1
            return None
74
        endpoint_keys = set(endpoints_links.keys())
75
        for idx, path in enumerate(paths):
76 1
            head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set()
77
            for endpoint_a, endpoint_b in zip(head, tail):
78 1
                if (endpoint_a, endpoint_b) in endpoints_links:
79 1
                    found_endpoints.add((endpoint_a, endpoint_b))
80 1
                if (endpoint_b, endpoint_a) in endpoints_links:
81
                    found_endpoints.add((endpoint_b, endpoint_a))
82 1
            if found_endpoints == endpoint_keys:
83 1
                yield idx
84
        return None
85
86
    @rest("v3/", methods=["POST"])
87 1
    @validate_openapi(spec)
88
    def shortest_path(self, request: Request) -> JSONResponse:
89 1
        """Calculate the best path between the source and destination."""
90 1
        data = get_json_or_400(request, self.controller.loop)
91
        if not isinstance(data, dict):
92 1
            raise HTTPException(400, detail=f"Invalid body value: {data}")
93
94
        undesired = data.get("undesired_links", [])
95
        spf_attr = data.get("spf_attribute", "hop")
96
        spf_max_paths = data.get("spf_max_paths", 2)
97
        spf_max_path_cost = data.get("spf_max_path_cost")
98 1
        mandatory_metrics = data.get("mandatory_metrics", {})
99 1
        flexible_metrics = data.get("flexible_metrics", {})
100 1
        minimum_hits = data.get("minimum_flexible_hits")
101 1
        log.debug(f"POST v2/ payload data: {data}")
102 1
103 1
        try:
104
            with self._lock:
105
                graph = self.graph.graph
106
                if undesired:
107
                    non_excluded_edges = self._non_excluded_edges(undesired)
108
                    graph = graph.edge_subgraph(non_excluded_edges)
109
110
                if any([mandatory_metrics, flexible_metrics]):
111
                    paths = self.graph.constrained_k_shortest_paths(
112
                        data["source"],
113
                        data["destination"],
114
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
115
                        k=spf_max_paths,
116
                        graph=graph,
117
                        minimum_hits=minimum_hits,
118
                        mandatory_metrics=mandatory_metrics,
119
                        flexible_metrics=flexible_metrics,
120
                    )
121
                else:
122
                    paths = self.graph.k_shortest_paths(
123
                        data["source"],
124
                        data["destination"],
125
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
126
                        k=spf_max_paths,
127
                        graph=graph,
128
                    )
129
130
                paths = self.graph.path_cost_builder(
131
                    paths,
132
                    weight=spf_attr,
133
                )
134
            log.debug(f"Found paths: {paths}")
135
        except TypeError as err:
136
            raise HTTPException(400, str(err))
137
138
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
139
        log.debug(f"Filtered paths: {paths}")
140
        return JSONResponse({"paths": paths})
141
142
    @listen_to(
143
        "kytos.topology.updated",
144
        "kytos/topology.current",
145
        "kytos/topology.topology_loaded",
146
    )
147
    def on_topology_updated(self, event):
148
        """Update the graph when the network topology is updated."""
149
        self.update_topology(event)
150
151
    def update_topology(self, event):
152
        """Update the graph when the network topology is updated."""
153
        if "topology" not in event.content:
154
            return
155
        topology = event.content["topology"]
156
        with self._lock:
157
            if (
158
                self._topology_updated_at
159
                and self._topology_updated_at > event.timestamp
160
            ):
161
                return
162
            self._topology = topology
163
            self._topology_updated_at = event.timestamp
164
            self.graph.update_topology(topology)
165
        switches = list(topology.switches.keys())
166
        links = list(topology.links.keys())
167
        log.debug(f"Topology graph updated with switches: {switches}, links: {links}.")
168
169
    def update_links_metadata_changed(self, event) -> None:
170
        """Update the graph when links' metadata are added or removed."""
171
        link = event.content["link"]
172
        try:
173
            with self._lock:
174
                if (
175
                    link.id in self._links_updated_at
176
                    and self._links_updated_at[link.id] > event.timestamp
177
                ):
178
                    return
179
                self.graph.update_link_metadata(link)
180
                self._links_updated_at[link.id] = event.timestamp
181
            metadata = event.content["metadata"]
182
            log.debug(f"Topology graph updated link id: {link.id} metadata: {metadata}")
183
        except KeyError as exc:
184
            log.warning(
185
                f"Unexpected KeyError {str(exc)} on event {event}."
186
                " pathfinder will reconciliate the topology"
187
            )
188
            self.controller.buffers.app.put(KytosEvent(name="kytos/topology.get"))
189
190
    @listen_to("kytos/topology.links.metadata.(added|removed)")
191
    def on_links_metadata_changed(self, event):
192
        """Update the graph when links' metadata are added or removed."""
193
        self.update_links_metadata_changed(event)
194