Passed
Pull Request — master (#65)
by
unknown
03:10
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
ccs 22
cts 22
cp 1
crap 7
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                list: lambda x: frozenset(x),
41
                type(None): None
42
            })
43
        )
44 1
        self._filter_functions = {
45
            "ownership": EdgeFilter(
46
                operator.contains,
47
                UseValIfNone(ownership_processor)
48
            ),
49
            "not_ownership": EdgeFilter(
50
                lambda a, b: not (a & b),
51
                UseDefaultIfNone(ownership_processor, frozenset()),
52
                TypeDifferentiatedProcessor({
53
                    str: lambda val: frozenset(val.split(',')),
54
                    list: lambda val: frozenset(val)
55
                })
56
            ),
57
            "bandwidth": EdgeFilter(
58
                operator.ge,
59
                'bandwidth'
60
            ),
61
            "reliability": EdgeFilter(
62
                operator.ge,
63
                'reliability'
64
            ),
65
            "priority": EdgeFilter(
66
                operator.le,
67
                'priority'
68
            ),
69
            "utilization": EdgeFilter(
70
                operator.le,
71
                'utilization'
72
            ),
73
            "delay": EdgeFilter(
74
                operator.le,
75
                'delay'
76
            ),
77
        }
78 1
        self.spf_edge_data_cbs = {
79
            "hop": nx_edge_data_weight,
80
            "delay": nx_edge_data_delay,
81
            "priority": nx_edge_data_priority,
82
        }
83
84 1
    def clear(self):
85
        """Remove all nodes and links registered."""
86 1
        self.graph.clear()
87
88 1
    def update_topology(self, topology):
89
        """Update all nodes and links inside the graph."""
90 1
        self.graph.clear()
91 1
        self.update_nodes(topology.switches)
92 1
        self.update_links(topology.links)
93
94 1
    def update_nodes(self, nodes):
95
        """Update all nodes inside the graph."""
96 1
        for node in nodes.values():
97 1
            try:
98 1
                if node.status != EntityStatus.UP:
99 1
                    continue
100 1
                self.graph.add_node(node.id)
101
102 1
                for interface in node.interfaces.values():
103 1
                    if interface.status == EntityStatus.UP:
104 1
                        self.graph.add_node(interface.id)
105 1
                        self.graph.add_edge(node.id, interface.id)
106
107 1
            except AttributeError as err:
108 1
                raise TypeError(
109
                    f"Error when updating nodes inside the graph: {str(err)}"
110
                )
111
112 1
    def update_links(self, links):
113
        """Update all links inside the graph."""
114 1
        for link in links.values():
115 1
            if link.status == EntityStatus.UP:
116 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
117 1
                self.update_link_metadata(link)
118
119 1
    def update_link_metadata(self, link):
120
        """Update link metadata."""
121 1
        for key, value in link.metadata.items():
122 1
            if key not in self._accepted_metadata:
123 1
                continue
124 1
            endpoint_a = link.endpoint_a.id
125 1
            endpoint_b = link.endpoint_b.id
126 1
            self.graph[endpoint_a][endpoint_b][key] = value
127
128 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
129
        """Return the metadata of a link."""
130 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
131
132 1
    @staticmethod
133 1
    def _remove_switch_hops(circuit):
134
        """Remove switch hops from a circuit hops list."""
135 1
        for hop in circuit["hops"]:
136 1
            if len(hop.split(":")) == 8:
137 1
                circuit["hops"].remove(hop)
138
139 1
    def _path_cost(self, path, weight="hop", default_cost=1):
140
        """Compute the path cost given an attribute."""
141 1
        cost = 0
142 1
        for node, nbr in nx.utils.pairwise(path):
143 1
            cost += self.graph[node][nbr].get(weight, default_cost)
144 1
        return cost
145
146 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
147
        """Build the cost of a path given a list of paths."""
148 1
        paths_acc = []
149 1
        for path in paths:
150 1
            if isinstance(path, list):
151 1
                paths_acc.append(
152
                    {
153
                        "hops": path,
154
                        "cost": self._path_cost(
155
                            path, weight=weight, default_cost=default_weight
156
                        ),
157
                    }
158
                )
159 1
            elif isinstance(path, dict):
160 1
                path["cost"] = self._path_cost(
161
                    path["hops"], weight=weight, default_cost=default_weight
162
                )
163 1
                paths_acc.append(path)
164
            else:
165
                raise TypeError(
166
                    f"type: '{type(path)}' must be be either list or dict. "
167
                    f"path: {path}"
168
                )
169 1
        return paths_acc
170
171 1
    def k_shortest_paths(
172
        self, source, destination, weight=None, k=1, graph=None
173
    ):
174
        """
175
        Compute up to k shortest paths and return them.
176
177
        This procedure is based on algorithm by Jin Y. Yen [1].
178
        Since Yen's algorithm calls Dijkstra's up to k times, the time
179
        complexity will be proportional to K * Dijkstra's, average
180
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
181
        number of vertices and E number of egdes.
182
183
        References
184
        ----------
185
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
186
           Network", Management Science, Vol. 17, No. 11, Theory Series
187
           (Jul., 1971), pp. 712-716.
188
        """
189 1
        try:
190 1
            return list(
191
                islice(
192
                    nx.shortest_simple_paths(
193
                        graph or self.graph,
194
                        source,
195
                        destination,
196
                        weight=weight,
197
                    ),
198
                    k,
199
                )
200
            )
201 1
        except (NodeNotFound, NetworkXNoPath):
202 1
            return []
203
204 1
    def constrained_k_shortest_paths(
205
        self,
206
        source,
207
        destination,
208
        weight=None,
209
        k=1,
210
        graph=None,
211
        minimum_hits=None,
212
        **metrics,
213
    ):
214
        """Calculate the constrained shortest paths with flexibility."""
215 1
        graph = graph or self.graph
216 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
217 1
        flexible_metrics = metrics.get("flexible_metrics", {})
218 1
        first_pass_links = list(
219
            self._filter_links(
220
                graph.edges(data=True), **mandatory_metrics
221
            )
222
        )
223 1
        length = len(flexible_metrics)
224 1
        if minimum_hits is None:
225 1
            minimum_hits = 0
226 1
        minimum_hits = min(length, max(0, minimum_hits))
227
228 1
        paths = []
229 1
        for i in range(length, minimum_hits - 1, -1):
230 1
            for combo in combinations(flexible_metrics.items(), i):
231 1
                additional = dict(combo)
232 1
                filtered_links = self._filter_links(
233
                    first_pass_links, **additional
234
                )
235 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
236 1
                for path in self.k_shortest_paths(
237
                    source,
238
                    destination,
239
                    weight=weight,
240
                    k=k,
241
                    graph=graph.edge_subgraph(filtered_links),
242
                ):
243 1
                    paths.append(
244
                        {
245
                            "hops": path,
246
                            "metrics": {**mandatory_metrics, **additional},
247
                        }
248
                    )
249 1
                if len(paths) == k:
250 1
                    return paths
251 1
            if paths:
252 1
                return paths
253 1
        return paths
254
255 1
    def _filter_links(self, links, **metrics):
256 1
        for metric, value in metrics.items():
257 1
            filter_func = self._filter_functions.get(metric, None)
258 1
            if filter_func is not None:
259 1
                try:
260 1
                    links = filter_func(value, links)
261
                except TypeError as err:
262
                    raise TypeError(
263
                        f"Error in {metric} value: {value} err: {err}"
264
                    )
265
        return links
266