Passed
Pull Request — master (#5)
by Vinicius
04:05
created

build.graph.KytosGraph.update_link_metadata()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 7
cts 7
cp 1
crap 3
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: disable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5
6 1
from kytos.core import log
7 1
from napps.kytos.pathfinder.utils import (filter_ge, filter_in, filter_le,
8
                                          lazy_filter, nx_edge_data_delay,
9
                                          nx_edge_data_priority,
10
                                          nx_edge_data_weight)
11
12 1
try:
13 1
    import networkx as nx
14 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
15
except ImportError:
16
    PACKAGE = "networkx==2.5.1"
17
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
18
19
20 1
class KytosGraph:
21
    """Class responsible for the graph generation."""
22
23 1
    def __init__(self):
24 1
        self.graph = nx.Graph()
25 1
        self._filter_functions = {
26
            "ownership": lazy_filter(str, filter_in("ownership")),
27
            "bandwidth": lazy_filter((int, float), filter_ge("bandwidth")),
28
            "reliability": lazy_filter((int, float), filter_ge("reliability")),
29
            "priority": lazy_filter((int, float), filter_le("priority")),
30
            "utilization": lazy_filter((int, float), filter_le("utilization")),
31
            "delay": lazy_filter((int, float), filter_le("delay")),
32
        }
33 1
        self.spf_edge_data_cbs = {
34
            "hop": nx_edge_data_weight,
35
            "delay": nx_edge_data_delay,
36
            "priority": nx_edge_data_priority,
37
        }
38
39 1
    def clear(self):
40
        """Remove all nodes and links registered."""
41 1
        self.graph.clear()
42
43 1
    def update_topology(self, topology):
44
        """Update all nodes and links inside the graph."""
45 1
        self.graph.clear()
46 1
        self.update_nodes(topology.switches)
47 1
        self.update_links(topology.links)
48
49 1
    def update_nodes(self, nodes):
50
        """Update all nodes inside the graph."""
51 1
        for node in nodes.values():
52 1
            try:
53 1
                self.graph.add_node(node.id)
54
55 1
                for interface in node.interfaces.values():
56 1
                    self.graph.add_node(interface.id)
57 1
                    self.graph.add_edge(node.id, interface.id)
58
59 1
            except AttributeError as err:
60 1
                raise TypeError(
61
                    f"Error when updating nodes inside the graph: {str(err)}"
62
                )
63
64 1
    def update_links(self, links):
65
        """Update all links inside the graph."""
66 1
        for link in links.values():
67 1
            if link.is_active():
68 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
69 1
                self.update_link_metadata(link)
70
71 1
    def update_link_metadata(self, link):
72
        """Update link metadata."""
73 1
        for key, value in link.metadata.items():
74 1
            if key not in self._filter_functions:
75 1
                continue
76 1
            endpoint_a = link.endpoint_a.id
77 1
            endpoint_b = link.endpoint_b.id
78 1
            self.graph[endpoint_a][endpoint_b][key] = value
79
80 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
81
        """Return the metadata of a link."""
82 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
83
84 1
    @staticmethod
85
    def _remove_switch_hops(circuit):
86
        """Remove switch hops from a circuit hops list."""
87 1
        for hop in circuit["hops"]:
88 1
            if len(hop.split(":")) == 8:
89 1
                circuit["hops"].remove(hop)
90
91 1
    def _path_cost(self, path, weight="hop", default_cost=1):
92
        """Compute the path cost given an attribute."""
93 1
        cost = 0
94 1
        for node, nbr in nx.utils.pairwise(path):
95 1
            cost += self.graph[node][nbr].get(weight, default_cost)
96 1
        return cost
97
98 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
99
        """Build the cost of a path given a list of paths."""
100 1
        paths_acc = []
101 1
        for path in paths:
102 1
            if isinstance(path, list):
103 1
                paths_acc.append(
104
                    {
105
                        "hops": path,
106
                        "cost": self._path_cost(
107
                            path, weight=weight, default_cost=default_weight
108
                        ),
109
                    }
110
                )
111 1
            elif isinstance(path, dict):
112 1
                path["cost"] = self._path_cost(
113
                    path["hops"], weight=weight, default_cost=default_weight
114
                )
115 1
                paths_acc.append(path)
116
            else:
117
                raise TypeError(
118
                    f"type: '{type(path)}' must be be either list or dict. "
119
                    f"path: {path}"
120
                )
121 1
        return paths_acc
122
123 1
    def k_shortest_paths(
124
        self, source, destination, weight=None, k=1, graph=None
125
    ):
126
        """
127
        Compute up to k shortest paths and return them.
128
129
        This procedure is based on algorithm by Jin Y. Yen [1].
130
        Since Yen's algorithm calls Dijkstra's up to k times, the time
131
        complexity will be proportional to K * Dijkstra's, average
132
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
133
        number of vertices and E number of egdes.
134
135
        References
136
        ----------
137
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
138
           Network", Management Science, Vol. 17, No. 11, Theory Series
139
           (Jul., 1971), pp. 712-716.
140
        """
141 1
        try:
142 1
            return list(
143
                islice(
144
                    nx.shortest_simple_paths(
145
                        graph or self.graph,
146
                        source,
147
                        destination,
148
                        weight=weight,
149
                    ),
150
                    k,
151
                )
152
            )
153 1
        except (NodeNotFound, NetworkXNoPath):
154 1
            return []
155
156 1
    def constrained_k_shortest_paths(
157
        self,
158
        source,
159
        destination,
160
        weight=None,
161
        k=1,
162
        minimum_hits=None,
163
        **metrics,
164
    ):
165
        """Calculate the constrained shortest paths with flexibility."""
166 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
167 1
        flexible_metrics = metrics.get("flexible_metrics", {})
168 1
        first_pass_links = list(
169
            self._filter_links(
170
                self.graph.edges(data=True), **mandatory_metrics
171
            )
172
        )
173 1
        length = len(flexible_metrics)
174 1
        if minimum_hits is None:
175 1
            minimum_hits = 0
176 1
        minimum_hits = min(length, max(0, minimum_hits))
177
178 1
        paths = []
179 1
        for i in range(length, minimum_hits - 1, -1):
180 1
            for combo in combinations(flexible_metrics.items(), i):
181 1
                additional = dict(combo)
182 1
                filtered_links = self._filter_links(
183
                    first_pass_links, **additional
184
                )
185 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
186 1
                for path in self.k_shortest_paths(
187
                    source,
188
                    destination,
189
                    weight=weight,
190
                    k=k,
191
                    graph=self.graph.edge_subgraph(filtered_links),
192
                ):
193 1
                    paths.append(
194
                        {
195
                            "hops": path,
196
                            "metrics": {**mandatory_metrics, **additional},
197
                        }
198
                    )
199 1
                if len(paths) == k:
200 1
                    return paths
201 1
            if paths:
202 1
                return paths
203 1
        return paths
204
205 1
    def _filter_links(self, links, **metrics):
206 1
        for metric, value in metrics.items():
207 1
            filter_func = self._filter_functions.get(metric, None)
208 1
            if filter_func is not None:
209 1
                try:
210 1
                    links = filter_func(value, links)
211 1
                except TypeError as err:
212 1
                    raise TypeError(
213
                        f"Error in {metric} value: {value} err: {err}"
214
                    )
215
        return links
216