Passed
Pull Request — master (#65)
by
unknown
03:18
created

build.graph.graph.KytosGraph.update_nodes()   B

Complexity

Conditions 6

Size

Total Lines 16
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 6

Importance

Changes 0
Metric Value
cc 6
eloc 13
nop 2
dl 0
loc 16
ccs 12
cts 12
cp 1
crap 6
rs 8.6666
c 0
b 0
f 0
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
# pylint: enable=too-many-arguments,too-many-locals
4 1
from itertools import combinations, islice
5 1
import operator
6
7 1
from kytos.core import log
8 1
from kytos.core.common import EntityStatus
9
 
10 1
from .filters import EdgeFilter, ProcessEdgeAttribute, TypeCheckPreprocessor, TypeDifferentiatedProcessor, UseDefaultIfNone, UseValIfNone
11 1
from .weights import (nx_edge_data_delay, nx_edge_data_priority, nx_edge_data_weight)
12
13
14 1
try:
15 1
    import networkx as nx
16 1
    from networkx.exception import NetworkXNoPath, NodeNotFound
17
except ImportError:
18
    PACKAGE = "networkx==2.5.1"
19
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
20
21
22 1
class KytosGraph:
23
    """Class responsible for the graph generation."""
24
25 1
    def __init__(self):
26 1
        self.graph = nx.Graph()
27 1
        self._accepted_metadata = {
28
            'ownership',
29
            'bandwidth',
30
            'reliability',
31
            'priority',
32
            'utilization',
33
            'delay',
34
        }
35 1
        ownership_processor = ProcessEdgeAttribute(
36
            'ownership',
37
            TypeDifferentiatedProcessor({
38
                str: lambda x: frozenset(x.split()),
39
                dict: lambda x: frozenset(x.keys()),
40
                type(None): None
41
            })
42
        )
43 1
        self._filter_functions = {
44
            "ownership": EdgeFilter(
45
                operator.contains,
46
                UseValIfNone(ownership_processor),
47
                TypeCheckPreprocessor(
48
                    str
49
                )
50
            ),
51
            "not_ownership": EdgeFilter(
52
                lambda a, b: not (a & b),
53
                UseDefaultIfNone(ownership_processor, frozenset()),
54
                TypeDifferentiatedProcessor({
55
                    str: lambda val: frozenset(val.split(',')),
56
                    list: lambda val: frozenset(val)
57
                })
58
            ),
59
            "bandwidth": EdgeFilter(
60
                operator.ge,
61
                'bandwidth',
62
                TypeCheckPreprocessor((int, float))
63
64
            ),
65
            "reliability": EdgeFilter(
66
                operator.ge,
67
                'reliability',
68
                TypeCheckPreprocessor((int, float))
69
            ),
70
            "priority": EdgeFilter(
71
                operator.le,
72
                'priority',
73
                TypeCheckPreprocessor((int, float))
74
            ),
75
            "utilization": EdgeFilter(
76
                operator.le,
77
                'utilization',
78
                TypeCheckPreprocessor((int, float))
79
            ),
80
            "delay": EdgeFilter(
81
                operator.le,
82
                'delay',
83
                TypeCheckPreprocessor((int, float))
84
            ),
85
        }
86 1
        self.spf_edge_data_cbs = {
87
            "hop": nx_edge_data_weight,
88
            "delay": nx_edge_data_delay,
89
            "priority": nx_edge_data_priority,
90
        }
91
92 1
    def clear(self):
93
        """Remove all nodes and links registered."""
94 1
        self.graph.clear()
95
96 1
    def update_topology(self, topology):
97
        """Update all nodes and links inside the graph."""
98 1
        self.graph.clear()
99 1
        self.update_nodes(topology.switches)
100 1
        self.update_links(topology.links)
101
102 1
    def update_nodes(self, nodes):
103
        """Update all nodes inside the graph."""
104 1
        for node in nodes.values():
105 1
            try:
106 1
                if node.status != EntityStatus.UP:
107 1
                    continue
108 1
                self.graph.add_node(node.id)
109
110 1
                for interface in node.interfaces.values():
111 1
                    if interface.status == EntityStatus.UP:
112 1
                        self.graph.add_node(interface.id)
113 1
                        self.graph.add_edge(node.id, interface.id)
114
115 1
            except AttributeError as err:
116 1
                raise TypeError(
117
                    f"Error when updating nodes inside the graph: {str(err)}"
118
                )
119
120 1
    def update_links(self, links):
121
        """Update all links inside the graph."""
122 1
        for link in links.values():
123 1
            if link.status == EntityStatus.UP:
124 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
125 1
                self.update_link_metadata(link)
126
127 1
    def update_link_metadata(self, link):
128
        """Update link metadata."""
129 1
        for key, value in link.metadata.items():
130 1
            if key not in self._accepted_metadata:
131 1
                continue
132 1
            endpoint_a = link.endpoint_a.id
133 1
            endpoint_b = link.endpoint_b.id
134 1
            self.graph[endpoint_a][endpoint_b][key] = value
135
136 1
    def get_link_metadata(self, endpoint_a, endpoint_b):
137
        """Return the metadata of a link."""
138 1
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
139
140 1
    @staticmethod
141 1
    def _remove_switch_hops(circuit):
142
        """Remove switch hops from a circuit hops list."""
143 1
        for hop in circuit["hops"]:
144 1
            if len(hop.split(":")) == 8:
145 1
                circuit["hops"].remove(hop)
146
147 1
    def _path_cost(self, path, weight="hop", default_cost=1):
148
        """Compute the path cost given an attribute."""
149 1
        cost = 0
150 1
        for node, nbr in nx.utils.pairwise(path):
151 1
            cost += self.graph[node][nbr].get(weight, default_cost)
152 1
        return cost
153
154 1
    def path_cost_builder(self, paths, weight="hop", default_weight=1):
155
        """Build the cost of a path given a list of paths."""
156 1
        paths_acc = []
157 1
        for path in paths:
158 1
            if isinstance(path, list):
159 1
                paths_acc.append(
160
                    {
161
                        "hops": path,
162
                        "cost": self._path_cost(
163
                            path, weight=weight, default_cost=default_weight
164
                        ),
165
                    }
166
                )
167 1
            elif isinstance(path, dict):
168 1
                path["cost"] = self._path_cost(
169
                    path["hops"], weight=weight, default_cost=default_weight
170
                )
171 1
                paths_acc.append(path)
172
            else:
173
                raise TypeError(
174
                    f"type: '{type(path)}' must be be either list or dict. "
175
                    f"path: {path}"
176
                )
177 1
        return paths_acc
178
179 1
    def k_shortest_paths(
180
        self, source, destination, weight=None, k=1, graph=None
181
    ):
182
        """
183
        Compute up to k shortest paths and return them.
184
185
        This procedure is based on algorithm by Jin Y. Yen [1].
186
        Since Yen's algorithm calls Dijkstra's up to k times, the time
187
        complexity will be proportional to K * Dijkstra's, average
188
        O(K(|V| + |E|)logV), assuming it's using a heap, where V is the
189
        number of vertices and E number of egdes.
190
191
        References
192
        ----------
193
        .. [1] Jin Y. Yen, "Finding the K Shortest Loopless Paths in a
194
           Network", Management Science, Vol. 17, No. 11, Theory Series
195
           (Jul., 1971), pp. 712-716.
196
        """
197 1
        try:
198 1
            return list(
199
                islice(
200
                    nx.shortest_simple_paths(
201
                        graph or self.graph,
202
                        source,
203
                        destination,
204
                        weight=weight,
205
                    ),
206
                    k,
207
                )
208
            )
209 1
        except (NodeNotFound, NetworkXNoPath):
210 1
            return []
211
212 1
    def constrained_k_shortest_paths(
213
        self,
214
        source,
215
        destination,
216
        weight=None,
217
        k=1,
218
        graph=None,
219
        minimum_hits=None,
220
        **metrics,
221
    ):
222
        """Calculate the constrained shortest paths with flexibility."""
223 1
        graph = graph or self.graph
224 1
        mandatory_metrics = metrics.get("mandatory_metrics", {})
225 1
        flexible_metrics = metrics.get("flexible_metrics", {})
226 1
        first_pass_links = list(
227
            self._filter_links(
228
                graph.edges(data=True), **mandatory_metrics
229
            )
230
        )
231 1
        length = len(flexible_metrics)
232 1
        if minimum_hits is None:
233 1
            minimum_hits = 0
234 1
        minimum_hits = min(length, max(0, minimum_hits))
235
236 1
        paths = []
237 1
        for i in range(length, minimum_hits - 1, -1):
238 1
            for combo in combinations(flexible_metrics.items(), i):
239 1
                additional = dict(combo)
240 1
                filtered_links = self._filter_links(
241
                    first_pass_links, **additional
242
                )
243 1
                filtered_links = ((u, v) for u, v, d in filtered_links)
0 ignored issues
show
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
244 1
                for path in self.k_shortest_paths(
245
                    source,
246
                    destination,
247
                    weight=weight,
248
                    k=k,
249
                    graph=graph.edge_subgraph(filtered_links),
250
                ):
251 1
                    paths.append(
252
                        {
253
                            "hops": path,
254
                            "metrics": {**mandatory_metrics, **additional},
255
                        }
256
                    )
257 1
                if len(paths) == k:
258 1
                    return paths
259 1
            if paths:
260 1
                return paths
261 1
        return paths
262
263 1
    def _filter_links(self, links, **metrics):
264 1
        for metric, value in metrics.items():
265 1
            filter_func = self._filter_functions.get(metric, None)
266 1
            if filter_func is not None:
267 1
                try:
268 1
                    links = filter_func(value, links)
269 1
                except TypeError as err:
270 1
                    raise TypeError(
271
                        f"Error in {metric} value: {value} err: {err}"
272
                    )
273
        return links
274