KytosGraph.constrained_k_shortest_paths()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
ccs 22
cts 22
cp 1
crap 7
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
import networkx as nx
15 1
from networkx.exception import NetworkXNoPath, NodeNotFound
16
17
18 1
class KytosGraph:
19
    """Class responsible for the graph generation."""
20
21 1
    def __init__(self):
22 1
        self.graph = nx.Graph()
23 1
        self._accepted_metadata = {
24
            'ownership',
25
            'bandwidth',
26
            'reliability',
27
            'priority',
28
            'utilization',
29
            'delay',
30
        }
31 1
        ownership_processor = ProcessEdgeAttribute(
32
            'ownership',
33
            TypeDifferentiatedProcessor({
34
                str: lambda x: frozenset(x.split()),
35
                dict: lambda x: frozenset(x.keys()),
36
                list: lambda x: frozenset(x),
37
                type(None): None
38
            })
39
        )
40 1
        self._filter_functions = {
41
            "ownership": EdgeFilter(
42
                operator.contains,
43
                UseValIfNone(ownership_processor)
44
            ),
45
            "not_ownership": EdgeFilter(
46
                lambda a, b: not (a & b),
47
                UseDefaultIfNone(ownership_processor, frozenset()),
48
                TypeDifferentiatedProcessor({
49
                    str: lambda val: frozenset(val.split(',')),
50
                    list: lambda val: frozenset(val)
51
                })
52
            ),
53
            "bandwidth": EdgeFilter(
54
                operator.ge,
55
                'bandwidth'
56
            ),
57
            "reliability": EdgeFilter(
58
                operator.ge,
59
                'reliability'
60
            ),
61
            "priority": EdgeFilter(
62
                operator.le,
63
                'priority'
64
            ),
65
            "utilization": EdgeFilter(
66
                operator.le,
67
                'utilization'
68
            ),
69
            "delay": EdgeFilter(
70
                operator.le,
71
                'delay'
72
            ),
73
        }
74 1
        self.spf_edge_data_cbs = {
75
            "hop": nx_edge_data_weight,
76
            "delay": nx_edge_data_delay,
77
            "priority": nx_edge_data_priority,
78
        }
79
80 1
    def clear(self):
81
        """Remove all nodes and links registered."""
82 1
        self.graph.clear()
83
84 1
    def update_topology(self, topology):
85
        """Update all nodes and links inside the graph."""
86 1
        self.graph.clear()
87 1
        self.update_nodes(topology.switches.copy())
88 1
        self.update_links(topology.links.copy())
89
90 1
    def update_nodes(self, nodes):
91
        """Update all nodes inside the graph."""
92 1
        for node in nodes.values():
93 1
            try:
94 1
                if node.status != EntityStatus.UP:
95 1
                    continue
96 1
                self.graph.add_node(node.id)
97
98 1
                for interface in node.interfaces.copy().values():
99 1
                    if interface.status == EntityStatus.UP:
100 1
                        self.graph.add_node(interface.id)
101 1
                        self.graph.add_edge(node.id, interface.id)
102
103 1
            except AttributeError as err:
104 1
                raise TypeError(
105
                    f"Error when updating nodes inside the graph: {str(err)}"
106
                )
107
108 1
    def update_links(self, links):
109
        """Update all links inside the graph."""
110 1
        for link in links.values():
111 1
            if link.status == EntityStatus.UP:
112 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
113 1
                self.update_link_metadata(link)
114
115 1
    def update_link_metadata(self, link):
116
        """Update link metadata."""
117 1
        for key, value in link.metadata.copy().items():
118 1
            if key not in self._accepted_metadata:
119 1
                continue
120 1
            endpoint_a = link.endpoint_a.id
121 1
            endpoint_b = link.endpoint_b.id
122 1
            self.graph[endpoint_a][endpoint_b][key] = value
123
124 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
125
        """Return the metadata of a link."""
126 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
127
128 1
    @staticmethod
129 1
    def _remove_switch_hops(circuit):
130
        """Remove switch hops from a circuit hops list."""
131 1
        for hop in circuit["hops"]:
132 1
            if len(hop.split(":")) == 8:
133 1
                circuit["hops"].remove(hop)
134
135 1
    def _path_cost(self, path, weight="hop", default_cost=1):
136
        """Compute the path cost given an attribute."""
137 1
        cost = 0
138 1
        for node, nbr in nx.utils.pairwise(path):
139 1
            cost += self.graph[node][nbr].get(weight, default_cost)
140 1
        return cost
141
142 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
143
        """Build the cost of a path given a list of paths."""
144 1
        paths_acc = []
145 1
        for path in paths:
146 1
            if isinstance(path, list):
147 1
                paths_acc.append(
148
                    {
149
                        "hops": path,
150
                        "cost": self._path_cost(
151
                            path, weight=weight, default_cost=default_weight
152
                        ),
153
                    }
154
                )
155 1
            elif isinstance(path, dict):
156 1
                path["cost"] = self._path_cost(
157
                    path["hops"], weight=weight, default_cost=default_weight
158
                )
159 1
                paths_acc.append(path)
160
            else:
161
                raise TypeError(
162
                    f"type: '{type(path)}' must be be either list or dict. "
163
                    f"path: {path}"
164
                )
165 1
        return paths_acc
166
167 1
    def k_shortest_paths(
168
        self, source, destination, weight=None, k=1, graph=None
169
    ):
170
        """
171
        Compute up to k shortest paths and return them.
172
173
        This procedure is based on algorithm by Jin Y. Yen [1].
174
        Since Yen's algorithm calls Dijkstra's up to k times, the time
175
        complexity will be proportional to K * Dijkstra's, average
176
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
177
        number of vertices and E number of egdes.
178
179
        References
180
        ----------
181
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
182
           Network", Management Science, Vol. 17, No. 11, Theory Series
183
           (Jul., 1971), pp. 712-716.
184
        """
185 1
        try:
186 1
            return list(
187
                islice(
188
                    nx.shortest_simple_paths(
189
                        graph or self.graph,
190
                        source,
191
                        destination,
192
                        weight=weight,
193
                    ),
194
                    k,
195
                )
196
            )
197 1
        except (NodeNotFound, NetworkXNoPath):
198 1
            return []
199
200 1
    def constrained_k_shortest_paths(
201
        self,
202
        source,
203
        destination,
204
        weight=None,
205
        k=1,
206
        graph=None,
207
        minimum_hits=None,
208
        **metrics,
209
    ):
210
        """Calculate the constrained shortest paths with flexibility."""
211 1
        graph = graph or self.graph
212 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
213 1
        flexible_metrics = metrics.get("flexible_metrics", {})
214 1
        first_pass_links = list(
215
            self._filter_links(
216
                graph.edges(data=True), **mandatory_metrics
217
            )
218
        )
219 1
        length = len(flexible_metrics)
220 1
        if minimum_hits is None:
221 1
            minimum_hits = 0
222 1
        minimum_hits = min(length, max(0, minimum_hits))
223
224 1
        paths = []
225 1
        for i in range(length, minimum_hits - 1, -1):
226 1
            for combo in combinations(flexible_metrics.items(), i):
227 1
                additional = dict(combo)
228 1
                filtered_links = self._filter_links(
229
                    first_pass_links, **additional
230
                )
231 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
232 1
                for path in self.k_shortest_paths(
233
                    source,
234
                    destination,
235
                    weight=weight,
236
                    k=k,
237
                    graph=graph.edge_subgraph(filtered_links),
238
                ):
239 1
                    paths.append(
240
                        {
241
                            "hops": path,
242
                            "metrics": {**mandatory_metrics, **additional},
243
                        }
244
                    )
245 1
                if len(paths) == k:
246 1
                    return paths
247 1
            if paths:
248 1
                return paths
249 1
        return paths
250
251 1
    def _filter_links(self, links, **metrics):
252 1
        for metric, value in metrics.items():
253 1
            filter_func = self._filter_functions.get(metric, None)
254 1
            if filter_func is not None:
255 1
                try:
256 1
                    links = filter_func(value, links)
257
                except TypeError as err:
258
                    raise TypeError(
259
                        f"Error in {metric} value: {value} err: {err}"
260
                    )
261
        return links
262