Passed
Pull Request — master (#65)
by
unknown
06:59 queued 04:09
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
ccs 22
cts 22
cp 1
crap 7
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, TypeCheckPreprocessor                                       
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        self._filter_functions = {
36
            "ownership": EdgeFilter(
37
                operator.and_,
38
                lambda edge, val: frozenset(edge[2].get('ownership', {}).keys()) or val,
39
                TypeCheckPreprocessor(
40
                    str,
41
                    lambda val: frozenset(val.split(','))
42
                )
43
            ),
44
            "not_ownership": EdgeFilter(
45
                lambda a, b: not (a & b),
46
                lambda edge, _: frozenset(edge[2].get('ownership', {}).keys()),
47
                TypeCheckPreprocessor(
48
                    str,
49
                    lambda val: frozenset(val.split(','))
50
                )
51
            ),
52
            "bandwidth": EdgeFilter(
53
                operator.ge,
54
                'bandwidth',
55
                TypeCheckPreprocessor((int, float))
56
57
            ),
58
            "reliability": EdgeFilter(
59
                operator.ge,
60
                'reliability',
61
                TypeCheckPreprocessor((int, float))
62
            ),
63
            "priority": EdgeFilter(
64
                operator.le,
65
                'priority',
66
                TypeCheckPreprocessor((int, float))
67
            ),
68
            "utilization": EdgeFilter(
69
                operator.le,
70
                'utilization',
71
                TypeCheckPreprocessor((int, float))
72
            ),
73
            "delay": EdgeFilter(
74
                operator.le,
75
                'delay',
76
                TypeCheckPreprocessor((int, float))
77
            ),
78
        }
79 1
        self.spf_edge_data_cbs = {
80
            "hop": nx_edge_data_weight,
81
            "delay": nx_edge_data_delay,
82
            "priority": nx_edge_data_priority,
83
        }
84
85 1
    def clear(self):
86
        """Remove all nodes and links registered."""
87 1
        self.graph.clear()
88
89 1
    def update_topology(self, topology):
90
        """Update all nodes and links inside the graph."""
91 1
        self.graph.clear()
92 1
        self.update_nodes(topology.switches)
93 1
        self.update_links(topology.links)
94
95 1
    def update_nodes(self, nodes):
96
        """Update all nodes inside the graph."""
97 1
        for node in nodes.values():
98 1
            try:
99 1
                if node.status != EntityStatus.UP:
100 1
                    continue
101 1
                self.graph.add_node(node.id)
102
103 1
                for interface in node.interfaces.values():
104 1
                    if interface.status == EntityStatus.UP:
105 1
                        self.graph.add_node(interface.id)
106 1
                        self.graph.add_edge(node.id, interface.id)
107
108 1
            except AttributeError as err:
109 1
                raise TypeError(
110
                    f"Error when updating nodes inside the graph: {str(err)}"
111
                )
112
113 1
    def update_links(self, links):
114
        """Update all links inside the graph."""
115 1
        for link in links.values():
116 1
            if link.status == EntityStatus.UP:
117 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
118 1
                self.update_link_metadata(link)
119
120 1
    def update_link_metadata(self, link):
121
        """Update link metadata."""
122 1
        for key, value in link.metadata.items():
123 1
            if key not in self._accepted_metadata:
124 1
                continue
125 1
            endpoint_a = link.endpoint_a.id
126 1
            endpoint_b = link.endpoint_b.id
127 1
            self.graph[endpoint_a][endpoint_b][key] = value
128
129 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
130
        """Return the metadata of a link."""
131 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
132
133 1
    @staticmethod
134 1
    def _remove_switch_hops(circuit):
135
        """Remove switch hops from a circuit hops list."""
136 1
        for hop in circuit["hops"]:
137 1
            if len(hop.split(":")) == 8:
138 1
                circuit["hops"].remove(hop)
139
140 1
    def _path_cost(self, path, weight="hop", default_cost=1):
141
        """Compute the path cost given an attribute."""
142 1
        cost = 0
143 1
        for node, nbr in nx.utils.pairwise(path):
144 1
            cost += self.graph[node][nbr].get(weight, default_cost)
145 1
        return cost
146
147 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
148
        """Build the cost of a path given a list of paths."""
149 1
        paths_acc = []
150 1
        for path in paths:
151 1
            if isinstance(path, list):
152 1
                paths_acc.append(
153
                    {
154
                        "hops": path,
155
                        "cost": self._path_cost(
156
                            path, weight=weight, default_cost=default_weight
157
                        ),
158
                    }
159
                )
160 1
            elif isinstance(path, dict):
161 1
                path["cost"] = self._path_cost(
162
                    path["hops"], weight=weight, default_cost=default_weight
163
                )
164 1
                paths_acc.append(path)
165
            else:
166
                raise TypeError(
167
                    f"type: '{type(path)}' must be be either list or dict. "
168
                    f"path: {path}"
169
                )
170 1
        return paths_acc
171
172 1
    def k_shortest_paths(
173
        self, source, destination, weight=None, k=1, graph=None
174
    ):
175
        """
176
        Compute up to k shortest paths and return them.
177
178
        This procedure is based on algorithm by Jin Y. Yen [1].
179
        Since Yen's algorithm calls Dijkstra's up to k times, the time
180
        complexity will be proportional to K * Dijkstra's, average
181
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
182
        number of vertices and E number of egdes.
183
184
        References
185
        ----------
186
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
187
           Network", Management Science, Vol. 17, No. 11, Theory Series
188
           (Jul., 1971), pp. 712-716.
189
        """
190 1
        try:
191 1
            return list(
192
                islice(
193
                    nx.shortest_simple_paths(
194
                        graph or self.graph,
195
                        source,
196
                        destination,
197
                        weight=weight,
198
                    ),
199
                    k,
200
                )
201
            )
202 1
        except (NodeNotFound, NetworkXNoPath):
203 1
            return []
204
205 1
    def constrained_k_shortest_paths(
206
        self,
207
        source,
208
        destination,
209
        weight=None,
210
        k=1,
211
        graph=None,
212
        minimum_hits=None,
213
        **metrics,
214
    ):
215
        """Calculate the constrained shortest paths with flexibility."""
216 1
        graph = graph or self.graph
217 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
218 1
        flexible_metrics = metrics.get("flexible_metrics", {})
219 1
        first_pass_links = list(
220
            self._filter_links(
221
                graph.edges(data=True), **mandatory_metrics
222
            )
223
        )
224 1
        length = len(flexible_metrics)
225 1
        if minimum_hits is None:
226 1
            minimum_hits = 0
227 1
        minimum_hits = min(length, max(0, minimum_hits))
228
229 1
        paths = []
230 1
        for i in range(length, minimum_hits - 1, -1):
231 1
            for combo in combinations(flexible_metrics.items(), i):
232 1
                additional = dict(combo)
233 1
                filtered_links = self._filter_links(
234
                    first_pass_links, **additional
235
                )
236 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
237 1
                for path in self.k_shortest_paths(
238
                    source,
239
                    destination,
240
                    weight=weight,
241
                    k=k,
242
                    graph=graph.edge_subgraph(filtered_links),
243
                ):
244 1
                    paths.append(
245
                        {
246
                            "hops": path,
247
                            "metrics": {**mandatory_metrics, **additional},
248
                        }
249
                    )
250 1
                if len(paths) == k:
251 1
                    return paths
252 1
            if paths:
253 1
                return paths
254 1
        return paths
255
256 1
    def _filter_links(self, links, **metrics):
257 1
        for metric, value in metrics.items():
258 1
            filter_func = self._filter_functions.get(metric, None)
259 1
            if filter_func is not None:
260 1
                try:
261 1
                    links = filter_func(value, links)
262 1
                except TypeError as err:
263 1
                    raise TypeError(
264
                        f"Error in {metric} value: {value} err: {err}"
265
                    )
266
        return links
267