Passed
Push — master ( 728489...ff5136 )
by Vinicius
05:39 queued 03:04
created

build.models.path.Path.is_valid()   C

Complexity

Conditions 11

Size

Total Lines 30
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 11.1967

Importance

Changes 0
Metric Value
cc 11
eloc 24
nop 4
dl 0
loc 30
ccs 15
cts 17
cp 0.8824
crap 11.1967
rs 5.4
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like build.models.path.Path.is_valid() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Classes related to paths"""
2 1
import requests
3
4 1
from kytos.core import log
5 1
from kytos.core.common import EntityStatus, GenericEntity
6 1
from kytos.core.link import Link
7 1
from napps.kytos.mef_eline import settings
8 1
from napps.kytos.mef_eline.exceptions import InvalidPath
9
10
11 1
class Path(list, GenericEntity):
12
    """Class to represent a Path."""
13
14 1
    def __eq__(self, other=None):
15
        """Compare paths."""
16 1
        if not other or not isinstance(other, Path):
17 1
            return False
18 1
        return super().__eq__(other)
19
20 1
    def is_affected_by_link(self, link=None):
21
        """Verify if the current path is affected by link."""
22 1
        if not link:
23 1
            return False
24 1
        return link in self
25
26 1
    def link_affected_by_interface(self, interface=None):
27
        """Return the link using this interface, if any, or None otherwise."""
28 1
        if not interface:
29 1
            return None
30 1
        for link in self:
31 1
            if interface in (link.endpoint_a, link.endpoint_b):
32 1
                return link
33
        return None
34
35 1
    def choose_vlans(self):
36
        """Choose the VLANs to be used for the circuit."""
37 1
        for link in self:
38 1
            tag = link.get_next_available_tag()
39 1
            link.add_metadata("s_vlan", tag)
40
41 1
    def make_vlans_available(self):
42
        """Make the VLANs used in a path available when undeployed."""
43 1
        for link in self:
44 1
            link.make_tag_available(link.get_metadata("s_vlan"))
45 1
            link.remove_metadata("s_vlan")
46
47 1
    def is_valid(self, switch_a, switch_z, is_scheduled=False):
48
        """Check if this is a valid path."""
49 1
        if not self:
50 1
            return True
51 1
        previous = visited = {switch_a}
52 1
        for link in self:
53 1
            current = {link.endpoint_a.switch, link.endpoint_b.switch} \
54
                      - previous
55 1
            if len(current) != 1:
56 1
                raise InvalidPath(
57
                    f"Previous switch {previous} is not connected to "
58
                    f"current link with switches {current}."
59
                )
60 1
            if current & visited:
61 1
                raise InvalidPath(
62
                    f"Loop detected in path, switch {current} was visited"
63
                    f" more than once."
64
                )
65 1
            if is_scheduled is False and (
66
                link.endpoint_a.link is None
67
                or link.endpoint_a.link != link
68
                or link.endpoint_b.link is None
69
                or link.endpoint_b.link != link
70
            ):
71
                raise InvalidPath(f"Link {link} is not available.")
72 1
            previous = current
73 1
            visited |= current
74 1
        if previous & {switch_z}:
75 1
            return True
76
        raise InvalidPath("Last link does not contain uni_z switch")
77
78 1
    @property
79 1
    def status(self):
80
        """Check for the  status of a path.
81
82
        If any link in this path is down, the path is considered down.
83
        """
84 1
        if not self:
85 1
            return EntityStatus.DISABLED
86
87 1
        endpoint = f"{settings.TOPOLOGY_URL}/links"
88 1
        api_reply = requests.get(endpoint)
89 1
        if api_reply.status_code != getattr(requests.codes, "ok"):
90
            log.error(
91
                "Failed to get links at %s. Returned %s",
92
                endpoint,
93
                api_reply.status_code,
94
            )
95
            return None
96 1
        links = api_reply.json()["links"]
97 1
        return_status = EntityStatus.UP
98 1
        for path_link in self:
99 1
            try:
100 1
                link = links[path_link.id]
101
            except KeyError:
102
                return EntityStatus.DISABLED
103 1
            if link["enabled"] is False:
104 1
                return EntityStatus.DISABLED
105 1
            if link["active"] is False:
106 1
                return_status = EntityStatus.DOWN
107 1
        return return_status
108
109 1
    def as_dict(self):
110
        """Return list comprehension of links as_dict."""
111 1
        return [link.as_dict() for link in self if link]
112
113
114 1
class DynamicPathManager:
115
    """Class to handle and create paths."""
116
117 1
    controller = None
118
119 1
    @classmethod
120 1
    def set_controller(cls, controller=None):
121
        """Set the controller to discovery news paths."""
122 1
        cls.controller = controller
123
124 1
    @staticmethod
125 1
    def get_paths(circuit, max_paths=2, **kwargs):
126
        """Get a valid path for the circuit from the Pathfinder."""
127 1
        endpoint = settings.PATHFINDER_URL
128 1
        request_data = {
129
            "source": circuit.uni_a.interface.id,
130
            "destination": circuit.uni_z.interface.id,
131
            "spf_max_paths": max_paths,
132
        }
133 1
        request_data.update(kwargs)
134 1
        api_reply = requests.post(endpoint, json=request_data)
135
136 1
        if api_reply.status_code != getattr(requests.codes, "ok"):
137 1
            log.error(
138
                "Failed to get paths at %s. Returned %s",
139
                endpoint,
140
                api_reply.text,
141
            )
142 1
            return None
143 1
        reply_data = api_reply.json()
144 1
        return reply_data.get("paths")
145
146 1
    @staticmethod
147 1
    def _clear_path(path):
148
        """Remove switches from a path, returning only interfaces."""
149 1
        return [endpoint for endpoint in path if len(endpoint) > 23]
150
151 1
    @classmethod
152 1
    def get_best_path(cls, circuit):
153
        """Return the best path available for a circuit, if exists."""
154 1
        paths = cls.get_paths(circuit)
155 1
        if paths:
156 1
            return cls.create_path(cls.get_paths(circuit)[0]["hops"])
157 1
        return None
158
159 1
    @classmethod
160 1
    def get_best_paths(cls, circuit, **kwargs):
161
        """Return the best paths available for a circuit, if they exist."""
162 1
        for path in cls.get_paths(circuit, **kwargs):
163 1
            yield cls.create_path(path["hops"])
164
165 1
    @classmethod
166 1
    def get_disjoint_paths(
167
        cls, circuit, unwanted_path, cutoff=settings.DISJOINT_PATH_CUTOFF
168
    ):
169
        """Computes the maximum disjoint paths from the unwanted_path for a EVC
170
171
        Maximum disjoint paths from the unwanted_path are the paths from the
172
        source node to the target node that share the minimum number os links
173
        contained in unwanted_path. In other words, unwanted_path is the path
174
        we want to avoid: we want the maximum possible disjoint path from it.
175
        The disjointness of a path in regards to unwanted_path is calculated
176
        by the complementary percentage of shared links between them. As an
177
        example, if the unwanted_path has 3 links, a given path P1 has 1 link
178
        shared with unwanted_path, and a given path P2 has 2 links shared with
179
        unwanted_path, then the disjointness of P1 is 0.67 and the disjointness
180
        of P2 is 0.33. In this example, P1 is preferable over P2 because it
181
        offers a better disjoint path. When two paths have the same
182
        disjointness they are ordered by 'cost' attributed as returned from
183
        Pathfinder. When the disjointness of a path is equal to 0 (i.e., it
184
        shares all the links with unwanted_path), that particular path is not
185
        considered a candidate.
186
187
        Parameters:
188
        -----------
189
190
        circuit : EVC
191
            The EVC providing source node (uni_a) and target node (uni_z)
192
193
        unwanted_path : Path
194
            The Path which we want to avoid.
195
196
        cutoff: int
197
            Maximum number of paths to consider when calculating the disjoint
198
            paths (number of paths to request from pathfinder)
199
200
        Returns:
201
        --------
202
        paths : generator
203
            Generator of unwanted_path disjoint paths. If unwanted_path is
204
            not provided or empty, we return an empty list.
205
        """
206 1
        unwanted_links = [
207
            (link.endpoint_a.id, link.endpoint_b.id) for link in unwanted_path
208
        ]
209 1
        if not unwanted_links:
210 1
            return None
211
212 1
        paths = cls.get_paths(circuit, max_paths=cutoff,
213
                              **circuit.secondary_constraints)
214 1
        for path in paths:
215 1
            head = path["hops"][:-1]
216 1
            tail = path["hops"][1:]
217 1
            shared_edges = 0
218 1
            for (endpoint_a, endpoint_b) in unwanted_links:
219 1
                if ((endpoint_a, endpoint_b) in zip(head, tail)) or (
220
                    (endpoint_b, endpoint_a) in zip(head, tail)
221
                ):
222 1
                    shared_edges += 1
223 1
            path["disjointness"] = 1 - shared_edges / len(unwanted_links)
224 1
        paths = sorted(paths, key=lambda x: (-x['disjointness'], x['cost']))
225 1
        for path in paths:
226 1
            if path["disjointness"] == 0:
227 1
                continue
228 1
            yield cls.create_path(path["hops"])
229 1
        return None
230
231 1
    @classmethod
232 1
    def create_path(cls, path):
233
        """Return the path containing only the interfaces."""
234 1
        new_path = Path()
235 1
        clean_path = cls._clear_path(path)
236
237 1
        if len(clean_path) % 2:
238 1
            return None
239
240 1
        for link in zip(clean_path[1:-1:2], clean_path[2::2]):
241 1
            interface_a = cls.controller.get_interface_by_id(link[0])
242 1
            interface_b = cls.controller.get_interface_by_id(link[1])
243 1
            if interface_a is None or interface_b is None:
244 1
                return None
245 1
            new_path.append(Link(interface_a, interface_b))
246
247
        return new_path
248