Passed
Pull Request — master (#465)
by Vinicius
04:23
created

build.models.path   F

Complexity

Total Complexity 60

Size/Duplication

Total Lines 293
Duplicated Lines 0 %

Test Coverage

Coverage 95.57%

Importance

Changes 0
Metric Value
eloc 189
dl 0
loc 293
ccs 151
cts 158
cp 0.9557
rs 3.6
c 0
b 0
f 0
wmc 60

How to fix   Complexity   

Complexity

Complex classes like build.models.path often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Classes related to paths"""
2 1
import requests
3
4 1
from kytos.core import log
5 1
from kytos.core.common import EntityStatus, GenericEntity
6 1
from kytos.core.interface import TAG
7 1
from kytos.core.link import Link
8 1
from napps.kytos.mef_eline import settings
9 1
from napps.kytos.mef_eline.exceptions import InvalidPath
10
11
12 1
class Path(list, GenericEntity):
13
    """Class to represent a Path."""
14
15 1
    def __eq__(self, other=None):
16
        """Compare paths."""
17 1
        if not other or not isinstance(other, Path):
18 1
            return False
19 1
        return super().__eq__(other)
20
21 1
    def is_affected_by_link(self, link=None):
22
        """Verify if the current path is affected by link."""
23 1
        if not link:
24 1
            return False
25 1
        return link in self
26
27 1
    def link_affected_by_interface(self, interface=None):
28
        """Return the link using this interface, if any, or None otherwise."""
29 1
        if not interface:
30 1
            return None
31 1
        for link in self:
32 1
            if interface in (link.endpoint_a, link.endpoint_b):
33 1
                return link
34
        return None
35
36 1
    def choose_vlans(self, controller):
37
        """Choose the VLANs to be used for the circuit."""
38 1
        for link in self:
39 1
            tag_value = link.get_next_available_tag(controller, link.id)
40 1
            tag = TAG('vlan', tag_value)
41 1
            link.add_metadata("s_vlan", tag)
42
43 1
    def make_vlans_available(self, controller):
44
        """Make the VLANs used in a path available when undeployed."""
45 1
        for link in self:
46 1
            tag = link.get_metadata("s_vlan")
47 1
            conflict_a, conflict_b = link.make_tags_available(
48
                controller, tag.value, link.id, tag.tag_type,
49
                check_order=False
50
            )
51 1
            if conflict_a:
52 1
                log.error(f"Tags {conflict_a} was already available in"
53
                          f"{link.endpoint_a.id}")
54 1
            if conflict_b:
55 1
                log.error(f"Tags {conflict_b} was already available in"
56
                          f"{link.endpoint_b.id}")
57 1
            link.remove_metadata("s_vlan")
58
59 1
    def is_valid(self, switch_a, switch_z, is_scheduled=False):
60
        """Check if this is a valid path."""
61 1
        if not self:
62 1
            return True
63 1
        previous = visited = {switch_a}
64 1
        for link in self:
65 1
            current = {link.endpoint_a.switch, link.endpoint_b.switch} \
66
                      - previous
67 1
            if len(current) != 1:
68 1
                raise InvalidPath(
69
                    f"Previous switch {previous} is not connected to "
70
                    f"current link with switches {current}."
71
                )
72 1
            if current & visited:
73 1
                raise InvalidPath(
74
                    f"Loop detected in path, switch {current} was visited"
75
                    f" more than once."
76
                )
77 1
            if is_scheduled is False and (
78
                link.endpoint_a.link is None
79
                or link.endpoint_a.link != link
80
                or link.endpoint_b.link is None
81
                or link.endpoint_b.link != link
82
            ):
83
                raise InvalidPath(f"Link {link} is not available.")
84 1
            previous = current
85 1
            visited |= current
86 1
        if previous & {switch_z}:
87 1
            return True
88
        raise InvalidPath("Last link does not contain uni_z switch")
89
90 1
    @property
91 1
    def status(self) -> EntityStatus:
92
        """Check for the  status of a path.
93
94
        Each endpoint link is checked instead to have the same object
95
        ref as topology. If any link in this path isn't UP,
96
        the path isn't considered UP.
97
        """
98 1
        if not self:
99 1
            return EntityStatus.DISABLED
100
101 1
        for path_link in self:
102 1
            link = path_link.endpoint_a.link
103 1
            if not link or link != path_link.endpoint_b.link:
104 1
                return EntityStatus.DOWN
105 1
            if (status := link.status) != EntityStatus.UP:
106 1
                return status
107 1
        return EntityStatus.UP
108
109 1
    def as_dict(self):
110
        """Return list comprehension of links as_dict."""
111 1
        return [link.as_dict() for link in self if link]
112
113
114 1
class DynamicPathManager:
115
    """Class to handle and create paths."""
116
117 1
    controller = None
118
119 1
    @classmethod
120 1
    def set_controller(cls, controller=None):
121
        """Set the controller to discovery news paths."""
122 1
        cls.controller = controller
123
124 1
    @staticmethod
125 1
    def get_paths(circuit, max_paths=2, **kwargs) -> list[dict]:
126
        """Get a valid path for the circuit from the Pathfinder."""
127 1
        endpoint = settings.PATHFINDER_URL
128 1
        spf_attribute = kwargs.get("spf_attribute") or settings.SPF_ATTRIBUTE
129 1
        request_data = {
130
            "source": circuit.uni_a.interface.id,
131
            "destination": circuit.uni_z.interface.id,
132
            "spf_max_paths": max_paths,
133
            "spf_attribute": spf_attribute
134
        }
135 1
        request_data.update(kwargs)
136 1
        api_reply = requests.post(endpoint, json=request_data)
137
138 1
        if api_reply.status_code != getattr(requests.codes, "ok"):
139 1
            log.error(
140
                "Failed to get paths at %s. Returned %s. Payload %s. EVC %s",
141
                endpoint,
142
                api_reply.text,
143
                request_data,
144
                circuit,
145
            )
146 1
            return []
147 1
        reply_data = api_reply.json()
148 1
        return reply_data.get("paths", [])
149
150 1
    @staticmethod
151 1
    def _clear_path(path):
152
        """Remove switches from a path, returning only interfaces."""
153 1
        return [endpoint for endpoint in path if len(endpoint) > 23]
154
155 1
    @classmethod
156 1
    def get_best_path(cls, circuit):
157
        """Return the best path available for a circuit, if exists."""
158 1
        paths = cls.get_paths(circuit)
159 1
        if paths:
160 1
            return cls.create_path(cls.get_paths(circuit)[0]["hops"])
161 1
        return None
162
163 1
    @classmethod
164 1
    def get_best_paths(cls, circuit, **kwargs):
165
        """Return the best paths available for a circuit, if they exist."""
166 1
        for path in cls.get_paths(circuit, **kwargs):
167 1
            yield cls.create_path(path["hops"])
168
169 1
    @classmethod
170 1
    def get_disjoint_paths(
171
        cls, circuit, unwanted_path, cutoff=settings.DISJOINT_PATH_CUTOFF
172
    ):
173
        """Computes the maximum disjoint paths from the unwanted_path for a EVC
174
175
        Maximum disjoint paths from the unwanted_path are the paths from the
176
        source node to the target node that share the minimum number of links
177
        and switches contained in unwanted_path. In other words, unwanted_path
178
        is the path we want to avoid: we want the maximum possible disjoint
179
        path from it. The disjointness of a path in regards to unwanted_path
180
        is calculated by the complementary percentage of shared links and
181
        switches between them. As an example, if the unwanted_path has 3
182
        links and 2 switches, a given path P1 has 1 link shared with
183
        unwanted_path, and a given path P2 has 2 links and 1 switch shared
184
        with unwanted_path, then the disjointness of P1 is 0.8 and the
185
        disjointness of P2 is 0.4. In this example, P1 is preferable over P2
186
        because it offers a better disjoint path. When two paths have the same
187
        disjointness they are ordered by 'cost' attributed as returned from
188
        Pathfinder. When the disjointness of a path is equal to 0 (i.e., it
189
        shares all the links with unwanted_path), that particular path is not
190
        considered a candidate.
191
192
        Parameters:
193
        -----------
194
195
        circuit : EVC
196
            The EVC providing source node (uni_a) and target node (uni_z)
197
198
        unwanted_path : Path
199
            The Path which we want to avoid.
200
201
        cutoff: int
202
            Maximum number of paths to consider when calculating the disjoint
203
            paths (number of paths to request from pathfinder)
204
205
        Returns:
206
        --------
207
        paths : generator
208
            Generator of unwanted_path disjoint paths. If unwanted_path is
209
            not provided or empty, we return an empty list.
210
        """
211 1
        unwanted_links = [
212
            (link.endpoint_a.id, link.endpoint_b.id) for link in unwanted_path
213
        ]
214 1
        unwanted_switches = set()
215 1
        for link in unwanted_path:
216 1
            unwanted_switches.add(link.endpoint_a.switch.id)
217 1
            unwanted_switches.add(link.endpoint_b.switch.id)
218 1
        unwanted_switches.discard(circuit.uni_a.interface.switch.id)
219 1
        unwanted_switches.discard(circuit.uni_z.interface.switch.id)
220
221 1
        length_unwanted = (len(unwanted_links) + len(unwanted_switches))
222 1
        if not unwanted_links:
223 1
            return None
224
225 1
        paths = cls.get_paths(circuit, max_paths=cutoff,
226
                              **circuit.secondary_constraints)
227 1
        for path in paths:
228 1
            links_n, switches_n = cls.get_shared_components(
229
                path, unwanted_links, unwanted_switches
230
            )
231 1
            shared_components = links_n + switches_n
232 1
            path["disjointness"] = 1 - shared_components / length_unwanted
233 1
        paths = sorted(paths, key=lambda x: (-x['disjointness'], x['cost']))
234 1
        for path in paths:
235 1
            if path["disjointness"] == 0:
236 1
                continue
237 1
            yield cls.create_path(path["hops"])
238 1
        return None
239
240 1
    @staticmethod
241 1
    def get_shared_components(
242
        path: Path,
243
        unwanted_links: list[tuple[str, str]],
244
        unwanted_switches: set[str]
245
    ) -> tuple[int, int]:
246
        """Return the number of shared links
247
        and switches found in path."""
248 1
        head = path["hops"][:-1]
249 1
        tail = path["hops"][1:]
250 1
        shared_links = 0
251 1
        for (endpoint_a, endpoint_b) in unwanted_links:
252 1
            if ((endpoint_a, endpoint_b) in zip(head, tail)) or (
253
                (endpoint_b, endpoint_a) in zip(head, tail)
254
            ):
255 1
                shared_links += 1
256 1
        copy_switches = unwanted_switches.copy()
257 1
        shared_switches = 0
258 1
        for component in path["hops"]:
259 1
            if component in copy_switches:
260 1
                shared_switches += 1
261 1
                copy_switches.remove(component)
262 1
        return shared_links, shared_switches
263
264 1
    @classmethod
265 1
    def create_path(cls, path):
266
        """Return the path containing only the interfaces."""
267 1
        new_path = Path()
268 1
        clean_path = cls._clear_path(path)
269
270 1
        if len(clean_path) % 2:
271 1
            return None
272
273 1
        for link in zip(clean_path[1:-1:2], clean_path[2::2]):
274 1
            interface_a = cls.controller.get_interface_by_id(link[0])
275 1
            interface_b = cls.controller.get_interface_by_id(link[1])
276 1
            if interface_a is None or interface_b is None:
277 1
                return None
278 1
            new_path.append(Link(interface_a, interface_b))
279
280
        return new_path
281