Passed
Pull Request — master (#65)
by
unknown
03:19
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
ccs 22
cts 22
cp 1
crap 7
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43 1
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.contains,
46
                UseValIfNone(ownership_processor)
47
            ),
48
            "not_ownership": EdgeFilter(
49
                lambda a, b: not (a & b),
50
                UseDefaultIfNone(ownership_processor, frozenset()),
51
                TypeDifferentiatedProcessor({
52
                    str: lambda val: frozenset(val.split(',')),
53
                    list: lambda val: frozenset(val)
54
                })
55
            ),
56
            "bandwidth": EdgeFilter(
57
                operator.ge,
58
                'bandwidth'
59
            ),
60
            "reliability": EdgeFilter(
61
                operator.ge,
62
                'reliability'
63
            ),
64
            "priority": EdgeFilter(
65
                operator.le,
66
                'priority'
67
            ),
68
            "utilization": EdgeFilter(
69
                operator.le,
70
                'utilization'
71
            ),
72
            "delay": EdgeFilter(
73
                operator.le,
74
                'delay'
75
            ),
76
        }
77 1
        self.spf_edge_data_cbs = {
78
            "hop": nx_edge_data_weight,
79
            "delay": nx_edge_data_delay,
80
            "priority": nx_edge_data_priority,
81
        }
82
83 1
    def clear(self):
84
        """Remove all nodes and links registered."""
85 1
        self.graph.clear()
86
87 1
    def update_topology(self, topology):
88
        """Update all nodes and links inside the graph."""
89 1
        self.graph.clear()
90 1
        self.update_nodes(topology.switches)
91 1
        self.update_links(topology.links)
92
93 1
    def update_nodes(self, nodes):
94
        """Update all nodes inside the graph."""
95 1
        for node in nodes.values():
96 1
            try:
97 1
                if node.status != EntityStatus.UP:
98 1
                    continue
99 1
                self.graph.add_node(node.id)
100
101 1
                for interface in node.interfaces.values():
102 1
                    if interface.status == EntityStatus.UP:
103 1
                        self.graph.add_node(interface.id)
104 1
                        self.graph.add_edge(node.id, interface.id)
105
106 1
            except AttributeError as err:
107 1
                raise TypeError(
108
                    f"Error when updating nodes inside the graph: {str(err)}"
109
                )
110
111 1
    def update_links(self, links):
112
        """Update all links inside the graph."""
113 1
        for link in links.values():
114 1
            if link.status == EntityStatus.UP:
115 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
116 1
                self.update_link_metadata(link)
117
118 1
    def update_link_metadata(self, link):
119
        """Update link metadata."""
120 1
        for key, value in link.metadata.items():
121 1
            if key not in self._accepted_metadata:
122 1
                continue
123 1
            endpoint_a = link.endpoint_a.id
124 1
            endpoint_b = link.endpoint_b.id
125 1
            self.graph[endpoint_a][endpoint_b][key] = value
126
127 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
128
        """Return the metadata of a link."""
129 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
130
131 1
    @staticmethod
132 1
    def _remove_switch_hops(circuit):
133
        """Remove switch hops from a circuit hops list."""
134 1
        for hop in circuit["hops"]:
135 1
            if len(hop.split(":")) == 8:
136 1
                circuit["hops"].remove(hop)
137
138 1
    def _path_cost(self, path, weight="hop", default_cost=1):
139
        """Compute the path cost given an attribute."""
140 1
        cost = 0
141 1
        for node, nbr in nx.utils.pairwise(path):
142 1
            cost += self.graph[node][nbr].get(weight, default_cost)
143 1
        return cost
144
145 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
146
        """Build the cost of a path given a list of paths."""
147 1
        paths_acc = []
148 1
        for path in paths:
149 1
            if isinstance(path, list):
150 1
                paths_acc.append(
151
                    {
152
                        "hops": path,
153
                        "cost": self._path_cost(
154
                            path, weight=weight, default_cost=default_weight
155
                        ),
156
                    }
157
                )
158 1
            elif isinstance(path, dict):
159 1
                path["cost"] = self._path_cost(
160
                    path["hops"], weight=weight, default_cost=default_weight
161
                )
162 1
                paths_acc.append(path)
163
            else:
164
                raise TypeError(
165
                    f"type: '{type(path)}' must be be either list or dict. "
166
                    f"path: {path}"
167
                )
168 1
        return paths_acc
169
170 1
    def k_shortest_paths(
171
        self, source, destination, weight=None, k=1, graph=None
172
    ):
173
        """
174
        Compute up to k shortest paths and return them.
175
176
        This procedure is based on algorithm by Jin Y. Yen [1].
177
        Since Yen's algorithm calls Dijkstra's up to k times, the time
178
        complexity will be proportional to K * Dijkstra's, average
179
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
180
        number of vertices and E number of egdes.
181
182
        References
183
        ----------
184
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
185
           Network", Management Science, Vol. 17, No. 11, Theory Series
186
           (Jul., 1971), pp. 712-716.
187
        """
188 1
        try:
189 1
            return list(
190
                islice(
191
                    nx.shortest_simple_paths(
192
                        graph or self.graph,
193
                        source,
194
                        destination,
195
                        weight=weight,
196
                    ),
197
                    k,
198
                )
199
            )
200 1
        except (NodeNotFound, NetworkXNoPath):
201 1
            return []
202
203 1
    def constrained_k_shortest_paths(
204
        self,
205
        source,
206
        destination,
207
        weight=None,
208
        k=1,
209
        graph=None,
210
        minimum_hits=None,
211
        **metrics,
212
    ):
213
        """Calculate the constrained shortest paths with flexibility."""
214 1
        graph = graph or self.graph
215 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
216 1
        flexible_metrics = metrics.get("flexible_metrics", {})
217 1
        first_pass_links = list(
218
            self._filter_links(
219
                graph.edges(data=True), **mandatory_metrics
220
            )
221
        )
222 1
        length = len(flexible_metrics)
223 1
        if minimum_hits is None:
224 1
            minimum_hits = 0
225 1
        minimum_hits = min(length, max(0, minimum_hits))
226
227 1
        paths = []
228 1
        for i in range(length, minimum_hits - 1, -1):
229 1
            for combo in combinations(flexible_metrics.items(), i):
230 1
                additional = dict(combo)
231 1
                filtered_links = self._filter_links(
232
                    first_pass_links, **additional
233
                )
234 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
235 1
                for path in self.k_shortest_paths(
236
                    source,
237
                    destination,
238
                    weight=weight,
239
                    k=k,
240
                    graph=graph.edge_subgraph(filtered_links),
241
                ):
242 1
                    paths.append(
243
                        {
244
                            "hops": path,
245
                            "metrics": {**mandatory_metrics, **additional},
246
                        }
247
                    )
248 1
                if len(paths) == k:
249 1
                    return paths
250 1
            if paths:
251 1
                return paths
252 1
        return paths
253
254 1
    def _filter_links(self, links, **metrics):
255 1
        for metric, value in metrics.items():
256 1
            filter_func = self._filter_functions.get(metric, None)
257 1
            if filter_func is not None:
258 1
                try:
259 1
                    links = filter_func(value, links)
260
                except TypeError as err:
261
                    raise TypeError(
262
                        f"Error in {metric} value: {value} err: {err}"
263
                    )
264
        return links
265