Test Failed
Pull Request — master (#53)
by
unknown
02:06
created

graph.py (5 issues)

1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3
from kytos.core import log
4
5
try:
6
    import networkx as nx
7
    from networkx.exception import NodeNotFound, NetworkXNoPath
8
except ImportError:
9
    PACKAGE = 'networkx>=2.2'
10
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
11
12
from itertools import combinations
13
14
class Filter:
15
    def __init__(self, filter_type, filter_function):
16
        self._filter_type = filter_type
17
        self._filter_fun = filter_function
18
19
    def run(self,value, items):
20
        if isinstance(value, self._filter_type):
21
            fun0 = self._filter_fun(value)
22
            return filter(fun0, items)
23
        else:
24
            raise TypeError(f"Expected type: {self._filter_type}")
25
26
27
28
class KytosGraph:
29
    """Class responsible for the graph generation."""
30
31
    def __init__(self):
32
        self.graph = nx.Graph()
33
        self._filter_fun_dict = {}
34
        def filterLEQ(metric):# Lower values are better
35
            return lambda x: (lambda y: y[2].get(metric,x) <= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
36
        def filterGEQ(metric):# Higher values  are better
37
            return lambda x: (lambda y: y[2].get(metric,x) >= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
38
        def filterEEQ(metric):# Equivalence
39
            return lambda x: (lambda y: y[2].get(metric,x) == x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
40
41
42
        self._filter_fun_dict["ownership"] = Filter(str,filterEEQ("ownership"))
43
        self._filter_fun_dict["bandwidth"] = Filter((int,float),filterGEQ("bandwidth"))
44
        self._filter_fun_dict["priority"] = Filter((int,float),filterGEQ("priority"))
45
        self._filter_fun_dict["reliability"] = Filter((int,float),filterGEQ("reliability"))
46
        self._filter_fun_dict["utilization"] = Filter((int,float),filterLEQ("utilization"))
47
        self._filter_fun_dict["delay"] = Filter((int,float),filterLEQ("delay"))
48
        self._path_fun = nx.all_shortest_paths
49
        
50
51
    def set_path_fun(self, path_fun):
52
        self._path_fun = path_fun
53
54
    def clear(self):
55
        """Remove all nodes and links registered."""
56
        self.graph.clear()
57
58
    def update_topology(self, topology):
59
        """Update all nodes and links inside the graph."""
60
        self.graph.clear()
61
        self.update_nodes(topology.switches)
62
        self.update_links(topology.links)
63
64
    def update_nodes(self, nodes):
65
        """Update all nodes inside the graph."""
66
        for node in nodes.values():
67
            try:
68
                self.graph.add_node(node.id)
69
70
                for interface in node.interfaces.values():
71
                    self.graph.add_node(interface.id)
72
                    self.graph.add_edge(node.id, interface.id)
73
74
            except AttributeError:
75
                pass
76
77
    def update_links(self, links):
78
        """Update all links inside the graph."""
79
        keys = []
80
        for link in links.values():
81
            if link.is_active():
82
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
83
                for key, value in link.metadata.items():
84
                    keys.append(key)
85
                    endpoint_a = link.endpoint_a.id
86
                    endpoint_b = link.endpoint_b.id
87
                    self.graph[endpoint_a][endpoint_b][key] = value
88
89
    def get_metadata_from_link(self, endpoint_a, endpoint_b):
90
        """Return the metadata of a link."""
91
        return self.graph.edges[endpoint_a, endpoint_b]
92
93
    @staticmethod
94
    def _remove_switch_hops(circuit):
95
        """Remove switch hops from a circuit hops list."""
96
        for hop in circuit['hops']:
97
            if len(hop.split(':')) == 8:
98
                circuit['hops'].remove(hop)
99
100
    def shortest_paths(self, source, destination, parameter=None):
101
        """Calculate the shortest paths and return them."""
102
        try:
103
            paths = list(self._path_fun(self.graph,
104
                                        source, destination, parameter))
105
        except (NodeNotFound, NetworkXNoPath):
106
            return []
107
        return paths
108
109
    def constrained_flexible_paths(self, source, destination, metrics, flexible_metrics, flexible = None):
110
        default_edge_list = self.graph.edges(data=True)
111
        default_edge_list = self._filter_edges(default_edge_list,**metrics)
112
        default_edge_list = list(default_edge_list)
113
        length = len(flexible_metrics)
114
        if flexible is None:
115
            flexible = length
116
        flexible = max(0,flexible)
117
        flexible = min(length,flexible)
118
        results = []
119
        stop = False
120
        for i in range(0,flexible+1):
121
            if stop:
122
                break
123
            y = combinations(flexible_metrics.items(),length-i)
124
            for x in y:
125
                tempDict = {}
126
                for k,v in x:
127
                    tempDict[k] = v
128
                edges = self._filter_edges(default_edge_list,**tempDict)
129
                edges = ((u,v) for u,v,d in edges)
0 ignored issues
show
The variable v does not seem to be defined in case the for loop on line 120 is not entered. Are you sure this can never be the case?
Loading history...
The variable u does not seem to be defined for all execution paths.
Loading history...
130
                res0 = self._constrained_shortest_paths(source,destination,edges)
131
                if res0 != []:
132
                    results.append({"paths":res0, "metrics":{**metrics, **tempDict}})
133
                    stop = True
134
        return results
135
136
    def _constrained_shortest_paths(self, source, destination, edges):
137
        paths = []
138
        try:
139
            paths = list(self._path_fun(self.graph.edge_subgraph(edges),
140
                                        source, destination))
141
        except NetworkXNoPath:
142
            pass
143
        except NodeNotFound:
144
            if source == destination:
145
                if source in self.graph.nodes:
146
                    paths = [[source]]
147
        return paths
148
149
    def _filter_edges(self, edges, **metrics):
150
        for metric, value in metrics.items():
151
            fil = self._filter_fun_dict.get(metric, None)
152
            if fil != None:
153
                try:
154
                    edges = fil.run(value,edges)
155
                except TypeError as err:
156
                    raise TypeError(f"Error in {metric} filter: {err}")
157
        return edges
158