Test Failed
Pull Request — master (#58)
by
unknown
02:09
created

KytosGraph.constrained_flexible_paths()   B

Complexity

Conditions 7

Size

Total Lines 26
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 56

Importance

Changes 0
Metric Value
cc 7
eloc 24
nop 6
dl 0
loc 26
rs 7.904
c 0
b 0
f 0
ccs 0
cts 0
cp 0
crap 56
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3 1
from kytos.core import log
4
5 1
try:
6 1
    import networkx as nx
7 1
    from networkx.exception import NodeNotFound, NetworkXNoPath
8
except ImportError:
9
    PACKAGE = 'networkx>=2.2'
10
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
11
12
from itertools import combinations
13 1
14
class Filter:
15
    '''Class responsible for removing items with disqualifying values'''
16 1
    def __init__(self, filter_type, filter_function):
17 1
        self._filter_type = filter_type
18
        self._filter_fun = filter_function
19 1
20
    def run(self,value, items):
21 1
        if isinstance(value, self._filter_type):
22
            return filter(self._filter_fun(value), items)
23 1
        else:
24
            raise TypeError(f"Expected type: {self._filter_type}")
25 1
26 1
class KytosGraph:
27 1
    """Class responsible for the graph generation."""
28
    def __init__(self):
29 1
        self.graph = nx.Graph()
30
        self._filter_fun_dict = {}
31 1
        def filterLEQ(metric):# Lower values are better
32 1
            return lambda x: (lambda y: y[2].get(metric,x) <= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
33 1
        def filterGEQ(metric):# Higher values  are better
34
            return lambda x: (lambda y: y[2].get(metric,x) >= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
35 1
        def filterEEQ(metric):# Equivalence
36 1
            return lambda x: (lambda y: y[2].get(metric,x) == x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
37 1
38
        self._filter_fun_dict["ownership"] = Filter(str,filterEEQ("ownership"))
39
        self._filter_fun_dict["bandwidth"] = Filter((int,float),filterGEQ("bandwidth"))
40
        self._filter_fun_dict["priority"] = Filter((int,float),filterGEQ("priority"))
41
        self._filter_fun_dict["reliability"] = Filter((int,float),filterGEQ("reliability"))
42 1
        self._filter_fun_dict["utilization"] = Filter((int,float),filterLEQ("utilization"))
43
        self._filter_fun_dict["delay"] = Filter((int,float),filterLEQ("delay"))
44 1
        self._path_fun = nx.all_shortest_paths
45 1
46 1
    def set_path_fun(self, path_fun):
47 1
        self._path_fun = path_fun
48 1
49 1
    def clear(self):
50 1
        """Remove all nodes and links registered."""
51 1
        self.graph.clear()
52 1
53
    def update_topology(self, topology):
54 1
        """Update all nodes and links inside the graph."""
55
        self.graph.clear()
56 1
        self.update_nodes(topology.switches)
57
        self.update_links(topology.links)
58
59
    def update_nodes(self, nodes):
60
        """Update all nodes inside the graph."""
61
        for node in nodes.values():
62 1
            try:
63 1
                self.graph.add_node(node.id)
64 1
65 1
                for interface in node.interfaces.values():
66
                    self.graph.add_node(interface.id)
67 1
                    self.graph.add_edge(node.id, interface.id)
68
69
            except AttributeError:
70 1
                pass
71 1
72 1
    def update_links(self, links):
73
        """Update all links inside the graph."""
74 1
        keys = []
75
        for link in links.values():
76 1
            if link.is_active():
77 1
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
78
                for key, value in link.metadata.items():
79
                    keys.append(key)
80
                    endpoint_a = link.endpoint_a.id
81
                    endpoint_b = link.endpoint_b.id
82 1
                    self.graph[endpoint_a][endpoint_b][key] = value
83
84
    def get_metadata_from_link(self, endpoint_a, endpoint_b):
85
        """Return the metadata of a link."""
86
        return self.graph.edges[endpoint_a, endpoint_b]
87
88
    @staticmethod
89
    def _remove_switch_hops(circuit):
90
        """Remove switch hops from a circuit hops list."""
91
        for hop in circuit['hops']:
92
            if len(hop.split(':')) == 8:
93
                circuit['hops'].remove(hop)
94
95
    def shortest_paths(self, source, destination, parameter=None):
96
        """Calculate the shortest paths and return them."""
97
        try:
98
            paths = list(self._path_fun(self.graph,
99
                                        source, destination, parameter))
100
        except (NodeNotFound, NetworkXNoPath):
101
            return []
102
        return paths
103
104
    def constrained_flexible_paths(self, source, destination, metrics, flexible_metrics, \
105
                                   flexible = None):   
106
        '''Calculate the constrained shortest paths with flexibility and return them.'''
107
        default_edge_list = list(self._filter_edges(self.graph.edges(data=True),**metrics))
108
        length = len(flexible_metrics)
109
        if flexible is None:
110
            flexible = length
111
        flexible = min(length,max(0,flexible))
112
        results = []
113
        stop = False
114
        i = 0
115
116
        while (not stop and i in range(0, flexible+1)):
117
            combos = combinations(flexible_metrics.items(),length-i)
118
            for combo in combos:
119
                tempDict = {}
120
                for metric,value in combo:
121
                    tempDict[metric] = value
122
                edges = self._filter_edges(default_edge_list,**tempDict)
123
                edges = ((u,v) for u,v,d in edges)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
124
                result = self._constrained_shortest_paths(source,destination,edges)
125
                if result != []:
126
                    results.append({"paths":result, "metrics":{**metrics,**tempDict}})
127
                    stop = True
128
            i = i + 1
129
        return results
130
131
    def _constrained_shortest_paths(self, source, destination, edges):
132
        paths = []
133
        try:
134
            paths = list(self._path_fun(self.graph.edge_subgraph(edges),
135
                                        source, destination))
136
        except NetworkXNoPath:
137
            pass
138
        except NodeNotFound:
139
            if source == destination:
140
                if source in self.graph.nodes:
141
                    paths = [[source]]
142
        return paths
143
144
    def _filter_edges(self, edges, **metrics):
145
        for metric, value in metrics.items():
146
            fil = self._filter_fun_dict.get(metric, None)
147
            if fil != None:
148
                try:
149
                    edges = fil.run(value,edges)
150
                except TypeError as err:
151
                    raise TypeError(f"Error in {metric} filter: {err}")
152
        return edges
153