Passed
Pull Request — master (#438)
by Italo Valcy
03:55
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import requests
12 1
from glom import glom
13 1
from requests.exceptions import Timeout
14
15 1
from kytos.core import log
16 1
from kytos.core.common import EntityStatus, GenericEntity
17 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
18 1
from kytos.core.helpers import get_time, now
19 1
from kytos.core.interface import UNI, Interface, TAGRange
20 1
from kytos.core.link import Link
21 1
from kytos.core.tag_ranges import range_difference
22 1
from napps.kytos.mef_eline import controllers, settings
23 1
from napps.kytos.mef_eline.exceptions import (DuplicatedNoTagUNI,
24
                                              FlowModException, InvalidPath)
25 1
from napps.kytos.mef_eline.utils import (check_disabled_component,
26
                                         compare_endpoint_trace,
27
                                         compare_uni_out_trace, emit_event,
28
                                         make_uni_list, map_dl_vlan,
29
                                         map_evc_event_content)
30
31 1
from .path import DynamicPathManager, Path
32
33
34 1
class EVCBase(GenericEntity):
35
    """Class to represent a circuit."""
36
37 1
    read_only_attributes = [
38
        "creation_time",
39
        "active",
40
        "current_path",
41
        "failover_path",
42
        "_id",
43
        "archived",
44
    ]
45 1
    attributes_requiring_redeploy = [
46
        "primary_path",
47
        "backup_path",
48
        "dynamic_backup_path",
49
        "queue_id",
50
        "sb_priority",
51
        "primary_constraints",
52
        "secondary_constraints",
53
        "uni_a",
54
        "uni_z",
55
    ]
56 1
    required_attributes = ["name", "uni_a", "uni_z"]
57
58 1
    def __init__(self, controller, **kwargs):
59
        """Create an EVC instance with the provided parameters.
60
61
        Args:
62
            id(str): EVC identifier. Whether it's None an ID will be genereted.
63
                     Only the first 14 bytes passed will be used.
64
            name: represents an EVC name.(Required)
65
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
66
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
67
            start_date(datetime|str): Date when the EVC was registred.
68
                                      Default is now().
69
            end_date(datetime|str): Final date that the EVC will be fineshed.
70
                                    Default is None.
71
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
72
            primary_links(list): Primary links used by evc. Default is []
73
            backup_links(list): Backups links used by evc. Default is []
74
            current_path(list): Circuit being used at the moment if this is an
75
                                active circuit. Default is [].
76
            failover_path(list): Path being used to provide EVC protection via
77
                                failover during link failures. Default is [].
78
            primary_path(list): primary circuit offered to user IF one or more
79
                                links were provided. Default is [].
80
            backup_path(list): backup circuit offered to the user IF one or
81
                               more links were provided. Default is [].
82
            dynamic_backup_path(bool): Enable computer backup path dynamically.
83
                                       Dafault is False.
84
            creation_time(datetime|str): datetime when the circuit should be
85
                                         activated. default is now().
86
            enabled(Boolean): attribute to indicate the administrative state;
87
                              default is False.
88
            active(Boolean): attribute to indicate the operational state;
89
                             default is False.
90
            archived(Boolean): indicate the EVC has been deleted and is
91
                               archived; default is False.
92
            owner(str): The EVC owner. Default is None.
93
            sb_priority(int): Service level provided in the request.
94
                              Default is None.
95
            service_level(int): Service level provided. The higher the better.
96
                                Default is 0.
97
98
        Raises:
99
            ValueError: raised when object attributes are invalid.
100
101
        """
102 1
        self._controller = controller
103 1
        self._validate(**kwargs)
104 1
        super().__init__()
105
106
        # required attributes
107 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
108 1
        self.uni_a: UNI = kwargs.get("uni_a")
109 1
        self.uni_z: UNI = kwargs.get("uni_z")
110 1
        self.name = kwargs.get("name")
111
112
        # optional attributes
113 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
114 1
        self.end_date = get_time(kwargs.get("end_date")) or None
115 1
        self.queue_id = kwargs.get("queue_id", -1)
116
117 1
        self.bandwidth = kwargs.get("bandwidth", 0)
118 1
        self.primary_links = Path(kwargs.get("primary_links", []))
119 1
        self.backup_links = Path(kwargs.get("backup_links", []))
120 1
        self.current_path = Path(kwargs.get("current_path", []))
121 1
        self.failover_path = Path(kwargs.get("failover_path", []))
122 1
        self.primary_path = Path(kwargs.get("primary_path", []))
123 1
        self.backup_path = Path(kwargs.get("backup_path", []))
124 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
125 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
126 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
127 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
128 1
        self.owner = kwargs.get("owner", None)
129 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
130
            "priority", None
131
        )
132 1
        self.service_level = kwargs.get("service_level", 0)
133 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
134 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
135 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
136 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
137
138 1
        self.current_links_cache = set()
139 1
        self.primary_links_cache = set()
140 1
        self.backup_links_cache = set()
141 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
142 1
        self.old_path = Path([])
143
144 1
        self.lock = Lock()
145
146 1
        self.archived = kwargs.get("archived", False)
147
148 1
        self.metadata = kwargs.get("metadata", {})
149
150 1
        self._mongo_controller = controllers.ELineController()
151
152 1
        if kwargs.get("active", False):
153 1
            self.activate()
154
        else:
155 1
            self.deactivate()
156
157 1
        if kwargs.get("enabled", False):
158 1
            self.enable()
159
        else:
160 1
            self.disable()
161
162
        # datetime of user request for a EVC (or datetime when object was
163
        # created)
164 1
        self.request_time = kwargs.get("request_time", now())
165
        # dict with the user original request (input)
166 1
        self._requested = kwargs
167
168
        # Special cases: No tag, any, untagged
169 1
        self.special_cases = {None, "4096/4096", 0}
170 1
        self.table_group = kwargs.get("table_group")
171
172 1
    def sync(self, keys: set = None):
173
        """Sync this EVC in the MongoDB."""
174 1
        self.updated_at = now()
175 1
        if keys:
176 1
            self._mongo_controller.update_evc(self.as_dict(keys))
177 1
            return
178 1
        self._mongo_controller.upsert_evc(self.as_dict())
179
180 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
181
        """Obtain both UNIs (uni_a, uni_z).
182
        If a UNI is changing, verify tags"""
183 1
        uni_a = kwargs.get("uni_a", None)
184 1
        uni_a_flag = False
185 1
        if uni_a and uni_a != self.uni_a:
186 1
            uni_a_flag = True
187 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
188
189 1
        uni_z = kwargs.get("uni_z", None)
190 1
        if uni_z and uni_z != self.uni_z:
191 1
            try:
192 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
193 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
194 1
            except KytosTagError as err:
195 1
                if uni_a_flag:
196 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
197 1
                raise err
198
        else:
199 1
            uni_z = self.uni_z
200
201 1
        if uni_a_flag:
202 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
203
        else:
204 1
            uni_a = self.uni_a
205 1
        return uni_a, uni_z
206
207 1
    def update(self, **kwargs):
208
        """Update evc attributes.
209
210
        This method will raises an error trying to change the following
211
        attributes: [creation_time, active, current_path, failover_path,
212
        _id, archived]
213
        [name, uni_a and uni_z]
214
215
        Returns:
216
            the values for enable and a redeploy attribute, if exists and None
217
            otherwise
218
        Raises:
219
            ValueError: message with error detail.
220
221
        """
222 1
        enable, redeploy = (None, None)
223 1
        if not self._tag_lists_equal(**kwargs):
224 1
            raise ValueError(
225
                "UNI_A and UNI_Z tag lists should be the same."
226
            )
227 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
228 1
        check_disabled_component(uni_a, uni_z)
229 1
        self._validate_has_primary_or_dynamic(
230
            primary_path=kwargs.get("primary_path"),
231
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
232
            uni_a=uni_a,
233
            uni_z=uni_z,
234
        )
235 1
        for attribute, value in kwargs.items():
236 1
            if attribute in self.read_only_attributes:
237 1
                raise ValueError(f"{attribute} can't be updated.")
238 1
            if not hasattr(self, attribute):
239 1
                raise ValueError(f'The attribute "{attribute}" is invalid.')
240 1
            if attribute in ("primary_path", "backup_path"):
241 1
                try:
242 1
                    value.is_valid(
243
                        uni_a.interface.switch, uni_z.interface.switch
244
                    )
245 1
                except InvalidPath as exception:
246 1
                    raise ValueError(  # pylint: disable=raise-missing-from
247
                        f"{attribute} is not a " f"valid path: {exception}"
248
                    )
249 1
        for attribute, value in kwargs.items():
250 1
            if attribute in ("enable", "enabled"):
251 1
                if value:
252 1
                    self.enable()
253
                else:
254 1
                    self.disable()
255 1
                enable = value
256
            else:
257 1
                setattr(self, attribute, value)
258 1
                if attribute in self.attributes_requiring_redeploy:
259 1
                    redeploy = True
260 1
        self.sync(set(kwargs.keys()))
261 1
        return enable, redeploy
262
263 1
    def set_flow_removed_at(self):
264
        """Update flow_removed_at attribute."""
265
        self.flow_removed_at = now()
266
267 1
    def has_recent_removed_flow(self, setting=settings):
268
        """Check if any flow has been removed from the evc"""
269
        if self.flow_removed_at is None:
270
            return False
271
        res_seconds = (now() - self.flow_removed_at).seconds
272
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
273
274 1
    def is_recent_updated(self, setting=settings):
275
        """Check if the evc has been updated recently"""
276
        res_seconds = (now() - self.updated_at).seconds
277
        return res_seconds < setting.TIME_RECENT_UPDATED
278
279 1
    def __repr__(self):
280
        """Repr method."""
281 1
        return f"EVC({self._id}, {self.name})"
282
283 1
    def _validate(self, **kwargs):
284
        """Do Basic validations.
285
286
        Verify required attributes: name, uni_a, uni_z
287
288
        Raises:
289
            ValueError: message with error detail.
290
291
        """
292 1
        for attribute in self.required_attributes:
293
294 1
            if attribute not in kwargs:
295 1
                raise ValueError(f"{attribute} is required.")
296
297 1
            if "uni" in attribute:
298 1
                uni = kwargs.get(attribute)
299 1
                if not isinstance(uni, UNI):
300
                    raise ValueError(f"{attribute} is an invalid UNI.")
301
302 1
    def _tag_lists_equal(self, **kwargs):
303
        """Verify that tag lists are the same."""
304 1
        uni_a = kwargs.get("uni_a") or self.uni_a
305 1
        uni_z = kwargs.get("uni_z") or self.uni_z
306 1
        uni_a_list = uni_z_list = False
307 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
308 1
            uni_a_list = True
309 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
310 1
            uni_z_list = True
311 1
        if uni_a_list and uni_z_list:
312 1
            return uni_a.user_tag.value == uni_z.user_tag.value
313 1
        return uni_a_list == uni_z_list
314
315 1
    def _validate_has_primary_or_dynamic(
316
        self,
317
        primary_path=None,
318
        dynamic_backup_path=None,
319
        uni_a=None,
320
        uni_z=None,
321
    ) -> None:
322
        """Validate that it must have a primary path or allow dynamic paths."""
323 1
        primary_path = (
324
            primary_path
325
            if primary_path is not None
326
            else self.primary_path
327
        )
328 1
        dynamic_backup_path = (
329
            dynamic_backup_path
330
            if dynamic_backup_path is not None
331
            else self.dynamic_backup_path
332
        )
333 1
        uni_a = uni_a if uni_a is not None else self.uni_a
334 1
        uni_z = uni_z if uni_z is not None else self.uni_z
335 1
        if (
336
            not primary_path
337
            and not dynamic_backup_path
338
            and uni_a and uni_z
339
            and uni_a.interface.switch != uni_z.interface.switch
340
        ):
341 1
            msg = "The EVC must have a primary path or allow dynamic paths."
342 1
            raise ValueError(msg)
343
344 1
    def __eq__(self, other):
345
        """Override the default implementation."""
346 1
        if not isinstance(other, EVC):
347
            return False
348
349 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
350 1
        for attribute in attrs_to_compare:
351 1
            if getattr(other, attribute) != getattr(self, attribute):
352 1
                return False
353 1
        return True
354
355 1
    def is_intra_switch(self):
356
        """Check if the UNIs are in the same switch."""
357 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
358
359 1
    def check_no_tag_duplicate(self, other_uni: UNI):
360
        """Check if a no tag UNI is duplicated."""
361 1
        if other_uni in (self.uni_a, self.uni_z):
362 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
363
                  f" duplicated with EVC {self.id}."
364 1
            raise DuplicatedNoTagUNI(msg)
365
366 1
    def as_dict(self, keys: set = None):
367
        """Return a dictionary representing an EVC object.
368
            keys: Only fields on this variable will be
369
                  returned in the dictionary"""
370 1
        evc_dict = {
371
            "id": self.id,
372
            "name": self.name,
373
            "uni_a": self.uni_a.as_dict(),
374
            "uni_z": self.uni_z.as_dict(),
375
        }
376
377 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
378
379 1
        evc_dict["start_date"] = self.start_date
380 1
        if isinstance(self.start_date, datetime):
381 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
382
383 1
        evc_dict["end_date"] = self.end_date
384 1
        if isinstance(self.end_date, datetime):
385 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
386
387 1
        evc_dict["queue_id"] = self.queue_id
388 1
        evc_dict["bandwidth"] = self.bandwidth
389 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
390 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
391 1
        evc_dict["current_path"] = self.current_path.as_dict()
392 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
393 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
394 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
395 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
396 1
        evc_dict["metadata"] = self.metadata
397
398 1
        evc_dict["request_time"] = self.request_time
399 1
        if isinstance(self.request_time, datetime):
400 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
401
402 1
        time = self.creation_time.strftime(time_fmt)
403 1
        evc_dict["creation_time"] = time
404
405 1
        evc_dict["owner"] = self.owner
406 1
        evc_dict["circuit_scheduler"] = [
407
            sc.as_dict() for sc in self.circuit_scheduler
408
        ]
409
410 1
        evc_dict["active"] = self.is_active()
411 1
        evc_dict["enabled"] = self.is_enabled()
412 1
        evc_dict["archived"] = self.archived
413 1
        evc_dict["sb_priority"] = self.sb_priority
414 1
        evc_dict["service_level"] = self.service_level
415 1
        evc_dict["primary_constraints"] = self.primary_constraints
416 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
417 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
418 1
        evc_dict["updated_at"] = self.updated_at
419
420 1
        if keys:
421 1
            selected = {}
422 1
            for key in keys:
423 1
                if key == "enable":
424
                    selected["enabled"] = evc_dict["enabled"]
425
                    continue
426 1
                selected[key] = evc_dict[key]
427 1
            selected["id"] = evc_dict["id"]
428 1
            return selected
429 1
        return evc_dict
430
431 1
    @property
432 1
    def id(self):  # pylint: disable=invalid-name
433
        """Return this EVC's ID."""
434 1
        return self._id
435
436 1
    def archive(self):
437
        """Archive this EVC on deletion."""
438 1
        self.archived = True
439
440 1
    def _use_uni_vlan(
441
        self,
442
        uni: UNI,
443
        uni_dif: Union[None, UNI] = None
444
    ):
445
        """Use tags from UNI"""
446 1
        if uni.user_tag is None:
447 1
            return
448 1
        tag = uni.user_tag.value
449 1
        if not tag:
450
            return
451 1
        tag_type = uni.user_tag.tag_type
452 1
        if (uni_dif and isinstance(tag, list) and
453
                isinstance(uni_dif.user_tag.value, list)):
454 1
            tag = range_difference(tag, uni_dif.user_tag.value)
455 1
            if not tag:
456 1
                return
457 1
        uni.interface.use_tags(
458
            self._controller, tag, tag_type, use_lock=True, check_order=False
459
        )
460
461 1
    def make_uni_vlan_available(
462
        self,
463
        uni: UNI,
464
        uni_dif: Union[None, UNI] = None,
465
    ):
466
        """Make available tag from UNI"""
467 1
        if uni.user_tag is None:
468 1
            return
469 1
        tag = uni.user_tag.value
470 1
        if not tag:
471 1
            return
472 1
        tag_type = uni.user_tag.tag_type
473 1
        if (uni_dif and isinstance(tag, list) and
474
                isinstance(uni_dif.user_tag.value, list)):
475 1
            tag = range_difference(tag, uni_dif.user_tag.value)
476 1
            if not tag:
477
                return
478 1
        try:
479 1
            conflict = uni.interface.make_tags_available(
480
                self._controller, tag, tag_type, use_lock=True,
481
                check_order=False
482
            )
483 1
        except KytosTagError as err:
484 1
            log.error(f"Error in circuit {self._id}: {err}")
485 1
            return
486 1
        if conflict:
487 1
            intf = uni.interface.id
488 1
            log.warning(f"Tags {conflict} was already available in {intf}")
489
490 1
    def remove_uni_tags(self):
491
        """Remove both UNI usage of a tag"""
492 1
        self.make_uni_vlan_available(self.uni_a)
493 1
        self.make_uni_vlan_available(self.uni_z)
494
495
496
# pylint: disable=fixme, too-many-public-methods
497 1
class EVCDeploy(EVCBase):
498
    """Class to handle the deploy procedures."""
499
500 1
    def create(self):
501
        """Create a EVC."""
502
503 1
    def discover_new_paths(self):
504
        """Discover new paths to satisfy this circuit and deploy it."""
505
        return DynamicPathManager.get_best_paths(self,
506
                                                 **self.primary_constraints)
507
508 1
    def get_failover_path_candidates(self):
509
        """Get failover paths to satisfy this EVC."""
510
        # in the future we can return primary/backup paths as well
511
        # we just have to properly handle link_up and failover paths
512
        # if (
513
        #     self.is_using_primary_path() and
514
        #     self.backup_path.status is EntityStatus.UP
515
        # ):
516
        #     yield self.backup_path
517 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
518
519 1
    def change_path(self):
520
        """Change EVC path."""
521
522 1
    def reprovision(self):
523
        """Force the EVC (re-)provisioning."""
524
525 1
    def is_affected_by_link(self, link):
526
        """Return True if this EVC has the given link on its current path."""
527 1
        return link in self.current_path
528
529 1
    def link_affected_by_interface(self, interface):
530
        """Return True if this EVC has the given link on its current path."""
531
        return self.current_path.link_affected_by_interface(interface)
532
533 1
    def is_backup_path_affected_by_link(self, link):
534
        """Return True if the backup path of this EVC uses the given link."""
535 1
        return link in self.backup_path
536
537
    # pylint: disable=invalid-name
538 1
    def is_primary_path_affected_by_link(self, link):
539
        """Return True if the primary path of this EVC uses the given link."""
540 1
        return link in self.primary_path
541
542 1
    def is_failover_path_affected_by_link(self, link):
543
        """Return True if this EVC has the given link on its failover path."""
544 1
        return link in self.failover_path
545
546 1
    def is_eligible_for_failover_path(self):
547
        """Verify if this EVC is eligible for failover path (EP029)"""
548
        # In the future this function can be augmented to consider
549
        # primary/backup, primary/dynamic, and other path combinations
550 1
        return (
551
            self.dynamic_backup_path and
552
            not self.primary_path and not self.backup_path
553
        )
554
555 1
    def is_using_primary_path(self):
556
        """Verify if the current deployed path is self.primary_path."""
557 1
        return self.primary_path and (self.current_path == self.primary_path)
558
559 1
    def is_using_backup_path(self):
560
        """Verify if the current deployed path is self.backup_path."""
561 1
        return self.backup_path and (self.current_path == self.backup_path)
562
563 1
    def is_using_dynamic_path(self):
564
        """Verify if the current deployed path is a dynamic path."""
565 1
        if (
566
            self.current_path
567
            and not self.is_using_primary_path()
568
            and not self.is_using_backup_path()
569
            and self.current_path.status == EntityStatus.UP
570
        ):
571
            return True
572 1
        return False
573
574 1
    def deploy_to_backup_path(self):
575
        """Deploy the backup path into the datapaths of this circuit.
576
577
        If the backup_path attribute is valid and up, this method will try to
578
        deploy this backup_path.
579
580
        If everything fails and dynamic_backup_path is True, then tries to
581
        deploy a dynamic path.
582
        """
583
        # TODO: Remove flows from current (cookies)
584 1
        if self.is_using_backup_path():
585
            # TODO: Log to say that cannot move backup to backup
586
            return True
587
588 1
        success = False
589 1
        if self.backup_path.status is EntityStatus.UP:
590 1
            success = self.deploy_to_path(self.backup_path)
591
592 1
        if success:
593 1
            return True
594
595 1
        if self.dynamic_backup_path or self.is_intra_switch():
596 1
            return self.deploy_to_path()
597
598
        return False
599
600 1
    def deploy_to_primary_path(self):
601
        """Deploy the primary path into the datapaths of this circuit.
602
603
        If the primary_path attribute is valid and up, this method will try to
604
        deploy this primary_path.
605
        """
606
        # TODO: Remove flows from current (cookies)
607 1
        if self.is_using_primary_path():
608
            # TODO: Log to say that cannot move primary to primary
609
            return True
610
611 1
        if self.primary_path.status is EntityStatus.UP:
612 1
            return self.deploy_to_path(self.primary_path)
613 1
        return False
614
615 1
    def deploy(self):
616
        """Deploy EVC to best path.
617
618
        Best path can be the primary path, if available. If not, the backup
619
        path, and, if it is also not available, a dynamic path.
620
        """
621 1
        if self.archived:
622 1
            return False
623 1
        self.enable()
624 1
        success = self.deploy_to_primary_path()
625 1
        if not success:
626 1
            success = self.deploy_to_backup_path()
627
628 1
        if success:
629 1
            emit_event(self._controller, "deployed",
630
                       content=map_evc_event_content(self))
631 1
        return success
632
633 1
    @staticmethod
634 1
    def get_path_status(path):
635
        """Check for the current status of a path.
636
637
        If any link in this path is down, the path is considered down.
638
        """
639 1
        if not path:
640 1
            return EntityStatus.DISABLED
641
642 1
        for link in path:
643 1
            if link.status is not EntityStatus.UP:
644 1
                return link.status
645 1
        return EntityStatus.UP
646
647
    #    def discover_new_path(self):
648
    #        # TODO: discover a new path to satisfy this circuit and deploy
649
650 1
    def remove(self):
651
        """Remove EVC path and disable it."""
652 1
        self.remove_current_flows(sync=False)
653 1
        self.remove_failover_flows(sync=False)
654 1
        self.disable()
655 1
        self.sync()
656 1
        emit_event(self._controller, "undeployed",
657
                   content=map_evc_event_content(self))
658
659 1
    def remove_failover_flows(self, exclude_uni_switches=True,
660
                              force=True, sync=True) -> None:
661
        """Remove failover_flows.
662
663
        By default, it'll exclude UNI switches, if mef_eline has already
664
        called remove_current_flows before then this minimizes the number
665
        of FlowMods and IO.
666
        """
667 1
        if not self.failover_path:
668 1
            return
669 1
        switches, cookie, excluded = OrderedDict(), self.get_cookie(), set()
670 1
        links = set()
671 1
        if exclude_uni_switches:
672 1
            excluded.add(self.uni_a.interface.switch.id)
673 1
            excluded.add(self.uni_z.interface.switch.id)
674 1
        for link in self.failover_path:
675 1
            if link.endpoint_a.switch.id not in excluded:
676 1
                switches[link.endpoint_a.switch.id] = link.endpoint_a.switch
677 1
                links.add(link)
678 1
            if link.endpoint_b.switch.id not in excluded:
679 1
                switches[link.endpoint_b.switch.id] = link.endpoint_b.switch
680 1
                links.add(link)
681 1
        for switch in switches.values():
682 1
            try:
683 1
                self._send_flow_mods(
684
                    switch.id,
685
                    [
686
                        {
687
                            "cookie": cookie,
688
                            "cookie_mask": int(0xffffffffffffffff),
689
                        }
690
                    ],
691
                    "delete",
692
                    force=force,
693
                )
694
            except FlowModException as err:
695
                log.error(
696
                    f"Error removing flows from switch {switch.id} for"
697
                    f"EVC {self}: {err}"
698
                )
699 1
        try:
700 1
            self.failover_path.make_vlans_available(self._controller)
701
        except KytosTagError as err:
702
            log.error(f"Error when removing failover flows: {err}")
703 1
        self.failover_path = Path([])
704 1
        if sync:
705 1
            self.sync()
706
707 1
    def remove_current_flows(self, current_path=None, force=True,
708
                             sync=True):
709
        """Remove all flows from current path."""
710 1
        switches = set()
711
712 1
        switches.add(self.uni_a.interface.switch)
713 1
        switches.add(self.uni_z.interface.switch)
714 1
        if not current_path:
715 1
            current_path = self.current_path
716 1
        for link in current_path:
717 1
            switches.add(link.endpoint_a.switch)
718 1
            switches.add(link.endpoint_b.switch)
719
720 1
        match = {
721
            "cookie": self.get_cookie(),
722
            "cookie_mask": int(0xffffffffffffffff)
723
        }
724
725 1
        for switch in switches:
726 1
            try:
727 1
                self._send_flow_mods(switch.id, [match], 'delete', force=force)
728 1
            except FlowModException as err:
729 1
                log.error(
730
                    f"Error removing flows from switch {switch.id} for"
731
                    f"EVC {self}: {err}"
732
                )
733 1
        try:
734 1
            current_path.make_vlans_available(self._controller)
735
        except KytosTagError as err:
736
            log.error(f"Error when removing current path flows: {err}")
737 1
        self.current_path = Path([])
738 1
        self.deactivate()
739 1
        if sync:
740 1
            self.sync()
741
742 1
    def remove_path_flows(self, path=None, force=True):
743
        """Remove all flows from path."""
744 1
        if not path:
745 1
            return
746
747 1
        dpid_flows_match = {}
748
749 1
        try:
750 1
            nni_flows = self._prepare_nni_flows(path)
751
        # pylint: disable=broad-except
752
        except Exception:
753
            err = traceback.format_exc().replace("\n", ", ")
754
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
755
            nni_flows = {}
756
757 1
        for dpid, flows in nni_flows.items():
758 1
            dpid_flows_match.setdefault(dpid, [])
759 1
            for flow in flows:
760 1
                dpid_flows_match[dpid].append({
761
                    "cookie": flow["cookie"],
762
                    "match": flow["match"],
763
                    "cookie_mask": int(0xffffffffffffffff)
764
                })
765
766 1
        try:
767 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
768
        # pylint: disable=broad-except
769
        except Exception:
770
            err = traceback.format_exc().replace("\n", ", ")
771
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
772
            uni_flows = {}
773
774 1
        for dpid, flows in uni_flows.items():
775 1
            dpid_flows_match.setdefault(dpid, [])
776 1
            for flow in flows:
777 1
                dpid_flows_match[dpid].append({
778
                    "cookie": flow["cookie"],
779
                    "match": flow["match"],
780
                    "cookie_mask": int(0xffffffffffffffff)
781
                })
782
783 1
        for dpid, flows in dpid_flows_match.items():
784 1
            try:
785 1
                self._send_flow_mods(dpid, flows, 'delete', force=force)
786 1
            except FlowModException as err:
787 1
                log.error(
788
                    "Error removing failover flows: "
789
                    f"dpid={dpid} evc={self} error={err}"
790
                )
791 1
        try:
792 1
            path.make_vlans_available(self._controller)
793
        except KytosTagError as err:
794
            log.error(f"Error when removing path flows: {err}")
795
796 1
    @staticmethod
797 1
    def links_zipped(path=None):
798
        """Return an iterator which yields pairs of links in order."""
799 1
        if not path:
800
            return []
801 1
        return zip(path[:-1], path[1:])
802
803 1
    def should_deploy(self, path=None):
804
        """Verify if the circuit should be deployed."""
805 1
        if not path:
806 1
            log.debug("Path is empty.")
807 1
            return False
808
809 1
        if not self.is_enabled():
810 1
            log.debug(f"{self} is disabled.")
811 1
            return False
812
813 1
        if not self.is_active():
814 1
            log.debug(f"{self} will be deployed.")
815 1
            return True
816
817 1
        return False
818
819 1
    def deploy_to_path(self, path=None):  # pylint: disable=too-many-branches
820
        """Install the flows for this circuit.
821
822
        Procedures to deploy:
823
824
        0. Remove current flows installed
825
        1. Decide if will deploy "path" or discover a new path
826
        2. Choose vlan
827
        3. Install NNI flows
828
        4. Install UNI flows
829
        5. Activate
830
        6. Update current_path
831
        7. Update links caches(primary, current, backup)
832
833
        """
834 1
        self.remove_current_flows()
835 1
        use_path = path
836 1
        if self.should_deploy(use_path):
837 1
            try:
838 1
                use_path.choose_vlans(self._controller)
839 1
            except KytosNoTagAvailableError:
840 1
                use_path = None
841
        else:
842 1
            for use_path in self.discover_new_paths():
843 1
                if use_path is None:
844
                    continue
845 1
                try:
846 1
                    use_path.choose_vlans(self._controller)
847 1
                    break
848 1
                except KytosNoTagAvailableError:
849 1
                    pass
850
            else:
851 1
                use_path = None
852
853 1
        try:
854 1
            if use_path:
855 1
                self._install_nni_flows(use_path)
856 1
                self._install_uni_flows(use_path)
857 1
            elif self.is_intra_switch():
858 1
                use_path = Path()
859 1
                self._install_direct_uni_flows()
860
            else:
861 1
                log.warning(
862
                    f"{self} was not deployed. No available path was found."
863
                )
864 1
                return False
865 1
        except FlowModException as err:
866 1
            log.error(
867
                f"Error deploying EVC {self} when calling flow_manager: {err}"
868
            )
869 1
            self.remove_current_flows(use_path)
870 1
            return False
871 1
        self.activate()
872 1
        self.current_path = use_path
873 1
        self.sync()
874 1
        log.info(f"{self} was deployed.")
875 1
        return True
876
877 1
    def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
878
        """Try setup failover_path whenever possible."""
879
        if self.failover_path:
880
            return
881
        if (now() - self.affected_by_link_at).seconds >= wait:
882
            with self.lock:
883
                self.setup_failover_path()
884
885 1
    def setup_failover_path(self):
886
        """Install flows for the failover path of this EVC.
887
888
        Procedures to deploy:
889
890
        0. Remove flows currently installed for failover_path (if any)
891
        1. Discover a disjoint path from current_path
892
        2. Choose vlans
893
        3. Install NNI flows
894
        4. Install UNI egress flows
895
        5. Update failover_path
896
        """
897
        # Intra-switch EVCs have no failover_path
898 1
        if self.is_intra_switch():
899 1
            return False
900
901
        # For not only setup failover path for totally dynamic EVCs
902 1
        if not self.is_eligible_for_failover_path():
903 1
            return False
904
905 1
        reason = ""
906 1
        self.remove_path_flows(self.failover_path)
907 1
        self.failover_path = Path([])
908 1
        for use_path in self.get_failover_path_candidates():
909 1
            if not use_path:
910 1
                continue
911 1
            try:
912 1
                use_path.choose_vlans(self._controller)
913 1
                break
914 1
            except KytosNoTagAvailableError:
915 1
                pass
916
        else:
917 1
            use_path = Path([])
918 1
            reason = "No available path was found"
919
920 1
        try:
921 1
            if use_path:
922 1
                self._install_nni_flows(use_path)
923 1
                self._install_uni_flows(use_path, skip_in=True)
924 1
        except FlowModException as err:
925 1
            reason = "Error deploying failover path"
926 1
            log.error(
927
                f"{reason} for {self}. FlowManager error: {err}"
928
            )
929 1
            self.remove_path_flows(use_path)
930 1
            use_path = Path([])
931
932 1
        self.failover_path = use_path
933 1
        self.sync()
934
935 1
        if not use_path:
936 1
            log.warning(
937
                f"Failover path for {self} was not deployed: {reason}"
938
            )
939 1
            return False
940 1
        log.info(f"Failover path for {self} was deployed.")
941 1
        return True
942
943 1
    def get_failover_flows(self):
944
        """Return the flows needed to make the failover path active, i.e. the
945
        flows for ingress forwarding.
946
947
        Return:
948
            dict: A dict of flows indexed by the switch_id will be returned, or
949
                an empty dict if no failover_path is available.
950
        """
951 1
        if not self.failover_path:
952 1
            return {}
953 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
954
955
    # pylint: disable=too-many-branches
956 1
    def _prepare_direct_uni_flows(self):
957
        """Prepare flows connecting two UNIs for intra-switch EVC."""
958 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
959 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
960
961 1
        flow_mod_az = self._prepare_flow_mod(
962
            self.uni_a.interface, self.uni_z.interface,
963
            self.queue_id, vlan_a
964
        )
965 1
        flow_mod_za = self._prepare_flow_mod(
966
            self.uni_z.interface, self.uni_a.interface,
967
            self.queue_id, vlan_z
968
        )
969
970 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
971 1
            flow_mod_az["actions"].insert(
972
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
973
            )
974 1
            if not vlan_a:
975 1
                flow_mod_az["actions"].insert(
976
                    0, {"action_type": "push_vlan", "tag_type": "c"}
977
                )
978 1
            if vlan_a == 0:
979 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
980 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
981 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
982
983 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
984 1
            flow_mod_za["actions"].insert(
985
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
986
                )
987 1
            if not vlan_z:
988 1
                flow_mod_za["actions"].insert(
989
                    0, {"action_type": "push_vlan", "tag_type": "c"}
990
                )
991 1
            if vlan_z == 0:
992 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
993 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
994 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
995
996 1
        flows = []
997 1
        if isinstance(vlan_a, list):
998 1
            for mask_a in vlan_a:
999 1
                flow_aux = deepcopy(flow_mod_az)
1000 1
                flow_aux["match"]["dl_vlan"] = mask_a
1001 1
                flows.append(flow_aux)
1002
        else:
1003 1
            if vlan_a is not None:
1004 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1005 1
            flows.append(flow_mod_az)
1006
1007 1
        if isinstance(vlan_z, list):
1008 1
            for mask_z in vlan_z:
1009 1
                flow_aux = deepcopy(flow_mod_za)
1010 1
                flow_aux["match"]["dl_vlan"] = mask_z
1011 1
                flows.append(flow_aux)
1012
        else:
1013 1
            if vlan_z is not None:
1014 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1015 1
            flows.append(flow_mod_za)
1016 1
        return (
1017
            self.uni_a.interface.switch.id, flows
1018
        )
1019
1020 1
    def _install_direct_uni_flows(self):
1021
        """Install flows connecting two UNIs.
1022
1023
        This case happens when the circuit is between UNIs in the
1024
        same switch.
1025
        """
1026 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1027 1
        self._send_flow_mods(dpid, flows)
1028
1029 1
    def _prepare_nni_flows(self, path=None):
1030
        """Prepare NNI flows."""
1031 1
        nni_flows = OrderedDict()
1032 1
        previous = self.uni_a.interface.switch.dpid
1033 1
        for incoming, outcoming in self.links_zipped(path):
1034 1
            in_vlan = incoming.get_metadata("s_vlan").value
1035 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1036 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1037 1
            out_endpoint = self.get_endpoint_by_id(
1038
                outcoming, in_endpoint.switch.id, eq
1039
            )
1040
1041 1
            flows = []
1042
            # Flow for one direction
1043 1
            flows.append(
1044
                self._prepare_nni_flow(
1045
                    in_endpoint,
1046
                    out_endpoint,
1047
                    in_vlan,
1048
                    out_vlan,
1049
                    queue_id=self.queue_id,
1050
                )
1051
            )
1052
1053
            # Flow for the other direction
1054 1
            flows.append(
1055
                self._prepare_nni_flow(
1056
                    out_endpoint,
1057
                    in_endpoint,
1058
                    out_vlan,
1059
                    in_vlan,
1060
                    queue_id=self.queue_id,
1061
                )
1062
            )
1063 1
            previous = in_endpoint.switch.id
1064 1
            nni_flows[in_endpoint.switch.id] = flows
1065 1
        return nni_flows
1066
1067 1
    def _install_nni_flows(self, path=None):
1068
        """Install NNI flows."""
1069 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1070 1
            self._send_flow_mods(dpid, flows)
1071
1072 1
    @staticmethod
1073 1
    def _get_value_from_uni_tag(uni: UNI):
1074
        """Returns the value from tag. In case of any and untagged
1075
        it should return 4096/4096 and 0 respectively"""
1076 1
        special = {"any": "4096/4096", "untagged": 0}
1077 1
        if uni.user_tag:
1078 1
            value = uni.user_tag.value
1079 1
            if isinstance(value, list):
1080 1
                return uni.user_tag.mask_list
1081 1
            return special.get(value, value)
1082 1
        return None
1083
1084
    # pylint: disable=too-many-locals
1085 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1086
        """Prepare flows to install UNIs."""
1087 1
        uni_flows = {}
1088 1
        if not path:
1089 1
            log.info("install uni flows without path.")
1090 1
            return uni_flows
1091
1092
        # Determine VLANs
1093 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1094 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1095
1096 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1097 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1098
1099
        # Get endpoints from path
1100 1
        endpoint_a = self.get_endpoint_by_id(
1101
            path[0], self.uni_a.interface.switch.id, eq
1102
        )
1103 1
        endpoint_z = self.get_endpoint_by_id(
1104
            path[-1], self.uni_z.interface.switch.id, eq
1105
        )
1106
1107
        # Flows for the first UNI
1108 1
        flows_a = []
1109
1110
        # Flow for one direction, pushing the service tag
1111 1
        if not skip_in:
1112 1
            if isinstance(in_vlan_a, list):
1113 1
                for in_mask_a in in_vlan_a:
1114 1
                    push_flow = self._prepare_push_flow(
1115
                        self.uni_a.interface,
1116
                        endpoint_a,
1117
                        in_mask_a,
1118
                        out_vlan_a,
1119
                        in_vlan_z,
1120
                        queue_id=self.queue_id,
1121
                    )
1122 1
                    flows_a.append(push_flow)
1123
            else:
1124 1
                push_flow = self._prepare_push_flow(
1125
                    self.uni_a.interface,
1126
                    endpoint_a,
1127
                    in_vlan_a,
1128
                    out_vlan_a,
1129
                    in_vlan_z,
1130
                    queue_id=self.queue_id,
1131
                )
1132 1
                flows_a.append(push_flow)
1133
1134
        # Flow for the other direction, popping the service tag
1135 1
        if not skip_out:
1136 1
            pop_flow = self._prepare_pop_flow(
1137
                endpoint_a,
1138
                self.uni_a.interface,
1139
                out_vlan_a,
1140
                queue_id=self.queue_id,
1141
            )
1142 1
            flows_a.append(pop_flow)
1143
1144 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1145
1146
        # Flows for the second UNI
1147 1
        flows_z = []
1148
1149
        # Flow for one direction, pushing the service tag
1150 1
        if not skip_in:
1151 1
            if isinstance(in_vlan_z, list):
1152 1
                for in_mask_z in in_vlan_z:
1153 1
                    push_flow = self._prepare_push_flow(
1154
                        self.uni_z.interface,
1155
                        endpoint_z,
1156
                        in_mask_z,
1157
                        out_vlan_z,
1158
                        in_vlan_a,
1159
                        queue_id=self.queue_id,
1160
                    )
1161 1
                    flows_z.append(push_flow)
1162
            else:
1163 1
                push_flow = self._prepare_push_flow(
1164
                    self.uni_z.interface,
1165
                    endpoint_z,
1166
                    in_vlan_z,
1167
                    out_vlan_z,
1168
                    in_vlan_a,
1169
                    queue_id=self.queue_id,
1170
                )
1171 1
                flows_z.append(push_flow)
1172
1173
        # Flow for the other direction, popping the service tag
1174 1
        if not skip_out:
1175 1
            pop_flow = self._prepare_pop_flow(
1176
                endpoint_z,
1177
                self.uni_z.interface,
1178
                out_vlan_z,
1179
                queue_id=self.queue_id,
1180
            )
1181 1
            flows_z.append(pop_flow)
1182
1183 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1184
1185 1
        return uni_flows
1186
1187 1
    def _install_uni_flows(self, path=None, skip_in=False, skip_out=False):
1188
        """Install UNI flows."""
1189 1
        uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)
1190
1191 1
        for (dpid, flows) in uni_flows.items():
1192 1
            self._send_flow_mods(dpid, flows)
1193
1194 1
    @staticmethod
1195 1
    def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
1196
        """Send a flow_mod list to a specific switch.
1197
1198
        Args:
1199
            dpid(str): The target of flows (i.e. Switch.id).
1200
            flow_mods(dict): Python dictionary with flow_mods.
1201
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1202
            force(bool): True to send via consistency check in case of errors
1203
1204
        """
1205
1206 1
        endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"
1207
1208 1
        data = {"flows": flow_mods, "force": force}
1209 1
        response = requests.post(endpoint, json=data)
1210 1
        if response.status_code >= 400:
1211 1
            raise FlowModException(str(response.text))
1212
1213 1
    def get_cookie(self):
1214
        """Return the cookie integer from evc id."""
1215 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1216
1217 1
    @staticmethod
1218 1
    def get_id_from_cookie(cookie):
1219
        """Return the evc id given a cookie value."""
1220 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1221 1
        return f"{evc_id:x}".zfill(14)
1222
1223 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1224
        """Set table_group and table_id"""
1225 1
        table_group = "epl" if vlan is None else "evpl"
1226 1
        flow_mod["table_group"] = table_group
1227 1
        flow_mod["table_id"] = self.table_group[table_group]
1228 1
        return flow_mod
1229
1230 1
    @staticmethod
1231 1
    def get_priority(vlan):
1232
        """Return priority value depending on vlan value"""
1233 1
        if isinstance(vlan, list):
1234 1
            return settings.EVPL_SB_PRIORITY
1235 1
        if vlan not in {None, "4096/4096", 0}:
1236 1
            return settings.EVPL_SB_PRIORITY
1237 1
        if vlan == 0:
1238 1
            return settings.UNTAGGED_SB_PRIORITY
1239 1
        if vlan == "4096/4096":
1240 1
            return settings.ANY_SB_PRIORITY
1241 1
        return settings.EPL_SB_PRIORITY
1242
1243 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1244
                          queue_id=None, vlan=True):
1245
        """Prepare a common flow mod."""
1246 1
        default_actions = [
1247
            {"action_type": "output", "port": out_interface.port_number}
1248
        ]
1249 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1250 1
        if queue_id is not None:
1251 1
            default_actions.append(
1252
                {"action_type": "set_queue", "queue_id": queue_id}
1253
            )
1254
1255 1
        flow_mod = {
1256
            "match": {"in_port": in_interface.port_number},
1257
            "cookie": self.get_cookie(),
1258
            "actions": default_actions,
1259
            "owner": "mef_eline",
1260
        }
1261
1262 1
        self.set_flow_table_group_id(flow_mod, vlan)
1263 1
        if self.sb_priority:
1264 1
            flow_mod["priority"] = self.sb_priority
1265
        else:
1266 1
            flow_mod["priority"] = self.get_priority(vlan)
1267 1
        return flow_mod
1268
1269 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1270
        """Create NNI flows."""
1271 1
        in_interface, out_interface, in_vlan, out_vlan = args
1272 1
        flow_mod = self._prepare_flow_mod(
1273
            in_interface, out_interface, queue_id
1274
        )
1275 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1276 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1277 1
        flow_mod["actions"].insert(0, new_action)
1278
1279 1
        return flow_mod
1280
1281 1
    def _prepare_push_flow(self, *args, queue_id=None):
1282
        """Prepare push flow.
1283
1284
        Arguments:
1285
            in_interface(str): Interface input.
1286
            out_interface(str): Interface output.
1287
            in_vlan(int,str,None): Vlan input.
1288
            out_vlan(str): Vlan output.
1289
            new_c_vlan(int,str,list,None): New client vlan.
1290
1291
        Return:
1292
            dict: An python dictionary representing a FlowMod
1293
1294
        """
1295
        # assign all arguments
1296 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1297 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1298 1
        flow_mod = self._prepare_flow_mod(
1299
            in_interface, out_interface, queue_id, vlan_pri
1300
        )
1301
        # the service tag must be always pushed
1302 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1303 1
        flow_mod["actions"].insert(0, new_action)
1304
1305 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1306 1
        flow_mod["actions"].insert(0, new_action)
1307
1308 1
        if in_vlan is not None:
1309
            # if in_vlan is set, it must be included in the match
1310 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1311
1312 1
        if (not isinstance(new_c_vlan, list) and in_vlan != new_c_vlan and
1313
                new_c_vlan not in self.special_cases):
1314
            # new_in_vlan is an integer but zero, action to set is required
1315 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1316 1
            flow_mod["actions"].insert(0, new_action)
1317
1318 1
        if in_vlan not in self.special_cases and new_c_vlan == 0:
1319
            # # new_in_vlan is an integer but zero and new_c_vlan does not
1320
            # a pop action is required
1321 1
            new_action = {"action_type": "pop_vlan"}
1322 1
            flow_mod["actions"].insert(0, new_action)
1323
1324 1
        elif in_vlan == "4096/4096" and new_c_vlan == 0:
1325
            # if in_vlan match with any tags and new_c_vlan does not
1326
            # a pop action is required
1327 1
            new_action = {"action_type": "pop_vlan"}
1328 1
            flow_mod["actions"].insert(0, new_action)
1329
1330 1
        elif (not in_vlan and
1331
                (not isinstance(new_c_vlan, list) and
1332
                 new_c_vlan not in self.special_cases)):
1333
            # new_in_vlan is an integer but zero and in_vlan is not set
1334
            # then it is set now
1335 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1336 1
            flow_mod["actions"].insert(0, new_action)
1337
1338 1
        return flow_mod
1339
1340 1
    def _prepare_pop_flow(
1341
        self, in_interface, out_interface, out_vlan, queue_id=None
1342
    ):
1343
        # pylint: disable=too-many-arguments
1344
        """Prepare pop flow."""
1345 1
        flow_mod = self._prepare_flow_mod(
1346
            in_interface, out_interface, queue_id
1347
        )
1348 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1349 1
        new_action = {"action_type": "pop_vlan"}
1350 1
        flow_mod["actions"].insert(0, new_action)
1351 1
        return flow_mod
1352
1353 1
    @staticmethod
1354 1
    def run_bulk_sdntraces(
1355
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1356
    ) -> dict:
1357
        """Run SDN traces on control plane starting from EVC UNIs."""
1358 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1359 1
        data = []
1360 1
        for interface, tag_value in uni_list:
1361 1
            data_uni = {
1362
                "trace": {
1363
                            "switch": {
1364
                                "dpid": interface.switch.dpid,
1365
                                "in_port": interface.port_number,
1366
                            }
1367
                        }
1368
                }
1369 1
            if tag_value:
1370 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1371 1
                if uni_dl_vlan:
1372 1
                    data_uni["trace"]["eth"] = {
1373
                                            "dl_type": 0x8100,
1374
                                            "dl_vlan": uni_dl_vlan,
1375
                                            }
1376 1
            data.append(data_uni)
1377 1
        try:
1378 1
            response = requests.put(endpoint, json=data, timeout=30)
1379 1
        except Timeout as exception:
1380 1
            log.error(f"Request has timed out: {exception}")
1381 1
            return {"result": []}
1382 1
        if response.status_code >= 400:
1383 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1384 1
            return {"result": []}
1385 1
        return response.json()
1386
1387
    # pylint: disable=too-many-return-statements, too-many-arguments
1388 1
    @staticmethod
1389 1
    def check_trace(
1390
        evc_id: str,
1391
        evc_name: str,
1392
        tag_a: Union[None, int, str],
1393
        tag_z: Union[None, int, str],
1394
        interface_a: Interface,
1395
        interface_z: Interface,
1396
        current_path: list,
1397
        trace_a: list,
1398
        trace_z: list
1399
    ) -> bool:
1400
        """Auxiliar function to check an individual trace"""
1401 1
        if (
1402
            len(trace_a) != len(current_path) + 1
1403
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1404
        ):
1405 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1406
                        f"Invalid trace from uni_a: {trace_a}")
1407 1
            return False
1408 1
        if (
1409
            len(trace_z) != len(current_path) + 1
1410
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1411
        ):
1412 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1413
                        f"Invalid trace from uni_z: {trace_z}")
1414 1
            return False
1415
1416 1
        if not current_path:
1417
            return True
1418
1419 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1420 1
        if (
1421
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1422
        ):
1423 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1424 1
        elif (
1425
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1426
        ):
1427 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1428
        else:
1429
            msg = (
1430
                f"first link {first_link} endpoint_a didn't match the first "
1431
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1432
            )
1433
            log.warning(msg)
1434
            return False
1435
1436 1
        for link, trace1, trace2 in zip(current_path,
1437
                                        trace_path_begin[1:],
1438
                                        trace_path_end[:0:-1]):
1439 1
            metadata_vlan = None
1440 1
            if link.metadata:
1441 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1442 1
            if compare_endpoint_trace(
1443
                                        link.endpoint_a,
1444
                                        metadata_vlan,
1445
                                        trace2
1446
                                    ) is False:
1447 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1448
                            f"Invalid trace from uni_a: {trace_a}")
1449 1
                return False
1450 1
            if compare_endpoint_trace(
1451
                                        link.endpoint_b,
1452
                                        metadata_vlan,
1453
                                        trace1
1454
                                    ) is False:
1455 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1456
                            f"Invalid trace from uni_z: {trace_z}")
1457 1
                return False
1458
1459 1
        return True
1460
1461 1
    @staticmethod
1462 1
    def check_range(circuit, traces: list) -> bool:
1463
        """Check traces when for UNI with TAGRange"""
1464 1
        check = True
1465 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1466 1
            trace_a = traces[i*2]
1467 1
            trace_z = traces[i*2+1]
1468 1
            check &= EVCDeploy.check_trace(
1469
                circuit.id, circuit.name,
1470
                mask, mask,
1471
                circuit.uni_a.interface,
1472
                circuit.uni_z.interface,
1473
                circuit.current_path,
1474
                trace_a, trace_z,
1475
            )
1476 1
        return check
1477
1478 1
    @staticmethod
1479 1
    def check_list_traces(list_circuits: list) -> dict:
1480
        """Check if current_path is deployed comparing with SDN traces."""
1481 1
        if not list_circuits:
1482 1
            return {}
1483 1
        uni_list = make_uni_list(list_circuits)
1484 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1485
1486 1
        if not traces:
1487 1
            return {}
1488
1489 1
        try:
1490 1
            circuits_checked = {}
1491 1
            i = 0
1492 1
            for circuit in list_circuits:
1493 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1494 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1495 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1496
                        circuit, traces[i:i+length*2]
1497
                    )
1498 1
                    i += length*2
1499
                else:
1500 1
                    trace_a = traces[i]
1501 1
                    trace_z = traces[i+1]
1502 1
                    tag_a = None
1503 1
                    if circuit.uni_a.user_tag:
1504 1
                        tag_a = circuit.uni_a.user_tag.value
1505 1
                    tag_z = None
1506 1
                    if circuit.uni_z.user_tag:
1507 1
                        tag_z = circuit.uni_z.user_tag.value
1508 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1509
                        circuit.id, circuit.name,
1510
                        tag_a, tag_z,
1511
                        circuit.uni_a.interface,
1512
                        circuit.uni_z.interface,
1513
                        circuit.current_path,
1514
                        trace_a, trace_z
1515
                    )
1516 1
                    i += 2
1517 1
        except IndexError as err:
1518 1
            log.error(
1519
                f"Bulk sdntraces returned fewer items than expected."
1520
                f"Error = {err}"
1521
            )
1522 1
            return {}
1523
1524 1
        return circuits_checked
1525
1526 1
    @staticmethod
1527 1
    def get_endpoint_by_id(
1528
        link: Link,
1529
        id_: str,
1530
        operator: Union[eq, ne]
1531
    ) -> Interface:
1532
        """Return endpoint from link
1533
        either equal(eq) or not equal(ne) to id"""
1534 1
        if operator(link.endpoint_a.switch.id, id_):
1535 1
            return link.endpoint_a
1536 1
        return link.endpoint_b
1537
1538
1539 1
class LinkProtection(EVCDeploy):
1540
    """Class to handle link protection."""
1541
1542 1
    def is_affected_by_link(self, link=None):
1543
        """Verify if the current path is affected by link down event."""
1544
        return self.current_path.is_affected_by_link(link)
1545
1546 1
    def is_using_primary_path(self):
1547
        """Verify if the current deployed path is self.primary_path."""
1548 1
        return self.current_path == self.primary_path
1549
1550 1
    def is_using_backup_path(self):
1551
        """Verify if the current deployed path is self.backup_path."""
1552 1
        return self.current_path == self.backup_path
1553
1554 1
    def is_using_dynamic_path(self):
1555
        """Verify if the current deployed path is dynamic."""
1556 1
        if (
1557
            self.current_path
1558
            and not self.is_using_primary_path()
1559
            and not self.is_using_backup_path()
1560
            and self.current_path.status is EntityStatus.UP
1561
        ):
1562
            return True
1563 1
        return False
1564
1565 1
    def deploy_to(self, path_name=None, path=None):
1566
        """Create a deploy to path."""
1567 1
        if self.current_path == path:
1568 1
            log.debug(f"{path_name} is equal to current_path.")
1569 1
            return True
1570
1571 1
        if path.status is EntityStatus.UP:
1572 1
            return self.deploy_to_path(path)
1573
1574 1
        return False
1575
1576 1
    def handle_link_up(self, link):
1577
        """Handle circuit when link up.
1578
1579
        Args:
1580
            link(Link): Link affected by link.up event.
1581
1582
        """
1583 1
        condition_pairs = [
1584
            (
1585
                lambda me: me.is_using_primary_path(),
1586
                lambda _: (True, 'nothing')
1587
            ),
1588
            (
1589
                lambda me: me.is_intra_switch(),
1590
                lambda _: (True, 'nothing')
1591
            ),
1592
            (
1593
                lambda me: me.primary_path.is_affected_by_link(link),
1594
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1595
            ),
1596
            # We tried to deploy(primary_path) without success.
1597
            # And in this case is up by some how. Nothing to do.
1598
            (
1599
                lambda me: me.is_using_backup_path(),
1600
                lambda _: (True, 'nothing')
1601
            ),
1602
            (
1603
                lambda me:  me.is_using_dynamic_path(),
1604
                lambda _: (True, 'nothing')
1605
            ),
1606
            # In this case, probably the circuit is not being used and
1607
            # we can move to backup
1608
            (
1609
                lambda me: me.backup_path.is_affected_by_link(link),
1610
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1611
            ),
1612
            # In this case, the circuit is not being used and we should
1613
            # try a dynamic path
1614
            (
1615
                lambda me: me.dynamic_backup_path,
1616
                lambda me: (me.deploy_to_path(), 'redeploy')
1617
            )
1618
        ]
1619 1
        for predicate, action in condition_pairs:
1620 1
            if not predicate(self):
1621 1
                continue
1622 1
            success, succcess_type = action(self)
1623 1
            if success:
1624 1
                if succcess_type == 'redeploy':
1625 1
                    emit_event(
1626
                        self._controller,
1627
                        "redeployed_link_up",
1628
                        content=map_evc_event_content(self)
1629
                    )
1630 1
                return True
1631 1
        return False
1632
1633 1
    def handle_link_down(self):
1634
        """Handle circuit when link down.
1635
1636
        Returns:
1637
            bool: True if the re-deploy was successly otherwise False.
1638
1639
        """
1640 1
        success = False
1641 1
        if self.is_using_primary_path():
1642 1
            success = self.deploy_to_backup_path()
1643 1
        elif self.is_using_backup_path():
1644 1
            success = self.deploy_to_primary_path()
1645
1646 1
        if not success and self.dynamic_backup_path:
1647 1
            success = self.deploy_to_path()
1648
1649 1
        if success:
1650 1
            log.debug(f"{self} deployed after link down.")
1651
        else:
1652 1
            self.remove_current_flows(sync=False)
1653 1
            self.deactivate()
1654 1
            self.sync()
1655 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1656
1657 1
        return success
1658
1659 1
    @staticmethod
1660 1
    def get_interface_from_switch(uni: UNI, switches: dict) -> Interface:
1661
        """Get interface from switch by uni"""
1662 1
        switch = switches[uni.interface.switch.dpid]
1663 1
        interface = switch.interfaces[uni.interface.port_number]
1664 1
        return interface
1665
1666 1
    def are_unis_active(self, switches: dict) -> bool:
1667
        """Determine whether this EVC should be active"""
1668 1
        interface_a = self.get_interface_from_switch(self.uni_a, switches)
1669 1
        interface_z = self.get_interface_from_switch(self.uni_z, switches)
1670 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1671 1
        return active
1672
1673 1
    @staticmethod
1674 1
    def is_uni_interface_active(
1675
        *interfaces: Interface
1676
    ) -> tuple[bool, dict]:
1677
        """Determine whether a UNI should be active"""
1678 1
        active = True
1679 1
        bad_interfaces = [
1680
            interface
1681
            for interface in interfaces
1682
            if interface.status != EntityStatus.UP
1683
        ]
1684 1
        if bad_interfaces:
1685 1
            active = False
1686 1
            interfaces = bad_interfaces
1687 1
        return active, {
1688
            interface.id: {
1689
                'status': interface.status.value,
1690
                'status_reason': interface.status_reason,
1691
            }
1692
            for interface in interfaces
1693
        }
1694
1695 1
    def handle_interface_link_up(self, interface: Interface):
1696
        """
1697
        Handler for interface link_up events
1698
        """
1699 1
        if self.is_active():
1700 1
            return
1701 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1702 1
        if interface not in interfaces:
1703
            return
1704 1
        down_interfaces = [
1705
            interface
1706
            for interface in interfaces
1707
            if interface.status != EntityStatus.UP
1708
        ]
1709 1
        if down_interfaces:
1710
            return
1711 1
        interface_dicts = {
1712
            interface.id: {
1713
                'status': interface.status.value,
1714
                'status_reason': interface.status_reason,
1715
            }
1716
            for interface in interfaces
1717
        }
1718 1
        self.activate()
1719 1
        log.info(
1720
            f"Activating EVC {self.id}. Interfaces: "
1721
            f"{interface_dicts}."
1722
        )
1723 1
        self.sync()
1724
1725 1
    def handle_interface_link_down(self, interface):
1726
        """
1727
        Handler for interface link_down events
1728
        """
1729 1
        if not self.is_active():
1730 1
            return
1731 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1732 1
        if interface not in interfaces:
1733
            return
1734 1
        down_interfaces = [
1735
            interface
1736
            for interface in interfaces
1737
            if interface.status != EntityStatus.UP
1738
        ]
1739 1
        if not down_interfaces:
1740
            return
1741 1
        interface_dicts = {
1742
            interface.id: {
1743
                'status': interface.status.value,
1744
                'status_reason': interface.status_reason,
1745
            }
1746
            for interface in down_interfaces
1747
        }
1748 1
        self.deactivate()
1749 1
        log.info(
1750
            f"Deactivating EVC {self.id}. Interfaces: "
1751
            f"{interface_dicts}."
1752
        )
1753 1
        self.sync()
1754
1755
1756 1
class EVC(LinkProtection):
1757
    """Class that represents a E-Line Virtual Connection."""
1758