Passed
Pull Request — master (#65)
by
unknown
02:58
created

build.graph.graph.KytosGraph.__init__()   B

Complexity

Conditions 6

Size

Total Lines 66
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 52
nop 1
dl 0
loc 66
ccs 6
cts 6
cp 1
crap 6
rs 7.6375
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43 1
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.and_,
46
                UseValIfNone(ownership_processor),
47
                TypeCheckPreprocessor(
48
                    str,
49
                    lambda val: frozenset(val.split(','))
50
                )
51
            ),
52
            "not_ownership": EdgeFilter(
53
                lambda a, b: not (a & b),
54
                UseDefaultIfNone(ownership_processor, frozenset()),
55
                TypeCheckPreprocessor(
56
                    str,
57
                    lambda val: frozenset(val.split(','))
58
                )
59
            ),
60
            "bandwidth": EdgeFilter(
61
                operator.ge,
62
                'bandwidth',
63
                TypeCheckPreprocessor((int, float))
64
65
            ),
66
            "reliability": EdgeFilter(
67
                operator.ge,
68
                'reliability',
69
                TypeCheckPreprocessor((int, float))
70
            ),
71
            "priority": EdgeFilter(
72
                operator.le,
73
                'priority',
74
                TypeCheckPreprocessor((int, float))
75
            ),
76
            "utilization": EdgeFilter(
77
                operator.le,
78
                'utilization',
79
                TypeCheckPreprocessor((int, float))
80
            ),
81
            "delay": EdgeFilter(
82
                operator.le,
83
                'delay',
84
                TypeCheckPreprocessor((int, float))
85
            ),
86
        }
87 1
        self.spf_edge_data_cbs = {
88
            "hop": nx_edge_data_weight,
89
            "delay": nx_edge_data_delay,
90
            "priority": nx_edge_data_priority,
91
        }
92
93 1
    def clear(self):
94
        """Remove all nodes and links registered."""
95 1
        self.graph.clear()
96
97 1
    def update_topology(self, topology):
98
        """Update all nodes and links inside the graph."""
99 1
        self.graph.clear()
100 1
        self.update_nodes(topology.switches)
101 1
        self.update_links(topology.links)
102
103 1
    def update_nodes(self, nodes):
104
        """Update all nodes inside the graph."""
105 1
        for node in nodes.values():
106 1
            try:
107 1
                if node.status != EntityStatus.UP:
108 1
                    continue
109 1
                self.graph.add_node(node.id)
110
111 1
                for interface in node.interfaces.values():
112 1
                    if interface.status == EntityStatus.UP:
113 1
                        self.graph.add_node(interface.id)
114 1
                        self.graph.add_edge(node.id, interface.id)
115
116 1
            except AttributeError as err:
117 1
                raise TypeError(
118
                    f"Error when updating nodes inside the graph: {str(err)}"
119
                )
120
121 1
    def update_links(self, links):
122
        """Update all links inside the graph."""
123 1
        for link in links.values():
124 1
            if link.status == EntityStatus.UP:
125 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
126 1
                self.update_link_metadata(link)
127
128 1
    def update_link_metadata(self, link):
129
        """Update link metadata."""
130 1
        for key, value in link.metadata.items():
131 1
            if key not in self._accepted_metadata:
132 1
                continue
133 1
            endpoint_a = link.endpoint_a.id
134 1
            endpoint_b = link.endpoint_b.id
135 1
            self.graph[endpoint_a][endpoint_b][key] = value
136
137 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
138
        """Return the metadata of a link."""
139 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
140
141 1
    @staticmethod
142 1
    def _remove_switch_hops(circuit):
143
        """Remove switch hops from a circuit hops list."""
144 1
        for hop in circuit["hops"]:
145 1
            if len(hop.split(":")) == 8:
146 1
                circuit["hops"].remove(hop)
147
148 1
    def _path_cost(self, path, weight="hop", default_cost=1):
149
        """Compute the path cost given an attribute."""
150 1
        cost = 0
151 1
        for node, nbr in nx.utils.pairwise(path):
152 1
            cost += self.graph[node][nbr].get(weight, default_cost)
153 1
        return cost
154
155 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
156
        """Build the cost of a path given a list of paths."""
157 1
        paths_acc = []
158 1
        for path in paths:
159 1
            if isinstance(path, list):
160 1
                paths_acc.append(
161
                    {
162
                        "hops": path,
163
                        "cost": self._path_cost(
164
                            path, weight=weight, default_cost=default_weight
165
                        ),
166
                    }
167
                )
168 1
            elif isinstance(path, dict):
169 1
                path["cost"] = self._path_cost(
170
                    path["hops"], weight=weight, default_cost=default_weight
171
                )
172 1
                paths_acc.append(path)
173
            else:
174
                raise TypeError(
175
                    f"type: '{type(path)}' must be be either list or dict. "
176
                    f"path: {path}"
177
                )
178 1
        return paths_acc
179
180 1
    def k_shortest_paths(
181
        self, source, destination, weight=None, k=1, graph=None
182
    ):
183
        """
184
        Compute up to k shortest paths and return them.
185
186
        This procedure is based on algorithm by Jin Y. Yen [1].
187
        Since Yen's algorithm calls Dijkstra's up to k times, the time
188
        complexity will be proportional to K * Dijkstra's, average
189
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
190
        number of vertices and E number of egdes.
191
192
        References
193
        ----------
194
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
195
           Network", Management Science, Vol. 17, No. 11, Theory Series
196
           (Jul., 1971), pp. 712-716.
197
        """
198 1
        try:
199 1
            return list(
200
                islice(
201
                    nx.shortest_simple_paths(
202
                        graph or self.graph,
203
                        source,
204
                        destination,
205
                        weight=weight,
206
                    ),
207
                    k,
208
                )
209
            )
210 1
        except (NodeNotFound, NetworkXNoPath):
211 1
            return []
212
213 1
    def constrained_k_shortest_paths(
214
        self,
215
        source,
216
        destination,
217
        weight=None,
218
        k=1,
219
        graph=None,
220
        minimum_hits=None,
221
        **metrics,
222
    ):
223
        """Calculate the constrained shortest paths with flexibility."""
224 1
        graph = graph or self.graph
225 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
226 1
        flexible_metrics = metrics.get("flexible_metrics", {})
227 1
        first_pass_links = list(
228
            self._filter_links(
229
                graph.edges(data=True), **mandatory_metrics
230
            )
231
        )
232 1
        length = len(flexible_metrics)
233 1
        if minimum_hits is None:
234 1
            minimum_hits = 0
235 1
        minimum_hits = min(length, max(0, minimum_hits))
236
237 1
        paths = []
238 1
        for i in range(length, minimum_hits - 1, -1):
239 1
            for combo in combinations(flexible_metrics.items(), i):
240 1
                additional = dict(combo)
241 1
                filtered_links = self._filter_links(
242
                    first_pass_links, **additional
243
                )
244 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
245 1
                for path in self.k_shortest_paths(
246
                    source,
247
                    destination,
248
                    weight=weight,
249
                    k=k,
250
                    graph=graph.edge_subgraph(filtered_links),
251
                ):
252 1
                    paths.append(
253
                        {
254
                            "hops": path,
255
                            "metrics": {**mandatory_metrics, **additional},
256
                        }
257
                    )
258 1
                if len(paths) == k:
259 1
                    return paths
260 1
            if paths:
261 1
                return paths
262 1
        return paths
263
264 1
    def _filter_links(self, links, **metrics):
265 1
        for metric, value in metrics.items():
266 1
            filter_func = self._filter_functions.get(metric, None)
267 1
            if filter_func is not None:
268 1
                try:
269 1
                    links = filter_func(value, links)
270 1
                except TypeError as err:
271 1
                    raise TypeError(
272
                        f"Error in {metric} value: {value} err: {err}"
273
                    )
274
        return links
275