Passed
Pull Request — master (#65)
by
unknown
03:39
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
ccs 22
cts 22
cp 1
crap 7
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43 1
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.contains,
46
                UseValIfNone(ownership_processor),
47
                TypeCheckPreprocessor(
48
                    str
49
                )
50
            ),
51
            "not_ownership": EdgeFilter(
52
                lambda a, b: not (a & b),
53
                UseDefaultIfNone(ownership_processor, frozenset()),
54
                TypeCheckPreprocessor(
55
                    str,
56
                    lambda val: frozenset(val.split(','))
57
                )
58
            ),
59
            "bandwidth": EdgeFilter(
60
                operator.ge,
61
                'bandwidth',
62
                TypeCheckPreprocessor((int, float))
63
64
            ),
65
            "reliability": EdgeFilter(
66
                operator.ge,
67
                'reliability',
68
                TypeCheckPreprocessor((int, float))
69
            ),
70
            "priority": EdgeFilter(
71
                operator.le,
72
                'priority',
73
                TypeCheckPreprocessor((int, float))
74
            ),
75
            "utilization": EdgeFilter(
76
                operator.le,
77
                'utilization',
78
                TypeCheckPreprocessor((int, float))
79
            ),
80
            "delay": EdgeFilter(
81
                operator.le,
82
                'delay',
83
                TypeCheckPreprocessor((int, float))
84
            ),
85
        }
86 1
        self.spf_edge_data_cbs = {
87
            "hop": nx_edge_data_weight,
88
            "delay": nx_edge_data_delay,
89
            "priority": nx_edge_data_priority,
90
        }
91
92 1
    def clear(self):
93
        """Remove all nodes and links registered."""
94 1
        self.graph.clear()
95
96 1
    def update_topology(self, topology):
97
        """Update all nodes and links inside the graph."""
98 1
        self.graph.clear()
99 1
        self.update_nodes(topology.switches)
100 1
        self.update_links(topology.links)
101
102 1
    def update_nodes(self, nodes):
103
        """Update all nodes inside the graph."""
104 1
        for node in nodes.values():
105 1
            try:
106 1
                if node.status != EntityStatus.UP:
107 1
                    continue
108 1
                self.graph.add_node(node.id)
109
110 1
                for interface in node.interfaces.values():
111 1
                    if interface.status == EntityStatus.UP:
112 1
                        self.graph.add_node(interface.id)
113 1
                        self.graph.add_edge(node.id, interface.id)
114
115 1
            except AttributeError as err:
116 1
                raise TypeError(
117
                    f"Error when updating nodes inside the graph: {str(err)}"
118
                )
119
120 1
    def update_links(self, links):
121
        """Update all links inside the graph."""
122 1
        for link in links.values():
123 1
            if link.status == EntityStatus.UP:
124 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
125 1
                self.update_link_metadata(link)
126
127 1
    def update_link_metadata(self, link):
128
        """Update link metadata."""
129 1
        for key, value in link.metadata.items():
130 1
            if key not in self._accepted_metadata:
131 1
                continue
132 1
            endpoint_a = link.endpoint_a.id
133 1
            endpoint_b = link.endpoint_b.id
134 1
            self.graph[endpoint_a][endpoint_b][key] = value
135
136 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
137
        """Return the metadata of a link."""
138 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
139
140 1
    @staticmethod
141 1
    def _remove_switch_hops(circuit):
142
        """Remove switch hops from a circuit hops list."""
143 1
        for hop in circuit["hops"]:
144 1
            if len(hop.split(":")) == 8:
145 1
                circuit["hops"].remove(hop)
146
147 1
    def _path_cost(self, path, weight="hop", default_cost=1):
148
        """Compute the path cost given an attribute."""
149 1
        cost = 0
150 1
        for node, nbr in nx.utils.pairwise(path):
151 1
            cost += self.graph[node][nbr].get(weight, default_cost)
152 1
        return cost
153
154 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
155
        """Build the cost of a path given a list of paths."""
156 1
        paths_acc = []
157 1
        for path in paths:
158 1
            if isinstance(path, list):
159 1
                paths_acc.append(
160
                    {
161
                        "hops": path,
162
                        "cost": self._path_cost(
163
                            path, weight=weight, default_cost=default_weight
164
                        ),
165
                    }
166
                )
167 1
            elif isinstance(path, dict):
168 1
                path["cost"] = self._path_cost(
169
                    path["hops"], weight=weight, default_cost=default_weight
170
                )
171 1
                paths_acc.append(path)
172
            else:
173
                raise TypeError(
174
                    f"type: '{type(path)}' must be be either list or dict. "
175
                    f"path: {path}"
176
                )
177 1
        return paths_acc
178
179 1
    def k_shortest_paths(
180
        self, source, destination, weight=None, k=1, graph=None
181
    ):
182
        """
183
        Compute up to k shortest paths and return them.
184
185
        This procedure is based on algorithm by Jin Y. Yen [1].
186
        Since Yen's algorithm calls Dijkstra's up to k times, the time
187
        complexity will be proportional to K * Dijkstra's, average
188
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
189
        number of vertices and E number of egdes.
190
191
        References
192
        ----------
193
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
194
           Network", Management Science, Vol. 17, No. 11, Theory Series
195
           (Jul., 1971), pp. 712-716.
196
        """
197 1
        try:
198 1
            return list(
199
                islice(
200
                    nx.shortest_simple_paths(
201
                        graph or self.graph,
202
                        source,
203
                        destination,
204
                        weight=weight,
205
                    ),
206
                    k,
207
                )
208
            )
209 1
        except (NodeNotFound, NetworkXNoPath):
210 1
            return []
211
212 1
    def constrained_k_shortest_paths(
213
        self,
214
        source,
215
        destination,
216
        weight=None,
217
        k=1,
218
        graph=None,
219
        minimum_hits=None,
220
        **metrics,
221
    ):
222
        """Calculate the constrained shortest paths with flexibility."""
223 1
        graph = graph or self.graph
224 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
225 1
        flexible_metrics = metrics.get("flexible_metrics", {})
226 1
        first_pass_links = list(
227
            self._filter_links(
228
                graph.edges(data=True), **mandatory_metrics
229
            )
230
        )
231 1
        length = len(flexible_metrics)
232 1
        if minimum_hits is None:
233 1
            minimum_hits = 0
234 1
        minimum_hits = min(length, max(0, minimum_hits))
235
236 1
        paths = []
237 1
        for i in range(length, minimum_hits - 1, -1):
238 1
            for combo in combinations(flexible_metrics.items(), i):
239 1
                additional = dict(combo)
240 1
                filtered_links = self._filter_links(
241
                    first_pass_links, **additional
242
                )
243 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
244 1
                for path in self.k_shortest_paths(
245
                    source,
246
                    destination,
247
                    weight=weight,
248
                    k=k,
249
                    graph=graph.edge_subgraph(filtered_links),
250
                ):
251 1
                    paths.append(
252
                        {
253
                            "hops": path,
254
                            "metrics": {**mandatory_metrics, **additional},
255
                        }
256
                    )
257 1
                if len(paths) == k:
258 1
                    return paths
259 1
            if paths:
260 1
                return paths
261 1
        return paths
262
263 1
    def _filter_links(self, links, **metrics):
264 1
        for metric, value in metrics.items():
265 1
            filter_func = self._filter_functions.get(metric, None)
266 1
            if filter_func is not None:
267 1
                try:
268 1
                    links = filter_func(value, links)
269 1
                except TypeError as err:
270 1
                    raise TypeError(
271
                        f"Error in {metric} value: {value} err: {err}"
272
                    )
273
        return links
274