Test Failed
Pull Request — master (#65)
by
unknown
06:49
created

KytosGraph.constrained_k_shortest_paths()   B

Complexity

Conditions 7

Size

Total Lines 50
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 41
nop 8
dl 0
loc 50
rs 7.496
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4
from itertools import combinations, islice
5
import operator
6
7
from kytos.core import log
8
from kytos.core.common import EntityStatus
9
 
10
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14
try:
15
    import networkx as nx
16
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25
    def __init__(self):
26
        self.graph = nx.Graph()
27
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.contains,
46
                UseValIfNone(ownership_processor)
47
            ),
48
            "not_ownership": EdgeFilter(
49
                lambda a, b: not (a & b),
50
                UseDefaultIfNone(ownership_processor, frozenset()),
51
                TypeDifferentiatedProcessor({
52
                    str: lambda val: frozenset(val.split(',')),
53
                    list: lambda val: frozenset(val)
54
                })
55
            ),
56
            "bandwidth": EdgeFilter(
57
                operator.ge,
58
                'bandwidth'
59
            ),
60
            "reliability": EdgeFilter(
61
                operator.ge,
62
                'reliability'
63
            ),
64
            "priority": EdgeFilter(
65
                operator.le,
66
                'priority'
67
            ),
68
            "utilization": EdgeFilter(
69
                operator.le,
70
                'utilization'
71
            ),
72
            "delay": EdgeFilter(
73
                operator.le,
74
                'delay'
75
            ),
76
        }
77
        self.spf_edge_data_cbs = {
78
            "hop": nx_edge_data_weight,
79
            "delay": nx_edge_data_delay,
80
            "priority": nx_edge_data_priority,
81
        }
82
83
    def clear(self):
84
        """Remove all nodes and links registered."""
85
        self.graph.clear()
86
87
    def update_topology(self, topology):
88
        """Update all nodes and links inside the graph."""
89
        self.graph.clear()
90
        self.update_nodes(topology.switches)
91
        self.update_links(topology.links)
92
93
    def update_nodes(self, nodes):
94
        """Update all nodes inside the graph."""
95
        for node in nodes.values():
96
            try:
97
                if node.status != EntityStatus.UP:
98
                    continue
99
                self.graph.add_node(node.id)
100
101
                for interface in node.interfaces.values():
102
                    if interface.status == EntityStatus.UP:
103
                        self.graph.add_node(interface.id)
104
                        self.graph.add_edge(node.id, interface.id)
105
106
            except AttributeError as err:
107
                raise TypeError(
108
                    f"Error when updating nodes inside the graph: {str(err)}"
109
                )
110
111
    def update_links(self, links):
112
        """Update all links inside the graph."""
113
        for link in links.values():
114
            if link.status == EntityStatus.UP:
115
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
116
                self.update_link_metadata(link)
117
118
    def update_link_metadata(self, link):
119
        """Update link metadata."""
120
        for key, value in link.metadata.items():
121
            if key not in self._accepted_metadata:
122
                continue
123
            endpoint_a = link.endpoint_a.id
124
            endpoint_b = link.endpoint_b.id
125
            self.graph[endpoint_a][endpoint_b][key] = value
126
127
    def get_link_metadata(self, endpoint_a, endpoint_b):
128
        """Return the metadata of a link."""
129
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
130
131
    @staticmethod
132
    def _remove_switch_hops(circuit):
133
        """Remove switch hops from a circuit hops list."""
134
        for hop in circuit["hops"]:
135
            if len(hop.split(":")) == 8:
136
                circuit["hops"].remove(hop)
137
138
    def _path_cost(self, path, weight="hop", default_cost=1):
139
        """Compute the path cost given an attribute."""
140
        cost = 0
141
        for node, nbr in nx.utils.pairwise(path):
142
            cost += self.graph[node][nbr].get(weight, default_cost)
143
        return cost
144
145
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
146
        """Build the cost of a path given a list of paths."""
147
        paths_acc = []
148
        for path in paths:
149
            if isinstance(path, list):
150
                paths_acc.append(
151
                    {
152
                        "hops": path,
153
                        "cost": self._path_cost(
154
                            path, weight=weight, default_cost=default_weight
155
                        ),
156
                    }
157
                )
158
            elif isinstance(path, dict):
159
                path["cost"] = self._path_cost(
160
                    path["hops"], weight=weight, default_cost=default_weight
161
                )
162
                paths_acc.append(path)
163
            else:
164
                raise TypeError(
165
                    f"type: '{type(path)}' must be be either list or dict. "
166
                    f"path: {path}"
167
                )
168
        return paths_acc
169
170
    def k_shortest_paths(
171
        self, source, destination, weight=None, k=1, graph=None
172
    ):
173
        """
174
        Compute up to k shortest paths and return them.
175
176
        This procedure is based on algorithm by Jin Y. Yen [1].
177
        Since Yen's algorithm calls Dijkstra's up to k times, the time
178
        complexity will be proportional to K * Dijkstra's, average
179
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
180
        number of vertices and E number of egdes.
181
182
        References
183
        ----------
184
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
185
           Network", Management Science, Vol. 17, No. 11, Theory Series
186
           (Jul., 1971), pp. 712-716.
187
        """
188
        try:
189
            return list(
190
                islice(
191
                    nx.shortest_simple_paths(
192
                        graph or self.graph,
193
                        source,
194
                        destination,
195
                        weight=weight,
196
                    ),
197
                    k,
198
                )
199
            )
200
        except (NodeNotFound, NetworkXNoPath):
201
            return []
202
203
    def constrained_k_shortest_paths(
204
        self,
205
        source,
206
        destination,
207
        weight=None,
208
        k=1,
209
        graph=None,
210
        minimum_hits=None,
211
        **metrics,
212
    ):
213
        """Calculate the constrained shortest paths with flexibility."""
214
        graph = graph or self.graph
215
        mandatory_metrics = metrics.get("mandatory_metrics", {})
216
        flexible_metrics = metrics.get("flexible_metrics", {})
217
        first_pass_links = list(
218
            self._filter_links(
219
                graph.edges(data=True), **mandatory_metrics
220
            )
221
        )
222
        length = len(flexible_metrics)
223
        if minimum_hits is None:
224
            minimum_hits = 0
225
        minimum_hits = min(length, max(0, minimum_hits))
226
227
        paths = []
228
        for i in range(length, minimum_hits - 1, -1):
229
            for combo in combinations(flexible_metrics.items(), i):
230
                additional = dict(combo)
231
                filtered_links = self._filter_links(
232
                    first_pass_links, **additional
233
                )
234
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
235
                for path in self.k_shortest_paths(
236
                    source,
237
                    destination,
238
                    weight=weight,
239
                    k=k,
240
                    graph=graph.edge_subgraph(filtered_links),
241
                ):
242
                    paths.append(
243
                        {
244
                            "hops": path,
245
                            "metrics": {**mandatory_metrics, **additional},
246
                        }
247
                    )
248
                if len(paths) == k:
249
                    return paths
250
            if paths:
251
                return paths
252
        return paths
253
254
    def _filter_links(self, links, **metrics):
255
        for metric, value in metrics.items():
256
            filter_func = self._filter_functions.get(metric, None)
257
            if filter_func is not None:
258
                try:
259
                    links = filter_func(value, links)
260
                except TypeError as err:
261
                    raise TypeError(
262
                        f"Error in {metric} value: {value} err: {err}"
263
                    )
264
        return links
265