Passed
Push — master ( d7abc2...9d9212 )
by Vinicius
04:54 queued 13s
created

build.main.Main._find_any_link_ids()   B

Complexity

Conditions 6

Size

Total Lines 21
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 17
nop 3
dl 0
loc 21
ccs 13
cts 13
cp 1
crap 6
rs 8.6166
c 0
b 0
f 0
1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
from threading import Lock
4 1
from typing import Generator
5
6 1
from flask import jsonify, request
7 1
from kytos.core import KytosNApp, log, rest
8 1
from kytos.core.helpers import listen_to
9 1
from napps.kytos.pathfinder.graph import KytosGraph
10
# pylint: disable=import-error
11 1
from werkzeug.exceptions import BadRequest
12
13
14 1
class Main(KytosNApp):
15
    """
16
    Main class of kytos/pathfinder NApp.
17
18
    This class is the entry point for this napp.
19
    """
20
21 1
    def setup(self):
22
        """Create a graph to handle the nodes and edges."""
23 1
        self.graph = KytosGraph()
24 1
        self._topology = None
25 1
        self._lock = Lock()
26
27 1
    def execute(self):
28
        """Do nothing."""
29
30 1
    def shutdown(self):
31
        """Shutdown the napp."""
32
33 1
    def _filter_paths_le_cost(self, paths, max_cost):
34
        """Filter by paths where the cost is le <= max_cost."""
35 1
        if not max_cost:
36 1
            return paths
37 1
        return [path for path in paths if path["cost"] <= max_cost]
38
39 1
    def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict:
40
        """Map endpoints from link ids."""
41 1
        endpoints = {}
42 1
        for link_id in link_ids:
43 1
            try:
44 1
                link = self._topology.links[link_id]
45 1
                endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b
46 1
                endpoints[(endpoint_a.id, endpoint_b.id)] = link
47 1
            except KeyError:
48 1
                pass
49 1
        return endpoints
50
51 1
    def _find_all_link_ids(
52
        self, paths: list[dict], link_ids: list[str]
53
    ) -> Generator[int, None, None]:
54
        """Find indexes of the paths that contain all link ids."""
55 1
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
56 1
        if not endpoints_links:
57 1
            return None
58 1
        endpoint_keys = set(endpoints_links.keys())
59 1
        for idx, path in enumerate(paths):
60 1
            head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set()
61 1
            for endpoint_a, endpoint_b in zip(head, tail):
62 1
                if (endpoint_a, endpoint_b) in endpoints_links:
63 1
                    found_endpoints.add((endpoint_a, endpoint_b))
64 1
                if (endpoint_b, endpoint_a) in endpoints_links:
65
                    found_endpoints.add((endpoint_b, endpoint_a))
66 1
            if found_endpoints == endpoint_keys:
67 1
                yield idx
68 1
        return None
69
70 1
    def _find_any_link_ids(
71
        self, paths: list[dict], link_ids: list[str]
72
    ) -> Generator[int, None, None]:
73
        """Find indexes of the paths that contain any of the link ids."""
74 1
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
75 1
        if not endpoints_links:
76 1
            return None
77 1
        for idx, path in enumerate(paths):
78 1
            head, tail, found = path["hops"][:-1], path["hops"][1:], False
79 1
            for endpoint_a, endpoint_b in zip(head, tail):
80 1
                if any(
81
                    (
82
                        (endpoint_a, endpoint_b) in endpoints_links,
83
                        (endpoint_b, endpoint_a) in endpoints_links,
84
                    )
85
                ):
86 1
                    found = True
87 1
                    break
88 1
            if found:
89 1
                yield idx
90 1
        return None
91
92 1
    def _filter_paths_undesired_links(
93
        self, paths: list[dict], undesired: list[str]
94
    ) -> list[dict]:
95
        """Filter by undesired_links, it performs a logical OR."""
96 1
        if not undesired:
97 1
            return paths
98 1
        excluded_indexes = set(self._find_any_link_ids(paths, undesired))
99 1
        return [path for idx, path in enumerate(paths) if idx not in excluded_indexes]
100
101 1
    def _filter_paths_desired_links(
102
        self, paths: list[dict], desired: list[str]
103
    ) -> list[dict]:
104
        """Filter by desired_links, it performs a logical AND."""
105 1
        if not desired:
106 1
            return paths
107 1
        included_indexes = set(self._find_all_link_ids(paths, desired))
108 1
        return [path for idx, path in enumerate(paths) if idx in included_indexes]
109
110 1
    def _validate_payload(self, data):
111
        """Validate shortest_path v2/ POST endpoint."""
112 1
        if data.get("desired_links"):
113 1
            if not isinstance(data["desired_links"], list):
114
                raise BadRequest(
115
                    f"TypeError: desired_links is supposed to be a list."
116
                    f" type: {type(data['desired_links'])}"
117
                )
118
119 1
        if data.get("undesired_links"):
120
            if not isinstance(data["undesired_links"], list):
121
                raise BadRequest(
122
                    f"TypeError: undesired_links is supposed to be a list."
123
                    f" type: {type(data['undesired_links'])}"
124
                )
125
126 1
        parameter = data.get("parameter")
127 1
        spf_attr = data.get("spf_attribute")
128 1
        if not spf_attr:
129 1
            spf_attr = parameter or "hop"
130 1
        data["spf_attribute"] = spf_attr
131
132 1
        if spf_attr not in self.graph.spf_edge_data_cbs:
133
            raise BadRequest(
134
                "Invalid 'spf_attribute'. Valid values: "
135
                f"{', '.join(self.graph.spf_edge_data_cbs.keys())}"
136
            )
137
138 1
        try:
139 1
            data["spf_max_paths"] = max(int(data.get("spf_max_paths", 2)), 1)
140
        except (TypeError, ValueError):
141
            raise BadRequest(
142
                f"spf_max_paths {data.get('spf_max_pahts')} must be an int"
143
            )
144
145 1
        spf_max_path_cost = data.get("spf_max_path_cost")
146 1
        if spf_max_path_cost:
147
            try:
148
                spf_max_path_cost = max(int(spf_max_path_cost), 1)
149
                data["spf_max_path_cost"] = spf_max_path_cost
150
            except (TypeError, ValueError):
151
                raise BadRequest(
152
                    f"spf_max_path_cost {data.get('spf_max_path_cost')} must"
153
                    " be an int"
154
                )
155
156 1
        data["mandatory_metrics"] = data.get("mandatory_metrics", {})
157 1
        data["flexible_metrics"] = data.get("flexible_metrics", {})
158
159 1
        try:
160 1
            minimum_hits = data.get("minimum_flexible_hits")
161 1
            if minimum_hits:
162 1
                minimum_hits = min(
163
                    len(data["flexible_metrics"]), max(0, int(minimum_hits))
164
                )
165 1
            data["minimum_flexible_hits"] = minimum_hits
166
        except (TypeError, ValueError):
167
            raise BadRequest(
168
                f"minimum_hits {data.get('minimum_flexible_hits')} must be an int"
169
            )
170
171 1
        return data
172
173 1
    @rest("v2/", methods=["POST"])
174 1
    def shortest_path(self):
175
        """Calculate the best path between the source and destination."""
176 1
        data = request.get_json()
177 1
        data = self._validate_payload(data)
178
179 1
        desired = data.get("desired_links")
180 1
        undesired = data.get("undesired_links")
181
182 1
        spf_attr = data.get("spf_attribute")
183 1
        spf_max_paths = data.get("spf_max_paths")
184 1
        spf_max_path_cost = data.get("spf_max_path_cost")
185 1
        mandatory_metrics = data.get("mandatory_metrics")
186 1
        flexible_metrics = data.get("flexible_metrics")
187 1
        minimum_hits = data.get("minimum_flexible_hits")
188 1
        log.debug(f"POST v2/ payload data: {data}")
189
190 1
        try:
191 1
            with self._lock:
192 1
                if any([mandatory_metrics, flexible_metrics]):
193 1
                    paths = self.graph.constrained_k_shortest_paths(
194
                        data["source"],
195
                        data["destination"],
196
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
197
                        k=spf_max_paths,
198
                        minimum_hits=minimum_hits,
199
                        mandatory_metrics=mandatory_metrics,
200
                        flexible_metrics=flexible_metrics,
201
                    )
202
                else:
203 1
                    paths = self.graph.k_shortest_paths(
204
                        data["source"],
205
                        data["destination"],
206
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
207
                        k=spf_max_paths,
208
                    )
209
210 1
                paths = self.graph.path_cost_builder(
211
                    paths,
212
                    weight=spf_attr,
213
                )
214 1
            log.debug(f"Found paths: {paths}")
215 1
        except TypeError as err:
216 1
            raise BadRequest(str(err))
217
218 1
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
219 1
        paths = self._filter_paths_undesired_links(paths, undesired)
220 1
        paths = self._filter_paths_desired_links(paths, desired)
221 1
        log.debug(f"Filtered paths: {paths}")
222 1
        return jsonify({"paths": paths})
223
224 1
    @listen_to("kytos.topology.updated", "kytos/topology.topology_loaded")
225 1
    def on_topology_updated(self, event):
226
        """Update the graph when the network topology is updated."""
227
        self.update_topology(event)
228
229 1
    def update_topology(self, event):
230
        """Update the graph when the network topology is updated."""
231 1
        if "topology" not in event.content:
232 1
            return
233 1
        topology = event.content["topology"]
234 1
        with self._lock:
235 1
            self._topology = topology
236 1
            self.graph.update_topology(topology)
237 1
        log.debug("Topology graph updated.")
238
239 1
    @listen_to("kytos/topology.links.metadata.(added|removed)")
240 1
    def on_links_metadata_changed(self, event):
241
        """Update the graph when links' metadata are added or removed."""
242
        link = event.content["link"]
243
        with self._lock:
244
            self.graph.update_link_metadata(link)
245
        metadata = event.content["metadata"]
246
        log.debug(f"Topology graph updated link id: {link.id} metadata: {metadata}")
247