Passed
Pull Request — master (#708)
by Aldo
04:52
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc,
30
                                         compare_endpoint_trace,
31
                                         compare_uni_out_trace, emit_event,
32
                                         make_uni_list, map_dl_vlan,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts)
35
36 1
from .path import DynamicPathManager, Path
37
38
39 1
class EVCBase(GenericEntity):
40
    """Class to represent a circuit."""
41
42 1
    attributes_requiring_redeploy = [
43
        "primary_path",
44
        "backup_path",
45
        "dynamic_backup_path",
46
        "queue_id",
47
        "sb_priority",
48
        "primary_constraints",
49
        "secondary_constraints",
50
        "uni_a",
51
        "uni_z",
52
    ]
53 1
    required_attributes = ["name", "uni_a", "uni_z"]
54
55 1
    updatable_attributes = {
56
        "uni_a",
57
        "uni_z",
58
        "name",
59
        "start_date",
60
        "end_date",
61
        "queue_id",
62
        "bandwidth",
63
        "primary_path",
64
        "backup_path",
65
        "dynamic_backup_path",
66
        "primary_constraints",
67
        "secondary_constraints",
68
        "owner",
69
        "sb_priority",
70
        "service_level",
71
        "circuit_scheduler",
72
        "metadata",
73
        "enabled",
74
        "max_paths",
75
    }
76
77
    # pylint: disable=too-many-statements
78 1
    def __init__(self, controller, **kwargs):
79
        """Create an EVC instance with the provided parameters.
80
81
        Args:
82
            id(str): EVC identifier. Whether it's None an ID will be genereted.
83
                     Only the first 14 bytes passed will be used.
84
            name: represents an EVC name.(Required)
85
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
86
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
87
            start_date(datetime|str): Date when the EVC was registred.
88
                                      Default is now().
89
            end_date(datetime|str): Final date that the EVC will be fineshed.
90
                                    Default is None.
91
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
92
            primary_links(list): Primary links used by evc. Default is []
93
            backup_links(list): Backups links used by evc. Default is []
94
            current_path(list): Circuit being used at the moment if this is an
95
                                active circuit. Default is [].
96
            failover_path(list): Path being used to provide EVC protection via
97
                                failover during link failures. Default is [].
98
            primary_path(list): primary circuit offered to user IF one or more
99
                                links were provided. Default is [].
100
            backup_path(list): backup circuit offered to the user IF one or
101
                               more links were provided. Default is [].
102
            dynamic_backup_path(bool): Enable computer backup path dynamically.
103
                                       Dafault is False.
104
            creation_time(datetime|str): datetime when the circuit should be
105
                                         activated. default is now().
106
            enabled(Boolean): attribute to indicate the administrative state;
107
                              default is False.
108
            active(Boolean): attribute to indicate the operational state;
109
                             default is False.
110
            archived(Boolean): indicate the EVC has been deleted and is
111
                               archived; default is False.
112
            owner(str): The EVC owner. Default is None.
113
            sb_priority(int): Service level provided in the request.
114
                              Default is None.
115
            service_level(int): Service level provided. The higher the better.
116
                                Default is 0.
117
118
        Raises:
119
            ValueError: raised when object attributes are invalid.
120
121
        """
122 1
        self._controller = controller
123 1
        self._validate(**kwargs)
124 1
        super().__init__()
125
126
        # required attributes
127 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
128 1
        self.uni_a: UNI = kwargs.get("uni_a")
129 1
        self.uni_z: UNI = kwargs.get("uni_z")
130 1
        self.name = kwargs.get("name")
131
132
        # optional attributes
133 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
134 1
        self.end_date = get_time(kwargs.get("end_date")) or None
135 1
        self.queue_id = kwargs.get("queue_id", -1)
136
137 1
        self.bandwidth = kwargs.get("bandwidth", 0)
138 1
        self.primary_links = Path(kwargs.get("primary_links", []))
139 1
        self.backup_links = Path(kwargs.get("backup_links", []))
140 1
        self.current_path = Path(kwargs.get("current_path", []))
141 1
        self.failover_path = Path(kwargs.get("failover_path", []))
142 1
        self.primary_path = Path(kwargs.get("primary_path", []))
143 1
        self.backup_path = Path(kwargs.get("backup_path", []))
144 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
145 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
146 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
147 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
148 1
        self.owner = kwargs.get("owner", None)
149 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
150
            "priority", None
151
        )
152 1
        self.service_level = kwargs.get("service_level", 0)
153 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
154 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
155 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
156 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
157 1
        self.current_links_cache = set()
158 1
        self.primary_links_cache = set()
159 1
        self.backup_links_cache = set()
160 1
        self.old_path = Path([])
161 1
        self.max_paths = kwargs.get("max_paths", 2)
162
163 1
        self.lock = Lock()
164
165 1
        self.archived = kwargs.get("archived", False)
166
167 1
        self.metadata = kwargs.get("metadata", {})
168
169 1
        self._mongo_controller = controllers.ELineController()
170
171 1
        if kwargs.get("active", False):
172 1
            self.activate()
173
        else:
174 1
            self.deactivate()
175
176 1
        if kwargs.get("enabled", False):
177 1
            self.enable()
178
        else:
179 1
            self.disable()
180
181
        # datetime of user request for a EVC (or datetime when object was
182
        # created)
183 1
        self.request_time = kwargs.get("request_time", now())
184
        # dict with the user original request (input)
185 1
        self._requested = kwargs
186
187
        # Special cases: No tag, any, untagged
188 1
        self.special_cases = {None, "4096/4096", 0}
189 1
        self.table_group = kwargs.get("table_group")
190
191 1
    def sync(self, keys: set = None):
192
        """Sync this EVC in the MongoDB."""
193 1
        self.updated_at = now()
194 1
        if keys:
195 1
            self._mongo_controller.update_evc(self.as_dict(keys))
196 1
            return
197 1
        self._mongo_controller.upsert_evc(self.as_dict())
198
199 1
    def _get_unis(self, **kwargs) -> tuple[UNI, UNI]:
200
        """Get the updated UNIs."""
201 1
        uni_a = kwargs.get("uni_a", None) or self.uni_a
202 1
        uni_z = kwargs.get("uni_z", None) or self.uni_z
203 1
        return uni_a, uni_z
204
205 1
    def _get_unis_use_tags(self, uni_a, uni_z) -> tuple[UNI, UNI]:
206
        """Obtain both UNIs (uni_a, uni_z).
207
        If a UNI is changing, verify tags"""
208 1
        uni_a_flag = False
209 1
        if uni_a != self.uni_a:
210 1
            uni_a_flag = True
211 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
212
213 1
        if uni_z != self.uni_z:
214 1
            try:
215 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
216 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
217 1
            except KytosTagError as err:
218 1
                if uni_a_flag:
219 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
220 1
                raise err
221
        else:
222 1
            uni_z = self.uni_z
223
224 1
        if uni_a_flag:
225 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
226
        else:
227 1
            uni_a = self.uni_a
228 1
        return uni_a, uni_z
229
230
    # pylint: disable=too-many-branches
231 1
    def update(self, **kwargs):
232
        """Update evc attributes.
233
234
        This method will raises an error trying to change the following
235
        attributes: [creation_time, active, current_path, failover_path,
236
        _id, archived]
237
        [name, uni_a and uni_z]
238
239
        Returns:
240
            the values for enable and a redeploy attribute, if exists and None
241
            otherwise
242
        Raises:
243
            ValueError: message with error detail.
244
245
        """
246 1
        enable, redeploy = (None, None)
247 1
        if not self._tag_lists_equal(**kwargs):
248 1
            raise ValueError(
249
                "UNI_A and UNI_Z tag lists should be the same."
250
            )
251
252 1
        uni_a, uni_z = self._get_unis(**kwargs)
253 1
        self._validate_has_primary_or_dynamic(
254
            primary_path=kwargs.get("primary_path"),
255
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
256
            uni_a=uni_a,
257
            uni_z=uni_z,
258
        )
259 1
        valid_path = False
260 1
        for attribute, value in kwargs.items():
261 1
            if attribute not in self.updatable_attributes:
262 1
                raise ValueError(f"{attribute} can't be updated.")
263 1
            if attribute in ("primary_path", "backup_path"):
264 1
                try:
265 1
                    value.is_valid(
266
                        uni_a.interface.switch, uni_z.interface.switch
267
                    )
268 1
                    valid_path = True
269 1
                except InvalidPath as exception:
270 1
                    raise ValueError(  # pylint: disable=raise-missing-from
271
                        f"{attribute} is not a valid path: {exception}"
272
                    )
273 1
        if not valid_path:
274 1
            for path_name in ("primary_path", "backup_path"):
275 1
                try:
276 1
                    path = getattr(self, path_name)
277 1
                    path.is_valid(
278
                        uni_a.interface.switch, uni_z.interface.switch
279
                    )
280 1
                except InvalidPath as exception:
281 1
                    raise ValueError(  # pylint: disable=raise-missing-from
282
                            f"{path_name} is not a valid path: {exception}"
283
                        )
284 1
            valid_path = True
285 1
        uni_a, uni_z = self._get_unis_use_tags(uni_a, uni_z)
286 1
        for attribute, value in kwargs.items():
287 1
            if attribute == "enabled":
288 1
                if value:
289 1
                    self.enable()
290
                else:
291 1
                    self.disable()
292 1
                enable = value
293
            else:
294 1
                setattr(self, attribute, value)
295 1
                if attribute in self.attributes_requiring_redeploy:
296 1
                    redeploy = True
297 1
        self.sync(set(kwargs.keys()))
298 1
        return enable, redeploy
299
300 1
    def set_flow_removed_at(self):
301
        """Update flow_removed_at attribute."""
302
        self.flow_removed_at = now()
303
304 1
    def has_recent_removed_flow(self, setting=settings):
305
        """Check if any flow has been removed from the evc"""
306
        if self.flow_removed_at is None:
307
            return False
308
        res_seconds = (now() - self.flow_removed_at).seconds
309
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
310
311 1
    def is_recent_updated(self, setting=settings):
312
        """Check if the evc has been updated recently"""
313
        res_seconds = (now() - self.updated_at).seconds
314
        return res_seconds < setting.TIME_RECENT_UPDATED
315
316 1
    def __repr__(self):
317
        """Repr method."""
318 1
        return f"EVC({self._id}, {self.name})"
319
320 1
    def _validate(self, **kwargs):
321
        """Do Basic validations.
322
323
        Verify required attributes: name, uni_a, uni_z
324
325
        Raises:
326
            ValueError: message with error detail.
327
328
        """
329 1
        for attribute in self.required_attributes:
330
331 1
            if attribute not in kwargs:
332 1
                raise ValueError(f"{attribute} is required.")
333
334 1
            if "uni" in attribute:
335 1
                uni = kwargs.get(attribute)
336 1
                if not isinstance(uni, UNI):
337
                    raise ValueError(f"{attribute} is an invalid UNI.")
338
339 1
    def _tag_lists_equal(self, **kwargs):
340
        """Verify that tag lists are the same."""
341 1
        uni_a = kwargs.get("uni_a") or self.uni_a
342 1
        uni_z = kwargs.get("uni_z") or self.uni_z
343 1
        uni_a_list = uni_z_list = False
344 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
345 1
            uni_a_list = True
346 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
347 1
            uni_z_list = True
348 1
        if uni_a_list and uni_z_list:
349 1
            return uni_a.user_tag.value == uni_z.user_tag.value
350 1
        return uni_a_list == uni_z_list
351
352 1
    def _validate_has_primary_or_dynamic(
353
        self,
354
        primary_path=None,
355
        dynamic_backup_path=None,
356
        uni_a=None,
357
        uni_z=None,
358
    ) -> None:
359
        """Validate that it must have a primary path or allow dynamic paths."""
360 1
        primary_path = (
361
            primary_path
362
            if primary_path is not None
363
            else self.primary_path
364
        )
365 1
        dynamic_backup_path = (
366
            dynamic_backup_path
367
            if dynamic_backup_path is not None
368
            else self.dynamic_backup_path
369
        )
370 1
        uni_a = uni_a if uni_a is not None else self.uni_a
371 1
        uni_z = uni_z if uni_z is not None else self.uni_z
372 1
        if (
373
            not primary_path
374
            and not dynamic_backup_path
375
            and uni_a and uni_z
376
            and uni_a.interface.switch != uni_z.interface.switch
377
        ):
378 1
            msg = "The EVC must have a primary path or allow dynamic paths."
379 1
            raise ValueError(msg)
380
381 1
    def __eq__(self, other):
382
        """Override the default implementation."""
383 1
        if not isinstance(other, EVC):
384
            return False
385
386 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
387 1
        for attribute in attrs_to_compare:
388 1
            if getattr(other, attribute) != getattr(self, attribute):
389 1
                return False
390 1
        return True
391
392 1
    def is_intra_switch(self):
393
        """Check if the UNIs are in the same switch."""
394 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
395
396 1
    def check_no_tag_duplicate(self, other_uni: UNI):
397
        """Check if a no tag UNI is duplicated."""
398 1
        if other_uni in (self.uni_a, self.uni_z):
399 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
400
                  f" duplicated with {self}."
401 1
            raise DuplicatedNoTagUNI(msg)
402
403 1
    def as_dict(self, keys: set = None):
404
        """Return a dictionary representing an EVC object.
405
            keys: Only fields on this variable will be
406
                  returned in the dictionary"""
407 1
        evc_dict = {
408
            "id": self.id,
409
            "name": self.name,
410
            "uni_a": self.uni_a.as_dict(),
411
            "uni_z": self.uni_z.as_dict(),
412
        }
413
414 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
415
416 1
        evc_dict["start_date"] = self.start_date
417 1
        if isinstance(self.start_date, datetime):
418 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
419
420 1
        evc_dict["end_date"] = self.end_date
421 1
        if isinstance(self.end_date, datetime):
422 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
423
424 1
        evc_dict["queue_id"] = self.queue_id
425 1
        evc_dict["bandwidth"] = self.bandwidth
426 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
427 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
428 1
        evc_dict["current_path"] = self.current_path.as_dict()
429 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
430 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
431 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
432 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
433 1
        evc_dict["metadata"] = self.metadata
434
435 1
        evc_dict["request_time"] = self.request_time
436 1
        if isinstance(self.request_time, datetime):
437 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
438
439 1
        time = self.creation_time.strftime(time_fmt)
440 1
        evc_dict["creation_time"] = time
441
442 1
        evc_dict["owner"] = self.owner
443 1
        evc_dict["circuit_scheduler"] = [
444
            sc.as_dict() for sc in self.circuit_scheduler
445
        ]
446
447 1
        evc_dict["active"] = self.is_active()
448 1
        evc_dict["enabled"] = self.is_enabled()
449 1
        evc_dict["archived"] = self.archived
450 1
        evc_dict["sb_priority"] = self.sb_priority
451 1
        evc_dict["service_level"] = self.service_level
452 1
        evc_dict["primary_constraints"] = self.primary_constraints
453 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
454 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
455 1
        evc_dict["updated_at"] = self.updated_at
456 1
        evc_dict["max_paths"] = self.max_paths
457
458 1
        if keys:
459 1
            selected = {}
460 1
            for key in keys:
461 1
                selected[key] = evc_dict[key]
462 1
            selected["id"] = evc_dict["id"]
463 1
            return selected
464 1
        return evc_dict
465
466 1
    @property
467 1
    def id(self):  # pylint: disable=invalid-name
468
        """Return this EVC's ID."""
469 1
        return self._id
470
471 1
    def archive(self):
472
        """Archive this EVC on deletion."""
473 1
        self.archived = True
474
475 1
    def _use_uni_vlan(
476
        self,
477
        uni: UNI,
478
        uni_dif: Union[None, UNI] = None
479
    ):
480
        """Use tags from UNI"""
481 1
        if uni.user_tag is None:
482 1
            return
483 1
        tag = uni.user_tag.value
484 1
        tag_type = uni.user_tag.tag_type
485 1
        if (uni_dif and isinstance(tag, list) and
486
                isinstance(uni_dif.user_tag.value, list)):
487 1
            tag = range_difference(tag, uni_dif.user_tag.value)
488 1
            if not tag:
489 1
                return
490 1
        uni.interface.use_tags(
491
            self._controller, tag, tag_type, use_lock=True, check_order=False
492
        )
493
494 1
    def make_uni_vlan_available(
495
        self,
496
        uni: UNI,
497
        uni_dif: Union[None, UNI] = None,
498
    ):
499
        """Make available tag from UNI"""
500 1
        if uni.user_tag is None:
501 1
            return
502 1
        tag = uni.user_tag.value
503 1
        tag_type = uni.user_tag.tag_type
504 1
        if (uni_dif and isinstance(tag, list) and
505
                isinstance(uni_dif.user_tag.value, list)):
506 1
            tag = range_difference(tag, uni_dif.user_tag.value)
507 1
            if not tag:
508
                return
509 1
        try:
510 1
            conflict = uni.interface.make_tags_available(
511
                self._controller, tag, tag_type, use_lock=True,
512
                check_order=False
513
            )
514 1
        except KytosTagError as err:
515 1
            log.error(f"Error in {self}: {err}")
516 1
            return
517 1
        if conflict:
518 1
            intf = uni.interface.id
519 1
            log.warning(f"Tags {conflict} was already available in {intf}")
520
521 1
    def remove_uni_tags(self):
522
        """Remove both UNI usage of a tag"""
523 1
        self.make_uni_vlan_available(self.uni_a)
524 1
        self.make_uni_vlan_available(self.uni_z)
525
526
527
# pylint: disable=fixme, too-many-public-methods
528 1
class EVCDeploy(EVCBase):
529
    """Class to handle the deploy procedures."""
530
531 1
    def create(self):
532
        """Create a EVC."""
533
534 1
    def discover_new_paths(self):
535
        """Discover new paths to satisfy this circuit and deploy it."""
536
        return DynamicPathManager.get_best_paths(self, self.max_paths,
537
                                                 **self.primary_constraints)
538
539 1
    def get_failover_path_candidates(self):
540
        """Get failover paths to satisfy this EVC."""
541
        # in the future we can return primary/backup paths as well
542
        # we just have to properly handle link_up and failover paths
543
        # if (
544
        #     self.is_using_primary_path() and
545
        #     self.backup_path.status is EntityStatus.UP
546
        # ):
547
        #     yield self.backup_path
548 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
549
550 1
    def change_path(self):
551
        """Change EVC path."""
552
553 1
    def reprovision(self):
554
        """Force the EVC (re-)provisioning."""
555
556 1
    def is_affected_by_link(self, link):
557
        """Return True if this EVC has the given link on its current path."""
558 1
        return link in self.current_path
559
560 1
    def link_affected_by_interface(self, interface):
561
        """Return True if this EVC has the given link on its current path."""
562
        return self.current_path.link_affected_by_interface(interface)
563
564 1
    def is_backup_path_affected_by_link(self, link):
565
        """Return True if the backup path of this EVC uses the given link."""
566 1
        return link in self.backup_path
567
568
    # pylint: disable=invalid-name
569 1
    def is_primary_path_affected_by_link(self, link):
570
        """Return True if the primary path of this EVC uses the given link."""
571 1
        return link in self.primary_path
572
573 1
    def is_failover_path_affected_by_link(self, link):
574
        """Return True if this EVC has the given link on its failover path."""
575 1
        return link in self.failover_path
576
577 1
    def is_eligible_for_failover_path(self):
578
        """Verify if this EVC is eligible for failover path (EP029)"""
579
        # In the future this function can be augmented to consider
580
        # primary/backup, primary/dynamic, and other path combinations
581 1
        return (
582
            self.dynamic_backup_path and
583
            not self.primary_path and not self.backup_path
584
        )
585
586 1
    def is_using_primary_path(self):
587
        """Verify if the current deployed path is self.primary_path."""
588 1
        return self.primary_path and (self.current_path == self.primary_path)
589
590 1
    def is_using_backup_path(self):
591
        """Verify if the current deployed path is self.backup_path."""
592 1
        return self.backup_path and (self.current_path == self.backup_path)
593
594 1
    def is_using_dynamic_path(self):
595
        """Verify if the current deployed path is a dynamic path."""
596 1
        if (
597
            self.current_path
598
            and not self.is_using_primary_path()
599
            and not self.is_using_backup_path()
600
            and self.current_path.status == EntityStatus.UP
601
        ):
602
            return True
603 1
        return False
604
605 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
606
        """Deploy the backup path into the datapaths of this circuit.
607
608
        If the backup_path attribute is valid and up, this method will try to
609
        deploy this backup_path.
610
611
        If everything fails and dynamic_backup_path is True, then tries to
612
        deploy a dynamic path.
613
        """
614
        # TODO: Remove flows from current (cookies)
615 1
        if self.is_using_backup_path():
616
            # TODO: Log to say that cannot move backup to backup
617
            return True
618
619 1
        success = False
620 1
        if self.backup_path.status is EntityStatus.UP:
621 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
622
623 1
        if success:
624 1
            return True
625
626 1
        if self.dynamic_backup_path or self.is_intra_switch():
627 1
            return self.deploy_to_path(old_path_dict=old_path_dict)
628
629
        return False
630
631 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
632
        """Deploy the primary path into the datapaths of this circuit.
633
634
        If the primary_path attribute is valid and up, this method will try to
635
        deploy this primary_path.
636
        """
637
        # TODO: Remove flows from current (cookies)
638 1
        if self.is_using_primary_path():
639
            # TODO: Log to say that cannot move primary to primary
640
            return True
641
642 1
        if self.primary_path.status is EntityStatus.UP:
643 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
644
        return False
645
646 1
    def deploy(self, old_path_dict: dict = None):
647
        """Deploy EVC to best path.
648
649
        Best path can be the primary path, if available. If not, the backup
650
        path, and, if it is also not available, a dynamic path.
651
        """
652 1
        if self.archived:
653 1
            return False
654 1
        self.enable()
655 1
        success = self.deploy_to_primary_path(old_path_dict)
656 1
        if not success:
657 1
            success = self.deploy_to_backup_path(old_path_dict)
658
659 1
        if success:
660 1
            emit_event(self._controller, "deployed",
661
                       content=map_evc_event_content(self))
662 1
        return success
663
664 1
    @staticmethod
665 1
    def get_path_status(path):
666
        """Check for the current status of a path.
667
668
        If any link in this path is down, the path is considered down.
669
        """
670 1
        if not path:
671 1
            return EntityStatus.DISABLED
672
673 1
        for link in path:
674 1
            if link.status is not EntityStatus.UP:
675 1
                return link.status
676 1
        return EntityStatus.UP
677
678
    #    def discover_new_path(self):
679
    #        # TODO: discover a new path to satisfy this circuit and deploy
680
681 1
    def remove(self):
682
        """Remove EVC path and disable it."""
683 1
        self.remove_current_flows(sync=False)
684 1
        self.remove_failover_flows(sync=False)
685 1
        self.disable()
686 1
        self.sync()
687 1
        emit_event(self._controller, "undeployed",
688
                   content=map_evc_event_content(self))
689
690 1
    def remove_failover_flows(self, exclude_uni_switches=True,
691
                              force=True, sync=True) -> None:
692
        """Remove failover_flows.
693
694
        By default, it'll exclude UNI switches, if mef_eline has already
695
        called remove_current_flows before then this minimizes the number
696
        of FlowMods and IO.
697
        """
698 1
        if not self.failover_path:
699 1
            return
700 1
        switches, cookie, excluded = set(), self.get_cookie(), set()
701 1
        if exclude_uni_switches:
702 1
            excluded.add(self.uni_a.interface.switch.id)
703 1
            excluded.add(self.uni_z.interface.switch.id)
704 1
        for link in self.failover_path:
705 1
            if link.endpoint_a.switch.id not in excluded:
706 1
                switches.add(link.endpoint_a.switch.id)
707 1
            if link.endpoint_b.switch.id not in excluded:
708 1
                switches.add(link.endpoint_b.switch.id)
709 1
        flow_mods = {
710
            "switches": list(switches),
711
            "flows": [{
712
                "cookie": cookie,
713
                "cookie_mask": int(0xffffffffffffffff),
714
                "owner": "mef_eline",
715
            }]
716
        }
717 1
        try:
718 1
            self._send_flow_mods(
719
                flow_mods,
720
                "delete",
721
                force=force,
722
            )
723
        except FlowModException as err:
724
            log.error(f"Error deleting {self} failover_path flows, {err}")
725 1
        try:
726 1
            self.failover_path.make_vlans_available(self._controller)
727
        except KytosTagError as err:
728
            log.error(f"Error removing {self} failover_path: {err}")
729 1
        self.failover_path = Path([])
730 1
        if sync:
731 1
            self.sync()
732
733 1
    def remove_current_flows(
734
        self,
735
        current_path=None,
736
        force=True,
737
        sync=True,
738
        return_path=False
739
    ) -> dict[str, int]:
740
        """Remove all flows from current path or path intended for
741
         current path if exists."""
742 1
        switches, old_path_dict = set(), {}
743 1
        current_path = self.current_path if not current_path else current_path
744 1
        if not current_path and not self.is_intra_switch():
745 1
            return {}
746
747 1
        if return_path:
748 1
            for link in self.current_path:
749 1
                s_vlan = link.metadata.get("s_vlan")
750 1
                if s_vlan:
751 1
                    old_path_dict[link.id] = s_vlan.value
752
753 1
        for link in current_path:
754 1
            switches.add(link.endpoint_a.switch.id)
755 1
            switches.add(link.endpoint_b.switch.id)
756 1
        switches.add(self.uni_a.interface.switch.id)
757 1
        switches.add(self.uni_z.interface.switch.id)
758 1
        flow_mods = {
759
            "switches": list(switches),
760
            "flows": [{
761
                "cookie": self.get_cookie(),
762
                "cookie_mask": int(0xffffffffffffffff),
763
                "owner": "mef_eline",
764
            }]
765
        }
766
767 1
        try:
768 1
            self._send_flow_mods(flow_mods, "delete", force=force)
769 1
        except FlowModException as err:
770 1
            log.error(f"Error deleting {self} current_path flows, {err}")
771
772 1
        try:
773 1
            current_path.make_vlans_available(self._controller)
774 1
        except KytosTagError as err:
775 1
            log.error(f"Error removing {self} current_path: {err}")
776 1
        self.current_path = Path([])
777 1
        self.deactivate()
778 1
        if sync:
779 1
            self.sync()
780 1
        return old_path_dict
781
782 1
    def remove_path_flows(
783
        self, path=None, force=True
784
    ) -> dict[str, list[dict]]:
785
        """Remove all flows from path, and return the removed flows."""
786 1
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
787 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
788
789 1
        if not path:
790 1
            return dpid_flows_match
791
792 1
        try:
793 1
            nni_flows = self._prepare_nni_flows(path)
794
        # pylint: disable=broad-except
795
        except Exception:
796
            err = traceback.format_exc()
797
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
798
            nni_flows = {}
799
800 1
        for dpid, flows in nni_flows.items():
801 1
            for flow in flows:
802 1
                flow_mod = {
803
                    "cookie": flow["cookie"],
804
                    "match": flow["match"],
805
                    "owner": "mef_eline",
806
                    "cookie_mask": int(0xffffffffffffffff)
807
                }
808 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
809 1
                out_flows[dpid].append(flow_mod)
810
811 1
        try:
812 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
813
        # pylint: disable=broad-except
814
        except Exception:
815
            err = traceback.format_exc()
816
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
817
            uni_flows = {}
818
819 1
        for dpid, flows in uni_flows.items():
820 1
            for flow in flows:
821 1
                flow_mod = {
822
                    "cookie": flow["cookie"],
823
                    "match": flow["match"],
824
                    "owner": "mef_eline",
825
                    "cookie_mask": int(0xffffffffffffffff)
826
                }
827 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
828 1
                out_flows[dpid].append(flow_mod)
829
830 1
        try:
831 1
            self._send_flow_mods(
832
                dpid_flows_match, 'delete', force=force, by_switch=True
833
            )
834 1
        except FlowModException as err:
835 1
            log.error(
836
                f"Error deleting {self} path flows, path:{path}, error={err}"
837
            )
838
839 1
        try:
840 1
            path.make_vlans_available(self._controller)
841
        except KytosTagError as err:
842
            log.error(f"Error removing {self} path: {err}")
843
844 1
        return out_flows
845
846 1
    @staticmethod
847 1
    def links_zipped(path=None):
848
        """Return an iterator which yields pairs of links in order."""
849 1
        if not path:
850 1
            return []
851 1
        return zip(path[:-1], path[1:])
852
853 1
    def should_deploy(self, path=None):
854
        """Verify if the circuit should be deployed."""
855 1
        if not path:
856 1
            log.debug("Path is empty.")
857 1
            return False
858
859 1
        if not self.is_enabled():
860 1
            log.debug(f"{self} is disabled.")
861 1
            return False
862
863 1
        if not self.is_active():
864 1
            log.debug(f"{self} will be deployed.")
865 1
            return True
866
867 1
        return False
868
869 1
    @staticmethod
870 1
    def is_uni_interface_active(
871
        *interfaces: Interface
872
    ) -> tuple[bool, dict]:
873
        """Whether UNIs are active and their status & status_reason."""
874 1
        active = True
875 1
        bad_interfaces = [
876
            interface
877
            for interface in interfaces
878
            if interface.status != EntityStatus.UP
879
        ]
880 1
        if bad_interfaces:
881 1
            active = False
882 1
            interfaces = bad_interfaces
883 1
        return active, {
884
            interface.id: {
885
                'status': interface.status.value,
886
                'status_reason': interface.status_reason,
887
            }
888
            for interface in interfaces
889
        }
890
891 1
    def try_to_activate(self) -> bool:
892
        """Try to activate the EVC."""
893 1
        if self.is_intra_switch():
894 1
            return self._try_to_activate_intra_evc()
895 1
        return self._try_to_activate_inter_evc()
896
897 1
    def _try_to_activate_intra_evc(self) -> bool:
898
        """Try to activate intra EVC."""
899 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
900 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
901 1
        if not is_active:
902 1
            raise ActivationError(
903
                f"Won't be able to activate {self} due to UNIs: {reason}"
904
            )
905 1
        self.activate()
906 1
        return True
907
908 1
    def _try_to_activate_inter_evc(self) -> bool:
909
        """Try to activate inter EVC."""
910 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
911 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
912 1
        if not is_active:
913 1
            raise ActivationError(
914
                f"Won't be able to activate {self} due to UNIs: {reason}"
915
            )
916 1
        if self.current_path.status != EntityStatus.UP:
917 1
            raise ActivationError(
918
                f"Won't be able to activate {self} due to current_path "
919
                f"status {self.current_path.status}"
920
            )
921 1
        self.activate()
922 1
        return True
923
924
    # pylint: disable=too-many-branches, too-many-statements
925 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
926
        """Install the flows for this circuit.
927
928
        Procedures to deploy:
929
930
        0. Remove current flows installed
931
        1. Decide if will deploy "path" or discover a new path
932
        2. Choose vlan
933
        3. Install NNI flows
934
        4. Install UNI flows
935
        5. Activate
936
        6. Update current_path
937
        7. Update links caches(primary, current, backup)
938
939
        """
940 1
        self.remove_current_flows(sync=False)
941 1
        use_path = path or Path([])
942 1
        if not old_path_dict:
943 1
            old_path_dict = {}
944 1
        tag_errors = []
945 1
        no_valid_path = False
946 1
        if self.should_deploy(use_path):
947 1
            try:
948 1
                use_path.choose_vlans(self._controller, old_path_dict)
949 1
            except KytosNoTagAvailableError as e:
950 1
                tag_errors.append(str(e))
951 1
                use_path = None
952
        else:
953 1
            for use_path in self.discover_new_paths():
954 1
                if use_path is None:
955 1
                    no_valid_path = True
956 1
                    continue
957 1
                try:
958 1
                    use_path.choose_vlans(self._controller, old_path_dict)
959 1
                    break
960 1
                except KytosNoTagAvailableError as e:
961 1
                    tag_errors.append(str(e))
962
            else:
963 1
                use_path = None
964
965 1
        try:
966 1
            if use_path:
967 1
                self._install_flows(use_path)
968 1
            elif self.is_intra_switch():
969 1
                use_path = Path()
970 1
                self._install_direct_uni_flows()
971
            else:
972 1
                no_path_msg = "No available path was found."
973 1
                if no_valid_path:
974 1
                    no_path_msg = "No valid path was found, "\
975
                                  "try increasing `max_paths`"\
976
                                 f" from {self.max_paths}."
977 1
                msg = f"{self} was not deployed. {no_path_msg}"
978 1
                if tag_errors:
979 1
                    msg = self.add_tag_errors(msg, tag_errors)
980 1
                    log.error(msg)
981
                else:
982 1
                    log.warning(msg)
983 1
                return False
984 1
        except EVCPathNotInstalled as err:
985 1
            log.error(
986
                f"Error deploying EVC {self} when calling flow_manager: {err}"
987
            )
988 1
            self.remove_current_flows(use_path, sync=True)
989 1
            return False
990
991 1
        self.current_path = use_path
992 1
        msg = f"{self} was deployed."
993 1
        try:
994 1
            self.try_to_activate()
995
        except ActivationError as exc:
996
            msg = f"{msg} {str(exc)}"
997 1
        self.sync()
998 1
        log.info(msg)
999 1
        return True
1000
1001
    # pylint: disable=too-many-statements
1002 1
    def setup_failover_path(self, warn_if_not_path=True):
1003
        """Install flows for the failover path of this EVC.
1004
1005
        Procedures to deploy:
1006
1007
        0. Remove flows currently installed for failover_path (if any)
1008
        1. Discover a disjoint path from current_path
1009
        2. Choose vlans
1010
        3. Install NNI flows
1011
        4. Install UNI egress flows
1012
        5. Update failover_path
1013
        """
1014
        # Intra-switch EVCs have no failover_path
1015 1
        if self.is_intra_switch():
1016 1
            return False
1017
1018
        # For not only setup failover path for totally dynamic EVCs
1019 1
        if not self.is_eligible_for_failover_path():
1020 1
            return False
1021
1022 1
        out_new_flows: dict[str, list[dict]] = {}
1023 1
        reason = ""
1024 1
        tag_errors = []
1025 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1026 1
        self.failover_path = Path([])
1027
1028 1
        for use_path in self.get_failover_path_candidates():
1029 1
            if not use_path:
1030 1
                continue
1031 1
            try:
1032 1
                use_path.choose_vlans(self._controller)
1033 1
                break
1034 1
            except KytosNoTagAvailableError as e:
1035 1
                tag_errors.append(str(e))
1036
        else:
1037 1
            use_path = Path([])
1038 1
            reason = "No available path was found"
1039
1040 1
        try:
1041 1
            if use_path:
1042 1
                out_new_flows = self._install_flows(
1043
                    use_path, skip_in=True
1044
                )
1045 1
        except EVCPathNotInstalled as err:
1046 1
            reason = "Error deploying failover path"
1047 1
            log.error(
1048
                f"{reason} for {self}. FlowManager error: {err}"
1049
            )
1050 1
            _rmed_flows = self.remove_path_flows(use_path)
1051 1
            out_removed_flows = merge_flow_dicts(
1052
                out_removed_flows, _rmed_flows
1053
            )
1054 1
            use_path = Path([])
1055
1056 1
        self.failover_path = use_path
1057 1
        self.sync()
1058
1059 1
        if out_new_flows or out_removed_flows:
1060 1
            emit_event(self._controller, "failover_deployed", content={
1061
                self.id: map_evc_event_content(
1062
                    self,
1063
                    flows=deepcopy(out_new_flows),
1064
                    removed_flows=deepcopy(out_removed_flows),
1065
                    error_reason=reason,
1066
                    current_path=self.current_path.as_dict(),
1067
                )
1068
            })
1069
1070 1
        if not use_path:
1071 1
            msg = f"Failover path for {self} was not deployed: {reason}."
1072 1
            if tag_errors:
1073 1
                msg = self.add_tag_errors(msg, tag_errors)
1074 1
                log.error(msg)
1075 1
            elif warn_if_not_path:
1076 1
                log.warning(msg)
1077 1
            return False
1078 1
        log.info(f"Failover path for {self} was deployed.")
1079 1
        return True
1080
1081 1
    @staticmethod
1082 1
    def add_tag_errors(msg: str, tag_errors: list):
1083
        """Add to msg the tag errors ecountered when chossing path."""
1084 1
        path = ['path', 'paths']
1085 1
        was = ['was', 'were']
1086 1
        message = ['message', 'messages']
1087
1088
        # Choose either singular(0) or plural(1) words
1089 1
        n = 1
1090 1
        if len(tag_errors) == 1:
1091 1
            n = 0
1092
1093 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1094 1
        msg += f" with {message[n]}: {tag_errors}"
1095 1
        return msg
1096
1097 1
    def get_failover_flows(self):
1098
        """Return the flows needed to make the failover path active, i.e. the
1099
        flows for ingress forwarding.
1100
1101
        Return:
1102
            dict: A dict of flows indexed by the switch_id will be returned, or
1103
                an empty dict if no failover_path is available.
1104
        """
1105 1
        if not self.failover_path:
1106 1
            return {}
1107 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1108
1109
    # pylint: disable=too-many-branches
1110 1
    def _prepare_direct_uni_flows(self):
1111
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1112 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1113 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1114
1115 1
        flow_mod_az = self._prepare_flow_mod(
1116
            self.uni_a.interface, self.uni_z.interface,
1117
            self.queue_id, vlan_a
1118
        )
1119 1
        flow_mod_za = self._prepare_flow_mod(
1120
            self.uni_z.interface, self.uni_a.interface,
1121
            self.queue_id, vlan_z
1122
        )
1123
1124 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1125 1
            flow_mod_az["actions"].insert(
1126
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1127
            )
1128 1
            if not vlan_a:
1129 1
                flow_mod_az["actions"].insert(
1130
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1131
                )
1132 1
            if vlan_a == 0:
1133 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1134 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1135 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1136
1137 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1138 1
            flow_mod_za["actions"].insert(
1139
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1140
                )
1141 1
            if not vlan_z:
1142 1
                flow_mod_za["actions"].insert(
1143
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1144
                )
1145 1
            if vlan_z == 0:
1146 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1147 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1148 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1149
1150 1
        flows = []
1151 1
        if isinstance(vlan_a, list):
1152 1
            for mask_a in vlan_a:
1153 1
                flow_aux = deepcopy(flow_mod_az)
1154 1
                flow_aux["match"]["dl_vlan"] = mask_a
1155 1
                flows.append(flow_aux)
1156
        else:
1157 1
            if vlan_a is not None:
1158 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1159 1
            flows.append(flow_mod_az)
1160
1161 1
        if isinstance(vlan_z, list):
1162 1
            for mask_z in vlan_z:
1163 1
                flow_aux = deepcopy(flow_mod_za)
1164 1
                flow_aux["match"]["dl_vlan"] = mask_z
1165 1
                flows.append(flow_aux)
1166
        else:
1167 1
            if vlan_z is not None:
1168 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1169 1
            flows.append(flow_mod_za)
1170 1
        return (
1171
            self.uni_a.interface.switch.id, flows
1172
        )
1173
1174 1
    def _install_direct_uni_flows(self):
1175
        """Install flows connecting two UNIs.
1176
1177
        This case happens when the circuit is between UNIs in the
1178
        same switch.
1179
        """
1180 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1181 1
        flow_mods = {"switches": [dpid], "flows": flows}
1182 1
        try:
1183 1
            self._send_flow_mods(flow_mods, "install")
1184 1
        except FlowModException as err:
1185 1
            raise EVCPathNotInstalled(str(err)) from err
1186
1187 1
    def _prepare_nni_flows(self, path=None):
1188
        """Prepare NNI flows."""
1189 1
        nni_flows = OrderedDict()
1190 1
        previous = self.uni_a.interface.switch.dpid
1191 1
        for incoming, outcoming in self.links_zipped(path):
1192 1
            in_vlan = incoming.get_metadata("s_vlan").value
1193 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1194 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1195 1
            out_endpoint = self.get_endpoint_by_id(
1196
                outcoming, in_endpoint.switch.id, eq
1197
            )
1198
1199 1
            flows = []
1200
            # Flow for one direction
1201 1
            flows.append(
1202
                self._prepare_nni_flow(
1203
                    in_endpoint,
1204
                    out_endpoint,
1205
                    in_vlan,
1206
                    out_vlan,
1207
                    queue_id=self.queue_id,
1208
                )
1209
            )
1210
1211
            # Flow for the other direction
1212 1
            flows.append(
1213
                self._prepare_nni_flow(
1214
                    out_endpoint,
1215
                    in_endpoint,
1216
                    out_vlan,
1217
                    in_vlan,
1218
                    queue_id=self.queue_id,
1219
                )
1220
            )
1221 1
            previous = in_endpoint.switch.id
1222 1
            nni_flows[in_endpoint.switch.id] = flows
1223 1
        return nni_flows
1224
1225 1
    def _install_flows(
1226
        self, path=None, skip_in=False, skip_out=False
1227
    ) -> dict[str, list[dict]]:
1228
        """Install uni and nni flows"""
1229 1
        flows_by_switch = defaultdict(lambda: {"flows": []})
1230 1
        new_flows = defaultdict(list)
1231 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1232 1
            flows_by_switch[dpid]["flows"].extend(flows)
1233 1
            new_flows[dpid].extend(flows)
1234 1
        for dpid, flows in self._prepare_uni_flows(
1235
            path, skip_in, skip_out
1236
        ).items():
1237 1
            flows_by_switch[dpid]["flows"].extend(flows)
1238 1
            new_flows[dpid].extend(flows)
1239
1240 1
        try:
1241 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1242 1
        except FlowModException as err:
1243 1
            raise EVCPathNotInstalled(str(err)) from err
1244
1245 1
        return new_flows
1246
1247 1
    @staticmethod
1248 1
    def _get_value_from_uni_tag(uni: UNI):
1249
        """Returns the value from tag. In case of any and untagged
1250
        it should return 4096/4096 and 0 respectively"""
1251 1
        special = {"any": "4096/4096", "untagged": 0}
1252 1
        if uni.user_tag:
1253 1
            value = uni.user_tag.value
1254 1
            if isinstance(value, list):
1255 1
                return uni.user_tag.mask_list
1256 1
            return special.get(value, value)
1257 1
        return None
1258
1259
    # pylint: disable=too-many-locals
1260 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1261
        """Prepare flows to install UNIs."""
1262 1
        uni_flows = {}
1263 1
        if not path:
1264
            return uni_flows
1265
1266
        # Determine VLANs
1267 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1268 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1269
1270 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1271 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1272
1273
        # Get endpoints from path
1274 1
        endpoint_a = self.get_endpoint_by_id(
1275
            path[0], self.uni_a.interface.switch.id, eq
1276
        )
1277 1
        endpoint_z = self.get_endpoint_by_id(
1278
            path[-1], self.uni_z.interface.switch.id, eq
1279
        )
1280
1281
        # Flows for the first UNI
1282 1
        flows_a = []
1283
1284
        # Flow for one direction, pushing the service tag
1285 1
        if not skip_in:
1286 1
            if isinstance(in_vlan_a, list):
1287 1
                for in_mask_a in in_vlan_a:
1288 1
                    push_flow = self._prepare_push_flow(
1289
                        self.uni_a.interface,
1290
                        endpoint_a,
1291
                        in_mask_a,
1292
                        out_vlan_a,
1293
                        in_vlan_z,
1294
                        queue_id=self.queue_id,
1295
                    )
1296 1
                    flows_a.append(push_flow)
1297
            else:
1298
                push_flow = self._prepare_push_flow(
1299
                    self.uni_a.interface,
1300
                    endpoint_a,
1301
                    in_vlan_a,
1302
                    out_vlan_a,
1303
                    in_vlan_z,
1304
                    queue_id=self.queue_id,
1305
                )
1306
                flows_a.append(push_flow)
1307
1308
        # Flow for the other direction, popping the service tag
1309 1
        if not skip_out:
1310 1
            pop_flow = self._prepare_pop_flow(
1311
                endpoint_a,
1312
                self.uni_a.interface,
1313
                out_vlan_a,
1314
                in_vlan_a,
1315
                in_vlan_z,
1316
                queue_id=self.queue_id,
1317
            )
1318 1
            flows_a.append(pop_flow)
1319
1320 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1321
1322
        # Flows for the second UNI
1323 1
        flows_z = []
1324
1325
        # Flow for one direction, pushing the service tag
1326 1
        if not skip_in:
1327 1
            if isinstance(in_vlan_z, list):
1328 1
                for in_mask_z in in_vlan_z:
1329 1
                    push_flow = self._prepare_push_flow(
1330
                        self.uni_z.interface,
1331
                        endpoint_z,
1332
                        in_mask_z,
1333
                        out_vlan_z,
1334
                        in_vlan_a,
1335
                        queue_id=self.queue_id,
1336
                    )
1337 1
                    flows_z.append(push_flow)
1338
            else:
1339
                push_flow = self._prepare_push_flow(
1340
                    self.uni_z.interface,
1341
                    endpoint_z,
1342
                    in_vlan_z,
1343
                    out_vlan_z,
1344
                    in_vlan_a,
1345
                    queue_id=self.queue_id,
1346
                )
1347
                flows_z.append(push_flow)
1348
1349
        # Flow for the other direction, popping the service tag
1350 1
        if not skip_out:
1351 1
            pop_flow = self._prepare_pop_flow(
1352
                endpoint_z,
1353
                self.uni_z.interface,
1354
                out_vlan_z,
1355
                in_vlan_z,
1356
                in_vlan_a,
1357
                queue_id=self.queue_id,
1358
            )
1359 1
            flows_z.append(pop_flow)
1360
1361 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1362
1363 1
        return uni_flows
1364
1365 1
    @staticmethod
1366 1
    @retry(
1367
        stop=stop_after_attempt(3),
1368
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1369
        retry=retry_if_exception_type(FlowModException),
1370
        before_sleep=before_sleep,
1371
        reraise=True,
1372
    )
1373 1
    def _send_flow_mods(
1374
        data_content: dict,
1375
        command="install",
1376
        force=False,
1377
        by_switch=False
1378
    ):
1379
        """Send a flow_mod list to a specific switch.
1380
1381
        Args:
1382
            dpid(str): The target of flows (i.e. Switch.id).
1383
            flow_mods(dict): Python dictionary with flow_mods.
1384
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1385
            force(bool): True to send via consistency check in case of errors.
1386
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1387
        """
1388 1
        if by_switch:
1389 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1390
        else:
1391 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1392 1
            data_content["force"] = force
1393 1
        try:
1394 1
            if command == "install":
1395 1
                res = httpx.post(endpoint, json=data_content, timeout=30)
1396 1
            elif command == "delete":
1397 1
                res = httpx.request(
1398
                    "DELETE", endpoint, json=data_content, timeout=30
1399
                )
1400 1
        except httpx.RequestError as err:
1401 1
            raise FlowModException(str(err)) from err
1402 1
        if res.is_server_error or res.status_code >= 400:
1403 1
            raise FlowModException(res.text)
1404
1405 1
    def get_cookie(self):
1406
        """Return the cookie integer from evc id."""
1407 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1408
1409 1
    @staticmethod
1410 1
    def get_id_from_cookie(cookie):
1411
        """Return the evc id given a cookie value."""
1412 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1413 1
        return f"{evc_id:x}".zfill(14)
1414
1415 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1416
        """Set table_group and table_id"""
1417 1
        table_group = "epl" if vlan is None else "evpl"
1418 1
        flow_mod["table_group"] = table_group
1419 1
        flow_mod["table_id"] = self.table_group[table_group]
1420 1
        return flow_mod
1421
1422 1
    @staticmethod
1423 1
    def get_priority(vlan):
1424
        """Return priority value depending on vlan value"""
1425 1
        if isinstance(vlan, list):
1426 1
            return settings.EVPL_SB_PRIORITY
1427 1
        if vlan not in {None, "4096/4096", 0}:
1428 1
            return settings.EVPL_SB_PRIORITY
1429 1
        if vlan == 0:
1430 1
            return settings.UNTAGGED_SB_PRIORITY
1431 1
        if vlan == "4096/4096":
1432 1
            return settings.ANY_SB_PRIORITY
1433 1
        return settings.EPL_SB_PRIORITY
1434
1435 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1436
                          queue_id=None, vlan=True):
1437
        """Prepare a common flow mod."""
1438 1
        default_actions = [
1439
            {"action_type": "output", "port": out_interface.port_number}
1440
        ]
1441 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1442 1
        if queue_id is not None:
1443 1
            default_actions.insert(
1444
                0,
1445
                {"action_type": "set_queue", "queue_id": queue_id}
1446
            )
1447
1448 1
        flow_mod = {
1449
            "match": {"in_port": in_interface.port_number},
1450
            "cookie": self.get_cookie(),
1451
            "actions": default_actions,
1452
            "owner": "mef_eline",
1453
        }
1454
1455 1
        self.set_flow_table_group_id(flow_mod, vlan)
1456 1
        if self.sb_priority:
1457 1
            flow_mod["priority"] = self.sb_priority
1458
        else:
1459 1
            flow_mod["priority"] = self.get_priority(vlan)
1460 1
        return flow_mod
1461
1462 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1463
        """Create NNI flows."""
1464 1
        in_interface, out_interface, in_vlan, out_vlan = args
1465 1
        flow_mod = self._prepare_flow_mod(
1466
            in_interface, out_interface, queue_id
1467
        )
1468 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1469 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1470 1
        flow_mod["actions"].insert(0, new_action)
1471
1472 1
        return flow_mod
1473
1474 1
    def _prepare_push_flow(self, *args, queue_id=None):
1475
        """Prepare push flow.
1476
1477
        Arguments:
1478
            in_interface(Interface): Interface input.
1479
            out_interface(Interface): Interface output.
1480
            in_vlan(int,str,None): Vlan input.
1481
            out_vlan(int): Vlan output.
1482
            new_c_vlan(int,str,list,None): New client vlan.
1483
1484
        Return:
1485
            dict: An python dictionary representing a FlowMod
1486
1487
        """
1488
        # assign all arguments
1489 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1490 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1491 1
        flow_mod = self._prepare_flow_mod(
1492
            in_interface, out_interface, queue_id, vlan_pri
1493
        )
1494
        # the service tag must be always pushed
1495 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1496 1
        flow_mod["actions"].insert(0, new_action)
1497
1498 1
        if (
1499
            not (in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1500
                 isinstance(new_c_vlan, int))
1501
        ):
1502
            # Add service VLAN header when it does NOT fall into this
1503
            # statement: Both VLANs should be integer and different.
1504 1
            new_action = {"action_type": "push_vlan", "tag_type": "s"}
1505 1
            flow_mod["actions"].insert(0, new_action)
1506
1507 1
        if in_vlan is not None:
1508
            # if in_vlan is set, it must be included in the match
1509 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1510
1511 1
        if (
1512
            not isinstance(in_vlan, int) and isinstance(new_c_vlan, int) and
1513
            new_c_vlan != 0
1514
        ):
1515
            # new_in_vlan is an integer but zero, action to set is required
1516 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1517 1
            flow_mod["actions"].insert(0, new_action)
1518
1519 1
        if in_vlan == "4096/4096" and new_c_vlan == 0:
1520
            # if in_vlan match with any tags and new_c_vlan does not,
1521
            # a pop action is required
1522 1
            new_action = {"action_type": "pop_vlan"}
1523 1
            flow_mod["actions"].insert(0, new_action)
1524
1525 1
        elif (not in_vlan and
1526
                (not isinstance(new_c_vlan, list) and
1527
                 new_c_vlan not in self.special_cases)):
1528
            # new_in_vlan is an integer but zero and in_vlan is a no-tag or
1529
            # untagged
1530 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1531 1
            flow_mod["actions"].insert(0, new_action)
1532
1533 1
        return flow_mod
1534
1535 1
    def _prepare_pop_flow(
1536
        self,
1537
        in_interface: Interface,
1538
        out_interface: Interface,
1539
        out_vlan: int,
1540
        in_vlan: Union[int, str, list, None],
1541
        new_c_vlan: Union[int, str, list, None],
1542
        queue_id=None,
1543
    ):
1544
        # pylint: disable=too-many-arguments
1545
        """Prepare pop flow."""
1546 1
        flow_mod = self._prepare_flow_mod(
1547
            in_interface, out_interface, queue_id
1548
        )
1549 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1550 1
        if in_vlan == 0:
1551 1
            new_action = {"action_type": "pop_vlan"}
1552 1
            flow_mod["actions"].insert(0, new_action)
1553 1
        elif (
1554
            in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1555
            isinstance(new_c_vlan, int)
1556
        ):
1557
            # If UNI VLANs are different and in_vlan is not 0
1558 1
            new_action = {"action_type": "set_vlan", "vlan_id": in_vlan}
1559 1
            flow_mod["actions"].insert(0, new_action)
1560
        else:
1561 1
            new_action = {"action_type": "pop_vlan"}
1562 1
            flow_mod["actions"].insert(0, new_action)
1563 1
        return flow_mod
1564
1565 1
    @staticmethod
1566 1
    def run_bulk_sdntraces(
1567
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1568
    ) -> dict:
1569
        """Run SDN traces on control plane starting from EVC UNIs."""
1570 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1571 1
        data = []
1572 1
        for interface, tag_value in uni_list:
1573 1
            data_uni = {
1574
                "trace": {
1575
                            "switch": {
1576
                                "dpid": interface.switch.dpid,
1577
                                "in_port": interface.port_number,
1578
                            }
1579
                        }
1580
                }
1581 1
            if tag_value:
1582 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1583 1
                if uni_dl_vlan:
1584 1
                    data_uni["trace"]["eth"] = {
1585
                                            "dl_type": 0x8100,
1586
                                            "dl_vlan": uni_dl_vlan,
1587
                                            }
1588 1
            data.append(data_uni)
1589 1
        try:
1590 1
            response = httpx.put(endpoint, json=data, timeout=30)
1591 1
        except httpx.TimeoutException as exception:
1592 1
            log.error(f"Request has timed out: {exception}")
1593 1
            return {"result": []}
1594 1
        if response.status_code >= 400:
1595 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1596 1
            return {"result": []}
1597 1
        return response.json()
1598
1599
    # pylint: disable=too-many-return-statements, too-many-arguments
1600 1
    @staticmethod
1601 1
    def check_trace(
1602
        evc_id: str,
1603
        evc_name: str,
1604
        tag_a: Union[None, int, str],
1605
        tag_z: Union[None, int, str],
1606
        interface_a: Interface,
1607
        interface_z: Interface,
1608
        current_path: list,
1609
        trace_a: list,
1610
        trace_z: list
1611
    ) -> bool:
1612
        """Auxiliar function to check an individual trace"""
1613 1
        if (
1614
            len(trace_a) != len(current_path) + 1
1615
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1616
        ):
1617 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1618
                        f"Invalid trace from uni_a: {trace_a}")
1619 1
            return False
1620 1
        if (
1621
            len(trace_z) != len(current_path) + 1
1622
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1623
        ):
1624 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1625
                        f"Invalid trace from uni_z: {trace_z}")
1626 1
            return False
1627
1628 1
        if not current_path:
1629
            return True
1630
1631 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1632 1
        if (
1633
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1634
        ):
1635 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1636 1
        elif (
1637
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1638
        ):
1639 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1640
        else:
1641
            msg = (
1642
                f"first link {first_link} endpoint_a didn't match the first "
1643
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1644
            )
1645
            log.warning(msg)
1646
            return False
1647
1648 1
        for link, trace1, trace2 in zip(current_path,
1649
                                        trace_path_begin[1:],
1650
                                        trace_path_end[:0:-1]):
1651 1
            metadata_vlan = None
1652 1
            if link.metadata:
1653 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1654 1
            if compare_endpoint_trace(
1655
                                        link.endpoint_a,
1656
                                        metadata_vlan,
1657
                                        trace2
1658
                                    ) is False:
1659 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1660
                            f"Invalid trace from uni_a: {trace_a}")
1661 1
                return False
1662 1
            if compare_endpoint_trace(
1663
                                        link.endpoint_b,
1664
                                        metadata_vlan,
1665
                                        trace1
1666
                                    ) is False:
1667 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1668
                            f"Invalid trace from uni_z: {trace_z}")
1669 1
                return False
1670
1671 1
        return True
1672
1673 1
    @staticmethod
1674 1
    def check_range(circuit, traces: list) -> bool:
1675
        """Check traces when for UNI with TAGRange"""
1676 1
        check = True
1677 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1678 1
            trace_a = traces[i*2]
1679 1
            trace_z = traces[i*2+1]
1680 1
            check &= EVCDeploy.check_trace(
1681
                circuit.id, circuit.name,
1682
                mask, mask,
1683
                circuit.uni_a.interface,
1684
                circuit.uni_z.interface,
1685
                circuit.current_path,
1686
                trace_a, trace_z,
1687
            )
1688 1
        return check
1689
1690 1
    @staticmethod
1691 1
    def check_list_traces(list_circuits: list) -> dict:
1692
        """Check if current_path is deployed comparing with SDN traces."""
1693 1
        if not list_circuits:
1694 1
            return {}
1695 1
        uni_list = make_uni_list(list_circuits)
1696 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1697
1698 1
        if not traces:
1699 1
            return {}
1700
1701 1
        try:
1702 1
            circuits_checked = {}
1703 1
            i = 0
1704 1
            for circuit in list_circuits:
1705 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1706 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1707 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1708
                        circuit, traces[i:i+length*2]
1709
                    )
1710 1
                    i += length*2
1711
                else:
1712 1
                    trace_a = traces[i]
1713 1
                    trace_z = traces[i+1]
1714 1
                    tag_a = None
1715 1
                    if circuit.uni_a.user_tag:
1716 1
                        tag_a = circuit.uni_a.user_tag.value
1717 1
                    tag_z = None
1718 1
                    if circuit.uni_z.user_tag:
1719 1
                        tag_z = circuit.uni_z.user_tag.value
1720 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1721
                        circuit.id, circuit.name,
1722
                        tag_a, tag_z,
1723
                        circuit.uni_a.interface,
1724
                        circuit.uni_z.interface,
1725
                        circuit.current_path,
1726
                        trace_a, trace_z
1727
                    )
1728 1
                    i += 2
1729 1
        except IndexError as err:
1730 1
            log.error(
1731
                f"Bulk sdntraces returned fewer items than expected."
1732
                f"Error = {err}"
1733
            )
1734 1
            return {}
1735
1736 1
        return circuits_checked
1737
1738 1
    @staticmethod
1739 1
    def get_endpoint_by_id(
1740
        link: Link,
1741
        id_: str,
1742
        operator: Union[eq, ne]
1743
    ) -> Interface:
1744
        """Return endpoint from link
1745
        either equal(eq) or not equal(ne) to id"""
1746 1
        if operator(link.endpoint_a.switch.id, id_):
1747 1
            return link.endpoint_a
1748 1
        return link.endpoint_b
1749
1750
1751 1
class LinkProtection(EVCDeploy):
1752
    """Class to handle link protection."""
1753
1754 1
    def is_affected_by_link(self, link=None):
1755
        """Verify if the current path is affected by link down event."""
1756
        return self.current_path.is_affected_by_link(link)
1757
1758 1
    def is_using_primary_path(self):
1759
        """Verify if the current deployed path is self.primary_path."""
1760 1
        return self.current_path == self.primary_path
1761
1762 1
    def is_using_backup_path(self):
1763
        """Verify if the current deployed path is self.backup_path."""
1764 1
        return self.current_path == self.backup_path
1765
1766 1
    def is_using_dynamic_path(self):
1767
        """Verify if the current deployed path is dynamic."""
1768 1
        if (
1769
            self.current_path
1770
            and not self.is_using_primary_path()
1771
            and not self.is_using_backup_path()
1772
            and self.current_path.status is EntityStatus.UP
1773
        ):
1774
            return True
1775 1
        return False
1776
1777 1
    def handle_link_up(self, link=None, interface=None):
1778
        """Handle circuit when link up.
1779
1780
        Args:
1781
            link(Link): Link affected by link.up event.
1782
1783
        """
1784 1
        condition_pairs = [
1785
            (
1786
                lambda me: me.is_using_primary_path(),
1787
                lambda _: (True, 'nothing')
1788
            ),
1789
            (
1790
                lambda me: me.is_intra_switch(),
1791
                lambda _: (True, 'nothing')
1792
            ),
1793
            (
1794
                lambda me: me.primary_path.is_affected_by_link(link),
1795
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1796
            ),
1797
            # For this special case, it reached this point because interface
1798
            # was previously confirmed to be a UNI and both UNI are UP
1799
            (
1800
                lambda me: (me.primary_path.status == EntityStatus.UP
1801
                            and interface),
1802
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1803
            ),
1804
            (
1805
                lambda me: (me.backup_path.status == EntityStatus.UP
1806
                            and interface),
1807
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1808
            ),
1809
            # We tried to deploy(primary_path) without success.
1810
            # And in this case is up by some how. Nothing to do.
1811
            (
1812
                lambda me: me.is_using_backup_path(),
1813
                lambda _: (True, 'nothing')
1814
            ),
1815
            (
1816
                lambda me:  me.is_using_dynamic_path(),
1817
                lambda _: (True, 'nothing')
1818
            ),
1819
            # In this case, probably the circuit is not being used and
1820
            # we can move to backup
1821
            (
1822
                lambda me: me.backup_path.is_affected_by_link(link),
1823
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1824
            ),
1825
            # In this case, the circuit is not being used and we should
1826
            # try a dynamic path
1827
            (
1828
                lambda me: me.dynamic_backup_path and not me.is_active(),
1829
                lambda me: (me.deploy_to_path(), 'redeploy')
1830
            )
1831
        ]
1832 1
        for predicate, action in condition_pairs:
1833 1
            if not predicate(self):
1834 1
                continue
1835 1
            success, succcess_type = action(self)
1836 1
            if success:
1837 1
                if succcess_type == 'redeploy':
1838 1
                    emit_event(
1839
                        self._controller,
1840
                        "redeployed_link_up",
1841
                        content=map_evc_event_content(self)
1842
                    )
1843 1
                return True
1844 1
        return False
1845
1846 1
    def handle_link_down(self):
1847
        """Handle circuit when link down.
1848
1849
        Returns:
1850
            bool: True if the re-deploy was successly otherwise False.
1851
1852
        """
1853 1
        success = False
1854 1
        if self.is_using_primary_path():
1855 1
            success = self.deploy_to_backup_path()
1856 1
        elif self.is_using_backup_path():
1857 1
            success = self.deploy_to_primary_path()
1858
1859 1
        if not success and self.dynamic_backup_path:
1860 1
            success = self.deploy_to_path()
1861
1862 1
        if success:
1863 1
            log.debug(f"{self} deployed after link down.")
1864
        else:
1865 1
            self.remove_current_flows(sync=False)
1866 1
            self.deactivate()
1867 1
            self.sync()
1868 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1869
1870 1
        return success
1871
1872 1
    def are_unis_active(self) -> bool:
1873
        """Determine whether this EVC should be active"""
1874 1
        interface_a = self.uni_a.interface
1875 1
        interface_z = self.uni_z.interface
1876 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1877 1
        return active
1878
1879 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1880
        """Try to handle UNI as link_up to trigger deployment."""
1881
        if (
1882
            self.current_path.status != EntityStatus.UP
1883
            and not self.is_intra_switch()
1884
        ):
1885
            succeeded = self.handle_link_up(interface=interface)
1886
            if succeeded:
1887
                msg = (
1888
                    f"Activated {self} due to successful "
1889
                    f"deployment triggered by {interface}"
1890
                )
1891
            else:
1892
                msg = (
1893
                    f"Couldn't activate {self} due to unsuccessful "
1894
                    f"deployment triggered by {interface}"
1895
                )
1896
            log.info(msg)
1897
            return True
1898
        return False
1899
1900 1
    def handle_interface_link_up(self, interface: Interface):
1901
        """
1902
        Handler for interface link_up events
1903
        """
1904 1
        if not _does_uni_affect_evc(self, interface, "up"):
1905 1
            return
1906 1
        if self.try_to_handle_uni_as_link_up(interface):
1907
            return
1908
1909 1
        interface_dicts = {
1910
            interface.id: {
1911
                'status': interface.status.value,
1912
                'status_reason': interface.status_reason,
1913
            }
1914
            for interface in (self.uni_a.interface, self.uni_z.interface)
1915
        }
1916 1
        try:
1917 1
            self.try_to_activate()
1918 1
            log.info(
1919
                f"Activating {self}. Interfaces: "
1920
                f"{interface_dicts}."
1921
            )
1922 1
            emit_event(self._controller, "uni_active_updated",
1923
                       content=map_evc_event_content(self))
1924 1
            self.sync()
1925
        except ActivationError as exc:
1926
            # On this ctx, no ActivationError isn't expected since the
1927
            # activation pre-requisites states were checked, so handled as err
1928
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1929
1930 1
    def handle_interface_link_down(self, interface):
1931
        """
1932
        Handler for interface link_down events
1933
        """
1934 1
        if not _does_uni_affect_evc(self, interface, "down"):
1935 1
            return
1936 1
        interface_dicts = {
1937
            interface.id: {
1938
                'status': interface.status.value,
1939
                'status_reason': interface.status_reason,
1940
            }
1941
            for interface in (self.uni_a.interface, self.uni_z.interface)
1942
            if interface.status != EntityStatus.UP
1943
        }
1944 1
        self.deactivate()
1945 1
        log.info(
1946
            f"Deactivating {self}. Interfaces: "
1947
            f"{interface_dicts}."
1948
        )
1949 1
        emit_event(self._controller, "uni_active_updated",
1950
                   content=map_evc_event_content(self))
1951 1
        self.sync()
1952
1953
1954 1
class EVC(LinkProtection):
1955
    """Class that represents a E-Line Virtual Connection."""
1956