Passed
Pull Request — master (#330)
by
unknown
07:02
created

build.models.path.Path.is_valid()   D

Complexity

Conditions 12

Size

Total Lines 27
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 13.3431

Importance

Changes 0
Metric Value
cc 12
eloc 24
nop 4
dl 0
loc 27
ccs 15
cts 19
cp 0.7895
crap 13.3431
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.models.path.Path.is_valid() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Classes related to paths"""
2 1
import requests
3
4 1
from kytos.core import log
5 1
from kytos.core.common import EntityStatus, GenericEntity
6 1
from kytos.core.link import Link
7 1
from napps.kytos.mef_eline import settings
8 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
9
10
11 1
class Path(list, GenericEntity):
12
    """Class to represent a Path."""
13
14 1
    def __eq__(self, other=None):
15
        """Compare paths."""
16 1
        if not other or not isinstance(other, Path):
17 1
            return False
18 1
        return super().__eq__(other)
19
20 1
    def is_affected_by_link(self, link=None):
21
        """Verify if the current path is affected by link."""
22 1
        if not link:
23 1
            return False
24 1
        return link in self
25
26 1
    def link_affected_by_interface(self, interface=None):
27
        """Return the link using this interface, if any, or None otherwise."""
28 1
        if not interface:
29 1
            return None
30 1
        for link in self:
31 1
            if interface in (link.endpoint_a, link.endpoint_b):
32 1
                return link
33
        return None
34
35 1
    def choose_vlans(self):
36
        """Choose the VLANs to be used for the circuit."""
37 1
        for link in self:
38 1
            tag = link.get_next_available_tag()
39 1
            link.add_metadata("s_vlan", tag)
40
41 1
    def make_vlans_available(self):
42
        """Make the VLANs used in a path available when undeployed."""
43 1
        for link in self:
44 1
            link.make_tag_available(link.get_metadata("s_vlan"))
45 1
            link.remove_metadata("s_vlan")
46
47 1
    def is_valid(self, switch_a, switch_z, is_scheduled=False):
48
        """Check if this is a valid path."""
49 1
        if not self:
50 1
            return True
51 1
        previous = switch_a
52 1
        for link in self:
53 1
            if link.endpoint_a.switch.status == EntityStatus.DISABLED:
54
                switch = link.endpoint_a.switch.dpid
55
                raise DisabledSwitch(f"{switch} is disabled")
56 1
            if link.endpoint_b.switch.status == EntityStatus.DISABLED:
57 1
                switch = link.endpoint_b.switch.dpid
58 1
                raise DisabledSwitch(f"{switch} is disabled")
59 1
            if link.endpoint_a.switch != previous:
60 1
                raise InvalidPath(
61
                    f"{link.endpoint_a} switch is different" f" from previous."
62
                )
63 1
            if is_scheduled is False and (
64
                link.endpoint_a.link is None
65
                or link.endpoint_a.link != link
66
                or link.endpoint_b.link is None
67
                or link.endpoint_b.link != link
68
            ):
69
                raise InvalidPath(f"Link {link} is not available.")
70 1
            previous = link.endpoint_b.switch
71 1
        if previous == switch_z:
72 1
            return True
73
        raise InvalidPath("Last endpoint is different from uni_z")
74
75 1
    @property
76 1
    def status(self):
77
        """Check for the  status of a path.
78
79
        If any link in this path is down, the path is considered down.
80
        """
81 1
        if not self:
82 1
            return EntityStatus.DISABLED
83
84 1
        endpoint = f"{settings.TOPOLOGY_URL}/links"
85 1
        api_reply = requests.get(endpoint)
86 1
        if api_reply.status_code != getattr(requests.codes, "ok"):
87
            log.error(
88
                "Failed to get links at %s. Returned %s",
89
                endpoint,
90
                api_reply.status_code,
91
            )
92
            return None
93 1
        links = api_reply.json()["links"]
94 1
        return_status = EntityStatus.UP
95 1
        for path_link in self:
96 1
            try:
97 1
                link = links[path_link.id]
98
            except KeyError:
99
                return EntityStatus.DISABLED
100 1
            if link["enabled"] is False:
101 1
                return EntityStatus.DISABLED
102 1
            if link["active"] is False:
103 1
                return_status = EntityStatus.DOWN
104 1
        return return_status
105
106 1
    def as_dict(self):
107
        """Return list comprehension of links as_dict."""
108 1
        return [link.as_dict() for link in self if link]
109
110
111 1
class DynamicPathManager:
112
    """Class to handle and create paths."""
113
114 1
    controller = None
115
116 1
    @classmethod
117 1
    def set_controller(cls, controller=None):
118
        """Set the controller to discovery news paths."""
119 1
        cls.controller = controller
120
121 1
    @staticmethod
122 1
    def get_paths(circuit, max_paths=2, **kwargs):
123
        """Get a valid path for the circuit from the Pathfinder."""
124 1
        endpoint = settings.PATHFINDER_URL
125 1
        request_data = {
126
            "source": circuit.uni_a.interface.id,
127
            "destination": circuit.uni_z.interface.id,
128
            "spf_max_paths": max_paths,
129
        }
130 1
        request_data.update(kwargs)
131 1
        api_reply = requests.post(endpoint, json=request_data)
132
133 1
        if api_reply.status_code != getattr(requests.codes, "ok"):
134 1
            log.error(
135
                "Failed to get paths at %s. Returned %s",
136
                endpoint,
137
                api_reply.text,
138
            )
139 1
            return None
140 1
        reply_data = api_reply.json()
141 1
        return reply_data.get("paths")
142
143 1
    @staticmethod
144 1
    def _clear_path(path):
145
        """Remove switches from a path, returning only interfaces."""
146 1
        return [endpoint for endpoint in path if len(endpoint) > 23]
147
148 1
    @classmethod
149 1
    def get_best_path(cls, circuit):
150
        """Return the best path available for a circuit, if exists."""
151 1
        paths = cls.get_paths(circuit)
152 1
        if paths:
153 1
            return cls.create_path(cls.get_paths(circuit)[0]["hops"])
154 1
        return None
155
156 1
    @classmethod
157 1
    def get_best_paths(cls, circuit, **kwargs):
158
        """Return the best paths available for a circuit, if they exist."""
159 1
        for path in cls.get_paths(circuit, **kwargs):
160 1
            yield cls.create_path(path["hops"])
161
162 1
    @classmethod
163 1
    def get_disjoint_paths(
164
        cls, circuit, unwanted_path, cutoff=settings.DISJOINT_PATH_CUTOFF
165
    ):
166
        """Computes the maximum disjoint paths from the unwanted_path for a EVC
167
168
        Maximum disjoint paths from the unwanted_path are the paths from the
169
        source node to the target node that share the minimum number os links
170
        contained in unwanted_path. In other words, unwanted_path is the path
171
        we want to avoid: we want the maximum possible disjoint path from it.
172
        The disjointness of a path in regards to unwanted_path is calculated
173
        by the complementary percentage of shared links between them. As an
174
        example, if the unwanted_path has 3 links, a given path P1 has 1 link
175
        shared with unwanted_path, and a given path P2 has 2 links shared with
176
        unwanted_path, then the disjointness of P1 is 0.67 and the disjointness
177
        of P2 is 0.33. In this example, P1 is preferable over P2 because it
178
        offers a better disjoint path. When two paths have the same
179
        disjointness they are ordered by 'cost' attributed as returned from
180
        Pathfinder. When the disjointness of a path is equal to 0 (i.e., it
181
        shares all the links with unwanted_path), that particular path is not
182
        considered a candidate.
183
184
        Parameters:
185
        -----------
186
187
        circuit : EVC
188
            The EVC providing source node (uni_a) and target node (uni_z)
189
190
        unwanted_path : Path
191
            The Path which we want to avoid.
192
193
        cutoff: int
194
            Maximum number of paths to consider when calculating the disjoint
195
            paths (number of paths to request from pathfinder)
196
197
        Returns:
198
        --------
199
        paths : generator
200
            Generator of unwanted_path disjoint paths. If unwanted_path is
201
            not provided or empty, we return an empty list.
202
        """
203 1
        unwanted_links = [
204
            (link.endpoint_a.id, link.endpoint_b.id) for link in unwanted_path
205
        ]
206 1
        if not unwanted_links:
207 1
            return None
208
209 1
        paths = cls.get_paths(circuit, max_paths=cutoff,
210
                              **circuit.secondary_constraints)
211 1
        for path in paths:
212 1
            head = path["hops"][:-1]
213 1
            tail = path["hops"][1:]
214 1
            shared_edges = 0
215 1
            for (endpoint_a, endpoint_b) in unwanted_links:
216 1
                if ((endpoint_a, endpoint_b) in zip(head, tail)) or (
217
                    (endpoint_b, endpoint_a) in zip(head, tail)
218
                ):
219 1
                    shared_edges += 1
220 1
            path["disjointness"] = 1 - shared_edges / len(unwanted_links)
221 1
        paths = sorted(paths, key=lambda x: (-x['disjointness'], x['cost']))
222 1
        for path in paths:
223 1
            if path["disjointness"] == 0:
224 1
                continue
225 1
            yield cls.create_path(path["hops"])
226 1
        return None
227
228 1
    @classmethod
229 1
    def create_path(cls, path):
230
        """Return the path containing only the interfaces."""
231 1
        new_path = Path()
232 1
        clean_path = cls._clear_path(path)
233
234 1
        if len(clean_path) % 2:
235 1
            return None
236
237 1
        for link in zip(clean_path[1:-1:2], clean_path[2::2]):
238 1
            interface_a = cls.controller.get_interface_by_id(link[0])
239 1
            interface_b = cls.controller.get_interface_by_id(link[1])
240 1
            if interface_a is None or interface_b is None:
241 1
                return None
242 1
            new_path.append(Link(interface_a, interface_b))
243
244
        return new_path
245