Passed
Pull Request — master (#65)
by
unknown
06:59 queued 04:09
created

build.graph.graph   B

Complexity

Total Complexity 43

Size/Duplication

Total Lines 267
Duplicated Lines 0 %

Test Coverage

Coverage 96.23%

Importance

Changes 0
Metric Value
eloc 184
dl 0
loc 267
ccs 102
cts 106
cp 0.9623
rs 8.96
c 0
b 0
f 0
wmc 43

13 Methods

Rating   Name   Duplication   Size   Complexity  
A KytosGraph.update_topology() 0 5 1
A KytosGraph.path_cost_builder() 0 24 4
A KytosGraph.update_link_metadata() 0 8 3
A KytosGraph.k_shortest_paths() 0 32 2
A KytosGraph._path_cost() 0 6 2
B KytosGraph.update_nodes() 0 16 6
A KytosGraph._remove_switch_hops() 0 6 3
A KytosGraph.clear() 0 3 1
A KytosGraph.update_links() 0 6 3
B KytosGraph.__init__() 0 58 6
A KytosGraph.get_link_metadata() 0 3 1
B KytosGraph.constrained_k_shortest_paths() 0 50 7
A KytosGraph._filter_links() 0 11 4

How to fix   Complexity   

Complexity

Complex classes like build.graph.graph often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, TypeCheckPreprocessor                                       
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        self._filter_functions = {
36
            "ownership": EdgeFilter(
37
                operator.and_,
38
                lambda edge, val: frozenset(edge[2].get('ownership', {}).keys()) or val,
39
                TypeCheckPreprocessor(
40
                    str,
41
                    lambda val: frozenset(val.split(','))
42
                )
43
            ),
44
            "not_ownership": EdgeFilter(
45
                lambda a, b: not (a & b),
46
                lambda edge, _: frozenset(edge[2].get('ownership', {}).keys()),
47
                TypeCheckPreprocessor(
48
                    str,
49
                    lambda val: frozenset(val.split(','))
50
                )
51
            ),
52
            "bandwidth": EdgeFilter(
53
                operator.ge,
54
                'bandwidth',
55
                TypeCheckPreprocessor((int, float))
56
57
            ),
58
            "reliability": EdgeFilter(
59
                operator.ge,
60
                'reliability',
61
                TypeCheckPreprocessor((int, float))
62
            ),
63
            "priority": EdgeFilter(
64
                operator.le,
65
                'priority',
66
                TypeCheckPreprocessor((int, float))
67
            ),
68
            "utilization": EdgeFilter(
69
                operator.le,
70
                'utilization',
71
                TypeCheckPreprocessor((int, float))
72
            ),
73
            "delay": EdgeFilter(
74
                operator.le,
75
                'delay',
76
                TypeCheckPreprocessor((int, float))
77
            ),
78
        }
79 1
        self.spf_edge_data_cbs = {
80
            "hop": nx_edge_data_weight,
81
            "delay": nx_edge_data_delay,
82
            "priority": nx_edge_data_priority,
83
        }
84
85 1
    def clear(self):
86
        """Remove all nodes and links registered."""
87 1
        self.graph.clear()
88
89 1
    def update_topology(self, topology):
90
        """Update all nodes and links inside the graph."""
91 1
        self.graph.clear()
92 1
        self.update_nodes(topology.switches)
93 1
        self.update_links(topology.links)
94
95 1
    def update_nodes(self, nodes):
96
        """Update all nodes inside the graph."""
97 1
        for node in nodes.values():
98 1
            try:
99 1
                if node.status != EntityStatus.UP:
100 1
                    continue
101 1
                self.graph.add_node(node.id)
102
103 1
                for interface in node.interfaces.values():
104 1
                    if interface.status == EntityStatus.UP:
105 1
                        self.graph.add_node(interface.id)
106 1
                        self.graph.add_edge(node.id, interface.id)
107
108 1
            except AttributeError as err:
109 1
                raise TypeError(
110
                    f"Error when updating nodes inside the graph: {str(err)}"
111
                )
112
113 1
    def update_links(self, links):
114
        """Update all links inside the graph."""
115 1
        for link in links.values():
116 1
            if link.status == EntityStatus.UP:
117 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
118 1
                self.update_link_metadata(link)
119
120 1
    def update_link_metadata(self, link):
121
        """Update link metadata."""
122 1
        for key, value in link.metadata.items():
123 1
            if key not in self._accepted_metadata:
124 1
                continue
125 1
            endpoint_a = link.endpoint_a.id
126 1
            endpoint_b = link.endpoint_b.id
127 1
            self.graph[endpoint_a][endpoint_b][key] = value
128
129 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
130
        """Return the metadata of a link."""
131 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
132
133 1
    @staticmethod
134 1
    def _remove_switch_hops(circuit):
135
        """Remove switch hops from a circuit hops list."""
136 1
        for hop in circuit["hops"]:
137 1
            if len(hop.split(":")) == 8:
138 1
                circuit["hops"].remove(hop)
139
140 1
    def _path_cost(self, path, weight="hop", default_cost=1):
141
        """Compute the path cost given an attribute."""
142 1
        cost = 0
143 1
        for node, nbr in nx.utils.pairwise(path):
144 1
            cost += self.graph[node][nbr].get(weight, default_cost)
145 1
        return cost
146
147 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
148
        """Build the cost of a path given a list of paths."""
149 1
        paths_acc = []
150 1
        for path in paths:
151 1
            if isinstance(path, list):
152 1
                paths_acc.append(
153
                    {
154
                        "hops": path,
155
                        "cost": self._path_cost(
156
                            path, weight=weight, default_cost=default_weight
157
                        ),
158
                    }
159
                )
160 1
            elif isinstance(path, dict):
161 1
                path["cost"] = self._path_cost(
162
                    path["hops"], weight=weight, default_cost=default_weight
163
                )
164 1
                paths_acc.append(path)
165
            else:
166
                raise TypeError(
167
                    f"type: '{type(path)}' must be be either list or dict. "
168
                    f"path: {path}"
169
                )
170 1
        return paths_acc
171
172 1
    def k_shortest_paths(
173
        self, source, destination, weight=None, k=1, graph=None
174
    ):
175
        """
176
        Compute up to k shortest paths and return them.
177
178
        This procedure is based on algorithm by Jin Y. Yen [1].
179
        Since Yen's algorithm calls Dijkstra's up to k times, the time
180
        complexity will be proportional to K * Dijkstra's, average
181
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
182
        number of vertices and E number of egdes.
183
184
        References
185
        ----------
186
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
187
           Network", Management Science, Vol. 17, No. 11, Theory Series
188
           (Jul., 1971), pp. 712-716.
189
        """
190 1
        try:
191 1
            return list(
192
                islice(
193
                    nx.shortest_simple_paths(
194
                        graph or self.graph,
195
                        source,
196
                        destination,
197
                        weight=weight,
198
                    ),
199
                    k,
200
                )
201
            )
202 1
        except (NodeNotFound, NetworkXNoPath):
203 1
            return []
204
205 1
    def constrained_k_shortest_paths(
206
        self,
207
        source,
208
        destination,
209
        weight=None,
210
        k=1,
211
        graph=None,
212
        minimum_hits=None,
213
        **metrics,
214
    ):
215
        """Calculate the constrained shortest paths with flexibility."""
216 1
        graph = graph or self.graph
217 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
218 1
        flexible_metrics = metrics.get("flexible_metrics", {})
219 1
        first_pass_links = list(
220
            self._filter_links(
221
                graph.edges(data=True), **mandatory_metrics
222
            )
223
        )
224 1
        length = len(flexible_metrics)
225 1
        if minimum_hits is None:
226 1
            minimum_hits = 0
227 1
        minimum_hits = min(length, max(0, minimum_hits))
228
229 1
        paths = []
230 1
        for i in range(length, minimum_hits - 1, -1):
231 1
            for combo in combinations(flexible_metrics.items(), i):
232 1
                additional = dict(combo)
233 1
                filtered_links = self._filter_links(
234
                    first_pass_links, **additional
235
                )
236 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
237 1
                for path in self.k_shortest_paths(
238
                    source,
239
                    destination,
240
                    weight=weight,
241
                    k=k,
242
                    graph=graph.edge_subgraph(filtered_links),
243
                ):
244 1
                    paths.append(
245
                        {
246
                            "hops": path,
247
                            "metrics": {**mandatory_metrics, **additional},
248
                        }
249
                    )
250 1
                if len(paths) == k:
251 1
                    return paths
252 1
            if paths:
253 1
                return paths
254 1
        return paths
255
256 1
    def _filter_links(self, links, **metrics):
257 1
        for metric, value in metrics.items():
258 1
            filter_func = self._filter_functions.get(metric, None)
259 1
            if filter_func is not None:
260 1
                try:
261 1
                    links = filter_func(value, links)
262 1
                except TypeError as err:
263 1
                    raise TypeError(
264
                        f"Error in {metric} value: {value} err: {err}"
265
                    )
266
        return links
267