Test Failed
Pull Request — master (#74)
by
unknown
02:02
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
rs 7.496
c 0
b 0
f 0
ccs 0
cts 0
cp 0
crap 56

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3 1
# pylint: disable=too-many-arguments,too-many-locals
4
from itertools import combinations, islice
5 1
6 1
from kytos.core import log
7 1
from kytos.core.common import EntityStatus
8
from napps.kytos.pathfinder.utils import (filter_ge, filter_in, filter_le,
9
                                          lazy_filter, nx_edge_data_delay,
10
                                          nx_edge_data_priority,
11
                                          nx_edge_data_weight)
12
13 1
try:
14
    import networkx as nx
15
    from networkx.exception import NetworkXNoPath, NodeNotFound
16 1
except ImportError:
17 1
    PACKAGE = "networkx==2.5.1"
18
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
19 1
20
21 1
class KytosGraph:
22
    """Class responsible for the graph generation."""
23 1
24
    def __init__(self):
25 1
        self.graph = nx.Graph()
26 1
        self._filter_functions = {
27 1
            "ownership": lazy_filter(str, filter_in("ownership")),
28
            "bandwidth": lazy_filter((int, float), filter_ge("bandwidth")),
29 1
            "reliability": lazy_filter((int, float), filter_ge("reliability")),
30
            "priority": lazy_filter((int, float), filter_le("priority")),
31 1
            "utilization": lazy_filter((int, float), filter_le("utilization")),
32 1
            "delay": lazy_filter((int, float), filter_le("delay")),
33 1
        }
34
        self.spf_edge_data_cbs = {
35 1
            "hop": nx_edge_data_weight,
36 1
            "delay": nx_edge_data_delay,
37 1
            "priority": nx_edge_data_priority,
38
        }
39
40
    def clear(self):
41
        """Remove all nodes and links registered."""
42 1
        self.graph.clear()
43
44 1
    def update_topology(self, topology):
45 1
        """Update all nodes and links inside the graph."""
46 1
        self.graph.clear()
47 1
        self.update_nodes(topology.switches)
48 1
        self.update_links(topology.links)
49 1
50 1
    def update_nodes(self, nodes):
51 1
        """Update all nodes inside the graph."""
52 1
        for node in nodes.values():
53
            try:
54 1
                if node.status != EntityStatus.UP:
55
                    continue
56 1
                self.graph.add_node(node.id)
57
58
                for interface in node.interfaces.values():
59
                    if interface.status == EntityStatus.UP:
60
                        self.graph.add_node(interface.id)
61
                        self.graph.add_edge(node.id, interface.id)
62 1
63 1
            except AttributeError as err:
64 1
                raise TypeError(
65 1
                    f"Error when updating nodes inside the graph: {str(err)}"
66
                )
67 1
68
    def update_links(self, links):
69
        """Update all links inside the graph."""
70 1
        for link in links.values():
71 1
            if link.status == EntityStatus.UP:
72 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
73
                self.update_link_metadata(link)
74 1
75
    def update_link_metadata(self, link):
76 1
        """Update link metadata."""
77 1
        for key, value in link.metadata.items():
78
            if key not in self._filter_functions:
79
                continue
80
            endpoint_a = link.endpoint_a.id
81
            endpoint_b = link.endpoint_b.id
82 1
            self.graph[endpoint_a][endpoint_b][key] = value
83
84
    def get_link_metadata(self, endpoint_a, endpoint_b):
85
        """Return the metadata of a link."""
86
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
87
88
    @staticmethod
89
    def _remove_switch_hops(circuit):
90
        """Remove switch hops from a circuit hops list."""
91
        for hop in circuit["hops"]:
92
            if len(hop.split(":")) == 8:
93
                circuit["hops"].remove(hop)
94
95
    def _path_cost(self, path, weight="hop", default_cost=1):
96
        """Compute the path cost given an attribute."""
97
        cost = 0
98
        for node, nbr in nx.utils.pairwise(path):
99
            cost += self.graph[node][nbr].get(weight, default_cost)
100
        return cost
101
102
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
103
        """Build the cost of a path given a list of paths."""
104
        paths_acc = []
105
        for path in paths:
106
            if isinstance(path, list):
107
                paths_acc.append(
108
                    {
109
                        "hops": path,
110
                        "cost": self._path_cost(
111
                            path, weight=weight, default_cost=default_weight
112
                        ),
113
                    }
114
                )
115
            elif isinstance(path, dict):
116
                path["cost"] = self._path_cost(
117
                    path["hops"], weight=weight, default_cost=default_weight
118
                )
119
                paths_acc.append(path)
120
            else:
121
                raise TypeError(
122
                    f"type: '{type(path)}' must be be either list or dict. "
123
                    f"path: {path}"
124
                )
125
        return paths_acc
126
127
    def k_shortest_paths(
128
        self, source, destination, weight=None, k=1, graph=None
129
    ):
130
        """
131
        Compute up to k shortest paths and return them.
132
133
        This procedure is based on algorithm by Jin Y. Yen [1].
134
        Since Yen's algorithm calls Dijkstra's up to k times, the time
135
        complexity will be proportional to K * Dijkstra's, average
136
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
137
        number of vertices and E number of egdes.
138
139
        References
140
        ----------
141
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
142
           Network", Management Science, Vol. 17, No. 11, Theory Series
143
           (Jul., 1971), pp. 712-716.
144
        """
145
        try:
146
            return list(
147
                islice(
148
                    nx.shortest_simple_paths(
149
                        graph or self.graph,
150
                        source,
151
                        destination,
152
                        weight=weight,
153
                    ),
154
                    k,
155
                )
156
            )
157
        except (NodeNotFound, NetworkXNoPath):
158
            return []
159
160
    def constrained_k_shortest_paths(
161
        self,
162
        source,
163
        destination,
164
        weight=None,
165
        k=1,
166
        graph=None,
167
        minimum_hits=None,
168
        **metrics,
169
    ):
170
        """Calculate the constrained shortest paths with flexibility."""
171
        graph = graph or self.graph
172
        mandatory_metrics = metrics.get("mandatory_metrics", {})
173
        flexible_metrics = metrics.get("flexible_metrics", {})
174
        first_pass_links = list(
175
            self._filter_links(
176
                graph.edges(data=True), **mandatory_metrics
177
            )
178
        )
179
        length = len(flexible_metrics)
180
        if minimum_hits is None:
181
            minimum_hits = 0
182
        minimum_hits = min(length, max(0, minimum_hits))
183
184
        paths = []
185
        for i in range(length, minimum_hits - 1, -1):
186
            for combo in combinations(flexible_metrics.items(), i):
187
                additional = dict(combo)
188
                filtered_links = self._filter_links(
189
                    first_pass_links, **additional
190
                )
191
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
192
                for path in self.k_shortest_paths(
193
                    source,
194
                    destination,
195
                    weight=weight,
196
                    k=k,
197
                    graph=graph.edge_subgraph(filtered_links),
198
                ):
199
                    paths.append(
200
                        {
201
                            "hops": path,
202
                            "metrics": {**mandatory_metrics, **additional},
203
                        }
204
                    )
205
                if len(paths) == k:
206
                    return paths
207
            if paths:
208
                return paths
209
        return paths
210
211
    def _filter_links(self, links, **metrics):
212
        for metric, value in metrics.items():
213
            filter_func = self._filter_functions.get(metric, None)
214
            if filter_func is not None:
215
                try:
216
                    links = filter_func(value, links)
217
                except TypeError as err:
218
                    raise TypeError(
219
                        f"Error in {metric} value: {value} err: {err}"
220
                    )
221
        return links
222