Test Failed
Pull Request — master (#30)
by Vinicius
07:18
created

build.main.Main._find_all_link_ids()   B

Complexity

Conditions 7

Size

Total Lines 18
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 7.1782

Importance

Changes 0
Metric Value
cc 7
eloc 16
nop 3
dl 0
loc 18
rs 8
c 0
b 0
f 0
ccs 11
cts 13
cp 0.8462
crap 7.1782
1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
from typing import Generator
4
from threading import Lock
5 1
6 1
from flask import jsonify, request
7 1
from kytos.core import KytosNApp, log, rest
8 1
from kytos.core.helpers import listen_to
9
from napps.kytos.pathfinder.graph import KytosGraph
10 1
11
# pylint: disable=import-error
12
from werkzeug.exceptions import BadRequest
13 1
14
15
class Main(KytosNApp):
16
    """
17
    Main class of kytos/pathfinder NApp.
18
19
    This class is the entry point for this napp.
20 1
    """
21
22 1
    def setup(self):
23 1
        """Create a graph to handle the nodes and edges."""
24 1
        self.graph = KytosGraph()
25
        self._topology = None
26 1
        self._lock = Lock()
27
28
    def execute(self):
29 1
        """Do nothing."""
30
31
    def shutdown(self):
32 1
        """Shutdown the napp."""
33
34
    def _filter_paths_le_cost(self, paths, max_cost):
35
        """Filter by paths where the cost is le <= max_cost."""
36
        if not max_cost:
37
            return paths
38
        return [path for path in paths if path["cost"] <= max_cost]
39 1
40
    def _map_endpoints_from_link_ids(self, link_ids: list[str]) -> dict:
41 1
        """Map endpoints from link ids."""
42 1
        endpoints = {}
43 1
        for link_id in link_ids:
44 1
            try:
45 1
                link = self._topology.links[link_id]
46
                endpoint_a, endpoint_b = link.endpoint_a, link.endpoint_b
47
                endpoints[(endpoint_a.id, endpoint_b.id)] = link
48
            except KeyError:
49 1
                pass
50 1
        return endpoints
51 1
52 1
    def _find_all_link_ids(
53
        self, paths: list[dict], link_ids: list[str]
54
    ) -> Generator[int, None, None]:
55 1
        """Find indexes of the paths that contain all link ids."""
56
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
57 1
        if not endpoints_links:
58
            return None
59 1
        endpoint_keys = set(endpoints_links.keys())
60 1
        for idx, path in enumerate(paths):
61 1
            head, tail, found_endpoints = path["hops"][:-1], path["hops"][1:], set()
62 1
            for endpoint_a, endpoint_b in zip(head, tail):
63 1
                if (endpoint_a, endpoint_b) in endpoints_links:
64
                    found_endpoints.add((endpoint_a, endpoint_b))
65
                if (endpoint_b, endpoint_a) in endpoints_links:
66
                    found_endpoints.add((endpoint_b, endpoint_a))
67 1
            if found_endpoints == endpoint_keys:
68 1
                yield idx
69 1
        return None
70 1
71
    def _find_any_link_ids(
72
        self, paths: list[dict], link_ids: list[str]
73
    ) -> Generator[int, None, None]:
74 1
        """Find indexes of the paths that contain any of the link ids."""
75
        endpoints_links = self._map_endpoints_from_link_ids(link_ids)
76 1
        if not endpoints_links:
77
            return None
78 1
        for idx, path in enumerate(paths):
79
            head, tail, found = path["hops"][:-1], path["hops"][1:], False
80 1
            for endpoint_a, endpoint_b in zip(head, tail):
81 1
                if any(
82 1
                    (
83
                        (endpoint_a, endpoint_b) in endpoints_links,
84 1
                        (endpoint_b, endpoint_a) in endpoints_links,
85
                    )
86 1
                ):
87 1
                    found = True
88
                    break
89
            if found:
90
                yield idx
91
        return None
92
93 1
    def _filter_paths_undesired_links(
94
        self, paths: list[dict], undesired: list[str]
95
    ) -> list[dict]:
96
        """Filter by undesired_links, it performs a logical OR."""
97
        if not undesired:
98
            return paths
99
        excluded_indexes = set(self._find_any_link_ids(paths, undesired))
100 1
        return [path for idx, path in enumerate(paths) if idx not in excluded_indexes]
101 1
102 1
    def _filter_paths_desired_links(
103 1
        self, paths: list[dict], desired: list[str]
104 1
    ) -> list[dict]:
105
        """Filter by desired_links, it performs a logical AND."""
106 1
        if not desired:
107
            return paths
108
        included_indexes = set(self._find_all_link_ids(paths, desired))
109
        return [path for idx, path in enumerate(paths) if idx in included_indexes]
110
111
    def _validate_payload(self, data):
112 1
        """Validate shortest_path v2/ POST endpoint."""
113 1
        if data.get("desired_links"):
114
            if not isinstance(data["desired_links"], list):
115
                raise BadRequest(
116
                    f"TypeError: desired_links is supposed to be a list."
117
                    f" type: {type(data['desired_links'])}"
118
                )
119 1
120 1
        if data.get("undesired_links"):
121
            if not isinstance(data["undesired_links"], list):
122
                raise BadRequest(
123
                    f"TypeError: undesired_links is supposed to be a list."
124
                    f" type: {type(data['undesired_links'])}"
125
                )
126
127
        parameter = data.get("parameter")
128
        spf_attr = data.get("spf_attribute")
129
        if not spf_attr:
130 1
            spf_attr = parameter or "hop"
131 1
        data["spf_attribute"] = spf_attr
132
133 1
        if spf_attr not in self.graph.spf_edge_data_cbs:
134 1
            raise BadRequest(
135 1
                "Invalid 'spf_attribute'. Valid values: "
136 1
                f"{', '.join(self.graph.spf_edge_data_cbs.keys())}"
137
            )
138
139 1
        try:
140
            data["spf_max_paths"] = max(int(data.get("spf_max_paths", 2)), 1)
141
        except (TypeError, ValueError):
142
            raise BadRequest(
143
                f"spf_max_paths {data.get('spf_max_pahts')} must be an int"
144
            )
145 1
146
        spf_max_path_cost = data.get("spf_max_path_cost")
147 1
        if spf_max_path_cost:
148 1
            try:
149
                spf_max_path_cost = max(int(spf_max_path_cost), 1)
150 1
                data["spf_max_path_cost"] = spf_max_path_cost
151 1
            except (TypeError, ValueError):
152
                raise BadRequest(
153 1
                    f"spf_max_path_cost {data.get('spf_max_path_cost')} must"
154 1
                    " be an int"
155
                )
156 1
157 1
        data["mandatory_metrics"] = data.get("mandatory_metrics", {})
158 1
        data["flexible_metrics"] = data.get("flexible_metrics", {})
159 1
160 1
        try:
161 1
            minimum_hits = data.get("minimum_flexible_hits")
162 1
            if minimum_hits:
163
                minimum_hits = min(
164 1
                    len(data["flexible_metrics"]), max(0, int(minimum_hits))
165 1
                )
166 1
            data["minimum_flexible_hits"] = minimum_hits
167 1
        except (TypeError, ValueError):
168
            raise BadRequest(
169
                f"minimum_hits {data.get('minimum_flexible_hits')} must be an int"
170
            )
171
172
        return data
173
174
    @rest("v2/", methods=["POST"])
175
    def shortest_path(self):
176
        """Calculate the best path between the source and destination."""
177 1
        data = request.get_json()
178
        data = self._validate_payload(data)
179
180
        desired = data.get("desired_links")
181
        undesired = data.get("undesired_links")
182
183
        spf_attr = data.get("spf_attribute")
184 1
        spf_max_paths = data.get("spf_max_paths")
185
        spf_max_path_cost = data.get("spf_max_path_cost")
186
        mandatory_metrics = data.get("mandatory_metrics")
187
        flexible_metrics = data.get("flexible_metrics")
188 1
        minimum_hits = data.get("minimum_flexible_hits")
189 1
        log.debug(f"POST v2/ payload data: {data}")
190 1
191
        try:
192 1
            with self._lock:
193 1
                if any([mandatory_metrics, flexible_metrics]):
194 1
                    paths = self.graph.constrained_k_shortest_paths(
195 1
                        data["source"],
196
                        data["destination"],
197 1
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
198 1
                        k=spf_max_paths,
199
                        minimum_hits=minimum_hits,
200
                        mandatory_metrics=mandatory_metrics,
201
                        flexible_metrics=flexible_metrics,
202 1
                    )
203
                else:
204 1
                    paths = self.graph.k_shortest_paths(
205 1
                        data["source"],
206 1
                        data["destination"],
207 1
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
208 1
                        k=spf_max_paths,
209 1
                    )
210 1
211
                paths = self.graph.path_cost_builder(
212 1
                    paths,
213 1
                    weight=spf_attr,
214
                )
215
            log.debug(f"Found paths: {paths}")
216
        except TypeError as err:
217
            raise BadRequest(str(err))
218
219
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
220
        paths = self._filter_paths_undesired_links(paths, undesired)
221
        paths = self._filter_paths_desired_links(paths, desired)
222
        log.debug(f"Filtered paths: {paths}")
223
        return jsonify({"paths": paths})
224
225
    @listen_to("kytos.topology.updated", "kytos/topology.topology_loaded")
226
    def on_topology_updated(self, event):
227
        """Update the graph when the network topology is updated."""
228
        self.update_topology(event)
229
230
    def update_topology(self, event):
231
        """Update the graph when the network topology is updated."""
232
        if "topology" not in event.content:
233
            return
234
        topology = event.content["topology"]
235
        with self._lock:
236
            self._topology = topology
237
            self.graph.update_topology(topology)
238
        log.debug("Topology graph updated.")
239
240
    @listen_to("kytos/topology.links.metadata.(added|removed)")
241
    def on_links_metadata_changed(self, event):
242
        """Update the graph when links' metadata are added or removed."""
243
        link = event.content["link"]
244
        with self._lock:
245
            self.graph.update_link_metadata(link)
246
        metadata = event.content["metadata"]
247
        log.debug(f"Topology graph updated link id: {link.id} metadata: {metadata}")
248