Test Failed
Pull Request — master (#68)
by Arturo
03:50 queued 01:43
created

build.graph.KytosGraph.__init__()   B

Complexity

Conditions 7

Size

Total Lines 32
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 17
CRAP Score 7.0572

Importance

Changes 0
Metric Value
cc 7
eloc 22
nop 1
dl 0
loc 32
rs 7.952
c 0
b 0
f 0
ccs 17
cts 19
cp 0.8947
crap 7.0572
1
"""Module Graph of kytos/pathfinder Kytos Network Application."""
2
3 1
from itertools import combinations
4
5 1
from kytos.core import log
6 1
7 1
try:
8
    import networkx as nx
9
    from networkx.exception import NodeNotFound, NetworkXNoPath
10
except ImportError:
11
    PACKAGE = 'networkx>=2.2'
12
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
13 1
try:
14
    from exactdelaypathfinder.core import ExactDelayPathfinder
15
except ImportError:
16 1
    PACKAGE = 'exactdelaypathfinder>=0.1.0'
17 1
    log.error(f"Package {PACKAGE} not found. Please 'pip install {PACKAGE}'")
18
19 1
20
class Filter:
21 1
    """Class responsible for removing items with disqualifying values."""
22
23 1
    def __init__(self, filter_type, filter_function):
24
        self._filter_type = filter_type
25 1
        self._filter_function = filter_function
26 1
27 1
    def run(self, value, items):
28
        """Filter out items."""
29 1
        if isinstance(value, self._filter_type):
30
            return filter(self._filter_function(value), items)
31 1
32 1
        raise TypeError(f"Expected type: {self._filter_type}")
33 1
34
35 1
class KytosGraph:
36 1
    """Class responsible for the graph generation."""
37 1
38
    def __init__(self):
39
        self.graph = nx.Graph()
40
        self._filter_functions = {}
41
42 1
        def filter_leq(metric):  # Lower values are better
43
            return lambda x: (lambda y: y[2].get(metric, x) <= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
44 1
45 1
        def filter_geq(metric):  # Higher values are better
46 1
            return lambda x: (lambda y: y[2].get(metric, x) >= x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
47 1
48 1
        def filter_eeq(metric):  # Equivalence
49 1
            return lambda x: (lambda y: y[2].get(metric, x) == x)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable x does not seem to be defined.
Loading history...
50 1
51 1
        self._filter_functions["ownership"] = Filter(
52 1
            str, filter_eeq("ownership"))
53
54 1
        self._filter_functions["bandwidth"] = Filter(
55
            (int, float), filter_geq("bandwidth"))
56 1
57
        self._filter_functions["priority"] = Filter(
58
            (int, float), filter_geq("priority"))
59
60
        self._filter_functions["reliability"] = Filter(
61
            (int, float), filter_geq("reliability"))
62 1
63 1
        self._filter_functions["utilization"] = Filter(
64 1
            (int, float), filter_leq("utilization"))
65 1
66
        self._filter_functions["delay"] = Filter(
67 1
            (int, float), filter_leq("delay"))
68
69
        self._path_function = nx.all_shortest_paths
70 1
71 1
    def clear(self):
72 1
        """
73
        Remove all nodes and links registered.
74 1
        """
75
        self.graph.clear()
76 1
77 1
    def update_topology(self, topology):
78
        """Update all nodes and links inside the graph."""
79
        self.graph.clear()
80
        self.update_nodes(topology.switches)
81
        self.update_links(topology.links)
82 1
83
    def update_nodes(self, nodes):
84
        """Update all nodes inside the graph."""
85
        for node in nodes.values():
86
            try:
87
                self.graph.add_node(node.id)
88
89
                for interface in node.interfaces.values():
90
                    self.graph.add_node(interface.id)
91
                    self.graph.add_edge(node.id, interface.id)
92
93
            except AttributeError:
94
                raise TypeError("Problems encountered updating nodes inside the graph")
95
96
    def update_links(self, links):
97
        """Update all links inside the graph."""
98
        for link in links.values():
99
            if link.is_active():
100
                self.graph.add_edge(link.endpoint_a.id, link.endpoint_b.id)
101
                for key, value in link.metadata.items():
102
                    endpoint_a = link.endpoint_a.id
103
                    endpoint_b = link.endpoint_b.id
104
                    self.graph[endpoint_a][endpoint_b][key] = value
105
106
    def get_link_metadata(self, endpoint_a, endpoint_b):
107
        """Return the metadata of a link."""
108
        return self.graph.get_edge_data(endpoint_a, endpoint_b)
109
110
    @staticmethod
111
    def _remove_switch_hops(circuit):
112
        """Remove switch hops from a circuit hops list."""
113
        for hop in circuit['hops']:
114
            if len(hop.split(':')) == 8:
115
                circuit['hops'].remove(hop)
116
117
    def shortest_paths(self, source, destination, parameter=None):
118
        """Calculate the shortest paths and return them."""
119
        try:
120
            paths = list(nx.shortest_simple_paths(self.graph,
121
                                                  source, destination,
122
                                                  parameter))
123
        except (NodeNotFound, NetworkXNoPath):
124
            return []
125
        return paths
126
127
    def exact_path(self, total_delay, source, destination):
128
        """Obtain paths with total delays equal or close to the user's requirements.
129
130
        This function utilizes the ExactDelayPathfinder
131
        library developed by the AmLight team at FIU.
132
        """
133
        pathfinder = ExactDelayPathfinder()
134
        result = pathfinder.search(self.graph, total_delay, source, destination)
135
        return result
136
137
    def constrained_flexible_paths(self, source, destination,
138
                                   minimum_hits=None, **metrics):
139
        """Calculate the constrained shortest paths with flexibility."""
140
        base = metrics.get("base", {})
141
        flexible = metrics.get("flexible", {})
142
        first_pass_links = list(self._filter_links(self.graph.edges(data=True),
143
                                                   **base))
144
        length = len(flexible)
145
        if minimum_hits is None:
146
            minimum_hits = length
147
        minimum_hits = min(length, max(0, minimum_hits))
148
        results = []
149
        paths = []
150
        i = minimum_hits
151
        while paths == [] and i <= length:
152
            for combo in combinations(flexible.items(), i):
153
                additional = dict(combo)
154
                paths = self._constrained_shortest_paths(
155
                    source, destination,
156
                    self._filter_links(first_pass_links,
157
                                       metadata=False, **additional))
158
                if paths:
159
                    results.append(
160
                        {"paths": paths, "metrics": {**base, **additional}})
161
            i = i + 1
162
        return results
163
164
    def _constrained_shortest_paths(self, source, destination, links):
165
        paths = []
166
        try:
167
            paths = list(self._path_function(self.graph.edge_subgraph(links),
168
                                             source, destination))
169
        except NetworkXNoPath:
170
            pass
171
        except NodeNotFound:
172
            if source == destination:
173
                if source in self.graph.nodes:
174
                    paths = [[source]]
175
        return paths
176
177
    def _filter_links(self, links, metadata=True, **metrics):
178
        for metric, value in metrics.items():
179
            filter_ = self._filter_functions.get(metric, None)
180
            if filter_ is not None:
181
                try:
182
                    links = filter_.run(value, links)
183
                except TypeError as err:
184
                    raise TypeError(f"Error in {metric} value: {err}")
185
        if not metadata:
186
            links = ((u, v) for u, v, d in links)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable u does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable v does not seem to be defined.
Loading history...
187
        return links
188
189
    def get_nodes(self):
190
        return self.graph.nodes
191
192
    def get_edges(self):
193
        return self.graph.edges
194