Test Failed
Pull Request — master (#65)
by
unknown
06:49
created

build.graph.graph.KytosGraph._remove_switch_hops()   A

Complexity

Conditions 3

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 5
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4
from itertools import combinations, islice
5
import operator
6
7
from kytos.core import log
8
from kytos.core.common import EntityStatus
9
 
10
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14
try:
15
    import networkx as nx
16
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25
    def __init__(self):
26
        self.graph = nx.Graph()
27
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.contains,
46
                UseValIfNone(ownership_processor)
47
            ),
48
            "not_ownership": EdgeFilter(
49
                lambda a, b: not (a & b),
50
                UseDefaultIfNone(ownership_processor, frozenset()),
51
                TypeDifferentiatedProcessor({
52
                    str: lambda val: frozenset(val.split(',')),
53
                    list: lambda val: frozenset(val)
54
                })
55
            ),
56
            "bandwidth": EdgeFilter(
57
                operator.ge,
58
                'bandwidth'
59
            ),
60
            "reliability": EdgeFilter(
61
                operator.ge,
62
                'reliability'
63
            ),
64
            "priority": EdgeFilter(
65
                operator.le,
66
                'priority'
67
            ),
68
            "utilization": EdgeFilter(
69
                operator.le,
70
                'utilization'
71
            ),
72
            "delay": EdgeFilter(
73
                operator.le,
74
                'delay'
75
            ),
76
        }
77
        self.spf_edge_data_cbs = {
78
            "hop": nx_edge_data_weight,
79
            "delay": nx_edge_data_delay,
80
            "priority": nx_edge_data_priority,
81
        }
82
83
    def clear(self):
84
        """Remove all nodes and links registered."""
85
        self.graph.clear()
86
87
    def update_topology(self, topology):
88
        """Update all nodes and links inside the graph."""
89
        self.graph.clear()
90
        self.update_nodes(topology.switches)
91
        self.update_links(topology.links)
92
93
    def update_nodes(self, nodes):
94
        """Update all nodes inside the graph."""
95
        for node in nodes.values():
96
            try:
97
                if node.status != EntityStatus.UP:
98
                    continue
99
                self.graph.add_node(node.id)
100
101
                for interface in node.interfaces.values():
102
                    if interface.status == EntityStatus.UP:
103
                        self.graph.add_node(interface.id)
104
                        self.graph.add_edge(node.id, interface.id)
105
106
            except AttributeError as err:
107
                raise TypeError(
108
                    f"Error when updating nodes inside the graph: {str(err)}"
109
                )
110
111
    def update_links(self, links):
112
        """Update all links inside the graph."""
113
        for link in links.values():
114
            if link.status == EntityStatus.UP:
115
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
116
                self.update_link_metadata(link)
117
118
    def update_link_metadata(self, link):
119
        """Update link metadata."""
120
        for key, value in link.metadata.items():
121
            if key not in self._accepted_metadata:
122
                continue
123
            endpoint_a = link.endpoint_a.id
124
            endpoint_b = link.endpoint_b.id
125
            self.graph[endpoint_a][endpoint_b][key] = value
126
127
    def get_link_metadata(self, endpoint_a, endpoint_b):
128
        """Return the metadata of a link."""
129
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
130
131
    @staticmethod
132
    def _remove_switch_hops(circuit):
133
        """Remove switch hops from a circuit hops list."""
134
        for hop in circuit["hops"]:
135
            if len(hop.split(":")) == 8:
136
                circuit["hops"].remove(hop)
137
138
    def _path_cost(self, path, weight="hop", default_cost=1):
139
        """Compute the path cost given an attribute."""
140
        cost = 0
141
        for node, nbr in nx.utils.pairwise(path):
142
            cost += self.graph[node][nbr].get(weight, default_cost)
143
        return cost
144
145
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
146
        """Build the cost of a path given a list of paths."""
147
        paths_acc = []
148
        for path in paths:
149
            if isinstance(path, list):
150
                paths_acc.append(
151
                    {
152
                        "hops": path,
153
                        "cost": self._path_cost(
154
                            path, weight=weight, default_cost=default_weight
155
                        ),
156
                    }
157
                )
158
            elif isinstance(path, dict):
159
                path["cost"] = self._path_cost(
160
                    path["hops"], weight=weight, default_cost=default_weight
161
                )
162
                paths_acc.append(path)
163
            else:
164
                raise TypeError(
165
                    f"type: '{type(path)}' must be be either list or dict. "
166
                    f"path: {path}"
167
                )
168
        return paths_acc
169
170
    def k_shortest_paths(
171
        self, source, destination, weight=None, k=1, graph=None
172
    ):
173
        """
174
        Compute up to k shortest paths and return them.
175
176
        This procedure is based on algorithm by Jin Y. Yen [1].
177
        Since Yen's algorithm calls Dijkstra's up to k times, the time
178
        complexity will be proportional to K * Dijkstra's, average
179
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
180
        number of vertices and E number of egdes.
181
182
        References
183
        ----------
184
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
185
           Network", Management Science, Vol. 17, No. 11, Theory Series
186
           (Jul., 1971), pp. 712-716.
187
        """
188
        try:
189
            return list(
190
                islice(
191
                    nx.shortest_simple_paths(
192
                        graph or self.graph,
193
                        source,
194
                        destination,
195
                        weight=weight,
196
                    ),
197
                    k,
198
                )
199
            )
200
        except (NodeNotFound, NetworkXNoPath):
201
            return []
202
203
    def constrained_k_shortest_paths(
204
        self,
205
        source,
206
        destination,
207
        weight=None,
208
        k=1,
209
        graph=None,
210
        minimum_hits=None,
211
        **metrics,
212
    ):
213
        """Calculate the constrained shortest paths with flexibility."""
214
        graph = graph or self.graph
215
        mandatory_metrics = metrics.get("mandatory_metrics", {})
216
        flexible_metrics = metrics.get("flexible_metrics", {})
217
        first_pass_links = list(
218
            self._filter_links(
219
                graph.edges(data=True), **mandatory_metrics
220
            )
221
        )
222
        length = len(flexible_metrics)
223
        if minimum_hits is None:
224
            minimum_hits = 0
225
        minimum_hits = min(length, max(0, minimum_hits))
226
227
        paths = []
228
        for i in range(length, minimum_hits - 1, -1):
229
            for combo in combinations(flexible_metrics.items(), i):
230
                additional = dict(combo)
231
                filtered_links = self._filter_links(
232
                    first_pass_links, **additional
233
                )
234
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
235
                for path in self.k_shortest_paths(
236
                    source,
237
                    destination,
238
                    weight=weight,
239
                    k=k,
240
                    graph=graph.edge_subgraph(filtered_links),
241
                ):
242
                    paths.append(
243
                        {
244
                            "hops": path,
245
                            "metrics": {**mandatory_metrics, **additional},
246
                        }
247
                    )
248
                if len(paths) == k:
249
                    return paths
250
            if paths:
251
                return paths
252
        return paths
253
254
    def _filter_links(self, links, **metrics):
255
        for metric, value in metrics.items():
256
            filter_func = self._filter_functions.get(metric, None)
257
            if filter_func is not None:
258
                try:
259
                    links = filter_func(value, links)
260
                except TypeError as err:
261
                    raise TypeError(
262
                        f"Error in {metric} value: {value} err: {err}"
263
                    )
264
        return links
265