Test Failed
Pull Request — master (#214)
by
unknown
03:41
created

build.models.evc.EVCDeploy._prepare_push_flow()   B

Complexity

Conditions 7

Size

Total Lines 56
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 7

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 56
ccs 28
cts 28
cp 1
rs 7.76
c 0
b 0
f 0
cc 7
nop 8
crap 7

How to fix   Long Method    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
from collections import OrderedDict
3 1
from datetime import datetime
4 1
from threading import Lock
5 1
from uuid import uuid4
6
7 1
import requests
8 1
from glom import glom
9
10 1
from kytos.core import log
11 1
from kytos.core.common import EntityStatus, GenericEntity
12 1
from kytos.core.exceptions import KytosNoTagAvailableError
13 1
from kytos.core.helpers import get_time, now
14 1
from kytos.core.interface import UNI
15 1
from napps.kytos.mef_eline import controllers, settings
16 1
from napps.kytos.mef_eline.exceptions import FlowModException, InvalidPath
17 1
from napps.kytos.mef_eline.utils import (compare_endpoint_trace, emit_event,
18
                                         notify_link_available_tags)
19
20 1
from .path import DynamicPathManager, Path
21
22
23 1
class EVCBase(GenericEntity):
24
    """Class to represent a circuit."""
25
26 1
    read_only_attributes = [
27
        "creation_time",
28
        "active",
29
        "current_path",
30
        "failover_path",
31
        "_id",
32
        "archived",
33
    ]
34 1
    attributes_requiring_redeploy = [
35
        "primary_path",
36
        "backup_path",
37
        "dynamic_backup_path",
38
        "queue_id",
39
        "sb_priority",
40
        "primary_constraints",
41
        "secondary_constraints"
42
    ]
43 1
    required_attributes = ["name", "uni_a", "uni_z"]
44
45 1
    def __init__(self, controller, **kwargs):
46
        """Create an EVC instance with the provided parameters.
47
48
        Args:
49
            id(str): EVC identifier. Whether it's None an ID will be genereted.
50
                     Only the first 14 bytes passed will be used.
51
            name: represents an EVC name.(Required)
52
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
53
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
54
            start_date(datetime|str): Date when the EVC was registred.
55
                                      Default is now().
56
            end_date(datetime|str): Final date that the EVC will be fineshed.
57
                                    Default is None.
58
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
59
            primary_links(list): Primary links used by evc. Default is []
60
            backup_links(list): Backups links used by evc. Default is []
61
            current_path(list): Circuit being used at the moment if this is an
62
                                active circuit. Default is [].
63
            failover_path(list): Path being used to provide EVC protection via
64
                                failover during link failures. Default is [].
65
            primary_path(list): primary circuit offered to user IF one or more
66
                                links were provided. Default is [].
67
            backup_path(list): backup circuit offered to the user IF one or
68
                               more links were provided. Default is [].
69
            dynamic_backup_path(bool): Enable computer backup path dynamically.
70
                                       Dafault is False.
71
            creation_time(datetime|str): datetime when the circuit should be
72
                                         activated. default is now().
73
            enabled(Boolean): attribute to indicate the administrative state;
74
                              default is False.
75
            active(Boolean): attribute to indicate the operational state;
76
                             default is False.
77
            archived(Boolean): indicate the EVC has been deleted and is
78
                               archived; default is False.
79
            owner(str): The EVC owner. Default is None.
80
            sb_priority(int): Service level provided in the request.
81
                              Default is None.
82
            service_level(int): Service level provided. The higher the better.
83
                                Default is 0.
84
85
        Raises:
86
            ValueError: raised when object attributes are invalid.
87
88
        """
89 1
        self._validate(**kwargs)
90 1
        super().__init__()
91
92
        # required attributes
93 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
94 1
        self.uni_a = kwargs.get("uni_a")
95 1
        self.uni_z = kwargs.get("uni_z")
96 1
        self.name = kwargs.get("name")
97
98
        # optional attributes
99 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
100 1
        self.end_date = get_time(kwargs.get("end_date")) or None
101 1
        self.queue_id = kwargs.get("queue_id", None)
102
103 1
        self.bandwidth = kwargs.get("bandwidth", 0)
104 1
        self.primary_links = Path(kwargs.get("primary_links", []))
105 1
        self.backup_links = Path(kwargs.get("backup_links", []))
106 1
        self.current_path = Path(kwargs.get("current_path", []))
107 1
        self.failover_path = Path(kwargs.get("failover_path", []))
108 1
        self.primary_path = Path(kwargs.get("primary_path", []))
109 1
        self.backup_path = Path(kwargs.get("backup_path", []))
110 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
111 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
112 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
113 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
114 1
        self.owner = kwargs.get("owner", None)
115 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
116
            "priority", None
117
        )
118 1
        self.service_level = kwargs.get("service_level", 0)
119 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
120
121 1
        self.current_links_cache = set()
122 1
        self.primary_links_cache = set()
123 1
        self.backup_links_cache = set()
124
125 1
        self.lock = Lock()
126
127 1
        self.archived = kwargs.get("archived", False)
128
129 1
        self.metadata = kwargs.get("metadata", {})
130
131 1
        self._controller = controller
132 1
        self._mongo_controller = controllers.ELineController()
133
134 1
        if kwargs.get("active", False):
135 1
            self.activate()
136
        else:
137 1
            self.deactivate()
138
139 1
        if kwargs.get("enabled", False):
140 1
            self.enable()
141
        else:
142 1
            self.disable()
143
144
        # datetime of user request for a EVC (or datetime when object was
145
        # created)
146 1
        self.request_time = kwargs.get("request_time", now())
147
        # dict with the user original request (input)
148 1
        self._requested = kwargs
149
150 1
    def sync(self):
151
        """Sync this EVC in the MongoDB."""
152 1
        self._mongo_controller.upsert_evc(self.as_dict())
153
154 1
    def update(self, **kwargs):
155
        """Update evc attributes.
156
157
        This method will raises an error trying to change the following
158
        attributes: [name, uni_a and uni_z]
159
160
        Returns:
161
            the values for enable and a redeploy attribute, if exists and None
162
            otherwise
163
        Raises:
164
            ValueError: message with error detail.
165
166
        """
167 1
        enable, redeploy = (None, None)
168 1
        uni_a = kwargs.get("uni_a") or self.uni_a
169 1
        uni_z = kwargs.get("uni_z") or self.uni_z
170 1
        for attribute, value in kwargs.items():
171 1
            if attribute in self.read_only_attributes:
172 1
                raise ValueError(f"{attribute} can't be updated.")
173 1
            if not hasattr(self, attribute):
174 1
                raise ValueError(f'The attribute "{attribute}" is invalid.')
175 1
            if attribute in ("primary_path", "backup_path"):
176 1
                try:
177 1
                    value.is_valid(
178
                        uni_a.interface.switch, uni_z.interface.switch
179
                    )
180 1
                except InvalidPath as exception:
181 1
                    raise ValueError(  # pylint: disable=raise-missing-from
182
                        f"{attribute} is not a " f"valid path: {exception}"
183
                    )
184 1
        for attribute, value in kwargs.items():
185 1
            if attribute in ("enable", "enabled"):
186 1
                if value:
187 1
                    self.enable()
188
                else:
189 1
                    self.disable()
190 1
                enable = value
191
            else:
192 1
                setattr(self, attribute, value)
193 1
                if attribute in self.attributes_requiring_redeploy:
194 1
                    redeploy = value
195 1
        self.sync()
196 1
        return enable, redeploy
197
198 1
    def __repr__(self):
199
        """Repr method."""
200 1
        return f"EVC({self._id}, {self.name})"
201
202 1
    def _validate(self, **kwargs):
203
        """Do Basic validations.
204
205
        Verify required attributes: name, uni_a, uni_z
206
        Verify if the attributes uni_a and uni_z are valid.
207
208
        Raises:
209
            ValueError: message with error detail.
210
211
        """
212 1
        for attribute in self.required_attributes:
213
214 1
            if attribute not in kwargs:
215 1
                raise ValueError(f"{attribute} is required.")
216
217 1
            if "uni" in attribute:
218 1
                uni = kwargs.get(attribute)
219 1
                if not isinstance(uni, UNI):
220
                    raise ValueError(f"{attribute} is an invalid UNI.")
221
222 1
                if not uni.is_valid():
223 1
                    tag = uni.user_tag.value
224 1
                    message = f"VLAN tag {tag} is not available in {attribute}"
225 1
                    raise ValueError(message)
226
227 1
    def __eq__(self, other):
228
        """Override the default implementation."""
229 1
        if not isinstance(other, EVC):
230
            return False
231
232 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
233 1
        for attribute in attrs_to_compare:
234 1
            if getattr(other, attribute) != getattr(self, attribute):
235 1
                return False
236 1
        return True
237
238 1
    def shares_uni(self, other):
239
        """Check if two EVCs share an UNI."""
240 1
        if other.uni_a in (self.uni_a, self.uni_z) or other.uni_z in (
241
            self.uni_a,
242
            self.uni_z,
243
        ):
244 1
            return True
245
        return False
246
247 1
    def as_dict(self):
248
        """Return a dictionary representing an EVC object."""
249 1
        evc_dict = {
250
            "id": self.id,
251
            "name": self.name,
252
            "uni_a": self.uni_a.as_dict(),
253
            "uni_z": self.uni_z.as_dict(),
254
        }
255
256 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
257
258 1
        evc_dict["start_date"] = self.start_date
259 1
        if isinstance(self.start_date, datetime):
260 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
261
262 1
        evc_dict["end_date"] = self.end_date
263 1
        if isinstance(self.end_date, datetime):
264 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
265
266 1
        evc_dict["queue_id"] = self.queue_id
267 1
        evc_dict["bandwidth"] = self.bandwidth
268 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
269 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
270 1
        evc_dict["current_path"] = self.current_path.as_dict()
271 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
272 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
273 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
274 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
275 1
        evc_dict["metadata"] = self.metadata
276
277 1
        evc_dict["request_time"] = self.request_time
278 1
        if isinstance(self.request_time, datetime):
279 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
280
281 1
        time = self.creation_time.strftime(time_fmt)
282 1
        evc_dict["creation_time"] = time
283
284 1
        evc_dict["owner"] = self.owner
285 1
        evc_dict["circuit_scheduler"] = [
286
            sc.as_dict() for sc in self.circuit_scheduler
287
        ]
288
289 1
        evc_dict["active"] = self.is_active()
290 1
        evc_dict["enabled"] = self.is_enabled()
291 1
        evc_dict["archived"] = self.archived
292 1
        evc_dict["sb_priority"] = self.sb_priority
293 1
        evc_dict["service_level"] = self.service_level
294 1
        evc_dict["primary_constraints"] = self.primary_constraints
295 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
296
297 1
        return evc_dict
298
299 1
    @property
300 1
    def id(self):  # pylint: disable=invalid-name
301
        """Return this EVC's ID."""
302 1
        return self._id
303
304 1
    def archive(self):
305
        """Archive this EVC on deletion."""
306 1
        self.archived = True
307
308
309
# pylint: disable=fixme, too-many-public-methods
310 1
class EVCDeploy(EVCBase):
311
    """Class to handle the deploy procedures."""
312
313 1
    def create(self):
314
        """Create a EVC."""
315
316 1
    def discover_new_paths(self):
317
        """Discover new paths to satisfy this circuit and deploy it."""
318
        return DynamicPathManager.get_best_paths(self,
319
                                                 **self.primary_constraints)
320
321 1
    def get_failover_path_candidates(self):
322
        """Get failover paths to satisfy this EVC."""
323
        # in the future we can return primary/backup paths as well
324
        # we just have to properly handle link_up and failover paths
325
        # if (
326
        #     self.is_using_primary_path() and
327
        #     self.backup_path.status is EntityStatus.UP
328
        # ):
329
        #     yield self.backup_path
330 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
331
332 1
    def change_path(self):
333
        """Change EVC path."""
334
335 1
    def reprovision(self):
336
        """Force the EVC (re-)provisioning."""
337
338 1
    def is_affected_by_link(self, link):
339
        """Return True if this EVC has the given link on its current path."""
340 1
        return link in self.current_path
341
342 1
    def link_affected_by_interface(self, interface):
343
        """Return True if this EVC has the given link on its current path."""
344
        return self.current_path.link_affected_by_interface(interface)
345
346 1
    def is_backup_path_affected_by_link(self, link):
347
        """Return True if the backup path of this EVC uses the given link."""
348 1
        return link in self.backup_path
349
350
    # pylint: disable=invalid-name
351 1
    def is_primary_path_affected_by_link(self, link):
352
        """Return True if the primary path of this EVC uses the given link."""
353 1
        return link in self.primary_path
354
355 1
    def is_failover_path_affected_by_link(self, link):
356
        """Return True if this EVC has the given link on its failover path."""
357 1
        return link in self.failover_path
358
359 1
    def is_eligible_for_failover_path(self):
360
        """Verify if this EVC is eligible for failover path (EP029)"""
361
        # In the future this function can be augmented to consider
362
        # primary/backup, primary/dynamic, and other path combinations
363 1
        return (
364
            self.dynamic_backup_path and
365
            not self.primary_path and not self.backup_path
366
        )
367
368 1
    def is_using_primary_path(self):
369
        """Verify if the current deployed path is self.primary_path."""
370 1
        return self.primary_path and (self.current_path == self.primary_path)
371
372 1
    def is_using_backup_path(self):
373
        """Verify if the current deployed path is self.backup_path."""
374 1
        return self.backup_path and (self.current_path == self.backup_path)
375
376 1
    def is_using_dynamic_path(self):
377
        """Verify if the current deployed path is a dynamic path."""
378 1
        if (
379
            self.current_path
380
            and not self.is_using_primary_path()
381
            and not self.is_using_backup_path()
382
            and self.current_path.status == EntityStatus.UP
383
        ):
384
            return True
385 1
        return False
386
387 1
    def deploy_to_backup_path(self):
388
        """Deploy the backup path into the datapaths of this circuit.
389
390
        If the backup_path attribute is valid and up, this method will try to
391
        deploy this backup_path.
392
393
        If everything fails and dynamic_backup_path is True, then tries to
394
        deploy a dynamic path.
395
        """
396
        # TODO: Remove flows from current (cookies)
397 1
        if self.is_using_backup_path():
398
            # TODO: Log to say that cannot move backup to backup
399
            return True
400
401 1
        success = False
402 1
        if self.backup_path.status is EntityStatus.UP:
403 1
            success = self.deploy_to_path(self.backup_path)
404
405 1
        if success:
406 1
            return True
407
408 1
        if (
409
            self.dynamic_backup_path
410
            or self.uni_a.interface.switch == self.uni_z.interface.switch
411
        ):
412 1
            return self.deploy_to_path()
413
414
        return False
415
416 1
    def deploy_to_primary_path(self):
417
        """Deploy the primary path into the datapaths of this circuit.
418
419
        If the primary_path attribute is valid and up, this method will try to
420
        deploy this primary_path.
421
        """
422
        # TODO: Remove flows from current (cookies)
423 1
        if self.is_using_primary_path():
424
            # TODO: Log to say that cannot move primary to primary
425
            return True
426
427 1
        if self.primary_path.status is EntityStatus.UP:
428 1
            return self.deploy_to_path(self.primary_path)
429 1
        return False
430
431 1
    def deploy(self):
432
        """Deploy EVC to best path.
433
434
        Best path can be the primary path, if available. If not, the backup
435
        path, and, if it is also not available, a dynamic path.
436
        """
437 1
        if self.archived:
438 1
            return False
439 1
        self.enable()
440 1
        success = self.deploy_to_primary_path()
441 1
        if not success:
442 1
            success = self.deploy_to_backup_path()
443
444 1
        if success:
445 1
            emit_event(self._controller, "deployed", evc_id=self.id)
446 1
        return success
447
448 1
    @staticmethod
449 1
    def get_path_status(path):
450
        """Check for the current status of a path.
451
452
        If any link in this path is down, the path is considered down.
453
        """
454 1
        if not path:
455 1
            return EntityStatus.DISABLED
456
457 1
        for link in path:
458 1
            if link.status is not EntityStatus.UP:
459 1
                return link.status
460 1
        return EntityStatus.UP
461
462
    #    def discover_new_path(self):
463
    #        # TODO: discover a new path to satisfy this circuit and deploy
464
465 1
    def remove(self):
466
        """Remove EVC path and disable it."""
467 1
        self.remove_current_flows()
468 1
        self.remove_path_flows(self.failover_path)
469 1
        self.disable()
470 1
        self.sync()
471 1
        emit_event(self._controller, "undeployed", evc_id=self.id)
472
473 1
    def remove_failover_flows(self, exclude_uni_switches=True,
474
                              force=True, sync=True) -> None:
475
        """Remove failover_flows.
476
477
        By default, it'll exclude UNI switches, if mef_eline has already
478
        called remove_current_flows before then this minimizes the number
479
        of FlowMods and IO.
480
        """
481 1
        if not self.failover_path:
482 1
            return
483 1
        switches, cookie, excluded = OrderedDict(), self.get_cookie(), set()
484 1
        links = set()
485 1
        if exclude_uni_switches:
486 1
            excluded.add(self.uni_a.interface.switch.id)
487 1
            excluded.add(self.uni_z.interface.switch.id)
488 1
        for link in self.failover_path:
489 1
            if link.endpoint_a.switch.id not in excluded:
490 1
                switches[link.endpoint_a.switch.id] = link.endpoint_a.switch
491 1
                links.add(link)
492 1
            if link.endpoint_b.switch.id not in excluded:
493 1
                switches[link.endpoint_b.switch.id] = link.endpoint_b.switch
494 1
                links.add(link)
495 1
        for switch in switches.values():
496 1
            try:
497 1
                self._send_flow_mods(
498
                    switch.id,
499
                    [
500
                        {
501
                            "cookie": cookie,
502
                            "cookie_mask": int(0xffffffffffffffff),
503
                        }
504
                    ],
505
                    "delete",
506
                    force=force,
507
                )
508
            except FlowModException as err:
509
                log.error(
510
                    f"Error removing flows from switch {switch.id} for"
511
                    f"EVC {self}: {err}"
512
                )
513 1
        for link in links:
514 1
            link.make_tag_available(link.get_metadata("s_vlan"))
515 1
            link.remove_metadata("s_vlan")
516 1
            notify_link_available_tags(self._controller, link)
517 1
        self.failover_path = Path([])
518 1
        if sync:
519 1
            self.sync()
520
521 1
    def remove_current_flows(self, current_path=None, force=True):
522
        """Remove all flows from current path."""
523 1
        switches = set()
524
525 1
        switches.add(self.uni_a.interface.switch)
526 1
        switches.add(self.uni_z.interface.switch)
527 1
        if not current_path:
528 1
            current_path = self.current_path
529 1
        for link in current_path:
530 1
            switches.add(link.endpoint_a.switch)
531 1
            switches.add(link.endpoint_b.switch)
532
533 1
        match = {
534
            "cookie": self.get_cookie(),
535
            "cookie_mask": int(0xffffffffffffffff)
536
        }
537
538 1
        for switch in switches:
539 1
            try:
540 1
                self._send_flow_mods(switch.id, [match], 'delete', force=force)
541 1
            except FlowModException as err:
542 1
                log.error(
543
                    f"Error removing flows from switch {switch.id} for"
544
                    f"EVC {self}: {err}"
545
                )
546
547 1
        current_path.make_vlans_available()
548 1
        for link in current_path:
549 1
            notify_link_available_tags(self._controller, link)
550 1
        self.current_path = Path([])
551 1
        self.deactivate()
552 1
        self.sync()
553
554 1
    def remove_path_flows(self, path=None, force=True):
555
        """Remove all flows from path."""
556 1
        if not path:
557 1
            return
558
559 1
        dpid_flows_match = {}
560 1
        for dpid, flows in self._prepare_nni_flows(path).items():
561 1
            dpid_flows_match.setdefault(dpid, [])
562 1
            for flow in flows:
563 1
                dpid_flows_match[dpid].append({
564
                    "cookie": flow["cookie"],
565
                    "match": flow["match"],
566
                    "cookie_mask": int(0xffffffffffffffff)
567
                })
568 1
        for dpid, flows in self._prepare_uni_flows(path, skip_in=True).items():
569 1
            dpid_flows_match.setdefault(dpid, [])
570 1
            for flow in flows:
571 1
                dpid_flows_match[dpid].append({
572
                    "cookie": flow["cookie"],
573
                    "match": flow["match"],
574
                    "cookie_mask": int(0xffffffffffffffff)
575
                })
576
577 1
        for dpid, flows in dpid_flows_match.items():
578 1
            try:
579 1
                self._send_flow_mods(dpid, flows, 'delete', force=force)
580 1
            except FlowModException as err:
581 1
                log.error(
582
                    "Error removing failover flows: "
583
                    f"dpid={dpid} evc={self} error={err}"
584
                )
585
586 1
        path.make_vlans_available()
587 1
        for link in path:
588 1
            notify_link_available_tags(self._controller, link)
589
590 1
    @staticmethod
591 1
    def links_zipped(path=None):
592
        """Return an iterator which yields pairs of links in order."""
593 1
        if not path:
594
            return []
595 1
        return zip(path[:-1], path[1:])
596
597 1
    def should_deploy(self, path=None):
598
        """Verify if the circuit should be deployed."""
599 1
        if not path:
600 1
            log.debug("Path is empty.")
601 1
            return False
602
603 1
        if not self.is_enabled():
604 1
            log.debug(f"{self} is disabled.")
605 1
            return False
606
607 1
        if not self.is_active():
608 1
            log.debug(f"{self} will be deployed.")
609 1
            return True
610
611 1
        return False
612
613 1
    def deploy_to_path(self, path=None):  # pylint: disable=too-many-branches
614
        """Install the flows for this circuit.
615
616
        Procedures to deploy:
617
618
        0. Remove current flows installed
619
        1. Decide if will deploy "path" or discover a new path
620
        2. Choose vlan
621
        3. Install NNI flows
622
        4. Install UNI flows
623
        5. Activate
624
        6. Update current_path
625
        7. Update links caches(primary, current, backup)
626
627
        """
628 1
        self.remove_current_flows()
629 1
        use_path = path
630 1
        if self.should_deploy(use_path):
631 1
            try:
632 1
                use_path.choose_vlans()
633 1
                for link in use_path:
634 1
                    notify_link_available_tags(self._controller, link)
635 1
            except KytosNoTagAvailableError:
636 1
                use_path = None
637
        else:
638 1
            for use_path in self.discover_new_paths():
639 1
                if use_path is None:
640
                    continue
641 1
                try:
642 1
                    use_path.choose_vlans()
643 1
                    for link in use_path:
644 1
                        notify_link_available_tags(self._controller, link)
645 1
                    break
646 1
                except KytosNoTagAvailableError:
647 1
                    pass
648
            else:
649 1
                use_path = None
650
651 1
        try:
652 1
            if use_path:
653 1
                self._install_nni_flows(use_path)
654 1
                self._install_uni_flows(use_path)
655 1
            elif self.uni_a.interface.switch == self.uni_z.interface.switch:
656 1
                use_path = Path()
657 1
                self._install_direct_uni_flows()
658
            else:
659 1
                log.warning(
660
                    f"{self} was not deployed. " "No available path was found."
661
                )
662 1
                return False
663 1
        except FlowModException as err:
664 1
            log.error(
665
                f"Error deploying EVC {self} when calling flow_manager: {err}"
666
            )
667 1
            self.remove_current_flows(use_path)
668 1
            return False
669 1
        self.activate()
670 1
        self.current_path = use_path
671 1
        self.sync()
672 1
        log.info(f"{self} was deployed.")
673 1
        return True
674
675 1
    def setup_failover_path(self):
676
        """Install flows for the failover path of this EVC.
677
678
        Procedures to deploy:
679
680
        0. Remove flows currently installed for failover_path (if any)
681
        1. Discover a disjoint path from current_path
682
        2. Choose vlans
683
        3. Install NNI flows
684
        4. Install UNI egress flows
685
        5. Update failover_path
686
        """
687
        # Intra-switch EVCs have no failover_path
688 1
        if self.uni_a.interface.switch == self.uni_z.interface.switch:
689 1
            return False
690
691
        # For not only setup failover path for totally dynamic EVCs
692 1
        if not self.is_eligible_for_failover_path():
693 1
            return False
694
695 1
        reason = ""
696 1
        self.remove_path_flows(self.failover_path)
697 1
        for use_path in self.get_failover_path_candidates():
698 1
            if not use_path:
699 1
                continue
700 1
            try:
701 1
                use_path.choose_vlans()
702 1
                for link in use_path:
703 1
                    notify_link_available_tags(self._controller, link)
704 1
                break
705 1
            except KytosNoTagAvailableError:
706 1
                pass
707
        else:
708 1
            use_path = Path([])
709 1
            reason = "No available path was found"
710
711 1
        try:
712 1
            if use_path:
713 1
                self._install_nni_flows(use_path)
714 1
                self._install_uni_flows(use_path, skip_in=True)
715 1
        except FlowModException as err:
716 1
            reason = "Error deploying failover path"
717 1
            log.error(
718
                f"{reason} for {self}. FlowManager error: {err}"
719
            )
720 1
            self.remove_path_flows(use_path)
721 1
            use_path = Path([])
722
723 1
        self.failover_path = use_path
724 1
        self.sync()
725
726 1
        if not use_path:
727 1
            log.warning(
728
                f"Failover path for {self} was not deployed: {reason}"
729
            )
730 1
            return False
731 1
        log.info(f"Failover path for {self} was deployed.")
732 1
        return True
733
734 1
    def get_failover_flows(self):
735
        """Return the flows needed to make the failover path active, i.e. the
736
        flows for ingress forwarding.
737
738
        Return:
739
            dict: A dict of flows indexed by the switch_id will be returned, or
740
                an empty dict if no failover_path is available.
741
        """
742 1
        if not self.failover_path:
743 1
            return {}
744 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
745
746 1
    def _prepare_direct_uni_flows(self):
747
        """Prepare flows connecting two UNIs for intra-switch EVC."""
748 1
        a_priority = self.uni_a.sb_priority
749 1
        if a_priority is None:
750
            if self.uni_a.user_tag:
751 1
                a_priority = settings.EVPL_SB_PRIORITY
752
            else:
753
                a_priority = settings.EPL_SB_PRIORITY
754 1
        vlan_a = self.uni_a.user_tag.value if self.uni_a.user_tag else None
755
756
        z_priority = self.uni_z.sb_priority
757
        if z_priority is None:
758 1
            if self.uni_z.user_tag:
759 1
                z_priority = settings.EVPL_SB_PRIORITY
760 1
            else:
761 1
                z_priority = settings.EPL_SB_PRIORITY
762
        vlan_z = self.uni_z.user_tag.value if self.uni_z.user_tag else None
763
764 1
        flow_mod_az = self._prepare_flow_mod(
765
            self.uni_a.interface, self.uni_z.interface,
766
            a_priority, self.queue_id
767 1
        )
768 1
        flow_mod_za = self._prepare_flow_mod(
769 1
            self.uni_z.interface, self.uni_a.interface,
770 1
            z_priority, self.queue_id
771
        )
772
773 1
        if vlan_a and vlan_z:
774 1
            flow_mod_az["match"]["dl_vlan"] = vlan_a
775 1
            flow_mod_za["match"]["dl_vlan"] = vlan_z
776 1
            flow_mod_az["actions"].insert(
777
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
778
            )
779 1
            flow_mod_za["actions"].insert(
780
                0, {"action_type": "set_vlan", "vlan_id": vlan_a}
781
            )
782
        elif vlan_a:
783 1
            flow_mod_az["match"]["dl_vlan"] = vlan_a
784
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
785
            flow_mod_za["actions"].insert(
786
                0, {"action_type": "set_vlan", "vlan_id": vlan_a}
787
            )
788
        elif vlan_z:
789 1
            flow_mod_za["match"]["dl_vlan"] = vlan_z
790 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
791
            flow_mod_az["actions"].insert(
792 1
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
793
            )
794 1
        return (
795 1
            self.uni_a.interface.switch.id, [flow_mod_az, flow_mod_za]
796 1
        )
797 1
798
    def _install_direct_uni_flows(self):
799 1
        """Install flows connecting two UNIs.
800
801 1
        This case happens when the circuit is between UNIs in the
802
        same switch.
803
        """
804
        (dpid, flows) = self._prepare_direct_uni_flows()
805
        self._send_flow_mods(dpid, flows)
806
807
    def _prepare_nni_flows(self, path=None):
808
        """Prepare NNI flows."""
809
        nni_flows = OrderedDict()
810
        for incoming, outcoming in self.links_zipped(path):
811
            in_vlan = incoming.get_metadata("s_vlan").value
812 1
            out_vlan = outcoming.get_metadata("s_vlan").value
813
814
            flows = []
815
            # Flow for one direction
816
            flows.append(
817
                self._prepare_nni_flow(
818
                    incoming.endpoint_b,
819
                    outcoming.endpoint_a,
820
                    in_vlan,
821 1
                    out_vlan,
822 1
                    queue_id=self.queue_id,
823
                )
824 1
            )
825
826 1
            # Flow for the other direction
827 1
            flows.append(
828
                self._prepare_nni_flow(
829 1
                    outcoming.endpoint_a,
830
                    incoming.endpoint_b,
831 1
                    out_vlan,
832 1
                    in_vlan,
833 1
                    queue_id=self.queue_id,
834 1
                )
835
            )
836
            nni_flows[incoming.endpoint_b.switch.id] = flows
837 1
        return nni_flows
838 1
839
    def _install_nni_flows(self, path=None):
840 1
        """Install NNI flows."""
841 1
        for dpid, flows in self._prepare_nni_flows(path).items():
842
            self._send_flow_mods(dpid, flows)
843
844 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
845
        """Prepare flows to install UNIs."""
846
        uni_flows = {}
847 1
        if not path:
848 1
            log.info("install uni flows without path.")
849
            return uni_flows
850
851
        # Determine VLANs
852
        in_vlan_a = self.uni_a.user_tag.value if self.uni_a.user_tag else None
853
        out_vlan_a = path[0].get_metadata("s_vlan").value
854
855
        in_vlan_z = self.uni_z.user_tag.value if self.uni_z.user_tag else None
856 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
857
858
        # Flows for the first UNI
859 1
        flows_a = []
860 1
861
        # Flow for one direction, pushing the service tag
862
        if not skip_in:
863
            push_flow = self._prepare_push_flow(
864
                self.uni_a.interface,
865
                path[0].endpoint_a,
866 1
                in_vlan_a,
867
                out_vlan_a,
868 1
                in_vlan_z,
869
                self.uni_a.sb_priority,
870
                queue_id=self.queue_id,
871 1
            )
872
            flows_a.append(push_flow)
873
874 1
        # Flow for the other direction, popping the service tag
875 1
        if not skip_out:
876
            pop_flow = self._prepare_pop_flow(
877
                path[0].endpoint_a,
878
                self.uni_a.interface,
879
                out_vlan_a,
880
                queue_id=self.queue_id,
881
            )
882
            flows_a.append(pop_flow)
883 1
884
        uni_flows[self.uni_a.interface.switch.id] = flows_a
885
886 1
        # Flows for the second UNI
887 1
        flows_z = []
888
889
        # Flow for one direction, pushing the service tag
890
        if not skip_in:
891
            push_flow = self._prepare_push_flow(
892
                self.uni_z.interface,
893 1
                path[-1].endpoint_b,
894
                in_vlan_z,
895 1
                out_vlan_z,
896
                in_vlan_a,
897 1
                self.uni_z.sb_priority,
898
                queue_id=self.queue_id,
899 1
            )
900
            flows_z.append(push_flow)
901 1
902
        # Flow for the other direction, popping the service tag
903 1
        if not skip_out:
904 1
            pop_flow = self._prepare_pop_flow(
905
                path[-1].endpoint_b,
906 1
                self.uni_z.interface,
907 1
                out_vlan_z,
908
                queue_id=self.queue_id,
909
            )
910
            flows_z.append(pop_flow)
911
912
        uni_flows[self.uni_z.interface.switch.id] = flows_z
913
914
        return uni_flows
915
916
    def _install_uni_flows(self, path=None, skip_in=False, skip_out=False):
917
        """Install UNI flows."""
918 1
        uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)
919
920 1
        for (dpid, flows) in uni_flows.items():
921 1
            self._send_flow_mods(dpid, flows)
922 1
923 1
    @staticmethod
924
    def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
925 1
        """Send a flow_mod list to a specific switch.
926
927 1
        Args:
928
            dpid(str): The target of flows (i.e. Switch.id).
929 1
            flow_mods(dict): Python dictionary with flow_mods.
930 1
            command(str): By default is 'flows'. To remove a flow is 'remove'.
931
            force(bool): True to send via consistency check in case of errors
932 1
933 1
        """
934
935 1
        endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"
936
937 1
        data = {"flows": flow_mods, "force": force}
938
        response = requests.post(endpoint, json=data)
939
        if response.status_code >= 400:
940 1
            raise FlowModException(str(response.text))
941
942
    def get_cookie(self):
943
        """Return the cookie integer from evc id."""
944
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
945 1
946
    @staticmethod
947
    def get_id_from_cookie(cookie):
948
        """Return the evc id given a cookie value."""
949
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
950 1
        return f"{evc_id:x}".zfill(14)
951
952
    def _prepare_flow_mod(self, in_interface, out_interface,
953 1
                          priority, queue_id=None):
954
        """Prepare a common flow mod."""
955 1
        default_actions = [
956
            {"action_type": "output", "port": out_interface.port_number}
957 1
        ]
958 1
        if queue_id is not None:
959
            default_actions.append(
960
                {"action_type": "set_queue", "queue_id": queue_id}
961 1
            )
962
963 1
        flow_mod = {
964 1
            "match": {"in_port": in_interface.port_number},
965
            "cookie": self.get_cookie(),
966 1
            "actions": default_actions,
967
        }
968 1
        flow_mod["priority"] = priority
969
        return flow_mod
970
971
    def _prepare_nni_flow(self, *args, queue_id=None):
972
        """Create NNI flows."""
973
        in_interface, out_interface, in_vlan, out_vlan = args
974
        sb_priority = self.sb_priority
975
        if sb_priority is None:
976
            sb_priority = settings.EVPL_SB_PRIORITY
977
        flow_mod = self._prepare_flow_mod(
978
            in_interface, out_interface, sb_priority, queue_id
979
        )
980
        flow_mod["match"]["dl_vlan"] = in_vlan
981
982
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
983 1
        flow_mod["actions"].insert(0, new_action)
984
985 1
        return flow_mod
986
987
    # pylint: disable=too-many-arguments
988
    def _prepare_push_flow(self,
989
                           in_interface,
990 1
                           out_interface,
991 1
                           in_vlan,
992
                           out_vlan,
993 1
                           new_c_vlan,
994 1
                           sp_priority,
995
                           queue_id=None):
996 1
        """Prepare push flow.
997
998 1
        Arguments:
999 1
            in_interface(str): Interface input.
1000
            out_interface(str): Interface output.
1001 1
            in_vlan(str): Vlan input.
1002 1
            out_vlan(str): Vlan output.
1003 1
            new_c_vlan(str): New client vlan.
1004
1005
        Return:
1006 1
            dict: An python dictionary representing a FlowMod
1007 1
1008 1
        """
1009
        # assign all arguments
1010
        if sp_priority is None:
1011 1
            if in_vlan:
1012 1
                sp_priority = settings.EVPL_SB_PRIORITY
1013 1
            else:
1014
                sp_priority = settings.EPL_SB_PRIORITY
1015 1
        flow_mod = self._prepare_flow_mod(
1016
            in_interface, out_interface, sp_priority, queue_id
1017
        )
1018
1019
        # the service tag must be always pushed
1020 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1021
        flow_mod["actions"].insert(0, new_action)
1022
1023 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1024 1
        flow_mod["actions"].insert(0, new_action)
1025 1
1026 1
        if in_vlan:
1027
            # if in_vlan is set, it must be included in the match
1028 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1029 1
        if new_c_vlan:
1030
            # new_in_vlan is set, so an action to set it is necessary
1031 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1032 1
            flow_mod["actions"].insert(0, new_action)
1033
            if not in_vlan:
1034
                # new_in_vlan is set, but in_vlan is not, so there was no
1035
                # vlan set; then it is set now
1036
                new_action = {"action_type": "push_vlan", "tag_type": "c"}
1037
                flow_mod["actions"].insert(0, new_action)
1038
        elif in_vlan:
1039
            # in_vlan is set, but new_in_vlan is not, so the existing vlan
1040 1
            # must be removed
1041 1
            new_action = {"action_type": "pop_vlan"}
1042
            flow_mod["actions"].insert(0, new_action)
1043
        return flow_mod
1044
1045 1
    def _prepare_pop_flow(
1046 1
        self, in_interface, out_interface, out_vlan, queue_id=None
1047 1
    ):
1048 1
        # pylint: disable=too-many-arguments
1049 1
        """Prepare pop flow."""
1050
        sb_priority = self.sb_priority
1051 1
        if sb_priority is None:
1052
            sb_priority = settings.EVPL_SB_PRIORITY
1053 1
        flow_mod = self._prepare_flow_mod(
1054 1
            in_interface, out_interface, sb_priority, queue_id
1055 1
        )
1056 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1057 1
        new_action = {"action_type": "pop_vlan"}
1058 1
        flow_mod["actions"].insert(0, new_action)
1059 1
        return flow_mod
1060 1
1061
    @staticmethod
1062 1
    def run_sdntrace(uni):
1063
        """Run SDN trace on control plane starting from EVC UNIs."""
1064
        endpoint = f"{settings.SDN_TRACE_CP_URL}/trace"
1065 1
        data_uni = {
1066
            "trace": {
1067
                "switch": {
1068 1
                    "dpid": uni.interface.switch.dpid,
1069 1
                    "in_port": uni.interface.port_number,
1070 1
                }
1071
            }
1072
        }
1073 1
        if uni.user_tag:
1074 1
            data_uni["trace"]["eth"] = {
1075
                "dl_type": 0x8100,
1076 1
                "dl_vlan": uni.user_tag.value,
1077
            }
1078
        response = requests.put(endpoint, json=data_uni)
1079 1
        if response.status_code >= 400:
1080
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1081
            return []
1082 1
        return response.json().get('result', [])
1083
1084
    def check_traces(self):
1085
        """Check if current_path is deployed comparing with SDN traces."""
1086 1
        trace_a = self.run_sdntrace(self.uni_a)
1087
        if len(trace_a) != len(self.current_path) + 1:
1088 1
            log.warning(f"Invalid trace from uni_a: {trace_a}")
1089
            return False
1090 1
        trace_z = self.run_sdntrace(self.uni_z)
1091
        if len(trace_z) != len(self.current_path) + 1:
1092 1
            log.warning(f"Invalid trace from uni_z: {trace_z}")
1093
            return False
1094 1
1095
        for link, trace1, trace2 in zip(self.current_path,
1096 1
                                        trace_a[1:],
1097
                                        trace_z[:0:-1]):
1098
            if compare_endpoint_trace(
1099
               link.endpoint_a,
1100
               glom(link.metadata, 's_vlan.value'), trace2) is False:
1101
                log.warning(f"Invalid trace from uni_a: {trace_a}")
1102
                return False
1103 1
            if compare_endpoint_trace(
1104
               link.endpoint_b,
1105 1
               glom(link.metadata, 's_vlan.value'), trace1) is False:
1106
                log.warning(f"Invalid trace from uni_z: {trace_z}")
1107 1
                return False
1108 1
1109 1
        return True
1110
1111 1
1112 1
class LinkProtection(EVCDeploy):
1113
    """Class to handle link protection."""
1114 1
1115
    def is_affected_by_link(self, link=None):
1116 1
        """Verify if the current path is affected by link down event."""
1117
        return self.current_path.is_affected_by_link(link)
1118
1119
    def is_using_primary_path(self):
1120
        """Verify if the current deployed path is self.primary_path."""
1121
        return self.current_path == self.primary_path
1122
1123 1
    def is_using_backup_path(self):
1124 1
        """Verify if the current deployed path is self.backup_path."""
1125
        return self.current_path == self.backup_path
1126 1
1127 1
    def is_using_dynamic_path(self):
1128 1
        """Verify if the current deployed path is dynamic."""
1129
        if (
1130 1
            self.current_path
1131 1
            and not self.is_using_primary_path()
1132
            and not self.is_using_backup_path()
1133
            and self.current_path.status is EntityStatus.UP
1134
        ):
1135 1
            return True
1136 1
        return False
1137
1138
    def deploy_to(self, path_name=None, path=None):
1139
        """Create a deploy to path."""
1140 1
        if self.current_path == path:
1141 1
            log.debug(f"{path_name} is equal to current_path.")
1142
            return True
1143
1144
        if path.status is EntityStatus.UP:
1145 1
            return self.deploy_to_path(path)
1146 1
1147
        return False
1148 1
1149 1
    def handle_link_up(self, link):
1150 1
        """Handle circuit when link down.
1151
1152 1
        Args:
1153
            link(Link): Link affected by link.down event.
1154 1
1155
        """
1156
        if self.is_using_primary_path():
1157
            return True
1158
1159
        success = False
1160
        if self.primary_path.is_affected_by_link(link):
1161 1
            success = self.deploy_to_primary_path()
1162 1
1163 1
        if success:
1164 1
            return True
1165 1
1166
        # We tried to deploy(primary_path) without success.
1167 1
        # And in this case is up by some how. Nothing to do.
1168 1
        if self.is_using_backup_path() or self.is_using_dynamic_path():
1169
            return True
1170 1
1171 1
        # In this case, probably the circuit is not being used and
1172
        # we can move to backup
1173 1
        if self.backup_path.is_affected_by_link(link):
1174 1
            success = self.deploy_to_backup_path()
1175 1
1176 1
        # In this case, the circuit is not being used and we should
1177
        # try a dynamic path
1178 1
        if not success and self.dynamic_backup_path:
1179
            success = self.deploy_to_path()
1180
1181 1
        if success:
1182
            emit_event(self._controller, "redeployed_link_up", evc_id=self.id)
1183
            return True
1184
1185
        return True
1186
1187
    def handle_link_down(self):
1188
        """Handle circuit when link down.
1189
1190
        Returns:
1191
            bool: True if the re-deploy was successly otherwise False.
1192
1193
        """
1194
        success = False
1195
        if self.is_using_primary_path():
1196
            success = self.deploy_to_backup_path()
1197
        elif self.is_using_backup_path():
1198
            success = self.deploy_to_primary_path()
1199
1200
        if not success and self.dynamic_backup_path:
1201
            success = self.deploy_to_path()
1202
1203
        if success:
1204
            log.debug(f"{self} deployed after link down.")
1205
        else:
1206
            self.deactivate()
1207
            self.current_path = Path([])
1208
            self.sync()
1209
            log.debug(f"Failed to re-deploy {self} after link down.")
1210
1211
        return success
1212
1213
1214
class EVC(LinkProtection):
1215
    """Class that represents a E-Line Virtual Connection."""
1216