Passed
Pull Request — master (#708)
by Aldo
05:06
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc,
30
                                         compare_endpoint_trace,
31
                                         compare_uni_out_trace, emit_event,
32
                                         make_uni_list, map_dl_vlan,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts)
35
36 1
from .path import DynamicPathManager, Path
37
38
39 1
class EVCBase(GenericEntity):
40
    """Class to represent a circuit."""
41
42 1
    attributes_requiring_redeploy = [
43
        "primary_path",
44
        "backup_path",
45
        "dynamic_backup_path",
46
        "queue_id",
47
        "sb_priority",
48
        "primary_constraints",
49
        "secondary_constraints",
50
        "uni_a",
51
        "uni_z",
52
    ]
53 1
    required_attributes = ["name", "uni_a", "uni_z"]
54
55 1
    updatable_attributes = {
56
        "uni_a",
57
        "uni_z",
58
        "name",
59
        "start_date",
60
        "end_date",
61
        "queue_id",
62
        "bandwidth",
63
        "primary_path",
64
        "backup_path",
65
        "dynamic_backup_path",
66
        "primary_constraints",
67
        "secondary_constraints",
68
        "owner",
69
        "sb_priority",
70
        "service_level",
71
        "circuit_scheduler",
72
        "metadata",
73
        "enabled",
74
        "max_paths",
75
    }
76
77
    # pylint: disable=too-many-statements
78 1
    def __init__(self, controller, **kwargs):
79
        """Create an EVC instance with the provided parameters.
80
81
        Args:
82
            id(str): EVC identifier. Whether it's None an ID will be genereted.
83
                     Only the first 14 bytes passed will be used.
84
            name: represents an EVC name.(Required)
85
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
86
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
87
            start_date(datetime|str): Date when the EVC was registred.
88
                                      Default is now().
89
            end_date(datetime|str): Final date that the EVC will be fineshed.
90
                                    Default is None.
91
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
92
            primary_links(list): Primary links used by evc. Default is []
93
            backup_links(list): Backups links used by evc. Default is []
94
            current_path(list): Circuit being used at the moment if this is an
95
                                active circuit. Default is [].
96
            failover_path(list): Path being used to provide EVC protection via
97
                                failover during link failures. Default is [].
98
            primary_path(list): primary circuit offered to user IF one or more
99
                                links were provided. Default is [].
100
            backup_path(list): backup circuit offered to the user IF one or
101
                               more links were provided. Default is [].
102
            dynamic_backup_path(bool): Enable computer backup path dynamically.
103
                                       Dafault is False.
104
            creation_time(datetime|str): datetime when the circuit should be
105
                                         activated. default is now().
106
            enabled(Boolean): attribute to indicate the administrative state;
107
                              default is False.
108
            active(Boolean): attribute to indicate the operational state;
109
                             default is False.
110
            archived(Boolean): indicate the EVC has been deleted and is
111
                               archived; default is False.
112
            owner(str): The EVC owner. Default is None.
113
            sb_priority(int): Service level provided in the request.
114
                              Default is None.
115
            service_level(int): Service level provided. The higher the better.
116
                                Default is 0.
117
118
        Raises:
119
            ValueError: raised when object attributes are invalid.
120
121
        """
122 1
        self._controller = controller
123 1
        self._validate(**kwargs)
124 1
        super().__init__()
125
126
        # required attributes
127 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
128 1
        self.uni_a: UNI = kwargs.get("uni_a")
129 1
        self.uni_z: UNI = kwargs.get("uni_z")
130 1
        self.name = kwargs.get("name")
131
132
        # optional attributes
133 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
134 1
        self.end_date = get_time(kwargs.get("end_date")) or None
135 1
        self.queue_id = kwargs.get("queue_id", -1)
136
137 1
        self.bandwidth = kwargs.get("bandwidth", 0)
138 1
        self.primary_links = Path(kwargs.get("primary_links", []))
139 1
        self.backup_links = Path(kwargs.get("backup_links", []))
140 1
        self.current_path = Path(kwargs.get("current_path", []))
141 1
        self.failover_path = Path(kwargs.get("failover_path", []))
142 1
        self.primary_path = Path(kwargs.get("primary_path", []))
143 1
        self.backup_path = Path(kwargs.get("backup_path", []))
144 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
145 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
146 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
147 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
148 1
        self.owner = kwargs.get("owner", None)
149 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
150
            "priority", None
151
        )
152 1
        self.service_level = kwargs.get("service_level", 0)
153 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
154 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
155 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
156 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
157 1
        self.current_links_cache = set()
158 1
        self.primary_links_cache = set()
159 1
        self.backup_links_cache = set()
160 1
        self.old_path = Path([])
161 1
        self.max_paths = kwargs.get("max_paths", 2)
162
163 1
        self.lock = Lock()
164
165 1
        self.archived = kwargs.get("archived", False)
166
167 1
        self.metadata = kwargs.get("metadata", {})
168
169 1
        self._mongo_controller = controllers.ELineController()
170
171 1
        if kwargs.get("active", False):
172 1
            self.activate()
173
        else:
174 1
            self.deactivate()
175
176 1
        if kwargs.get("enabled", False):
177 1
            self.enable()
178
        else:
179 1
            self.disable()
180
181
        # datetime of user request for a EVC (or datetime when object was
182
        # created)
183 1
        self.request_time = kwargs.get("request_time", now())
184
        # dict with the user original request (input)
185 1
        self._requested = kwargs
186
187
        # Special cases: No tag, any, untagged
188 1
        self.special_cases = {None, "4096/4096", 0}
189 1
        self.table_group = kwargs.get("table_group")
190
191 1
    def sync(self, keys: set = None):
192
        """Sync this EVC in the MongoDB."""
193 1
        self.updated_at = now()
194 1
        if keys:
195 1
            self._mongo_controller.update_evc(self.as_dict(keys))
196 1
            return
197 1
        self._mongo_controller.upsert_evc(self.as_dict())
198
199 1
    def _get_unis(self, **kwargs) -> tuple[UNI, UNI]:
200
        """Get the updated UNIs."""
201 1
        uni_a = kwargs.get("uni_a", None) or self.uni_a
202 1
        uni_z = kwargs.get("uni_z", None) or self.uni_z
203 1
        return uni_a, uni_z
204
205 1
    def _get_unis_use_tags(self, uni_a, uni_z) -> tuple[UNI, UNI]:
206
        """Obtain both UNIs (uni_a, uni_z).
207
        If a UNI is changing, verify tags"""
208 1
        uni_a_flag = False
209 1
        if uni_a != self.uni_a:
210 1
            uni_a_flag = True
211 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
212
213 1
        if uni_z != self.uni_z:
214 1
            try:
215 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
216 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
217 1
            except KytosTagError as err:
218 1
                if uni_a_flag:
219 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
220 1
                raise err
221
        else:
222 1
            uni_z = self.uni_z
223
224 1
        if uni_a_flag:
225 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
226
        else:
227 1
            uni_a = self.uni_a
228 1
        return uni_a, uni_z
229
230
    # pylint: disable=too-many-branches
231 1
    def update(self, **kwargs):
232
        """Update evc attributes.
233
234
        This method will raises an error trying to change the following
235
        attributes: [creation_time, active, current_path, failover_path,
236
        _id, archived]
237
        [name, uni_a and uni_z]
238
239
        Returns:
240
            the values for enable and a redeploy attribute, if exists and None
241
            otherwise
242
        Raises:
243
            ValueError: message with error detail.
244
245
        """
246 1
        enable, redeploy = (None, None)
247
        # Verification of the attributes
248 1
        if not self._tag_lists_equal(**kwargs):
249 1
            raise ValueError(
250
                "UNI_A and UNI_Z tag lists should be the same."
251
            )
252
253 1
        uni_a, uni_z = self._get_unis(**kwargs)
254 1
        self._validate_has_primary_or_dynamic(
255
            primary_path=kwargs.get("primary_path"),
256
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
257
            uni_a=uni_a,
258
            uni_z=uni_z,
259
        )
260 1
        for attribute, value in kwargs.items():
261 1
            if attribute not in self.updatable_attributes:
262 1
                raise ValueError(f"{attribute} can't be updated.")
263
264
        # Alway verify primary_path and backup_path.
265
        # If a UNI has changed, they need to connect with paths
266
        # If path has changed, they need to be valid and connect with UNIs
267 1
        for path_name in ("primary_path", "backup_path"):
268 1
            path = getattr(self, path_name)
269 1
            path = kwargs.get(path_name, path)
270 1
            try:
271 1
                path.is_valid(
272
                    uni_a.interface.switch, uni_z.interface.switch
273
                )
274 1
            except InvalidPath as exception:
275 1
                raise ValueError(  # pylint: disable=raise-missing-from
276
                        f"{path_name} is not a valid path: {exception}"
277
                    )
278
279
        # Changing/Updating with the new values
280 1
        uni_a, uni_z = self._get_unis_use_tags(uni_a, uni_z)
281 1
        for attribute, value in kwargs.items():
282 1
            if attribute == "enabled":
283 1
                if value:
284 1
                    self.enable()
285
                else:
286 1
                    self.disable()
287 1
                enable = value
288
            else:
289 1
                setattr(self, attribute, value)
290 1
                if attribute in self.attributes_requiring_redeploy:
291 1
                    redeploy = True
292 1
        self.sync(set(kwargs.keys()))
293 1
        return enable, redeploy
294
295 1
    def set_flow_removed_at(self):
296
        """Update flow_removed_at attribute."""
297
        self.flow_removed_at = now()
298
299 1
    def has_recent_removed_flow(self, setting=settings):
300
        """Check if any flow has been removed from the evc"""
301
        if self.flow_removed_at is None:
302
            return False
303
        res_seconds = (now() - self.flow_removed_at).seconds
304
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
305
306 1
    def is_recent_updated(self, setting=settings):
307
        """Check if the evc has been updated recently"""
308
        res_seconds = (now() - self.updated_at).seconds
309
        return res_seconds < setting.TIME_RECENT_UPDATED
310
311 1
    def __repr__(self):
312
        """Repr method."""
313 1
        return f"EVC({self._id}, {self.name})"
314
315 1
    def _validate(self, **kwargs):
316
        """Do Basic validations.
317
318
        Verify required attributes: name, uni_a, uni_z
319
320
        Raises:
321
            ValueError: message with error detail.
322
323
        """
324 1
        for attribute in self.required_attributes:
325
326 1
            if attribute not in kwargs:
327 1
                raise ValueError(f"{attribute} is required.")
328
329 1
            if "uni" in attribute:
330 1
                uni = kwargs.get(attribute)
331 1
                if not isinstance(uni, UNI):
332
                    raise ValueError(f"{attribute} is an invalid UNI.")
333
334 1
    def _tag_lists_equal(self, **kwargs):
335
        """Verify that tag lists are the same."""
336 1
        uni_a = kwargs.get("uni_a") or self.uni_a
337 1
        uni_z = kwargs.get("uni_z") or self.uni_z
338 1
        uni_a_list = uni_z_list = False
339 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
340 1
            uni_a_list = True
341 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
342 1
            uni_z_list = True
343 1
        if uni_a_list and uni_z_list:
344 1
            return uni_a.user_tag.value == uni_z.user_tag.value
345 1
        return uni_a_list == uni_z_list
346
347 1
    def _validate_has_primary_or_dynamic(
348
        self,
349
        primary_path=None,
350
        dynamic_backup_path=None,
351
        uni_a=None,
352
        uni_z=None,
353
    ) -> None:
354
        """Validate that it must have a primary path or allow dynamic paths."""
355 1
        primary_path = (
356
            primary_path
357
            if primary_path is not None
358
            else self.primary_path
359
        )
360 1
        dynamic_backup_path = (
361
            dynamic_backup_path
362
            if dynamic_backup_path is not None
363
            else self.dynamic_backup_path
364
        )
365 1
        uni_a = uni_a if uni_a is not None else self.uni_a
366 1
        uni_z = uni_z if uni_z is not None else self.uni_z
367 1
        if (
368
            not primary_path
369
            and not dynamic_backup_path
370
            and uni_a and uni_z
371
            and uni_a.interface.switch != uni_z.interface.switch
372
        ):
373 1
            msg = "The EVC must have a primary path or allow dynamic paths."
374 1
            raise ValueError(msg)
375
376 1
    def __eq__(self, other):
377
        """Override the default implementation."""
378 1
        if not isinstance(other, EVC):
379
            return False
380
381 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
382 1
        for attribute in attrs_to_compare:
383 1
            if getattr(other, attribute) != getattr(self, attribute):
384 1
                return False
385 1
        return True
386
387 1
    def is_intra_switch(self):
388
        """Check if the UNIs are in the same switch."""
389 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
390
391 1
    def check_no_tag_duplicate(self, other_uni: UNI):
392
        """Check if a no tag UNI is duplicated."""
393 1
        if other_uni in (self.uni_a, self.uni_z):
394 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
395
                  f" duplicated with {self}."
396 1
            raise DuplicatedNoTagUNI(msg)
397
398 1
    def as_dict(self, keys: set = None):
399
        """Return a dictionary representing an EVC object.
400
            keys: Only fields on this variable will be
401
                  returned in the dictionary"""
402 1
        evc_dict = {
403
            "id": self.id,
404
            "name": self.name,
405
            "uni_a": self.uni_a.as_dict(),
406
            "uni_z": self.uni_z.as_dict(),
407
        }
408
409 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
410
411 1
        evc_dict["start_date"] = self.start_date
412 1
        if isinstance(self.start_date, datetime):
413 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
414
415 1
        evc_dict["end_date"] = self.end_date
416 1
        if isinstance(self.end_date, datetime):
417 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
418
419 1
        evc_dict["queue_id"] = self.queue_id
420 1
        evc_dict["bandwidth"] = self.bandwidth
421 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
422 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
423 1
        evc_dict["current_path"] = self.current_path.as_dict()
424 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
425 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
426 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
427 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
428 1
        evc_dict["metadata"] = self.metadata
429
430 1
        evc_dict["request_time"] = self.request_time
431 1
        if isinstance(self.request_time, datetime):
432 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
433
434 1
        time = self.creation_time.strftime(time_fmt)
435 1
        evc_dict["creation_time"] = time
436
437 1
        evc_dict["owner"] = self.owner
438 1
        evc_dict["circuit_scheduler"] = [
439
            sc.as_dict() for sc in self.circuit_scheduler
440
        ]
441
442 1
        evc_dict["active"] = self.is_active()
443 1
        evc_dict["enabled"] = self.is_enabled()
444 1
        evc_dict["archived"] = self.archived
445 1
        evc_dict["sb_priority"] = self.sb_priority
446 1
        evc_dict["service_level"] = self.service_level
447 1
        evc_dict["primary_constraints"] = self.primary_constraints
448 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
449 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
450 1
        evc_dict["updated_at"] = self.updated_at
451 1
        evc_dict["max_paths"] = self.max_paths
452
453 1
        if keys:
454 1
            selected = {}
455 1
            for key in keys:
456 1
                selected[key] = evc_dict[key]
457 1
            selected["id"] = evc_dict["id"]
458 1
            return selected
459 1
        return evc_dict
460
461 1
    @property
462 1
    def id(self):  # pylint: disable=invalid-name
463
        """Return this EVC's ID."""
464 1
        return self._id
465
466 1
    def archive(self):
467
        """Archive this EVC on deletion."""
468 1
        self.archived = True
469
470 1
    def _use_uni_vlan(
471
        self,
472
        uni: UNI,
473
        uni_dif: Union[None, UNI] = None
474
    ):
475
        """Use tags from UNI"""
476 1
        if uni.user_tag is None:
477 1
            return
478 1
        tag = uni.user_tag.value
479 1
        tag_type = uni.user_tag.tag_type
480 1
        if (uni_dif and isinstance(tag, list) and
481
                isinstance(uni_dif.user_tag.value, list)):
482 1
            tag = range_difference(tag, uni_dif.user_tag.value)
483 1
            if not tag:
484 1
                return
485 1
        uni.interface.use_tags(
486
            self._controller, tag, tag_type, use_lock=True, check_order=False
487
        )
488
489 1
    def make_uni_vlan_available(
490
        self,
491
        uni: UNI,
492
        uni_dif: Union[None, UNI] = None,
493
    ):
494
        """Make available tag from UNI"""
495 1
        if uni.user_tag is None:
496 1
            return
497 1
        tag = uni.user_tag.value
498 1
        tag_type = uni.user_tag.tag_type
499 1
        if (uni_dif and isinstance(tag, list) and
500
                isinstance(uni_dif.user_tag.value, list)):
501 1
            tag = range_difference(tag, uni_dif.user_tag.value)
502 1
            if not tag:
503
                return
504 1
        try:
505 1
            conflict = uni.interface.make_tags_available(
506
                self._controller, tag, tag_type, use_lock=True,
507
                check_order=False
508
            )
509 1
        except KytosTagError as err:
510 1
            log.error(f"Error in {self}: {err}")
511 1
            return
512 1
        if conflict:
513 1
            intf = uni.interface.id
514 1
            log.warning(f"Tags {conflict} was already available in {intf}")
515
516 1
    def remove_uni_tags(self):
517
        """Remove both UNI usage of a tag"""
518 1
        self.make_uni_vlan_available(self.uni_a)
519 1
        self.make_uni_vlan_available(self.uni_z)
520
521
522
# pylint: disable=fixme, too-many-public-methods
523 1
class EVCDeploy(EVCBase):
524
    """Class to handle the deploy procedures."""
525
526 1
    def create(self):
527
        """Create a EVC."""
528
529 1
    def discover_new_paths(self):
530
        """Discover new paths to satisfy this circuit and deploy it."""
531
        return DynamicPathManager.get_best_paths(self, self.max_paths,
532
                                                 **self.primary_constraints)
533
534 1
    def get_failover_path_candidates(self):
535
        """Get failover paths to satisfy this EVC."""
536
        # in the future we can return primary/backup paths as well
537
        # we just have to properly handle link_up and failover paths
538
        # if (
539
        #     self.is_using_primary_path() and
540
        #     self.backup_path.status is EntityStatus.UP
541
        # ):
542
        #     yield self.backup_path
543 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
544
545 1
    def change_path(self):
546
        """Change EVC path."""
547
548 1
    def reprovision(self):
549
        """Force the EVC (re-)provisioning."""
550
551 1
    def is_affected_by_link(self, link):
552
        """Return True if this EVC has the given link on its current path."""
553 1
        return link in self.current_path
554
555 1
    def link_affected_by_interface(self, interface):
556
        """Return True if this EVC has the given link on its current path."""
557
        return self.current_path.link_affected_by_interface(interface)
558
559 1
    def is_backup_path_affected_by_link(self, link):
560
        """Return True if the backup path of this EVC uses the given link."""
561 1
        return link in self.backup_path
562
563
    # pylint: disable=invalid-name
564 1
    def is_primary_path_affected_by_link(self, link):
565
        """Return True if the primary path of this EVC uses the given link."""
566 1
        return link in self.primary_path
567
568 1
    def is_failover_path_affected_by_link(self, link):
569
        """Return True if this EVC has the given link on its failover path."""
570 1
        return link in self.failover_path
571
572 1
    def is_eligible_for_failover_path(self):
573
        """Verify if this EVC is eligible for failover path (EP029)"""
574
        # In the future this function can be augmented to consider
575
        # primary/backup, primary/dynamic, and other path combinations
576 1
        return (
577
            self.dynamic_backup_path and
578
            not self.primary_path and not self.backup_path
579
        )
580
581 1
    def is_using_primary_path(self):
582
        """Verify if the current deployed path is self.primary_path."""
583 1
        return self.primary_path and (self.current_path == self.primary_path)
584
585 1
    def is_using_backup_path(self):
586
        """Verify if the current deployed path is self.backup_path."""
587 1
        return self.backup_path and (self.current_path == self.backup_path)
588
589 1
    def is_using_dynamic_path(self):
590
        """Verify if the current deployed path is a dynamic path."""
591 1
        if (
592
            self.current_path
593
            and not self.is_using_primary_path()
594
            and not self.is_using_backup_path()
595
            and self.current_path.status == EntityStatus.UP
596
        ):
597
            return True
598 1
        return False
599
600 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
601
        """Deploy the backup path into the datapaths of this circuit.
602
603
        If the backup_path attribute is valid and up, this method will try to
604
        deploy this backup_path.
605
606
        If everything fails and dynamic_backup_path is True, then tries to
607
        deploy a dynamic path.
608
        """
609
        # TODO: Remove flows from current (cookies)
610 1
        if self.is_using_backup_path():
611
            # TODO: Log to say that cannot move backup to backup
612
            return True
613
614 1
        success = False
615 1
        if self.backup_path.status is EntityStatus.UP:
616 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
617
618 1
        if success:
619 1
            return True
620
621 1
        if self.dynamic_backup_path or self.is_intra_switch():
622 1
            return self.deploy_to_path(old_path_dict=old_path_dict)
623
624
        return False
625
626 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
627
        """Deploy the primary path into the datapaths of this circuit.
628
629
        If the primary_path attribute is valid and up, this method will try to
630
        deploy this primary_path.
631
        """
632
        # TODO: Remove flows from current (cookies)
633 1
        if self.is_using_primary_path():
634
            # TODO: Log to say that cannot move primary to primary
635
            return True
636
637 1
        if self.primary_path.status is EntityStatus.UP:
638 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
639
        return False
640
641 1
    def deploy(self, old_path_dict: dict = None):
642
        """Deploy EVC to best path.
643
644
        Best path can be the primary path, if available. If not, the backup
645
        path, and, if it is also not available, a dynamic path.
646
        """
647 1
        if self.archived:
648 1
            return False
649 1
        self.enable()
650 1
        success = self.deploy_to_primary_path(old_path_dict)
651 1
        if not success:
652 1
            success = self.deploy_to_backup_path(old_path_dict)
653
654 1
        if success:
655 1
            emit_event(self._controller, "deployed",
656
                       content=map_evc_event_content(self))
657 1
        return success
658
659 1
    @staticmethod
660 1
    def get_path_status(path):
661
        """Check for the current status of a path.
662
663
        If any link in this path is down, the path is considered down.
664
        """
665 1
        if not path:
666 1
            return EntityStatus.DISABLED
667
668 1
        for link in path:
669 1
            if link.status is not EntityStatus.UP:
670 1
                return link.status
671 1
        return EntityStatus.UP
672
673
    #    def discover_new_path(self):
674
    #        # TODO: discover a new path to satisfy this circuit and deploy
675
676 1
    def remove(self):
677
        """Remove EVC path and disable it."""
678 1
        self.remove_current_flows(sync=False)
679 1
        self.remove_failover_flows(sync=False)
680 1
        self.disable()
681 1
        self.sync()
682 1
        emit_event(self._controller, "undeployed",
683
                   content=map_evc_event_content(self))
684
685 1
    def remove_failover_flows(self, exclude_uni_switches=True,
686
                              force=True, sync=True) -> None:
687
        """Remove failover_flows.
688
689
        By default, it'll exclude UNI switches, if mef_eline has already
690
        called remove_current_flows before then this minimizes the number
691
        of FlowMods and IO.
692
        """
693 1
        if not self.failover_path:
694 1
            return
695 1
        switches, cookie, excluded = set(), self.get_cookie(), set()
696 1
        if exclude_uni_switches:
697 1
            excluded.add(self.uni_a.interface.switch.id)
698 1
            excluded.add(self.uni_z.interface.switch.id)
699 1
        for link in self.failover_path:
700 1
            if link.endpoint_a.switch.id not in excluded:
701 1
                switches.add(link.endpoint_a.switch.id)
702 1
            if link.endpoint_b.switch.id not in excluded:
703 1
                switches.add(link.endpoint_b.switch.id)
704 1
        flow_mods = {
705
            "switches": list(switches),
706
            "flows": [{
707
                "cookie": cookie,
708
                "cookie_mask": int(0xffffffffffffffff),
709
                "owner": "mef_eline",
710
            }]
711
        }
712 1
        try:
713 1
            self._send_flow_mods(
714
                flow_mods,
715
                "delete",
716
                force=force,
717
            )
718
        except FlowModException as err:
719
            log.error(f"Error deleting {self} failover_path flows, {err}")
720 1
        try:
721 1
            self.failover_path.make_vlans_available(self._controller)
722
        except KytosTagError as err:
723
            log.error(f"Error removing {self} failover_path: {err}")
724 1
        self.failover_path = Path([])
725 1
        if sync:
726 1
            self.sync()
727
728 1
    def remove_current_flows(
729
        self,
730
        current_path=None,
731
        force=True,
732
        sync=True,
733
        return_path=False
734
    ) -> dict[str, int]:
735
        """Remove all flows from current path or path intended for
736
         current path if exists."""
737 1
        switches, old_path_dict = set(), {}
738 1
        current_path = self.current_path if not current_path else current_path
739 1
        if not current_path and not self.is_intra_switch():
740 1
            return {}
741
742 1
        if return_path:
743 1
            for link in self.current_path:
744 1
                s_vlan = link.metadata.get("s_vlan")
745 1
                if s_vlan:
746 1
                    old_path_dict[link.id] = s_vlan.value
747
748 1
        for link in current_path:
749 1
            switches.add(link.endpoint_a.switch.id)
750 1
            switches.add(link.endpoint_b.switch.id)
751 1
        switches.add(self.uni_a.interface.switch.id)
752 1
        switches.add(self.uni_z.interface.switch.id)
753 1
        flow_mods = {
754
            "switches": list(switches),
755
            "flows": [{
756
                "cookie": self.get_cookie(),
757
                "cookie_mask": int(0xffffffffffffffff),
758
                "owner": "mef_eline",
759
            }]
760
        }
761
762 1
        try:
763 1
            self._send_flow_mods(flow_mods, "delete", force=force)
764 1
        except FlowModException as err:
765 1
            log.error(f"Error deleting {self} current_path flows, {err}")
766
767 1
        try:
768 1
            current_path.make_vlans_available(self._controller)
769 1
        except KytosTagError as err:
770 1
            log.error(f"Error removing {self} current_path: {err}")
771 1
        self.current_path = Path([])
772 1
        self.deactivate()
773 1
        if sync:
774 1
            self.sync()
775 1
        return old_path_dict
776
777 1
    def remove_path_flows(
778
        self, path=None, force=True
779
    ) -> dict[str, list[dict]]:
780
        """Remove all flows from path, and return the removed flows."""
781 1
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
782 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
783
784 1
        if not path:
785 1
            return dpid_flows_match
786
787 1
        try:
788 1
            nni_flows = self._prepare_nni_flows(path)
789
        # pylint: disable=broad-except
790
        except Exception:
791
            err = traceback.format_exc()
792
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
793
            nni_flows = {}
794
795 1
        for dpid, flows in nni_flows.items():
796 1
            for flow in flows:
797 1
                flow_mod = {
798
                    "cookie": flow["cookie"],
799
                    "match": flow["match"],
800
                    "owner": "mef_eline",
801
                    "cookie_mask": int(0xffffffffffffffff)
802
                }
803 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
804 1
                out_flows[dpid].append(flow_mod)
805
806 1
        try:
807 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
808
        # pylint: disable=broad-except
809
        except Exception:
810
            err = traceback.format_exc()
811
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
812
            uni_flows = {}
813
814 1
        for dpid, flows in uni_flows.items():
815 1
            for flow in flows:
816 1
                flow_mod = {
817
                    "cookie": flow["cookie"],
818
                    "match": flow["match"],
819
                    "owner": "mef_eline",
820
                    "cookie_mask": int(0xffffffffffffffff)
821
                }
822 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
823 1
                out_flows[dpid].append(flow_mod)
824
825 1
        try:
826 1
            self._send_flow_mods(
827
                dpid_flows_match, 'delete', force=force, by_switch=True
828
            )
829 1
        except FlowModException as err:
830 1
            log.error(
831
                f"Error deleting {self} path flows, path:{path}, error={err}"
832
            )
833
834 1
        try:
835 1
            path.make_vlans_available(self._controller)
836
        except KytosTagError as err:
837
            log.error(f"Error removing {self} path: {err}")
838
839 1
        return out_flows
840
841 1
    @staticmethod
842 1
    def links_zipped(path=None):
843
        """Return an iterator which yields pairs of links in order."""
844 1
        if not path:
845 1
            return []
846 1
        return zip(path[:-1], path[1:])
847
848 1
    def should_deploy(self, path=None):
849
        """Verify if the circuit should be deployed."""
850 1
        if not path:
851 1
            log.debug("Path is empty.")
852 1
            return False
853
854 1
        if not self.is_enabled():
855 1
            log.debug(f"{self} is disabled.")
856 1
            return False
857
858 1
        if not self.is_active():
859 1
            log.debug(f"{self} will be deployed.")
860 1
            return True
861
862 1
        return False
863
864 1
    @staticmethod
865 1
    def is_uni_interface_active(
866
        *interfaces: Interface
867
    ) -> tuple[bool, dict]:
868
        """Whether UNIs are active and their status & status_reason."""
869 1
        active = True
870 1
        bad_interfaces = [
871
            interface
872
            for interface in interfaces
873
            if interface.status != EntityStatus.UP
874
        ]
875 1
        if bad_interfaces:
876 1
            active = False
877 1
            interfaces = bad_interfaces
878 1
        return active, {
879
            interface.id: {
880
                'status': interface.status.value,
881
                'status_reason': interface.status_reason,
882
            }
883
            for interface in interfaces
884
        }
885
886 1
    def try_to_activate(self) -> bool:
887
        """Try to activate the EVC."""
888 1
        if self.is_intra_switch():
889 1
            return self._try_to_activate_intra_evc()
890 1
        return self._try_to_activate_inter_evc()
891
892 1
    def _try_to_activate_intra_evc(self) -> bool:
893
        """Try to activate intra EVC."""
894 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
895 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
896 1
        if not is_active:
897 1
            raise ActivationError(
898
                f"Won't be able to activate {self} due to UNIs: {reason}"
899
            )
900 1
        self.activate()
901 1
        return True
902
903 1
    def _try_to_activate_inter_evc(self) -> bool:
904
        """Try to activate inter EVC."""
905 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
906 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
907 1
        if not is_active:
908 1
            raise ActivationError(
909
                f"Won't be able to activate {self} due to UNIs: {reason}"
910
            )
911 1
        if self.current_path.status != EntityStatus.UP:
912 1
            raise ActivationError(
913
                f"Won't be able to activate {self} due to current_path "
914
                f"status {self.current_path.status}"
915
            )
916 1
        self.activate()
917 1
        return True
918
919
    # pylint: disable=too-many-branches, too-many-statements
920 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
921
        """Install the flows for this circuit.
922
923
        Procedures to deploy:
924
925
        0. Remove current flows installed
926
        1. Decide if will deploy "path" or discover a new path
927
        2. Choose vlan
928
        3. Install NNI flows
929
        4. Install UNI flows
930
        5. Activate
931
        6. Update current_path
932
        7. Update links caches(primary, current, backup)
933
934
        """
935 1
        self.remove_current_flows(sync=False)
936 1
        use_path = path or Path([])
937 1
        if not old_path_dict:
938 1
            old_path_dict = {}
939 1
        tag_errors = []
940 1
        no_valid_path = False
941 1
        if self.should_deploy(use_path):
942 1
            try:
943 1
                use_path.choose_vlans(self._controller, old_path_dict)
944 1
            except KytosNoTagAvailableError as e:
945 1
                tag_errors.append(str(e))
946 1
                use_path = None
947
        else:
948 1
            for use_path in self.discover_new_paths():
949 1
                if use_path is None:
950 1
                    no_valid_path = True
951 1
                    continue
952 1
                try:
953 1
                    use_path.choose_vlans(self._controller, old_path_dict)
954 1
                    break
955 1
                except KytosNoTagAvailableError as e:
956 1
                    tag_errors.append(str(e))
957
            else:
958 1
                use_path = None
959
960 1
        try:
961 1
            if use_path:
962 1
                self._install_flows(use_path)
963 1
            elif self.is_intra_switch():
964 1
                use_path = Path()
965 1
                self._install_direct_uni_flows()
966
            else:
967 1
                no_path_msg = "No available path was found."
968 1
                if no_valid_path:
969 1
                    no_path_msg = "No valid path was found, "\
970
                                  "try increasing `max_paths`"\
971
                                 f" from {self.max_paths}."
972 1
                msg = f"{self} was not deployed. {no_path_msg}"
973 1
                if tag_errors:
974 1
                    msg = self.add_tag_errors(msg, tag_errors)
975 1
                    log.error(msg)
976
                else:
977 1
                    log.warning(msg)
978 1
                return False
979 1
        except EVCPathNotInstalled as err:
980 1
            log.error(
981
                f"Error deploying EVC {self} when calling flow_manager: {err}"
982
            )
983 1
            self.remove_current_flows(use_path, sync=True)
984 1
            return False
985
986 1
        self.current_path = use_path
987 1
        msg = f"{self} was deployed."
988 1
        try:
989 1
            self.try_to_activate()
990
        except ActivationError as exc:
991
            msg = f"{msg} {str(exc)}"
992 1
        self.sync()
993 1
        log.info(msg)
994 1
        return True
995
996
    # pylint: disable=too-many-statements
997 1
    def setup_failover_path(self, warn_if_not_path=True):
998
        """Install flows for the failover path of this EVC.
999
1000
        Procedures to deploy:
1001
1002
        0. Remove flows currently installed for failover_path (if any)
1003
        1. Discover a disjoint path from current_path
1004
        2. Choose vlans
1005
        3. Install NNI flows
1006
        4. Install UNI egress flows
1007
        5. Update failover_path
1008
        """
1009
        # Intra-switch EVCs have no failover_path
1010 1
        if self.is_intra_switch():
1011 1
            return False
1012
1013
        # For not only setup failover path for totally dynamic EVCs
1014 1
        if not self.is_eligible_for_failover_path():
1015 1
            return False
1016
1017 1
        out_new_flows: dict[str, list[dict]] = {}
1018 1
        reason = ""
1019 1
        tag_errors = []
1020 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1021 1
        self.failover_path = Path([])
1022
1023 1
        for use_path in self.get_failover_path_candidates():
1024 1
            if not use_path:
1025 1
                continue
1026 1
            try:
1027 1
                use_path.choose_vlans(self._controller)
1028 1
                break
1029 1
            except KytosNoTagAvailableError as e:
1030 1
                tag_errors.append(str(e))
1031
        else:
1032 1
            use_path = Path([])
1033 1
            reason = "No available path was found"
1034
1035 1
        try:
1036 1
            if use_path:
1037 1
                out_new_flows = self._install_flows(
1038
                    use_path, skip_in=True
1039
                )
1040 1
        except EVCPathNotInstalled as err:
1041 1
            reason = "Error deploying failover path"
1042 1
            log.error(
1043
                f"{reason} for {self}. FlowManager error: {err}"
1044
            )
1045 1
            _rmed_flows = self.remove_path_flows(use_path)
1046 1
            out_removed_flows = merge_flow_dicts(
1047
                out_removed_flows, _rmed_flows
1048
            )
1049 1
            use_path = Path([])
1050
1051 1
        self.failover_path = use_path
1052 1
        self.sync()
1053
1054 1
        if out_new_flows or out_removed_flows:
1055 1
            emit_event(self._controller, "failover_deployed", content={
1056
                self.id: map_evc_event_content(
1057
                    self,
1058
                    flows=deepcopy(out_new_flows),
1059
                    removed_flows=deepcopy(out_removed_flows),
1060
                    error_reason=reason,
1061
                    current_path=self.current_path.as_dict(),
1062
                )
1063
            })
1064
1065 1
        if not use_path:
1066 1
            msg = f"Failover path for {self} was not deployed: {reason}."
1067 1
            if tag_errors:
1068 1
                msg = self.add_tag_errors(msg, tag_errors)
1069 1
                log.error(msg)
1070 1
            elif warn_if_not_path:
1071 1
                log.warning(msg)
1072 1
            return False
1073 1
        log.info(f"Failover path for {self} was deployed.")
1074 1
        return True
1075
1076 1
    @staticmethod
1077 1
    def add_tag_errors(msg: str, tag_errors: list):
1078
        """Add to msg the tag errors ecountered when chossing path."""
1079 1
        path = ['path', 'paths']
1080 1
        was = ['was', 'were']
1081 1
        message = ['message', 'messages']
1082
1083
        # Choose either singular(0) or plural(1) words
1084 1
        n = 1
1085 1
        if len(tag_errors) == 1:
1086 1
            n = 0
1087
1088 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1089 1
        msg += f" with {message[n]}: {tag_errors}"
1090 1
        return msg
1091
1092 1
    def get_failover_flows(self):
1093
        """Return the flows needed to make the failover path active, i.e. the
1094
        flows for ingress forwarding.
1095
1096
        Return:
1097
            dict: A dict of flows indexed by the switch_id will be returned, or
1098
                an empty dict if no failover_path is available.
1099
        """
1100 1
        if not self.failover_path:
1101 1
            return {}
1102 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1103
1104
    # pylint: disable=too-many-branches
1105 1
    def _prepare_direct_uni_flows(self):
1106
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1107 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1108 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1109
1110 1
        flow_mod_az = self._prepare_flow_mod(
1111
            self.uni_a.interface, self.uni_z.interface,
1112
            self.queue_id, vlan_a
1113
        )
1114 1
        flow_mod_za = self._prepare_flow_mod(
1115
            self.uni_z.interface, self.uni_a.interface,
1116
            self.queue_id, vlan_z
1117
        )
1118
1119 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1120 1
            flow_mod_az["actions"].insert(
1121
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1122
            )
1123 1
            if not vlan_a:
1124 1
                flow_mod_az["actions"].insert(
1125
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1126
                )
1127 1
            if vlan_a == 0:
1128 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1129 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1130 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1131
1132 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1133 1
            flow_mod_za["actions"].insert(
1134
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1135
                )
1136 1
            if not vlan_z:
1137 1
                flow_mod_za["actions"].insert(
1138
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1139
                )
1140 1
            if vlan_z == 0:
1141 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1142 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1143 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1144
1145 1
        flows = []
1146 1
        if isinstance(vlan_a, list):
1147 1
            for mask_a in vlan_a:
1148 1
                flow_aux = deepcopy(flow_mod_az)
1149 1
                flow_aux["match"]["dl_vlan"] = mask_a
1150 1
                flows.append(flow_aux)
1151
        else:
1152 1
            if vlan_a is not None:
1153 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1154 1
            flows.append(flow_mod_az)
1155
1156 1
        if isinstance(vlan_z, list):
1157 1
            for mask_z in vlan_z:
1158 1
                flow_aux = deepcopy(flow_mod_za)
1159 1
                flow_aux["match"]["dl_vlan"] = mask_z
1160 1
                flows.append(flow_aux)
1161
        else:
1162 1
            if vlan_z is not None:
1163 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1164 1
            flows.append(flow_mod_za)
1165 1
        return (
1166
            self.uni_a.interface.switch.id, flows
1167
        )
1168
1169 1
    def _install_direct_uni_flows(self):
1170
        """Install flows connecting two UNIs.
1171
1172
        This case happens when the circuit is between UNIs in the
1173
        same switch.
1174
        """
1175 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1176 1
        flow_mods = {"switches": [dpid], "flows": flows}
1177 1
        try:
1178 1
            self._send_flow_mods(flow_mods, "install")
1179 1
        except FlowModException as err:
1180 1
            raise EVCPathNotInstalled(str(err)) from err
1181
1182 1
    def _prepare_nni_flows(self, path=None):
1183
        """Prepare NNI flows."""
1184 1
        nni_flows = OrderedDict()
1185 1
        previous = self.uni_a.interface.switch.dpid
1186 1
        for incoming, outcoming in self.links_zipped(path):
1187 1
            in_vlan = incoming.get_metadata("s_vlan").value
1188 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1189 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1190 1
            out_endpoint = self.get_endpoint_by_id(
1191
                outcoming, in_endpoint.switch.id, eq
1192
            )
1193
1194 1
            flows = []
1195
            # Flow for one direction
1196 1
            flows.append(
1197
                self._prepare_nni_flow(
1198
                    in_endpoint,
1199
                    out_endpoint,
1200
                    in_vlan,
1201
                    out_vlan,
1202
                    queue_id=self.queue_id,
1203
                )
1204
            )
1205
1206
            # Flow for the other direction
1207 1
            flows.append(
1208
                self._prepare_nni_flow(
1209
                    out_endpoint,
1210
                    in_endpoint,
1211
                    out_vlan,
1212
                    in_vlan,
1213
                    queue_id=self.queue_id,
1214
                )
1215
            )
1216 1
            previous = in_endpoint.switch.id
1217 1
            nni_flows[in_endpoint.switch.id] = flows
1218 1
        return nni_flows
1219
1220 1
    def _install_flows(
1221
        self, path=None, skip_in=False, skip_out=False
1222
    ) -> dict[str, list[dict]]:
1223
        """Install uni and nni flows"""
1224 1
        flows_by_switch = defaultdict(lambda: {"flows": []})
1225 1
        new_flows = defaultdict(list)
1226 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1227 1
            flows_by_switch[dpid]["flows"].extend(flows)
1228 1
            new_flows[dpid].extend(flows)
1229 1
        for dpid, flows in self._prepare_uni_flows(
1230
            path, skip_in, skip_out
1231
        ).items():
1232 1
            flows_by_switch[dpid]["flows"].extend(flows)
1233 1
            new_flows[dpid].extend(flows)
1234
1235 1
        try:
1236 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1237 1
        except FlowModException as err:
1238 1
            raise EVCPathNotInstalled(str(err)) from err
1239
1240 1
        return new_flows
1241
1242 1
    @staticmethod
1243 1
    def _get_value_from_uni_tag(uni: UNI):
1244
        """Returns the value from tag. In case of any and untagged
1245
        it should return 4096/4096 and 0 respectively"""
1246 1
        special = {"any": "4096/4096", "untagged": 0}
1247 1
        if uni.user_tag:
1248 1
            value = uni.user_tag.value
1249 1
            if isinstance(value, list):
1250 1
                return uni.user_tag.mask_list
1251 1
            return special.get(value, value)
1252 1
        return None
1253
1254
    # pylint: disable=too-many-locals
1255 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1256
        """Prepare flows to install UNIs."""
1257 1
        uni_flows = {}
1258 1
        if not path:
1259
            return uni_flows
1260
1261
        # Determine VLANs
1262 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1263 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1264
1265 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1266 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1267
1268
        # Get endpoints from path
1269 1
        endpoint_a = self.get_endpoint_by_id(
1270
            path[0], self.uni_a.interface.switch.id, eq
1271
        )
1272 1
        endpoint_z = self.get_endpoint_by_id(
1273
            path[-1], self.uni_z.interface.switch.id, eq
1274
        )
1275
1276
        # Flows for the first UNI
1277 1
        flows_a = []
1278
1279
        # Flow for one direction, pushing the service tag
1280 1
        if not skip_in:
1281 1
            if isinstance(in_vlan_a, list):
1282 1
                for in_mask_a in in_vlan_a:
1283 1
                    push_flow = self._prepare_push_flow(
1284
                        self.uni_a.interface,
1285
                        endpoint_a,
1286
                        in_mask_a,
1287
                        out_vlan_a,
1288
                        in_vlan_z,
1289
                        queue_id=self.queue_id,
1290
                    )
1291 1
                    flows_a.append(push_flow)
1292
            else:
1293
                push_flow = self._prepare_push_flow(
1294
                    self.uni_a.interface,
1295
                    endpoint_a,
1296
                    in_vlan_a,
1297
                    out_vlan_a,
1298
                    in_vlan_z,
1299
                    queue_id=self.queue_id,
1300
                )
1301
                flows_a.append(push_flow)
1302
1303
        # Flow for the other direction, popping the service tag
1304 1
        if not skip_out:
1305 1
            pop_flow = self._prepare_pop_flow(
1306
                endpoint_a,
1307
                self.uni_a.interface,
1308
                out_vlan_a,
1309
                in_vlan_a,
1310
                in_vlan_z,
1311
                queue_id=self.queue_id,
1312
            )
1313 1
            flows_a.append(pop_flow)
1314
1315 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1316
1317
        # Flows for the second UNI
1318 1
        flows_z = []
1319
1320
        # Flow for one direction, pushing the service tag
1321 1
        if not skip_in:
1322 1
            if isinstance(in_vlan_z, list):
1323 1
                for in_mask_z in in_vlan_z:
1324 1
                    push_flow = self._prepare_push_flow(
1325
                        self.uni_z.interface,
1326
                        endpoint_z,
1327
                        in_mask_z,
1328
                        out_vlan_z,
1329
                        in_vlan_a,
1330
                        queue_id=self.queue_id,
1331
                    )
1332 1
                    flows_z.append(push_flow)
1333
            else:
1334
                push_flow = self._prepare_push_flow(
1335
                    self.uni_z.interface,
1336
                    endpoint_z,
1337
                    in_vlan_z,
1338
                    out_vlan_z,
1339
                    in_vlan_a,
1340
                    queue_id=self.queue_id,
1341
                )
1342
                flows_z.append(push_flow)
1343
1344
        # Flow for the other direction, popping the service tag
1345 1
        if not skip_out:
1346 1
            pop_flow = self._prepare_pop_flow(
1347
                endpoint_z,
1348
                self.uni_z.interface,
1349
                out_vlan_z,
1350
                in_vlan_z,
1351
                in_vlan_a,
1352
                queue_id=self.queue_id,
1353
            )
1354 1
            flows_z.append(pop_flow)
1355
1356 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1357
1358 1
        return uni_flows
1359
1360 1
    @staticmethod
1361 1
    @retry(
1362
        stop=stop_after_attempt(3),
1363
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1364
        retry=retry_if_exception_type(FlowModException),
1365
        before_sleep=before_sleep,
1366
        reraise=True,
1367
    )
1368 1
    def _send_flow_mods(
1369
        data_content: dict,
1370
        command="install",
1371
        force=False,
1372
        by_switch=False
1373
    ):
1374
        """Send a flow_mod list to a specific switch.
1375
1376
        Args:
1377
            dpid(str): The target of flows (i.e. Switch.id).
1378
            flow_mods(dict): Python dictionary with flow_mods.
1379
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1380
            force(bool): True to send via consistency check in case of errors.
1381
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1382
        """
1383 1
        if by_switch:
1384 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1385
        else:
1386 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1387 1
            data_content["force"] = force
1388 1
        try:
1389 1
            if command == "install":
1390 1
                res = httpx.post(endpoint, json=data_content, timeout=30)
1391 1
            elif command == "delete":
1392 1
                res = httpx.request(
1393
                    "DELETE", endpoint, json=data_content, timeout=30
1394
                )
1395 1
        except httpx.RequestError as err:
1396 1
            raise FlowModException(str(err)) from err
1397 1
        if res.is_server_error or res.status_code >= 400:
1398 1
            raise FlowModException(res.text)
1399
1400 1
    def get_cookie(self):
1401
        """Return the cookie integer from evc id."""
1402 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1403
1404 1
    @staticmethod
1405 1
    def get_id_from_cookie(cookie):
1406
        """Return the evc id given a cookie value."""
1407 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1408 1
        return f"{evc_id:x}".zfill(14)
1409
1410 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1411
        """Set table_group and table_id"""
1412 1
        table_group = "epl" if vlan is None else "evpl"
1413 1
        flow_mod["table_group"] = table_group
1414 1
        flow_mod["table_id"] = self.table_group[table_group]
1415 1
        return flow_mod
1416
1417 1
    @staticmethod
1418 1
    def get_priority(vlan):
1419
        """Return priority value depending on vlan value"""
1420 1
        if isinstance(vlan, list):
1421 1
            return settings.EVPL_SB_PRIORITY
1422 1
        if vlan not in {None, "4096/4096", 0}:
1423 1
            return settings.EVPL_SB_PRIORITY
1424 1
        if vlan == 0:
1425 1
            return settings.UNTAGGED_SB_PRIORITY
1426 1
        if vlan == "4096/4096":
1427 1
            return settings.ANY_SB_PRIORITY
1428 1
        return settings.EPL_SB_PRIORITY
1429
1430 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1431
                          queue_id=None, vlan=True):
1432
        """Prepare a common flow mod."""
1433 1
        default_actions = [
1434
            {"action_type": "output", "port": out_interface.port_number}
1435
        ]
1436 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1437 1
        if queue_id is not None:
1438 1
            default_actions.insert(
1439
                0,
1440
                {"action_type": "set_queue", "queue_id": queue_id}
1441
            )
1442
1443 1
        flow_mod = {
1444
            "match": {"in_port": in_interface.port_number},
1445
            "cookie": self.get_cookie(),
1446
            "actions": default_actions,
1447
            "owner": "mef_eline",
1448
        }
1449
1450 1
        self.set_flow_table_group_id(flow_mod, vlan)
1451 1
        if self.sb_priority:
1452 1
            flow_mod["priority"] = self.sb_priority
1453
        else:
1454 1
            flow_mod["priority"] = self.get_priority(vlan)
1455 1
        return flow_mod
1456
1457 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1458
        """Create NNI flows."""
1459 1
        in_interface, out_interface, in_vlan, out_vlan = args
1460 1
        flow_mod = self._prepare_flow_mod(
1461
            in_interface, out_interface, queue_id
1462
        )
1463 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1464 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1465 1
        flow_mod["actions"].insert(0, new_action)
1466
1467 1
        return flow_mod
1468
1469 1
    def _prepare_push_flow(self, *args, queue_id=None):
1470
        """Prepare push flow.
1471
1472
        Arguments:
1473
            in_interface(Interface): Interface input.
1474
            out_interface(Interface): Interface output.
1475
            in_vlan(int,str,None): Vlan input.
1476
            out_vlan(int): Vlan output.
1477
            new_c_vlan(int,str,list,None): New client vlan.
1478
1479
        Return:
1480
            dict: An python dictionary representing a FlowMod
1481
1482
        """
1483
        # assign all arguments
1484 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1485 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1486 1
        flow_mod = self._prepare_flow_mod(
1487
            in_interface, out_interface, queue_id, vlan_pri
1488
        )
1489
        # the service tag must be always pushed
1490 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1491 1
        flow_mod["actions"].insert(0, new_action)
1492
1493 1
        if (
1494
            not (in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1495
                 isinstance(new_c_vlan, int))
1496
        ):
1497
            # Add service VLAN header when it does NOT fall into this
1498
            # statement: Both VLANs should be integer and different.
1499 1
            new_action = {"action_type": "push_vlan", "tag_type": "s"}
1500 1
            flow_mod["actions"].insert(0, new_action)
1501
1502 1
        if in_vlan is not None:
1503
            # if in_vlan is set, it must be included in the match
1504 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1505
1506 1
        if (
1507
            not isinstance(in_vlan, int) and isinstance(new_c_vlan, int) and
1508
            new_c_vlan != 0
1509
        ):
1510
            # new_in_vlan is an integer but zero, action to set is required
1511 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1512 1
            flow_mod["actions"].insert(0, new_action)
1513
1514 1
        if in_vlan == "4096/4096" and new_c_vlan == 0:
1515
            # if in_vlan match with any tags and new_c_vlan does not,
1516
            # a pop action is required
1517 1
            new_action = {"action_type": "pop_vlan"}
1518 1
            flow_mod["actions"].insert(0, new_action)
1519
1520 1
        elif (not in_vlan and
1521
                (not isinstance(new_c_vlan, list) and
1522
                 new_c_vlan not in self.special_cases)):
1523
            # new_in_vlan is an integer but zero and in_vlan is a no-tag or
1524
            # untagged
1525 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1526 1
            flow_mod["actions"].insert(0, new_action)
1527
1528 1
        return flow_mod
1529
1530 1
    def _prepare_pop_flow(
1531
        self,
1532
        in_interface: Interface,
1533
        out_interface: Interface,
1534
        out_vlan: int,
1535
        in_vlan: Union[int, str, list, None],
1536
        new_c_vlan: Union[int, str, list, None],
1537
        queue_id=None,
1538
    ):
1539
        # pylint: disable=too-many-arguments
1540
        """Prepare pop flow."""
1541 1
        flow_mod = self._prepare_flow_mod(
1542
            in_interface, out_interface, queue_id
1543
        )
1544 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1545 1
        if in_vlan == 0:
1546 1
            new_action = {"action_type": "pop_vlan"}
1547 1
            flow_mod["actions"].insert(0, new_action)
1548 1
        elif (
1549
            in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1550
            isinstance(new_c_vlan, int)
1551
        ):
1552
            # If UNI VLANs are different and in_vlan is not 0
1553 1
            new_action = {"action_type": "set_vlan", "vlan_id": in_vlan}
1554 1
            flow_mod["actions"].insert(0, new_action)
1555
        else:
1556 1
            new_action = {"action_type": "pop_vlan"}
1557 1
            flow_mod["actions"].insert(0, new_action)
1558 1
        return flow_mod
1559
1560 1
    @staticmethod
1561 1
    def run_bulk_sdntraces(
1562
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1563
    ) -> dict:
1564
        """Run SDN traces on control plane starting from EVC UNIs."""
1565 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1566 1
        data = []
1567 1
        for interface, tag_value in uni_list:
1568 1
            data_uni = {
1569
                "trace": {
1570
                            "switch": {
1571
                                "dpid": interface.switch.dpid,
1572
                                "in_port": interface.port_number,
1573
                            }
1574
                        }
1575
                }
1576 1
            if tag_value:
1577 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1578 1
                if uni_dl_vlan:
1579 1
                    data_uni["trace"]["eth"] = {
1580
                                            "dl_type": 0x8100,
1581
                                            "dl_vlan": uni_dl_vlan,
1582
                                            }
1583 1
            data.append(data_uni)
1584 1
        try:
1585 1
            response = httpx.put(endpoint, json=data, timeout=30)
1586 1
        except httpx.TimeoutException as exception:
1587 1
            log.error(f"Request has timed out: {exception}")
1588 1
            return {"result": []}
1589 1
        if response.status_code >= 400:
1590 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1591 1
            return {"result": []}
1592 1
        return response.json()
1593
1594
    # pylint: disable=too-many-return-statements, too-many-arguments
1595 1
    @staticmethod
1596 1
    def check_trace(
1597
        evc_id: str,
1598
        evc_name: str,
1599
        tag_a: Union[None, int, str],
1600
        tag_z: Union[None, int, str],
1601
        interface_a: Interface,
1602
        interface_z: Interface,
1603
        current_path: list,
1604
        trace_a: list,
1605
        trace_z: list
1606
    ) -> bool:
1607
        """Auxiliar function to check an individual trace"""
1608 1
        if (
1609
            len(trace_a) != len(current_path) + 1
1610
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1611
        ):
1612 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1613
                        f"Invalid trace from uni_a: {trace_a}")
1614 1
            return False
1615 1
        if (
1616
            len(trace_z) != len(current_path) + 1
1617
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1618
        ):
1619 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1620
                        f"Invalid trace from uni_z: {trace_z}")
1621 1
            return False
1622
1623 1
        if not current_path:
1624
            return True
1625
1626 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1627 1
        if (
1628
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1629
        ):
1630 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1631 1
        elif (
1632
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1633
        ):
1634 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1635
        else:
1636
            msg = (
1637
                f"first link {first_link} endpoint_a didn't match the first "
1638
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1639
            )
1640
            log.warning(msg)
1641
            return False
1642
1643 1
        for link, trace1, trace2 in zip(current_path,
1644
                                        trace_path_begin[1:],
1645
                                        trace_path_end[:0:-1]):
1646 1
            metadata_vlan = None
1647 1
            if link.metadata:
1648 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1649 1
            if compare_endpoint_trace(
1650
                                        link.endpoint_a,
1651
                                        metadata_vlan,
1652
                                        trace2
1653
                                    ) is False:
1654 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1655
                            f"Invalid trace from uni_a: {trace_a}")
1656 1
                return False
1657 1
            if compare_endpoint_trace(
1658
                                        link.endpoint_b,
1659
                                        metadata_vlan,
1660
                                        trace1
1661
                                    ) is False:
1662 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1663
                            f"Invalid trace from uni_z: {trace_z}")
1664 1
                return False
1665
1666 1
        return True
1667
1668 1
    @staticmethod
1669 1
    def check_range(circuit, traces: list) -> bool:
1670
        """Check traces when for UNI with TAGRange"""
1671 1
        check = True
1672 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1673 1
            trace_a = traces[i*2]
1674 1
            trace_z = traces[i*2+1]
1675 1
            check &= EVCDeploy.check_trace(
1676
                circuit.id, circuit.name,
1677
                mask, mask,
1678
                circuit.uni_a.interface,
1679
                circuit.uni_z.interface,
1680
                circuit.current_path,
1681
                trace_a, trace_z,
1682
            )
1683 1
        return check
1684
1685 1
    @staticmethod
1686 1
    def check_list_traces(list_circuits: list) -> dict:
1687
        """Check if current_path is deployed comparing with SDN traces."""
1688 1
        if not list_circuits:
1689 1
            return {}
1690 1
        uni_list = make_uni_list(list_circuits)
1691 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1692
1693 1
        if not traces:
1694 1
            return {}
1695
1696 1
        try:
1697 1
            circuits_checked = {}
1698 1
            i = 0
1699 1
            for circuit in list_circuits:
1700 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1701 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1702 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1703
                        circuit, traces[i:i+length*2]
1704
                    )
1705 1
                    i += length*2
1706
                else:
1707 1
                    trace_a = traces[i]
1708 1
                    trace_z = traces[i+1]
1709 1
                    tag_a = None
1710 1
                    if circuit.uni_a.user_tag:
1711 1
                        tag_a = circuit.uni_a.user_tag.value
1712 1
                    tag_z = None
1713 1
                    if circuit.uni_z.user_tag:
1714 1
                        tag_z = circuit.uni_z.user_tag.value
1715 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1716
                        circuit.id, circuit.name,
1717
                        tag_a, tag_z,
1718
                        circuit.uni_a.interface,
1719
                        circuit.uni_z.interface,
1720
                        circuit.current_path,
1721
                        trace_a, trace_z
1722
                    )
1723 1
                    i += 2
1724 1
        except IndexError as err:
1725 1
            log.error(
1726
                f"Bulk sdntraces returned fewer items than expected."
1727
                f"Error = {err}"
1728
            )
1729 1
            return {}
1730
1731 1
        return circuits_checked
1732
1733 1
    @staticmethod
1734 1
    def get_endpoint_by_id(
1735
        link: Link,
1736
        id_: str,
1737
        operator: Union[eq, ne]
1738
    ) -> Interface:
1739
        """Return endpoint from link
1740
        either equal(eq) or not equal(ne) to id"""
1741 1
        if operator(link.endpoint_a.switch.id, id_):
1742 1
            return link.endpoint_a
1743 1
        return link.endpoint_b
1744
1745
1746 1
class LinkProtection(EVCDeploy):
1747
    """Class to handle link protection."""
1748
1749 1
    def is_affected_by_link(self, link=None):
1750
        """Verify if the current path is affected by link down event."""
1751
        return self.current_path.is_affected_by_link(link)
1752
1753 1
    def is_using_primary_path(self):
1754
        """Verify if the current deployed path is self.primary_path."""
1755 1
        return self.current_path == self.primary_path
1756
1757 1
    def is_using_backup_path(self):
1758
        """Verify if the current deployed path is self.backup_path."""
1759 1
        return self.current_path == self.backup_path
1760
1761 1
    def is_using_dynamic_path(self):
1762
        """Verify if the current deployed path is dynamic."""
1763 1
        if (
1764
            self.current_path
1765
            and not self.is_using_primary_path()
1766
            and not self.is_using_backup_path()
1767
            and self.current_path.status is EntityStatus.UP
1768
        ):
1769
            return True
1770 1
        return False
1771
1772 1
    def handle_link_up(self, link=None, interface=None):
1773
        """Handle circuit when link up.
1774
1775
        Args:
1776
            link(Link): Link affected by link.up event.
1777
1778
        """
1779 1
        condition_pairs = [
1780
            (
1781
                lambda me: me.is_using_primary_path(),
1782
                lambda _: (True, 'nothing')
1783
            ),
1784
            (
1785
                lambda me: me.is_intra_switch(),
1786
                lambda _: (True, 'nothing')
1787
            ),
1788
            (
1789
                lambda me: me.primary_path.is_affected_by_link(link),
1790
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1791
            ),
1792
            # For this special case, it reached this point because interface
1793
            # was previously confirmed to be a UNI and both UNI are UP
1794
            (
1795
                lambda me: (me.primary_path.status == EntityStatus.UP
1796
                            and interface),
1797
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1798
            ),
1799
            (
1800
                lambda me: (me.backup_path.status == EntityStatus.UP
1801
                            and interface),
1802
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1803
            ),
1804
            # We tried to deploy(primary_path) without success.
1805
            # And in this case is up by some how. Nothing to do.
1806
            (
1807
                lambda me: me.is_using_backup_path(),
1808
                lambda _: (True, 'nothing')
1809
            ),
1810
            (
1811
                lambda me:  me.is_using_dynamic_path(),
1812
                lambda _: (True, 'nothing')
1813
            ),
1814
            # In this case, probably the circuit is not being used and
1815
            # we can move to backup
1816
            (
1817
                lambda me: me.backup_path.is_affected_by_link(link),
1818
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1819
            ),
1820
            # In this case, the circuit is not being used and we should
1821
            # try a dynamic path
1822
            (
1823
                lambda me: me.dynamic_backup_path and not me.is_active(),
1824
                lambda me: (me.deploy_to_path(), 'redeploy')
1825
            )
1826
        ]
1827 1
        for predicate, action in condition_pairs:
1828 1
            if not predicate(self):
1829 1
                continue
1830 1
            success, succcess_type = action(self)
1831 1
            if success:
1832 1
                if succcess_type == 'redeploy':
1833 1
                    emit_event(
1834
                        self._controller,
1835
                        "redeployed_link_up",
1836
                        content=map_evc_event_content(self)
1837
                    )
1838 1
                return True
1839 1
        return False
1840
1841 1
    def handle_link_down(self):
1842
        """Handle circuit when link down.
1843
1844
        Returns:
1845
            bool: True if the re-deploy was successly otherwise False.
1846
1847
        """
1848 1
        success = False
1849 1
        if self.is_using_primary_path():
1850 1
            success = self.deploy_to_backup_path()
1851 1
        elif self.is_using_backup_path():
1852 1
            success = self.deploy_to_primary_path()
1853
1854 1
        if not success and self.dynamic_backup_path:
1855 1
            success = self.deploy_to_path()
1856
1857 1
        if success:
1858 1
            log.debug(f"{self} deployed after link down.")
1859
        else:
1860 1
            self.remove_current_flows(sync=False)
1861 1
            self.deactivate()
1862 1
            self.sync()
1863 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1864
1865 1
        return success
1866
1867 1
    def are_unis_active(self) -> bool:
1868
        """Determine whether this EVC should be active"""
1869 1
        interface_a = self.uni_a.interface
1870 1
        interface_z = self.uni_z.interface
1871 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1872 1
        return active
1873
1874 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1875
        """Try to handle UNI as link_up to trigger deployment."""
1876
        if (
1877
            self.current_path.status != EntityStatus.UP
1878
            and not self.is_intra_switch()
1879
        ):
1880
            succeeded = self.handle_link_up(interface=interface)
1881
            if succeeded:
1882
                msg = (
1883
                    f"Activated {self} due to successful "
1884
                    f"deployment triggered by {interface}"
1885
                )
1886
            else:
1887
                msg = (
1888
                    f"Couldn't activate {self} due to unsuccessful "
1889
                    f"deployment triggered by {interface}"
1890
                )
1891
            log.info(msg)
1892
            return True
1893
        return False
1894
1895 1
    def handle_interface_link_up(self, interface: Interface):
1896
        """
1897
        Handler for interface link_up events
1898
        """
1899 1
        if not _does_uni_affect_evc(self, interface, "up"):
1900 1
            return
1901 1
        if self.try_to_handle_uni_as_link_up(interface):
1902
            return
1903
1904 1
        interface_dicts = {
1905
            interface.id: {
1906
                'status': interface.status.value,
1907
                'status_reason': interface.status_reason,
1908
            }
1909
            for interface in (self.uni_a.interface, self.uni_z.interface)
1910
        }
1911 1
        try:
1912 1
            self.try_to_activate()
1913 1
            log.info(
1914
                f"Activating {self}. Interfaces: "
1915
                f"{interface_dicts}."
1916
            )
1917 1
            emit_event(self._controller, "uni_active_updated",
1918
                       content=map_evc_event_content(self))
1919 1
            self.sync()
1920
        except ActivationError as exc:
1921
            # On this ctx, no ActivationError isn't expected since the
1922
            # activation pre-requisites states were checked, so handled as err
1923
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1924
1925 1
    def handle_interface_link_down(self, interface):
1926
        """
1927
        Handler for interface link_down events
1928
        """
1929 1
        if not _does_uni_affect_evc(self, interface, "down"):
1930 1
            return
1931 1
        interface_dicts = {
1932
            interface.id: {
1933
                'status': interface.status.value,
1934
                'status_reason': interface.status_reason,
1935
            }
1936
            for interface in (self.uni_a.interface, self.uni_z.interface)
1937
            if interface.status != EntityStatus.UP
1938
        }
1939 1
        self.deactivate()
1940 1
        log.info(
1941
            f"Deactivating {self}. Interfaces: "
1942
            f"{interface_dicts}."
1943
        )
1944 1
        emit_event(self._controller, "uni_active_updated",
1945
                   content=map_evc_event_content(self))
1946 1
        self.sync()
1947
1948
1949 1
class EVC(LinkProtection):
1950
    """Class that represents a E-Line Virtual Connection."""
1951