Passed
Pull Request — master (#65)
by
unknown
02:58
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
ccs 22
cts 22
cp 1
crap 7
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43 1
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.and_,
46
                UseValIfNone(ownership_processor),
47
                TypeCheckPreprocessor(
48
                    str,
49
                    lambda val: frozenset(val.split(','))
50
                )
51
            ),
52
            "not_ownership": EdgeFilter(
53
                lambda a, b: not (a & b),
54
                UseDefaultIfNone(ownership_processor, frozenset()),
55
                TypeCheckPreprocessor(
56
                    str,
57
                    lambda val: frozenset(val.split(','))
58
                )
59
            ),
60
            "bandwidth": EdgeFilter(
61
                operator.ge,
62
                'bandwidth',
63
                TypeCheckPreprocessor((int, float))
64
65
            ),
66
            "reliability": EdgeFilter(
67
                operator.ge,
68
                'reliability',
69
                TypeCheckPreprocessor((int, float))
70
            ),
71
            "priority": EdgeFilter(
72
                operator.le,
73
                'priority',
74
                TypeCheckPreprocessor((int, float))
75
            ),
76
            "utilization": EdgeFilter(
77
                operator.le,
78
                'utilization',
79
                TypeCheckPreprocessor((int, float))
80
            ),
81
            "delay": EdgeFilter(
82
                operator.le,
83
                'delay',
84
                TypeCheckPreprocessor((int, float))
85
            ),
86
        }
87 1
        self.spf_edge_data_cbs = {
88
            "hop": nx_edge_data_weight,
89
            "delay": nx_edge_data_delay,
90
            "priority": nx_edge_data_priority,
91
        }
92
93 1
    def clear(self):
94
        """Remove all nodes and links registered."""
95 1
        self.graph.clear()
96
97 1
    def update_topology(self, topology):
98
        """Update all nodes and links inside the graph."""
99 1
        self.graph.clear()
100 1
        self.update_nodes(topology.switches)
101 1
        self.update_links(topology.links)
102
103 1
    def update_nodes(self, nodes):
104
        """Update all nodes inside the graph."""
105 1
        for node in nodes.values():
106 1
            try:
107 1
                if node.status != EntityStatus.UP:
108 1
                    continue
109 1
                self.graph.add_node(node.id)
110
111 1
                for interface in node.interfaces.values():
112 1
                    if interface.status == EntityStatus.UP:
113 1
                        self.graph.add_node(interface.id)
114 1
                        self.graph.add_edge(node.id, interface.id)
115
116 1
            except AttributeError as err:
117 1
                raise TypeError(
118
                    f"Error when updating nodes inside the graph: {str(err)}"
119
                )
120
121 1
    def update_links(self, links):
122
        """Update all links inside the graph."""
123 1
        for link in links.values():
124 1
            if link.status == EntityStatus.UP:
125 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
126 1
                self.update_link_metadata(link)
127
128 1
    def update_link_metadata(self, link):
129
        """Update link metadata."""
130 1
        for key, value in link.metadata.items():
131 1
            if key not in self._accepted_metadata:
132 1
                continue
133 1
            endpoint_a = link.endpoint_a.id
134 1
            endpoint_b = link.endpoint_b.id
135 1
            self.graph[endpoint_a][endpoint_b][key] = value
136
137 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
138
        """Return the metadata of a link."""
139 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
140
141 1
    @staticmethod
142 1
    def _remove_switch_hops(circuit):
143
        """Remove switch hops from a circuit hops list."""
144 1
        for hop in circuit["hops"]:
145 1
            if len(hop.split(":")) == 8:
146 1
                circuit["hops"].remove(hop)
147
148 1
    def _path_cost(self, path, weight="hop", default_cost=1):
149
        """Compute the path cost given an attribute."""
150 1
        cost = 0
151 1
        for node, nbr in nx.utils.pairwise(path):
152 1
            cost += self.graph[node][nbr].get(weight, default_cost)
153 1
        return cost
154
155 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
156
        """Build the cost of a path given a list of paths."""
157 1
        paths_acc = []
158 1
        for path in paths:
159 1
            if isinstance(path, list):
160 1
                paths_acc.append(
161
                    {
162
                        "hops": path,
163
                        "cost": self._path_cost(
164
                            path, weight=weight, default_cost=default_weight
165
                        ),
166
                    }
167
                )
168 1
            elif isinstance(path, dict):
169 1
                path["cost"] = self._path_cost(
170
                    path["hops"], weight=weight, default_cost=default_weight
171
                )
172 1
                paths_acc.append(path)
173
            else:
174
                raise TypeError(
175
                    f"type: '{type(path)}' must be be either list or dict. "
176
                    f"path: {path}"
177
                )
178 1
        return paths_acc
179
180 1
    def k_shortest_paths(
181
        self, source, destination, weight=None, k=1, graph=None
182
    ):
183
        """
184
        Compute up to k shortest paths and return them.
185
186
        This procedure is based on algorithm by Jin Y. Yen [1].
187
        Since Yen's algorithm calls Dijkstra's up to k times, the time
188
        complexity will be proportional to K * Dijkstra's, average
189
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
190
        number of vertices and E number of egdes.
191
192
        References
193
        ----------
194
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
195
           Network", Management Science, Vol. 17, No. 11, Theory Series
196
           (Jul., 1971), pp. 712-716.
197
        """
198 1
        try:
199 1
            return list(
200
                islice(
201
                    nx.shortest_simple_paths(
202
                        graph or self.graph,
203
                        source,
204
                        destination,
205
                        weight=weight,
206
                    ),
207
                    k,
208
                )
209
            )
210 1
        except (NodeNotFound, NetworkXNoPath):
211 1
            return []
212
213 1
    def constrained_k_shortest_paths(
214
        self,
215
        source,
216
        destination,
217
        weight=None,
218
        k=1,
219
        graph=None,
220
        minimum_hits=None,
221
        **metrics,
222
    ):
223
        """Calculate the constrained shortest paths with flexibility."""
224 1
        graph = graph or self.graph
225 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
226 1
        flexible_metrics = metrics.get("flexible_metrics", {})
227 1
        first_pass_links = list(
228
            self._filter_links(
229
                graph.edges(data=True), **mandatory_metrics
230
            )
231
        )
232 1
        length = len(flexible_metrics)
233 1
        if minimum_hits is None:
234 1
            minimum_hits = 0
235 1
        minimum_hits = min(length, max(0, minimum_hits))
236
237 1
        paths = []
238 1
        for i in range(length, minimum_hits - 1, -1):
239 1
            for combo in combinations(flexible_metrics.items(), i):
240 1
                additional = dict(combo)
241 1
                filtered_links = self._filter_links(
242
                    first_pass_links, **additional
243
                )
244 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
245 1
                for path in self.k_shortest_paths(
246
                    source,
247
                    destination,
248
                    weight=weight,
249
                    k=k,
250
                    graph=graph.edge_subgraph(filtered_links),
251
                ):
252 1
                    paths.append(
253
                        {
254
                            "hops": path,
255
                            "metrics": {**mandatory_metrics, **additional},
256
                        }
257
                    )
258 1
                if len(paths) == k:
259 1
                    return paths
260 1
            if paths:
261 1
                return paths
262 1
        return paths
263
264 1
    def _filter_links(self, links, **metrics):
265 1
        for metric, value in metrics.items():
266 1
            filter_func = self._filter_functions.get(metric, None)
267 1
            if filter_func is not None:
268 1
                try:
269 1
                    links = filter_func(value, links)
270 1
                except TypeError as err:
271 1
                    raise TypeError(
272
                        f"Error in {metric} value: {value} err: {err}"
273
                    )
274
        return links
275