Test Failed
Pull Request — master (#74)
by
unknown
02:02
created

build.graph.KytosGraph.update_nodes()   B

Complexity

Conditions 6

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 2
dl 0
loc 16
rs 8.6666
c 0
b 0
f 0
ccs 9
cts 9
cp 1
crap 6
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3 1
# pylint: disable=too-many-arguments,too-many-locals
4
from itertools import combinations, islice
5 1
6 1
from kytos.core import log
7 1
from kytos.core.common import EntityStatus
8
from napps.kytos.pathfinder.utils import (filter_ge, filter_in, filter_le,
9
                                          lazy_filter, nx_edge_data_delay,
10
                                          nx_edge_data_priority,
11
                                          nx_edge_data_weight)
12
13 1
try:
14
    import networkx as nx
15
    from networkx.exception import NetworkXNoPath, NodeNotFound
16 1
except ImportError:
17 1
    PACKAGE = "networkx==2.5.1"
18
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
19 1
20
21 1
class KytosGraph:
22
    """Class responsible for the graph generation."""
23 1
24
    def __init__(self):
25 1
        self.graph = nx.Graph()
26 1
        self._filter_functions = {
27 1
            "ownership": lazy_filter(str, filter_in("ownership")),
28
            "bandwidth": lazy_filter((int, float), filter_ge("bandwidth")),
29 1
            "reliability": lazy_filter((int, float), filter_ge("reliability")),
30
            "priority": lazy_filter((int, float), filter_le("priority")),
31 1
            "utilization": lazy_filter((int, float), filter_le("utilization")),
32 1
            "delay": lazy_filter((int, float), filter_le("delay")),
33 1
        }
34
        self.spf_edge_data_cbs = {
35 1
            "hop": nx_edge_data_weight,
36 1
            "delay": nx_edge_data_delay,
37 1
            "priority": nx_edge_data_priority,
38
        }
39
40
    def clear(self):
41
        """Remove all nodes and links registered."""
42 1
        self.graph.clear()
43
44 1
    def update_topology(self, topology):
45 1
        """Update all nodes and links inside the graph."""
46 1
        self.graph.clear()
47 1
        self.update_nodes(topology.switches)
48 1
        self.update_links(topology.links)
49 1
50 1
    def update_nodes(self, nodes):
51 1
        """Update all nodes inside the graph."""
52 1
        for node in nodes.values():
53
            try:
54 1
                if node.status != EntityStatus.UP:
55
                    continue
56 1
                self.graph.add_node(node.id)
57
58
                for interface in node.interfaces.values():
59
                    if interface.status == EntityStatus.UP:
60
                        self.graph.add_node(interface.id)
61
                        self.graph.add_edge(node.id, interface.id)
62 1
63 1
            except AttributeError as err:
64 1
                raise TypeError(
65 1
                    f"Error when updating nodes inside the graph: {str(err)}"
66
                )
67 1
68
    def update_links(self, links):
69
        """Update all links inside the graph."""
70 1
        for link in links.values():
71 1
            if link.status == EntityStatus.UP:
72 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
73
                self.update_link_metadata(link)
74 1
75
    def update_link_metadata(self, link):
76 1
        """Update link metadata."""
77 1
        for key, value in link.metadata.items():
78
            if key not in self._filter_functions:
79
                continue
80
            endpoint_a = link.endpoint_a.id
81
            endpoint_b = link.endpoint_b.id
82 1
            self.graph[endpoint_a][endpoint_b][key] = value
83
84
    def get_link_metadata(self, endpoint_a, endpoint_b):
85
        """Return the metadata of a link."""
86
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
87
88
    @staticmethod
89
    def _remove_switch_hops(circuit):
90
        """Remove switch hops from a circuit hops list."""
91
        for hop in circuit["hops"]:
92
            if len(hop.split(":")) == 8:
93
                circuit["hops"].remove(hop)
94
95
    def _path_cost(self, path, weight="hop", default_cost=1):
96
        """Compute the path cost given an attribute."""
97
        cost = 0
98
        for node, nbr in nx.utils.pairwise(path):
99
            cost += self.graph[node][nbr].get(weight, default_cost)
100
        return cost
101
102
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
103
        """Build the cost of a path given a list of paths."""
104
        paths_acc = []
105
        for path in paths:
106
            if isinstance(path, list):
107
                paths_acc.append(
108
                    {
109
                        "hops": path,
110
                        "cost": self._path_cost(
111
                            path, weight=weight, default_cost=default_weight
112
                        ),
113
                    }
114
                )
115
            elif isinstance(path, dict):
116
                path["cost"] = self._path_cost(
117
                    path["hops"], weight=weight, default_cost=default_weight
118
                )
119
                paths_acc.append(path)
120
            else:
121
                raise TypeError(
122
                    f"type: '{type(path)}' must be be either list or dict. "
123
                    f"path: {path}"
124
                )
125
        return paths_acc
126
127
    def k_shortest_paths(
128
        self, source, destination, weight=None, k=1, graph=None
129
    ):
130
        """
131
        Compute up to k shortest paths and return them.
132
133
        This procedure is based on algorithm by Jin Y. Yen [1].
134
        Since Yen's algorithm calls Dijkstra's up to k times, the time
135
        complexity will be proportional to K * Dijkstra's, average
136
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
137
        number of vertices and E number of egdes.
138
139
        References
140
        ----------
141
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
142
           Network", Management Science, Vol. 17, No. 11, Theory Series
143
           (Jul., 1971), pp. 712-716.
144
        """
145
        try:
146
            return list(
147
                islice(
148
                    nx.shortest_simple_paths(
149
                        graph or self.graph,
150
                        source,
151
                        destination,
152
                        weight=weight,
153
                    ),
154
                    k,
155
                )
156
            )
157
        except (NodeNotFound, NetworkXNoPath):
158
            return []
159
160
    def constrained_k_shortest_paths(
161
        self,
162
        source,
163
        destination,
164
        weight=None,
165
        k=1,
166
        graph=None,
167
        minimum_hits=None,
168
        **metrics,
169
    ):
170
        """Calculate the constrained shortest paths with flexibility."""
171
        graph = graph or self.graph
172
        mandatory_metrics = metrics.get("mandatory_metrics", {})
173
        flexible_metrics = metrics.get("flexible_metrics", {})
174
        first_pass_links = list(
175
            self._filter_links(
176
                graph.edges(data=True), **mandatory_metrics
177
            )
178
        )
179
        length = len(flexible_metrics)
180
        if minimum_hits is None:
181
            minimum_hits = 0
182
        minimum_hits = min(length, max(0, minimum_hits))
183
184
        paths = []
185
        for i in range(length, minimum_hits - 1, -1):
186
            for combo in combinations(flexible_metrics.items(), i):
187
                additional = dict(combo)
188
                filtered_links = self._filter_links(
189
                    first_pass_links, **additional
190
                )
191
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
192
                for path in self.k_shortest_paths(
193
                    source,
194
                    destination,
195
                    weight=weight,
196
                    k=k,
197
                    graph=graph.edge_subgraph(filtered_links),
198
                ):
199
                    paths.append(
200
                        {
201
                            "hops": path,
202
                            "metrics": {**mandatory_metrics, **additional},
203
                        }
204
                    )
205
                if len(paths) == k:
206
                    return paths
207
            if paths:
208
                return paths
209
        return paths
210
211
    def _filter_links(self, links, **metrics):
212
        for metric, value in metrics.items():
213
            filter_func = self._filter_functions.get(metric, None)
214
            if filter_func is not None:
215
                try:
216
                    links = filter_func(value, links)
217
                except TypeError as err:
218
                    raise TypeError(
219
                        f"Error in {metric} value: {value} err: {err}"
220
                    )
221
        return links
222