Passed
Pull Request — master (#65)
by
unknown
03:10
created

build.graph.graph   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 266
Duplicated Lines 0 %

Test Coverage

Coverage 94.39%

Importance

Changes 0
Metric Value
eloc 183
dl 0
loc 266
ccs 101
cts 107
cp 0.9439
rs 8.8798
c 0
b 0
f 0
wmc 44

13 Methods

Rating   Name   Duplication   Size   Complexity  
A KytosGraph.update_topology() 0 5 1
A KytosGraph.path_cost_builder() 0 24 4
A KytosGraph.update_link_metadata() 0 8 3
A KytosGraph.k_shortest_paths() 0 32 2
A KytosGraph._path_cost() 0 6 2
B KytosGraph.update_nodes() 0 16 6
A KytosGraph._remove_switch_hops() 0 6 3
A KytosGraph.clear() 0 3 1
A KytosGraph.update_links() 0 6 3
B KytosGraph.__init__() 0 57 7
A KytosGraph.get_link_metadata() 0 3 1
B KytosGraph.constrained_k_shortest_paths() 0 50 7
A KytosGraph._filter_links() 0 11 4

How to fix   Complexity   

Complexity

Complex classes like build.graph.graph often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                list: lambda x: frozenset(x),
41
                type(None): None
42
            })
43
        )
44 1
        self._filter_functions = {
45
            "ownership": EdgeFilter(
46
                operator.contains,
47
                UseValIfNone(ownership_processor)
48
            ),
49
            "not_ownership": EdgeFilter(
50
                lambda a, b: not (a & b),
51
                UseDefaultIfNone(ownership_processor, frozenset()),
52
                TypeDifferentiatedProcessor({
53
                    str: lambda val: frozenset(val.split(',')),
54
                    list: lambda val: frozenset(val)
55
                })
56
            ),
57
            "bandwidth": EdgeFilter(
58
                operator.ge,
59
                'bandwidth'
60
            ),
61
            "reliability": EdgeFilter(
62
                operator.ge,
63
                'reliability'
64
            ),
65
            "priority": EdgeFilter(
66
                operator.le,
67
                'priority'
68
            ),
69
            "utilization": EdgeFilter(
70
                operator.le,
71
                'utilization'
72
            ),
73
            "delay": EdgeFilter(
74
                operator.le,
75
                'delay'
76
            ),
77
        }
78 1
        self.spf_edge_data_cbs = {
79
            "hop": nx_edge_data_weight,
80
            "delay": nx_edge_data_delay,
81
            "priority": nx_edge_data_priority,
82
        }
83
84 1
    def clear(self):
85
        """Remove all nodes and links registered."""
86 1
        self.graph.clear()
87
88 1
    def update_topology(self, topology):
89
        """Update all nodes and links inside the graph."""
90 1
        self.graph.clear()
91 1
        self.update_nodes(topology.switches)
92 1
        self.update_links(topology.links)
93
94 1
    def update_nodes(self, nodes):
95
        """Update all nodes inside the graph."""
96 1
        for node in nodes.values():
97 1
            try:
98 1
                if node.status != EntityStatus.UP:
99 1
                    continue
100 1
                self.graph.add_node(node.id)
101
102 1
                for interface in node.interfaces.values():
103 1
                    if interface.status == EntityStatus.UP:
104 1
                        self.graph.add_node(interface.id)
105 1
                        self.graph.add_edge(node.id, interface.id)
106
107 1
            except AttributeError as err:
108 1
                raise TypeError(
109
                    f"Error when updating nodes inside the graph: {str(err)}"
110
                )
111
112 1
    def update_links(self, links):
113
        """Update all links inside the graph."""
114 1
        for link in links.values():
115 1
            if link.status == EntityStatus.UP:
116 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
117 1
                self.update_link_metadata(link)
118
119 1
    def update_link_metadata(self, link):
120
        """Update link metadata."""
121 1
        for key, value in link.metadata.items():
122 1
            if key not in self._accepted_metadata:
123 1
                continue
124 1
            endpoint_a = link.endpoint_a.id
125 1
            endpoint_b = link.endpoint_b.id
126 1
            self.graph[endpoint_a][endpoint_b][key] = value
127
128 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
129
        """Return the metadata of a link."""
130 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
131
132 1
    @staticmethod
133 1
    def _remove_switch_hops(circuit):
134
        """Remove switch hops from a circuit hops list."""
135 1
        for hop in circuit["hops"]:
136 1
            if len(hop.split(":")) == 8:
137 1
                circuit["hops"].remove(hop)
138
139 1
    def _path_cost(self, path, weight="hop", default_cost=1):
140
        """Compute the path cost given an attribute."""
141 1
        cost = 0
142 1
        for node, nbr in nx.utils.pairwise(path):
143 1
            cost += self.graph[node][nbr].get(weight, default_cost)
144 1
        return cost
145
146 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
147
        """Build the cost of a path given a list of paths."""
148 1
        paths_acc = []
149 1
        for path in paths:
150 1
            if isinstance(path, list):
151 1
                paths_acc.append(
152
                    {
153
                        "hops": path,
154
                        "cost": self._path_cost(
155
                            path, weight=weight, default_cost=default_weight
156
                        ),
157
                    }
158
                )
159 1
            elif isinstance(path, dict):
160 1
                path["cost"] = self._path_cost(
161
                    path["hops"], weight=weight, default_cost=default_weight
162
                )
163 1
                paths_acc.append(path)
164
            else:
165
                raise TypeError(
166
                    f"type: '{type(path)}' must be be either list or dict. "
167
                    f"path: {path}"
168
                )
169 1
        return paths_acc
170
171 1
    def k_shortest_paths(
172
        self, source, destination, weight=None, k=1, graph=None
173
    ):
174
        """
175
        Compute up to k shortest paths and return them.
176
177
        This procedure is based on algorithm by Jin Y. Yen [1].
178
        Since Yen's algorithm calls Dijkstra's up to k times, the time
179
        complexity will be proportional to K * Dijkstra's, average
180
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
181
        number of vertices and E number of egdes.
182
183
        References
184
        ----------
185
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
186
           Network", Management Science, Vol. 17, No. 11, Theory Series
187
           (Jul., 1971), pp. 712-716.
188
        """
189 1
        try:
190 1
            return list(
191
                islice(
192
                    nx.shortest_simple_paths(
193
                        graph or self.graph,
194
                        source,
195
                        destination,
196
                        weight=weight,
197
                    ),
198
                    k,
199
                )
200
            )
201 1
        except (NodeNotFound, NetworkXNoPath):
202 1
            return []
203
204 1
    def constrained_k_shortest_paths(
205
        self,
206
        source,
207
        destination,
208
        weight=None,
209
        k=1,
210
        graph=None,
211
        minimum_hits=None,
212
        **metrics,
213
    ):
214
        """Calculate the constrained shortest paths with flexibility."""
215 1
        graph = graph or self.graph
216 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
217 1
        flexible_metrics = metrics.get("flexible_metrics", {})
218 1
        first_pass_links = list(
219
            self._filter_links(
220
                graph.edges(data=True), **mandatory_metrics
221
            )
222
        )
223 1
        length = len(flexible_metrics)
224 1
        if minimum_hits is None:
225 1
            minimum_hits = 0
226 1
        minimum_hits = min(length, max(0, minimum_hits))
227
228 1
        paths = []
229 1
        for i in range(length, minimum_hits - 1, -1):
230 1
            for combo in combinations(flexible_metrics.items(), i):
231 1
                additional = dict(combo)
232 1
                filtered_links = self._filter_links(
233
                    first_pass_links, **additional
234
                )
235 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
236 1
                for path in self.k_shortest_paths(
237
                    source,
238
                    destination,
239
                    weight=weight,
240
                    k=k,
241
                    graph=graph.edge_subgraph(filtered_links),
242
                ):
243 1
                    paths.append(
244
                        {
245
                            "hops": path,
246
                            "metrics": {**mandatory_metrics, **additional},
247
                        }
248
                    )
249 1
                if len(paths) == k:
250 1
                    return paths
251 1
            if paths:
252 1
                return paths
253 1
        return paths
254
255 1
    def _filter_links(self, links, **metrics):
256 1
        for metric, value in metrics.items():
257 1
            filter_func = self._filter_functions.get(metric, None)
258 1
            if filter_func is not None:
259 1
                try:
260 1
                    links = filter_func(value, links)
261
                except TypeError as err:
262
                    raise TypeError(
263
                        f"Error in {metric} value: {value} err: {err}"
264
                    )
265
        return links
266