Test Failed
Pull Request — master (#68)
by Arturo
02:03
created

build.graph.KytosGraph.__init__()   B

Complexity

Conditions 7

Size

Total Lines 32
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 7.0572

Importance

Changes 0
Metric Value
cc 7
eloc 22
nop 1
dl 0
loc 32
rs 7.952
c 0
b 0
f 0
ccs 17
cts 19
cp 0.8947
crap 7.0572
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3 1
from itertools import combinations
4
5 1
from kytos.core import log
6 1
7 1
try:
8
    import networkx as nx
9
    from networkx.exception import NodeNotFound, NetworkXNoPath
10
except ImportError:
11
    PACKAGE = 'networkx>=2.2'
12
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
13 1
try:
14
    from exactdelaypathfinder.core import ExactDelayPathfinder
15
except ImportError:
16 1
    PACKAGE = 'exactdelaypathfinder>=0.1.0'
17 1
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
18
19 1
20
class Filter:
21 1
    """Class responsible for removing items with disqualifying values."""
22
23 1
    def __init__(self, filter_type, filter_function):
24
        self._filter_type = filter_type
25 1
        self._filter_function = filter_function
26 1
27 1
    def run(self, value, items):
28
        """Filter out items."""
29 1
        if isinstance(value, self._filter_type):
30
            return filter(self._filter_function(value), items)
31 1
32 1
        raise TypeError(f"Expected type: {self._filter_type}")
33 1
34
35 1
class KytosGraph:
36 1
    """Class responsible for the graph generation."""
37 1
38
    def __init__(self):
39
        self.graph = nx.Graph()
40
        self._filter_functions = {}
41
42 1
        def filter_leq(metric):  # Lower values are better
43
            return lambda x: (lambda y: y[2].get(metric, x) <= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
44 1
45 1
        def filter_geq(metric):  # Higher values are better
46 1
            return lambda x: (lambda y: y[2].get(metric, x) >= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
47 1
48 1
        def filter_eeq(metric):  # Equivalence
49 1
            return lambda x: (lambda y: y[2].get(metric, x) == x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
50 1
51 1
        self._filter_functions["ownership"] = Filter(
52 1
            str, filter_eeq("ownership"))
53
54 1
        self._filter_functions["bandwidth"] = Filter(
55
            (int, float), filter_geq("bandwidth"))
56 1
57
        self._filter_functions["priority"] = Filter(
58
            (int, float), filter_geq("priority"))
59
60
        self._filter_functions["reliability"] = Filter(
61
            (int, float), filter_geq("reliability"))
62 1
63 1
        self._filter_functions["utilization"] = Filter(
64 1
            (int, float), filter_leq("utilization"))
65 1
66
        self._filter_functions["delay"] = Filter(
67 1
            (int, float), filter_leq("delay"))
68
69
        self._path_function = nx.all_shortest_paths
70 1
71 1
    def clear(self):
72 1
        """
73
        Remove all nodes and links registered.
74 1
        """
75
        self.graph.clear()
76 1
77 1
    def update_topology(self, topology):
78
        """Update all nodes and links inside the graph."""
79
        self.graph.clear()
80
        self.update_nodes(topology.switches)
81
        self.update_links(topology.links)
82 1
83
    def update_nodes(self, nodes):
84
        """Update all nodes inside the graph."""
85
        for node in nodes.values():
86
            try:
87
                self.graph.add_node(node.id)
88
89
                for interface in node.interfaces.values():
90
                    self.graph.add_node(interface.id)
91
                    self.graph.add_edge(node.id, interface.id)
92
93
            except AttributeError:
94
                raise TypeError("Problems encountered updating nodes inside the graph")
95
96
    def update_links(self, links):
97
        """Update all links inside the graph."""
98
        keys = []
99
        for link in links.values():
100
            if link.is_active():
101
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
102
                for key, value in link.metadata.items():
103
                    keys.append(key) if key not in keys else keys
104
                    endpoint_a = link.endpoint_a.id
105
                    endpoint_b = link.endpoint_b.id
106
                    self.graph[endpoint_a][endpoint_b][key] = value
107
108
        # self._set_default_metadata(keys)  # It creates errors during the path construction
109
110
    # def _set_default_metadata(self, keys):
111
    #     """Set metadata to all links.
112
    #
113
    #     Set the value to zero for inexistent metadata in a link to make those
114
    #     irrelevant in pathfinding.
115
    #     """
116
    #     for key in keys:
117
    #         for endpoint_a, endpoint_b in self.graph.edges:
118
    #             if key not in self.graph[endpoint_a][endpoint_b]:
119
    #                 self.graph[endpoint_a][endpoint_b][key] = 0
120
121
    def get_link_metadata(self, endpoint_a, endpoint_b):
122
        """Return the metadata of a link."""
123
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
124
125
    @staticmethod
126
    def _remove_switch_hops(circuit):
127
        """Remove switch hops from a circuit hops list."""
128
        for hop in circuit['hops']:
129
            if len(hop.split(':')) == 8:
130
                circuit['hops'].remove(hop)
131
132
    def shortest_paths(self, source, destination, parameter=None):
133
        """Calculate the shortest paths and return them."""
134
        try:
135
            paths = list(nx.shortest_simple_paths(self.graph,
136
                                                  source, destination,
137
                                                  parameter))
138
        except (NodeNotFound, NetworkXNoPath):
139
            return []
140
        return paths
141
142
    def exact_path(self, total_delay, source, destination):
143
        """Obtain paths with total delays equal or close to the user's requirements.
144
145
        This function utilizes the ExactDelayPathfinder
146
        library developed by the AmLight team at FIU.
147
        """
148
        pathfinder = ExactDelayPathfinder()
149
        result = pathfinder.search(self.graph, total_delay, source, destination)
150
        return result
151
152
    def constrained_flexible_paths(self, source, destination,
153
                                   minimum_hits=None, **metrics):
154
        """Calculate the constrained shortest paths with flexibility."""
155
        base = metrics.get("base", {})
156
        flexible = metrics.get("flexible", {})
157
        first_pass_links = list(self._filter_links(self.graph.edges(data=True),
158
                                                   **base))
159
        length = len(flexible)
160
        if minimum_hits is None:
161
            minimum_hits = length
162
        minimum_hits = min(length, max(0, minimum_hits))
163
        results = []
164
        paths = []
165
        i = 0
166
        while paths == [] and i in range(0, minimum_hits + 1):
167
            for combo in combinations(flexible.items(), length - i):
168
                additional = dict(combo)
169
                paths = self._constrained_shortest_paths(
170
                    source, destination,
171
                    self._filter_links(first_pass_links,
172
                                       metadata=False, **additional))
173
                if paths:
174
                    results.append(
175
                        {"paths": paths, "metrics": {**base, **additional}})
176
            i = i + 1
177
        return results
178
179
    def _constrained_shortest_paths(self, source, destination, links):
180
        paths = []
181
        try:
182
            paths = list(self._path_function(self.graph.edge_subgraph(links),
183
                                             source, destination))
184
        except NetworkXNoPath:
185
            pass
186
        except NodeNotFound:
187
            if source == destination:
188
                if source in self.graph.nodes:
189
                    paths = [[source]]
190
        return paths
191
192
    def _filter_links(self, links, metadata=True, **metrics):
193
        for metric, value in metrics.items():
194
            filter_ = self._filter_functions.get(metric, None)
195
            if filter_ is not None:
196
                try:
197
                    links = filter_.run(value, links)
198
                except TypeError as err:
199
                    raise TypeError(f"Error in {metric} value: {err}")
200
        if not metadata:
201
            links = ((u, v) for u, v, d in links)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable v does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable u does not seem to be defined.
Loading history...
202
        return links
203
204
    def get_nodes(self):
205
        return self.graph.nodes
206
207
    def get_edges(self):
208
        return self.graph.edges
209