Test Failed
Pull Request — master (#58)
by
unknown
07:38
created

KytosGraph._constrained_shortest_paths()   A

Complexity

Conditions 5

Size

Total Lines 12
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
cc 5
eloc 12
nop 4
dl 0
loc 12
rs 9.3333
c 0
b 0
f 0
ccs 0
cts 0
cp 0
crap 30
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3 1
from itertools import combinations
4
5 1
from kytos.core import log
6 1
7 1
try:
8
    import networkx as nx
9
    from networkx.exception import NodeNotFound, NetworkXNoPath
10
except ImportError:
11
    PACKAGE = 'networkx>=2.2'
12
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
13 1
14
15
class Filter:
16 1
    '''Class responsible for removing items with disqualifying values'''
17 1
18
    def __init__(self, filter_type, filter_function):
19 1
        self._filter_type = filter_type
20
        self._filter_fun = filter_function
21 1
22
    def run(self, value, items):
23 1
        '''Filter out items. Filter chosen is picked at runtime.'''
24
        if isinstance(value, self._filter_type):
25 1
            return filter(self._filter_fun(value), items)
26 1
        else:
27 1
            raise TypeError(f"Expected type: {self._filter_type}")
28
29 1
30
class KytosGraph:
31 1
    """Class responsible for the graph generation."""
32 1
33 1
    def __init__(self):
34
        self.graph = nx.Graph()
35 1
        self._filter_fun_dict = {}
36 1
37 1
        def filterLEQ(metric):  # Lower values are better
38
            return lambda x: (lambda y: y[2].get(metric, x) <= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
39
40
        def filterGEQ(metric):  # Higher values are better
41
            return lambda x: (lambda y: y[2].get(metric, x) >= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
42 1
43
        def filterEEQ(metric):  # Equivalence
44 1
            return lambda x: (lambda y: y[2].get(metric, x) == x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
45 1
46 1
        self._filter_fun_dict["ownership"] = Filter(
47 1
            str, filterEEQ("ownership"))
48 1
        self._filter_fun_dict["bandwidth"] = Filter(
49 1
            (int, float), filterGEQ("bandwidth"))
50 1
        self._filter_fun_dict["priority"] = Filter(
51 1
            (int, float), filterGEQ("priority"))
52 1
        self._filter_fun_dict["reliability"] = Filter(
53
            (int, float), filterGEQ("reliability"))
54 1
        self._filter_fun_dict["utilization"] = Filter(
55
            (int, float), filterLEQ("utilization"))
56 1
        self._filter_fun_dict["delay"] = Filter(
57
            (int, float), filterLEQ("delay"))
58
        self._path_fun = nx.all_shortest_paths
59
60
    def set_path_fun(self, path_fun):
61
        self._path_fun = path_fun
62 1
63 1
    def clear(self):
64 1
        """Remove all nodes and links registered."""
65 1
        self.graph.clear()
66
67 1
    def update_topology(self, topology):
68
        """Update all nodes and links inside the graph."""
69
        self.graph.clear()
70 1
        self.update_nodes(topology.switches)
71 1
        self.update_links(topology.links)
72 1
73
    def update_nodes(self, nodes):
74 1
        """Update all nodes inside the graph."""
75
        for node in nodes.values():
76 1
            try:
77 1
                self.graph.add_node(node.id)
78
79
                for interface in node.interfaces.values():
80
                    self.graph.add_node(interface.id)
81
                    self.graph.add_edge(node.id, interface.id)
82 1
83
            except AttributeError:
84
                pass
85
86
    def update_links(self, links):
87
        """Update all links inside the graph."""
88
        keys = []
89
        for link in links.values():
90
            if link.is_active():
91
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
92
                for key, value in link.metadata.items():
93
                    keys.append(key)
94
                    endpoint_a = link.endpoint_a.id
95
                    endpoint_b = link.endpoint_b.id
96
                    self.graph[endpoint_a][endpoint_b][key] = value
97
98
    def get_metadata_from_link(self, endpoint_a, endpoint_b):
99
        """Return the metadata of a link."""
100
        return self.graph.edges[endpoint_a, endpoint_b]
101
102
    @staticmethod
103
    def _remove_switch_hops(circuit):
104
        """Remove switch hops from a circuit hops list."""
105
        for hop in circuit['hops']:
106
            if len(hop.split(':')) == 8:
107
                circuit['hops'].remove(hop)
108
109
    def shortest_paths(self, source, destination, parameter=None):
110
        """Calculate the shortest paths and return them."""
111
        try:
112
            paths = list(self._path_fun(self.graph,
113
                                        source, destination, parameter))
114
        except (NodeNotFound, NetworkXNoPath):
115
            return []
116
        return paths
117
118
    def constrained_flexible_paths(self, source, destination, metrics,
119
                                   flexible_metrics, flexible=None):
120
        '''Calculate the constrained shortest paths with flexibility.'''
121
        default_edge_list = list(self._filter_edges(
122
            self.graph.edges(data=True), **metrics))
123
        length = len(flexible_metrics)
124
        if flexible is None:
125
            flexible = length
126
        flexible = min(length, max(0, flexible))
127
        results = []
128
        stop = False
129
        i = 0
130
131
        while (not stop and i in range(0, flexible+1)):
132
            combos = combinations(flexible_metrics.items(), length-i)
133
            for combo in combos:
134
                temp_dict = {}
135
                for metric, value in combo:
136
                    temp_dict[metric] = value
137
                edges = self._filter_edges(default_edge_list, **temp_dict)
138
                edges = ((u, v) for u, v, d in edges)
0 ignored issues
show
introduced by
The variable v does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable u does not seem to be defined for all execution paths.
Loading history...
139
                result = self._constrained_shortest_paths(
140
                    source, destination, edges)
141
                if result != []:
142
                    results.append(
143
                        {"paths": result, "metrics": {**metrics, **temp_dict}})
144
                    stop = True
145
            i = i + 1
146
        return results
147
148
    def _constrained_shortest_paths(self, source, destination, edges):
149
        paths = []
150
        try:
151
            paths = list(self._path_fun(self.graph.edge_subgraph(edges),
152
                                        source, destination))
153
        except NetworkXNoPath:
154
            pass
155
        except NodeNotFound:
156
            if source == destination:
157
                if source in self.graph.nodes:
158
                    paths = [[source]]
159
        return paths
160
161
    def _filter_edges(self, edges, **metrics):
162
        for metric, value in metrics.items():
163
            fil = self._filter_fun_dict.get(metric, None)
164
            if fil is not None:
165
                try:
166
                    edges = fil.run(value, edges)
167
                except TypeError as err:
168
                    raise TypeError(f"Error in {metric} filter: {err}")
169
        return edges
170