Passed
Push — master ( c8f996...742f38 )
by Vinicius
02:09 queued 13s
created

build.main.Main.on_links_metadata_changed()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 4.3145

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 1
cts 6
cp 0.1666
crap 4.3145
1
"""Main module of kytos/pathfinder Kytos Network Application."""
2
3 1
from threading import Lock
4
5 1
from flask import jsonify, request
6 1
from kytos.core import KytosNApp, log, rest
7 1
from kytos.core.helpers import listen_to
8 1
from napps.kytos.pathfinder.graph import KytosGraph
9
# pylint: disable=import-error,no-self-use
10 1
from werkzeug.exceptions import BadRequest
11
12
13 1
class Main(KytosNApp):
14
    """
15
    Main class of kytos/pathfinder NApp.
16
17
    This class is the entry point for this napp.
18
    """
19
20 1
    def setup(self):
21
        """Create a graph to handle the nodes and edges."""
22 1
        self.graph = KytosGraph()
23 1
        self._topology = None
24 1
        self._lock = Lock()
25
26 1
    def execute(self):
27
        """Do nothing."""
28
29 1
    def shutdown(self):
30
        """Shutdown the napp."""
31
32 1
    def _filter_paths(self, paths, desired, undesired):
33
        """
34
        Apply filters to the paths list.
35
36
        Make sure that each path in the list has all the desired links and none
37
        of the undesired ones.
38
        """
39 1
        filtered_paths = []
40
41 1 View Code Duplication
        if desired:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
42 1
            for link_id in desired:
43 1
                try:
44 1
                    endpoint_a = self._topology.links[link_id].endpoint_a.id
45 1
                    endpoint_b = self._topology.links[link_id].endpoint_b.id
46
                except KeyError:
47
                    return []
48
49 1
                for path in paths:
50 1
                    head = path["hops"][:-1]
51 1
                    tail = path["hops"][1:]
52 1
                    if ((endpoint_a, endpoint_b) in zip(head, tail)) or (
53
                        (endpoint_b, endpoint_a) in zip(head, tail)
54
                    ):
55 1
                        filtered_paths.append(path)
56
        else:
57 1
            filtered_paths = paths
58
59 1 View Code Duplication
        if undesired:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
60 1
            for link_id in undesired:
61 1
                try:
62 1
                    endpoint_a = self._topology.links[link_id].endpoint_a.id
63 1
                    endpoint_b = self._topology.links[link_id].endpoint_b.id
64
                except KeyError:
65
                    continue
66
67 1
                for path in paths:
68 1
                    head = path["hops"][:-1]
69 1
                    tail = path["hops"][1:]
70 1
                    if ((endpoint_a, endpoint_b) in zip(head, tail)) or (
71
                        (endpoint_b, endpoint_a) in zip(head, tail)
72
                    ):
73
74 1
                        filtered_paths.remove(path)
75
76 1
        return filtered_paths
77
78 1
    def _filter_paths_le_cost(self, paths, max_cost):
79
        """Filter by paths where the cost is le <= max_cost."""
80 1
        if not max_cost:
81 1
            return paths
82 1
        return [path for path in paths if path["cost"] <= max_cost]
83
84 1
    def _validate_payload(self, data):
85
        """Validate shortest_path v2/ POST endpoint."""
86 1
        if data.get("desired_links"):
87 1
            if not isinstance(data["desired_links"], list):
88
                raise BadRequest(
89
                    f"TypeError: desired_links is supposed to be a list."
90
                    f" type: {type(data['desired_links'])}"
91
                )
92
93 1
        if data.get("undesired_links"):
94
            if not isinstance(data["undesired_links"], list):
95
                raise BadRequest(
96
                    f"TypeError: undesired_links is supposed to be a list."
97
                    f" type: {type(data['undesired_links'])}"
98
                )
99
100 1
        parameter = data.get("parameter")
101 1
        spf_attr = data.get("spf_attribute")
102 1
        if not spf_attr:
103 1
            spf_attr = parameter or "hop"
104 1
        data["spf_attribute"] = spf_attr
105
106 1
        if spf_attr not in self.graph.spf_edge_data_cbs:
107
            raise BadRequest(
108
                "Invalid 'spf_attribute'. Valid values: "
109
                f"{', '.join(self.graph.spf_edge_data_cbs.keys())}"
110
            )
111
112 1
        try:
113 1
            data["spf_max_paths"] = max(int(data.get("spf_max_paths", 2)), 1)
114
        except (TypeError, ValueError):
115
            raise BadRequest(
116
                f"spf_max_paths {data.get('spf_max_pahts')} must be an int"
117
            )
118
119 1
        spf_max_path_cost = data.get("spf_max_path_cost")
120 1
        if spf_max_path_cost:
121
            try:
122
                spf_max_path_cost = max(int(spf_max_path_cost), 1)
123
                data["spf_max_path_cost"] = spf_max_path_cost
124
            except (TypeError, ValueError):
125
                raise BadRequest(
126
                    f"spf_max_path_cost {data.get('spf_max_path_cost')} must"
127
                    " be an int"
128
                )
129
130 1
        data["mandatory_metrics"] = data.get("mandatory_metrics", {})
131 1
        data["flexible_metrics"] = data.get("flexible_metrics", {})
132
133 1
        try:
134 1
            minimum_hits = data.get("minimum_flexible_hits")
135 1
            if minimum_hits:
136 1
                minimum_hits = min(
137
                    len(data["flexible_metrics"]), max(0, int(minimum_hits))
138
                )
139 1
            data["minimum_flexible_hits"] = minimum_hits
140
        except (TypeError, ValueError):
141
            raise BadRequest(
142
                f"minimum_hits {data.get('minimum_flexible_hits')} must be an int"
143
            )
144
145 1
        return data
146
147 1
    @rest("v2/", methods=["POST"])
148
    def shortest_path(self):
149
        """Calculate the best path between the source and destination."""
150 1
        data = request.get_json()
151 1
        data = self._validate_payload(data)
152
153 1
        desired = data.get("desired_links")
154 1
        undesired = data.get("undesired_links")
155
156 1
        spf_attr = data.get("spf_attribute")
157 1
        spf_max_paths = data.get("spf_max_paths")
158 1
        spf_max_path_cost = data.get("spf_max_path_cost")
159 1
        mandatory_metrics = data.get("mandatory_metrics")
160 1
        flexible_metrics = data.get("flexible_metrics")
161 1
        minimum_hits = data.get("minimum_flexible_hits")
162 1
        log.debug(f"POST v2/ payload data: {data}")
163
164 1
        try:
165 1
            with self._lock:
166 1
                if any([mandatory_metrics, flexible_metrics]):
167 1
                    paths = self.graph.constrained_k_shortest_paths(
168
                        data["source"],
169
                        data["destination"],
170
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
171
                        k=spf_max_paths,
172
                        minimum_hits=minimum_hits,
173
                        mandatory_metrics=mandatory_metrics,
174
                        flexible_metrics=flexible_metrics,
175
                    )
176
                else:
177 1
                    paths = self.graph.k_shortest_paths(
178
                        data["source"],
179
                        data["destination"],
180
                        weight=self.graph.spf_edge_data_cbs[spf_attr],
181
                        k=spf_max_paths,
182
                    )
183
184 1
                paths = self.graph.path_cost_builder(
185
                    paths,
186
                    weight=spf_attr,
187
                )
188 1
            log.debug(f"Found paths: {paths}")
189 1
        except TypeError as err:
190 1
            raise BadRequest(str(err))
191
192 1
        paths = self._filter_paths(paths, desired, undesired)
193 1
        paths = self._filter_paths_le_cost(paths, max_cost=spf_max_path_cost)
194 1
        log.debug(f"Filtered paths: {paths}")
195 1
        return jsonify({"paths": paths})
196
197 1
    @listen_to("kytos.topology.updated")
198
    def update_topology(self, event):
199
        """Update the graph when the network topology is updated."""
200 1
        if "topology" not in event.content:
201 1
            return
202 1
        topology = event.content["topology"]
203 1
        with self._lock:
204 1
            self._topology = topology
205 1
            self.graph.update_topology(topology)
206 1
        log.debug("Topology graph updated.")
207
208 1
    @listen_to("kytos/topology.links.metadata.(added|removed)")
209
    def on_links_metadata_changed(self, event):
210
        """Update the graph when links' metadata are added or removed."""
211
        link = event.content["link"]
212
        with self._lock:
213
            self.graph.update_link_metadata(link)
214
        metadata = event.content["metadata"]
215
        log.debug(f"Topology graph updated link id: {link.id} metadata: {metadata}")
216