1 | """Module Graph of kytos/pathfinder Kytos Network Application.""" |
||
2 | |||
3 | from kytos.core import log |
||
4 | |||
5 | try: |
||
6 | import networkx as nx |
||
7 | from networkx.exception import NodeNotFound, NetworkXNoPath |
||
8 | except ImportError: |
||
9 | PACKAGE = 'networkx>=2.2' |
||
10 | log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'") |
||
11 | |||
12 | from itertools import combinations |
||
13 | |||
14 | class Filter: |
||
15 | def __init__(self, filter_type, filter_function): |
||
16 | self._filter_type = filter_type |
||
17 | self._filter_fun = filter_function |
||
18 | |||
19 | def run(self,value, items): |
||
20 | if isinstance(value, self._filter_type): |
||
21 | fun0 = self._filter_fun(value) |
||
22 | return filter(fun0, items) |
||
23 | else: |
||
24 | raise TypeError(f"Expected type: {self._filter_type}") |
||
25 | |||
26 | |||
27 | |||
28 | class KytosGraph: |
||
29 | """Class responsible for the graph generation.""" |
||
30 | |||
31 | def __init__(self): |
||
32 | self.graph = nx.Graph() |
||
33 | self._filter_fun_dict = {} |
||
34 | def filterLEQ(metric):# Lower values are better |
||
35 | return lambda x: (lambda y: y[2].get(metric,x) <= x) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
36 | def filterGEQ(metric):# Higher values are better |
||
37 | return lambda x: (lambda y: y[2].get(metric,x) >= x) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
38 | def filterEEQ(metric):# Equivalence |
||
39 | return lambda x: (lambda y: y[2].get(metric,x) == x) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
40 | |||
41 | |||
42 | self._filter_fun_dict["ownership"] = Filter(str,filterEEQ("ownership")) |
||
43 | self._filter_fun_dict["bandwidth"] = Filter((int,float),filterGEQ("bandwidth")) |
||
44 | self._filter_fun_dict["priority"] = Filter((int,float),filterGEQ("priority")) |
||
45 | self._filter_fun_dict["reliability"] = Filter((int,float),filterGEQ("reliability")) |
||
46 | self._filter_fun_dict["utilization"] = Filter((int,float),filterLEQ("utilization")) |
||
47 | self._filter_fun_dict["delay"] = Filter((int,float),filterLEQ("delay")) |
||
48 | self._path_fun = nx.all_shortest_paths |
||
49 | |||
50 | |||
51 | def set_path_fun(self, path_fun): |
||
52 | self._path_fun = path_fun |
||
53 | |||
54 | def clear(self): |
||
55 | """Remove all nodes and links registered.""" |
||
56 | self.graph.clear() |
||
57 | |||
58 | def update_topology(self, topology): |
||
59 | """Update all nodes and links inside the graph.""" |
||
60 | self.graph.clear() |
||
61 | self.update_nodes(topology.switches) |
||
62 | self.update_links(topology.links) |
||
63 | |||
64 | def update_nodes(self, nodes): |
||
65 | """Update all nodes inside the graph.""" |
||
66 | for node in nodes.values(): |
||
67 | try: |
||
68 | self.graph.add_node(node.id) |
||
69 | |||
70 | for interface in node.interfaces.values(): |
||
71 | self.graph.add_node(interface.id) |
||
72 | self.graph.add_edge(node.id, interface.id) |
||
73 | |||
74 | except AttributeError: |
||
75 | pass |
||
76 | |||
77 | def update_links(self, links): |
||
78 | """Update all links inside the graph.""" |
||
79 | keys = [] |
||
80 | for link in links.values(): |
||
81 | if link.is_active(): |
||
82 | self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id) |
||
83 | for key, value in link.metadata.items(): |
||
84 | keys.append(key) |
||
85 | endpoint_a = link.endpoint_a.id |
||
86 | endpoint_b = link.endpoint_b.id |
||
87 | self.graph[endpoint_a][endpoint_b][key] = value |
||
88 | |||
89 | def get_metadata_from_link(self, endpoint_a, endpoint_b): |
||
90 | """Return the metadata of a link.""" |
||
91 | return self.graph.edges[endpoint_a, endpoint_b] |
||
92 | |||
93 | @staticmethod |
||
94 | def _remove_switch_hops(circuit): |
||
95 | """Remove switch hops from a circuit hops list.""" |
||
96 | for hop in circuit['hops']: |
||
97 | if len(hop.split(':')) == 8: |
||
98 | circuit['hops'].remove(hop) |
||
99 | |||
100 | def shortest_paths(self, source, destination, parameter=None): |
||
101 | """Calculate the shortest paths and return them.""" |
||
102 | try: |
||
103 | paths = list(self._path_fun(self.graph, |
||
104 | source, destination, parameter)) |
||
105 | except (NodeNotFound, NetworkXNoPath): |
||
106 | return [] |
||
107 | return paths |
||
108 | |||
109 | def constrained_flexible_paths(self, source, destination, metrics, flexible_metrics, flexible = None): |
||
110 | default_edge_list = self.graph.edges(data=True) |
||
111 | default_edge_list = self._filter_edges(default_edge_list,**metrics) |
||
112 | default_edge_list = list(default_edge_list) |
||
113 | length = len(flexible_metrics) |
||
114 | if flexible is None: |
||
115 | flexible = length |
||
116 | flexible = max(0,flexible) |
||
117 | flexible = min(length,flexible) |
||
118 | results = [] |
||
119 | stop = False |
||
120 | for i in range(0,flexible+1): |
||
121 | if stop: |
||
122 | break |
||
123 | y = combinations(flexible_metrics.items(),length-i) |
||
124 | for x in y: |
||
125 | tempDict = {} |
||
126 | for k,v in x: |
||
127 | tempDict[k] = v |
||
128 | edges = self._filter_edges(default_edge_list,**tempDict) |
||
129 | edges = ((u,v) for u,v,d in edges) |
||
0 ignored issues
–
show
|
|||
130 | res0 = self._constrained_shortest_paths(source,destination,edges) |
||
131 | if res0 != []: |
||
132 | results.append({"paths":res0, "metrics":{**metrics, **tempDict}}) |
||
133 | stop = True |
||
134 | return results |
||
135 | |||
136 | def _constrained_shortest_paths(self, source, destination, edges): |
||
137 | paths = [] |
||
138 | try: |
||
139 | paths = list(self._path_fun(self.graph.edge_subgraph(edges), |
||
140 | source, destination)) |
||
141 | except NetworkXNoPath: |
||
142 | pass |
||
143 | except NodeNotFound: |
||
144 | if source == destination: |
||
145 | if source in self.graph.nodes: |
||
146 | paths = [[source]] |
||
147 | return paths |
||
148 | |||
149 | def _filter_edges(self, edges, **metrics): |
||
150 | for metric, value in metrics.items(): |
||
151 | fil = self._filter_fun_dict.get(metric, None) |
||
152 | if fil != None: |
||
153 | try: |
||
154 | edges = fil.run(value,edges) |
||
155 | except TypeError as err: |
||
156 | raise TypeError(f"Error in {metric} filter: {err}") |
||
157 | return edges |
||
158 |