Passed
Pull Request — master (#65)
by
unknown
03:45
created

build.graph.graph.KytosGraph.__init__()   B

Complexity

Conditions 6

Size

Total Lines 56
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 44
nop 1
dl 0
loc 56
ccs 6
cts 6
cp 1
crap 6
rs 7.8906
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43 1
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.contains,
46
                UseValIfNone(ownership_processor)
47
            ),
48
            "not_ownership": EdgeFilter(
49
                lambda a, b: not (a & b),
50
                UseDefaultIfNone(ownership_processor, frozenset()),
51
                TypeDifferentiatedProcessor({
52
                    str: lambda val: frozenset(val.split(',')),
53
                    list: lambda val: frozenset(val)
54
                })
55
            ),
56
            "bandwidth": EdgeFilter(
57
                operator.ge,
58
                'bandwidth'
59
            ),
60
            "reliability": EdgeFilter(
61
                operator.ge,
62
                'reliability'
63
            ),
64
            "priority": EdgeFilter(
65
                operator.le,
66
                'priority'
67
            ),
68
            "utilization": EdgeFilter(
69
                operator.le,
70
                'utilization'
71
            ),
72
            "delay": EdgeFilter(
73
                operator.le,
74
                'delay'
75
            ),
76
        }
77 1
        self.spf_edge_data_cbs = {
78
            "hop": nx_edge_data_weight,
79
            "delay": nx_edge_data_delay,
80
            "priority": nx_edge_data_priority,
81
        }
82
83 1
    def clear(self):
84
        """Remove all nodes and links registered."""
85 1
        self.graph.clear()
86
87 1
    def update_topology(self, topology):
88
        """Update all nodes and links inside the graph."""
89 1
        self.graph.clear()
90 1
        self.update_nodes(topology.switches)
91 1
        self.update_links(topology.links)
92
93 1
    def update_nodes(self, nodes):
94
        """Update all nodes inside the graph."""
95 1
        for node in nodes.values():
96 1
            try:
97 1
                if node.status != EntityStatus.UP:
98 1
                    continue
99 1
                self.graph.add_node(node.id)
100
101 1
                for interface in node.interfaces.values():
102 1
                    if interface.status == EntityStatus.UP:
103 1
                        self.graph.add_node(interface.id)
104 1
                        self.graph.add_edge(node.id, interface.id)
105
106 1
            except AttributeError as err:
107 1
                raise TypeError(
108
                    f"Error when updating nodes inside the graph: {str(err)}"
109
                )
110
111 1
    def update_links(self, links):
112
        """Update all links inside the graph."""
113 1
        for link in links.values():
114 1
            if link.status == EntityStatus.UP:
115 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
116 1
                self.update_link_metadata(link)
117
118 1
    def update_link_metadata(self, link):
119
        """Update link metadata."""
120 1
        for key, value in link.metadata.items():
121 1
            if key not in self._accepted_metadata:
122 1
                continue
123 1
            endpoint_a = link.endpoint_a.id
124 1
            endpoint_b = link.endpoint_b.id
125 1
            self.graph[endpoint_a][endpoint_b][key] = value
126
127 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
128
        """Return the metadata of a link."""
129 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
130
131 1
    @staticmethod
132 1
    def _remove_switch_hops(circuit):
133
        """Remove switch hops from a circuit hops list."""
134 1
        for hop in circuit["hops"]:
135 1
            if len(hop.split(":")) == 8:
136 1
                circuit["hops"].remove(hop)
137
138 1
    def _path_cost(self, path, weight="hop", default_cost=1):
139
        """Compute the path cost given an attribute."""
140 1
        cost = 0
141 1
        for node, nbr in nx.utils.pairwise(path):
142 1
            cost += self.graph[node][nbr].get(weight, default_cost)
143 1
        return cost
144
145 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
146
        """Build the cost of a path given a list of paths."""
147 1
        paths_acc = []
148 1
        for path in paths:
149 1
            if isinstance(path, list):
150 1
                paths_acc.append(
151
                    {
152
                        "hops": path,
153
                        "cost": self._path_cost(
154
                            path, weight=weight, default_cost=default_weight
155
                        ),
156
                    }
157
                )
158 1
            elif isinstance(path, dict):
159 1
                path["cost"] = self._path_cost(
160
                    path["hops"], weight=weight, default_cost=default_weight
161
                )
162 1
                paths_acc.append(path)
163
            else:
164
                raise TypeError(
165
                    f"type: '{type(path)}' must be be either list or dict. "
166
                    f"path: {path}"
167
                )
168 1
        return paths_acc
169
170 1
    def k_shortest_paths(
171
        self, source, destination, weight=None, k=1, graph=None
172
    ):
173
        """
174
        Compute up to k shortest paths and return them.
175
176
        This procedure is based on algorithm by Jin Y. Yen [1].
177
        Since Yen's algorithm calls Dijkstra's up to k times, the time
178
        complexity will be proportional to K * Dijkstra's, average
179
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
180
        number of vertices and E number of egdes.
181
182
        References
183
        ----------
184
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
185
           Network", Management Science, Vol. 17, No. 11, Theory Series
186
           (Jul., 1971), pp. 712-716.
187
        """
188 1
        try:
189 1
            return list(
190
                islice(
191
                    nx.shortest_simple_paths(
192
                        graph or self.graph,
193
                        source,
194
                        destination,
195
                        weight=weight,
196
                    ),
197
                    k,
198
                )
199
            )
200 1
        except (NodeNotFound, NetworkXNoPath):
201 1
            return []
202
203 1
    def constrained_k_shortest_paths(
204
        self,
205
        source,
206
        destination,
207
        weight=None,
208
        k=1,
209
        graph=None,
210
        minimum_hits=None,
211
        **metrics,
212
    ):
213
        """Calculate the constrained shortest paths with flexibility."""
214 1
        graph = graph or self.graph
215 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
216 1
        flexible_metrics = metrics.get("flexible_metrics", {})
217 1
        first_pass_links = list(
218
            self._filter_links(
219
                graph.edges(data=True), **mandatory_metrics
220
            )
221
        )
222 1
        length = len(flexible_metrics)
223 1
        if minimum_hits is None:
224 1
            minimum_hits = 0
225 1
        minimum_hits = min(length, max(0, minimum_hits))
226
227 1
        paths = []
228 1
        for i in range(length, minimum_hits - 1, -1):
229 1
            for combo in combinations(flexible_metrics.items(), i):
230 1
                additional = dict(combo)
231 1
                filtered_links = self._filter_links(
232
                    first_pass_links, **additional
233
                )
234 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
235 1
                for path in self.k_shortest_paths(
236
                    source,
237
                    destination,
238
                    weight=weight,
239
                    k=k,
240
                    graph=graph.edge_subgraph(filtered_links),
241
                ):
242 1
                    paths.append(
243
                        {
244
                            "hops": path,
245
                            "metrics": {**mandatory_metrics, **additional},
246
                        }
247
                    )
248 1
                if len(paths) == k:
249 1
                    return paths
250 1
            if paths:
251 1
                return paths
252 1
        return paths
253
254 1
    def _filter_links(self, links, **metrics):
255 1
        for metric, value in metrics.items():
256 1
            filter_func = self._filter_functions.get(metric, None)
257 1
            if filter_func is not None:
258 1
                try:
259 1
                    links = filter_func(value, links)
260
                except TypeError as err:
261
                    raise TypeError(
262
                        f"Error in {metric} value: {value} err: {err}"
263
                    )
264
        return links
265