Passed
Pull Request — master (#690)
by
unknown
05:52 queued 01:19
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc,
30
                                         compare_endpoint_trace,
31
                                         compare_uni_out_trace, emit_event,
32
                                         make_uni_list, map_dl_vlan,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts)
35
36 1
from .path import DynamicPathManager, Path
37
38
39 1
class EVCBase(GenericEntity):
40
    """Class to represent a circuit."""
41
42 1
    attributes_requiring_redeploy = [
43
        "primary_path",
44
        "backup_path",
45
        "dynamic_backup_path",
46
        "queue_id",
47
        "sb_priority",
48
        "primary_constraints",
49
        "secondary_constraints",
50
        "uni_a",
51
        "uni_z",
52
    ]
53 1
    required_attributes = ["name", "uni_a", "uni_z"]
54
55 1
    updatable_attributes = {
56
        "uni_a",
57
        "uni_z",
58
        "name",
59
        "start_date",
60
        "end_date",
61
        "queue_id",
62
        "bandwidth",
63
        "primary_path",
64
        "backup_path",
65
        "dynamic_backup_path",
66
        "primary_constraints",
67
        "secondary_constraints",
68
        "owner",
69
        "sb_priority",
70
        "service_level",
71
        "circuit_scheduler",
72
        "metadata",
73
        "enabled",
74
        "max_paths",
75
    }
76
77
    # pylint: disable=too-many-statements
78 1
    def __init__(self, controller, **kwargs):
79
        """Create an EVC instance with the provided parameters.
80
81
        Args:
82
            id(str): EVC identifier. Whether it's None an ID will be genereted.
83
                     Only the first 14 bytes passed will be used.
84
            name: represents an EVC name.(Required)
85
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
86
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
87
            start_date(datetime|str): Date when the EVC was registred.
88
                                      Default is now().
89
            end_date(datetime|str): Final date that the EVC will be fineshed.
90
                                    Default is None.
91
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
92
            primary_links(list): Primary links used by evc. Default is []
93
            backup_links(list): Backups links used by evc. Default is []
94
            current_path(list): Circuit being used at the moment if this is an
95
                                active circuit. Default is [].
96
            failover_path(list): Path being used to provide EVC protection via
97
                                failover during link failures. Default is [].
98
            primary_path(list): primary circuit offered to user IF one or more
99
                                links were provided. Default is [].
100
            backup_path(list): backup circuit offered to the user IF one or
101
                               more links were provided. Default is [].
102
            dynamic_backup_path(bool): Enable computer backup path dynamically.
103
                                       Dafault is False.
104
            creation_time(datetime|str): datetime when the circuit should be
105
                                         activated. default is now().
106
            enabled(Boolean): attribute to indicate the administrative state;
107
                              default is False.
108
            active(Boolean): attribute to indicate the operational state;
109
                             default is False.
110
            archived(Boolean): indicate the EVC has been deleted and is
111
                               archived; default is False.
112
            owner(str): The EVC owner. Default is None.
113
            sb_priority(int): Service level provided in the request.
114
                              Default is None.
115
            service_level(int): Service level provided. The higher the better.
116
                                Default is 0.
117
118
        Raises:
119
            ValueError: raised when object attributes are invalid.
120
121
        """
122 1
        self._controller = controller
123 1
        self._validate(**kwargs)
124 1
        super().__init__()
125
126
        # required attributes
127 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
128 1
        self.uni_a: UNI = kwargs.get("uni_a")
129 1
        self.uni_z: UNI = kwargs.get("uni_z")
130 1
        self.name = kwargs.get("name")
131
132
        # optional attributes
133 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
134 1
        self.end_date = get_time(kwargs.get("end_date")) or None
135 1
        self.queue_id = kwargs.get("queue_id", -1)
136
137 1
        self.bandwidth = kwargs.get("bandwidth", 0)
138 1
        self.primary_links = Path(kwargs.get("primary_links", []))
139 1
        self.backup_links = Path(kwargs.get("backup_links", []))
140 1
        self.current_path = Path(kwargs.get("current_path", []))
141 1
        self.failover_path = Path(kwargs.get("failover_path", []))
142 1
        self.primary_path = Path(kwargs.get("primary_path", []))
143 1
        self.backup_path = Path(kwargs.get("backup_path", []))
144 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
145 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
146 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
147 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
148 1
        self.owner = kwargs.get("owner", None)
149 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
150
            "priority", None
151
        )
152 1
        self.service_level = kwargs.get("service_level", 0)
153 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
154 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
155 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
156 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
157 1
        self.current_links_cache = set()
158 1
        self.primary_links_cache = set()
159 1
        self.backup_links_cache = set()
160 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
161 1
        self.old_path = Path([])
162 1
        self.max_paths = kwargs.get("max_paths", 2)
163
164 1
        self.lock = Lock()
165
166 1
        self.archived = kwargs.get("archived", False)
167
168 1
        self.metadata = kwargs.get("metadata", {})
169
170 1
        self._mongo_controller = controllers.ELineController()
171
172 1
        if kwargs.get("active", False):
173 1
            self.activate()
174
        else:
175 1
            self.deactivate()
176
177 1
        if kwargs.get("enabled", False):
178 1
            self.enable()
179
        else:
180 1
            self.disable()
181
182
        # datetime of user request for a EVC (or datetime when object was
183
        # created)
184 1
        self.request_time = kwargs.get("request_time", now())
185
        # dict with the user original request (input)
186 1
        self._requested = kwargs
187
188
        # Special cases: No tag, any, untagged
189 1
        self.special_cases = {None, "4096/4096", 0}
190 1
        self.table_group = kwargs.get("table_group")
191
192 1
    def sync(self, keys: set = None):
193
        """Sync this EVC in the MongoDB."""
194 1
        self.updated_at = now()
195 1
        if keys:
196 1
            self._mongo_controller.update_evc(self.as_dict(keys))
197 1
            return
198 1
        self._mongo_controller.upsert_evc(self.as_dict())
199
200 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
201
        """Obtain both UNIs (uni_a, uni_z).
202
        If a UNI is changing, verify tags"""
203 1
        uni_a = kwargs.get("uni_a", None)
204 1
        uni_a_flag = False
205 1
        if uni_a and uni_a != self.uni_a:
206 1
            uni_a_flag = True
207 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
208
209 1
        uni_z = kwargs.get("uni_z", None)
210 1
        if uni_z and uni_z != self.uni_z:
211 1
            try:
212 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
213 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
214 1
            except KytosTagError as err:
215 1
                if uni_a_flag:
216 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
217 1
                raise err
218
        else:
219 1
            uni_z = self.uni_z
220
221 1
        if uni_a_flag:
222 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
223
        else:
224 1
            uni_a = self.uni_a
225 1
        return uni_a, uni_z
226
227 1
    def update(self, **kwargs):
228
        """Update evc attributes.
229
230
        This method will raises an error trying to change the following
231
        attributes: [creation_time, active, current_path, failover_path,
232
        _id, archived]
233
        [name, uni_a and uni_z]
234
235
        Returns:
236
            the values for enable and a redeploy attribute, if exists and None
237
            otherwise
238
        Raises:
239
            ValueError: message with error detail.
240
241
        """
242 1
        enable, redeploy = (None, None)
243 1
        if not self._tag_lists_equal(**kwargs):
244 1
            raise ValueError(
245
                "UNI_A and UNI_Z tag lists should be the same."
246
            )
247 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
248 1
        self._validate_has_primary_or_dynamic(
249
            primary_path=kwargs.get("primary_path"),
250
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
251
            uni_a=uni_a,
252
            uni_z=uni_z,
253
        )
254 1
        for attribute, value in kwargs.items():
255 1
            if attribute not in self.updatable_attributes:
256 1
                raise ValueError(f"{attribute} can't be updated.")
257 1
            if attribute in ("primary_path", "backup_path"):
258 1
                try:
259 1
                    value.is_valid(
260
                        uni_a.interface.switch, uni_z.interface.switch
261
                    )
262 1
                except InvalidPath as exception:
263 1
                    raise ValueError(  # pylint: disable=raise-missing-from
264
                        f"{attribute} is not a " f"valid path: {exception}"
265
                    )
266 1
        for attribute, value in kwargs.items():
267 1
            if attribute == "enabled":
268 1
                if value:
269 1
                    self.enable()
270
                else:
271 1
                    self.disable()
272 1
                enable = value
273
            else:
274 1
                setattr(self, attribute, value)
275 1
                if attribute in self.attributes_requiring_redeploy:
276 1
                    redeploy = True
277 1
        self.sync(set(kwargs.keys()))
278 1
        return enable, redeploy
279
280 1
    def set_flow_removed_at(self):
281
        """Update flow_removed_at attribute."""
282
        self.flow_removed_at = now()
283
284 1
    def has_recent_removed_flow(self, setting=settings):
285
        """Check if any flow has been removed from the evc"""
286
        if self.flow_removed_at is None:
287
            return False
288
        res_seconds = (now() - self.flow_removed_at).seconds
289
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
290
291 1
    def is_recent_updated(self, setting=settings):
292
        """Check if the evc has been updated recently"""
293
        res_seconds = (now() - self.updated_at).seconds
294
        return res_seconds < setting.TIME_RECENT_UPDATED
295
296 1
    def __repr__(self):
297
        """Repr method."""
298 1
        return f"EVC({self._id}, {self.name})"
299
300 1
    def _validate(self, **kwargs):
301
        """Do Basic validations.
302
303
        Verify required attributes: name, uni_a, uni_z
304
305
        Raises:
306
            ValueError: message with error detail.
307
308
        """
309 1
        for attribute in self.required_attributes:
310
311 1
            if attribute not in kwargs:
312 1
                raise ValueError(f"{attribute} is required.")
313
314 1
            if "uni" in attribute:
315 1
                uni = kwargs.get(attribute)
316 1
                if not isinstance(uni, UNI):
317
                    raise ValueError(f"{attribute} is an invalid UNI.")
318
319 1
    def _tag_lists_equal(self, **kwargs):
320
        """Verify that tag lists are the same."""
321 1
        uni_a = kwargs.get("uni_a") or self.uni_a
322 1
        uni_z = kwargs.get("uni_z") or self.uni_z
323 1
        uni_a_list = uni_z_list = False
324 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
325 1
            uni_a_list = True
326 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
327 1
            uni_z_list = True
328 1
        if uni_a_list and uni_z_list:
329 1
            return uni_a.user_tag.value == uni_z.user_tag.value
330 1
        return uni_a_list == uni_z_list
331
332 1
    def _validate_has_primary_or_dynamic(
333
        self,
334
        primary_path=None,
335
        dynamic_backup_path=None,
336
        uni_a=None,
337
        uni_z=None,
338
    ) -> None:
339
        """Validate that it must have a primary path or allow dynamic paths."""
340 1
        primary_path = (
341
            primary_path
342
            if primary_path is not None
343
            else self.primary_path
344
        )
345 1
        dynamic_backup_path = (
346
            dynamic_backup_path
347
            if dynamic_backup_path is not None
348
            else self.dynamic_backup_path
349
        )
350 1
        uni_a = uni_a if uni_a is not None else self.uni_a
351 1
        uni_z = uni_z if uni_z is not None else self.uni_z
352 1
        if (
353
            not primary_path
354
            and not dynamic_backup_path
355
            and uni_a and uni_z
356
            and uni_a.interface.switch != uni_z.interface.switch
357
        ):
358 1
            msg = "The EVC must have a primary path or allow dynamic paths."
359 1
            raise ValueError(msg)
360
361 1
    def __eq__(self, other):
362
        """Override the default implementation."""
363 1
        if not isinstance(other, EVC):
364
            return False
365
366 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
367 1
        for attribute in attrs_to_compare:
368 1
            if getattr(other, attribute) != getattr(self, attribute):
369 1
                return False
370 1
        return True
371
372 1
    def is_intra_switch(self):
373
        """Check if the UNIs are in the same switch."""
374 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
375
376 1
    def check_no_tag_duplicate(self, other_uni: UNI):
377
        """Check if a no tag UNI is duplicated."""
378 1
        if other_uni in (self.uni_a, self.uni_z):
379 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
380
                  f" duplicated with {self}."
381 1
            raise DuplicatedNoTagUNI(msg)
382
383 1
    def as_dict(self, keys: set = None):
384
        """Return a dictionary representing an EVC object.
385
            keys: Only fields on this variable will be
386
                  returned in the dictionary"""
387 1
        evc_dict = {
388
            "id": self.id,
389
            "name": self.name,
390
            "uni_a": self.uni_a.as_dict(),
391
            "uni_z": self.uni_z.as_dict(),
392
        }
393
394 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
395
396 1
        evc_dict["start_date"] = self.start_date
397 1
        if isinstance(self.start_date, datetime):
398 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
399
400 1
        evc_dict["end_date"] = self.end_date
401 1
        if isinstance(self.end_date, datetime):
402 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
403
404 1
        evc_dict["queue_id"] = self.queue_id
405 1
        evc_dict["bandwidth"] = self.bandwidth
406 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
407 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
408 1
        evc_dict["current_path"] = self.current_path.as_dict()
409 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
410 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
411 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
412 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
413 1
        evc_dict["metadata"] = self.metadata
414
415 1
        evc_dict["request_time"] = self.request_time
416 1
        if isinstance(self.request_time, datetime):
417 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
418
419 1
        time = self.creation_time.strftime(time_fmt)
420 1
        evc_dict["creation_time"] = time
421
422 1
        evc_dict["owner"] = self.owner
423 1
        evc_dict["circuit_scheduler"] = [
424
            sc.as_dict() for sc in self.circuit_scheduler
425
        ]
426
427 1
        evc_dict["active"] = self.is_active()
428 1
        evc_dict["enabled"] = self.is_enabled()
429 1
        evc_dict["archived"] = self.archived
430 1
        evc_dict["sb_priority"] = self.sb_priority
431 1
        evc_dict["service_level"] = self.service_level
432 1
        evc_dict["primary_constraints"] = self.primary_constraints
433 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
434 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
435 1
        evc_dict["updated_at"] = self.updated_at
436 1
        evc_dict["max_paths"] = self.max_paths
437
438 1
        if keys:
439 1
            selected = {}
440 1
            for key in keys:
441 1
                selected[key] = evc_dict[key]
442 1
            selected["id"] = evc_dict["id"]
443 1
            return selected
444 1
        return evc_dict
445
446 1
    @property
447 1
    def id(self):  # pylint: disable=invalid-name
448
        """Return this EVC's ID."""
449 1
        return self._id
450
451 1
    def archive(self):
452
        """Archive this EVC on deletion."""
453 1
        self.archived = True
454
455 1
    def _use_uni_vlan(
456
        self,
457
        uni: UNI,
458
        uni_dif: Union[None, UNI] = None
459
    ):
460
        """Use tags from UNI"""
461 1
        if uni.user_tag is None:
462 1
            return
463 1
        tag = uni.user_tag.value
464 1
        tag_type = uni.user_tag.tag_type
465 1
        if (uni_dif and isinstance(tag, list) and
466
                isinstance(uni_dif.user_tag.value, list)):
467 1
            tag = range_difference(tag, uni_dif.user_tag.value)
468 1
            if not tag:
469 1
                return
470 1
        uni.interface.use_tags(
471
            self._controller, tag, tag_type, use_lock=True, check_order=False
472
        )
473
474 1
    def make_uni_vlan_available(
475
        self,
476
        uni: UNI,
477
        uni_dif: Union[None, UNI] = None,
478
    ):
479
        """Make available tag from UNI"""
480 1
        if uni.user_tag is None:
481 1
            return
482 1
        tag = uni.user_tag.value
483 1
        tag_type = uni.user_tag.tag_type
484 1
        if (uni_dif and isinstance(tag, list) and
485
                isinstance(uni_dif.user_tag.value, list)):
486 1
            tag = range_difference(tag, uni_dif.user_tag.value)
487 1
            if not tag:
488
                return
489 1
        try:
490 1
            conflict = uni.interface.make_tags_available(
491
                self._controller, tag, tag_type, use_lock=True,
492
                check_order=False
493
            )
494 1
        except KytosTagError as err:
495 1
            log.error(f"Error in {self}: {err}")
496 1
            return
497 1
        if conflict:
498 1
            intf = uni.interface.id
499 1
            log.warning(f"Tags {conflict} was already available in {intf}")
500
501 1
    def remove_uni_tags(self):
502
        """Remove both UNI usage of a tag"""
503 1
        self.make_uni_vlan_available(self.uni_a)
504 1
        self.make_uni_vlan_available(self.uni_z)
505
506
507
# pylint: disable=fixme, too-many-public-methods
508 1
class EVCDeploy(EVCBase):
509
    """Class to handle the deploy procedures."""
510
511 1
    def create(self):
512
        """Create a EVC."""
513
514 1
    def discover_new_paths(self):
515
        """Discover new paths to satisfy this circuit and deploy it."""
516
        return DynamicPathManager.get_best_paths(self, self.max_paths,
517
                                                 **self.primary_constraints)
518
519 1
    def get_failover_path_candidates(self):
520
        """Get failover paths to satisfy this EVC."""
521
        # in the future we can return primary/backup paths as well
522
        # we just have to properly handle link_up and failover paths
523
        # if (
524
        #     self.is_using_primary_path() and
525
        #     self.backup_path.status is EntityStatus.UP
526
        # ):
527
        #     yield self.backup_path
528 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
529
530 1
    def change_path(self):
531
        """Change EVC path."""
532
533 1
    def reprovision(self):
534
        """Force the EVC (re-)provisioning."""
535
536 1
    def is_affected_by_link(self, link):
537
        """Return True if this EVC has the given link on its current path."""
538 1
        return link in self.current_path
539
540 1
    def link_affected_by_interface(self, interface):
541
        """Return True if this EVC has the given link on its current path."""
542
        return self.current_path.link_affected_by_interface(interface)
543
544 1
    def is_backup_path_affected_by_link(self, link):
545
        """Return True if the backup path of this EVC uses the given link."""
546 1
        return link in self.backup_path
547
548
    # pylint: disable=invalid-name
549 1
    def is_primary_path_affected_by_link(self, link):
550
        """Return True if the primary path of this EVC uses the given link."""
551 1
        return link in self.primary_path
552
553 1
    def is_failover_path_affected_by_link(self, link):
554
        """Return True if this EVC has the given link on its failover path."""
555 1
        return link in self.failover_path
556
557 1
    def is_eligible_for_failover_path(self):
558
        """Verify if this EVC is eligible for failover path (EP029)"""
559
        # In the future this function can be augmented to consider
560
        # primary/backup, primary/dynamic, and other path combinations
561 1
        return (
562
            self.dynamic_backup_path and
563
            not self.primary_path and not self.backup_path
564
        )
565
566 1
    def is_using_primary_path(self):
567
        """Verify if the current deployed path is self.primary_path."""
568 1
        return self.primary_path and (self.current_path == self.primary_path)
569
570 1
    def is_using_backup_path(self):
571
        """Verify if the current deployed path is self.backup_path."""
572 1
        return self.backup_path and (self.current_path == self.backup_path)
573
574 1
    def is_using_dynamic_path(self):
575
        """Verify if the current deployed path is a dynamic path."""
576 1
        if (
577
            self.current_path
578
            and not self.is_using_primary_path()
579
            and not self.is_using_backup_path()
580
            and self.current_path.status == EntityStatus.UP
581
        ):
582
            return True
583 1
        return False
584
585 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
586
        """Deploy the backup path into the datapaths of this circuit.
587
588
        If the backup_path attribute is valid and up, this method will try to
589
        deploy this backup_path.
590
591
        If everything fails and dynamic_backup_path is True, then tries to
592
        deploy a dynamic path.
593
        """
594
        # TODO: Remove flows from current (cookies)
595 1
        if self.is_using_backup_path():
596
            # TODO: Log to say that cannot move backup to backup
597
            return True
598
599 1
        success = False
600 1
        if self.backup_path.status is EntityStatus.UP:
601 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
602
603 1
        if success:
604 1
            return True
605
606 1
        if self.dynamic_backup_path or self.is_intra_switch():
607 1
            return self.deploy_to_path(old_path_dict=old_path_dict)
608
609
        return False
610
611 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
612
        """Deploy the primary path into the datapaths of this circuit.
613
614
        If the primary_path attribute is valid and up, this method will try to
615
        deploy this primary_path.
616
        """
617
        # TODO: Remove flows from current (cookies)
618 1
        if self.is_using_primary_path():
619
            # TODO: Log to say that cannot move primary to primary
620
            return True
621
622 1
        if self.primary_path.status is EntityStatus.UP:
623 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
624
        return False
625
626 1
    def deploy(self, old_path_dict: dict = None):
627
        """Deploy EVC to best path.
628
629
        Best path can be the primary path, if available. If not, the backup
630
        path, and, if it is also not available, a dynamic path.
631
        """
632 1
        if self.archived:
633 1
            return False
634 1
        self.enable()
635 1
        success = self.deploy_to_primary_path(old_path_dict)
636 1
        if not success:
637 1
            success = self.deploy_to_backup_path(old_path_dict)
638
639 1
        if success:
640 1
            emit_event(self._controller, "deployed",
641
                       content=map_evc_event_content(self))
642 1
        return success
643
644 1
    @staticmethod
645 1
    def get_path_status(path):
646
        """Check for the current status of a path.
647
648
        If any link in this path is down, the path is considered down.
649
        """
650 1
        if not path:
651 1
            return EntityStatus.DISABLED
652
653 1
        for link in path:
654 1
            if link.status is not EntityStatus.UP:
655 1
                return link.status
656 1
        return EntityStatus.UP
657
658
    #    def discover_new_path(self):
659
    #        # TODO: discover a new path to satisfy this circuit and deploy
660
661 1
    def remove(self):
662
        """Remove EVC path and disable it."""
663 1
        self.remove_current_flows(sync=False)
664 1
        self.remove_failover_flows(sync=False)
665 1
        self.disable()
666 1
        self.sync()
667 1
        emit_event(self._controller, "undeployed",
668
                   content=map_evc_event_content(self))
669
670 1
    def remove_failover_flows(self, exclude_uni_switches=True,
671
                              force=True, sync=True) -> None:
672
        """Remove failover_flows.
673
674
        By default, it'll exclude UNI switches, if mef_eline has already
675
        called remove_current_flows before then this minimizes the number
676
        of FlowMods and IO.
677
        """
678 1
        if not self.failover_path:
679 1
            return
680 1
        switches, cookie, excluded = set(), self.get_cookie(), set()
681 1
        if exclude_uni_switches:
682 1
            excluded.add(self.uni_a.interface.switch.id)
683 1
            excluded.add(self.uni_z.interface.switch.id)
684 1
        for link in self.failover_path:
685 1
            if link.endpoint_a.switch.id not in excluded:
686 1
                switches.add(link.endpoint_a.switch.id)
687 1
            if link.endpoint_b.switch.id not in excluded:
688 1
                switches.add(link.endpoint_b.switch.id)
689 1
        flow_mods = {
690
            "switches": list(switches),
691
            "flows": [{
692
                "cookie": cookie,
693
                "cookie_mask": int(0xffffffffffffffff),
694
                "owner": "mef_eline",
695
            }]
696
        }
697 1
        try:
698 1
            self._send_flow_mods(
699
                flow_mods,
700
                "delete",
701
                force=force,
702
            )
703
        except FlowModException as err:
704
            log.error(f"Error deleting {self} failover_path flows, {err}")
705 1
        try:
706 1
            self.failover_path.make_vlans_available(self._controller)
707
        except KytosTagError as err:
708
            log.error(f"Error removing {self} failover_path: {err}")
709 1
        self.failover_path = Path([])
710 1
        if sync:
711 1
            self.sync()
712
713 1
    def remove_current_flows(
714
        self,
715
        current_path=None,
716
        force=True,
717
        sync=True,
718
        return_path=False
719
    ) -> dict[str, int]:
720
        """Remove all flows from current path or path intended for
721
         current path if exists."""
722 1
        switches, old_path_dict = set(), {}
723 1
        current_path = self.current_path if not current_path else current_path
724 1
        if not current_path and not self.is_intra_switch():
725 1
            return {}
726
727 1
        if return_path:
728 1
            for link in self.current_path:
729 1
                s_vlan = link.metadata.get("s_vlan")
730 1
                if s_vlan:
731 1
                    old_path_dict[link.id] = s_vlan.value
732
733 1
        for link in current_path:
734 1
            switches.add(link.endpoint_a.switch.id)
735 1
            switches.add(link.endpoint_b.switch.id)
736 1
        switches.add(self.uni_a.interface.switch.id)
737 1
        switches.add(self.uni_z.interface.switch.id)
738 1
        flow_mods = {
739
            "switches": list(switches),
740
            "flows": [{
741
                "cookie": self.get_cookie(),
742
                "cookie_mask": int(0xffffffffffffffff),
743
                "owner": "mef_eline",
744
            }]
745
        }
746
747 1
        try:
748 1
            self._send_flow_mods(flow_mods, "delete", force=force)
749 1
        except FlowModException as err:
750 1
            log.error(f"Error deleting {self} current_path flows, {err}")
751
752 1
        try:
753 1
            current_path.make_vlans_available(self._controller)
754 1
        except KytosTagError as err:
755 1
            log.error(f"Error removing {self} current_path: {err}")
756 1
        self.current_path = Path([])
757 1
        self.deactivate()
758 1
        if sync:
759 1
            self.sync()
760 1
        return old_path_dict
761
762 1
    def remove_path_flows(
763
        self, path=None, force=True
764
    ) -> dict[str, list[dict]]:
765
        """Remove all flows from path, and return the removed flows."""
766 1
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
767 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
768
769 1
        if not path:
770 1
            return dpid_flows_match
771
772 1
        try:
773 1
            nni_flows = self._prepare_nni_flows(path)
774
        # pylint: disable=broad-except
775
        except Exception:
776
            err = traceback.format_exc()
777
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
778
            nni_flows = {}
779
780 1
        for dpid, flows in nni_flows.items():
781 1
            for flow in flows:
782 1
                flow_mod = {
783
                    "cookie": flow["cookie"],
784
                    "match": flow["match"],
785
                    "owner": "mef_eline",
786
                    "cookie_mask": int(0xffffffffffffffff)
787
                }
788 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
789 1
                out_flows[dpid].append(flow_mod)
790
791 1
        try:
792 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
793
        # pylint: disable=broad-except
794
        except Exception:
795
            err = traceback.format_exc()
796
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
797
            uni_flows = {}
798
799 1
        for dpid, flows in uni_flows.items():
800 1
            for flow in flows:
801 1
                flow_mod = {
802
                    "cookie": flow["cookie"],
803
                    "match": flow["match"],
804
                    "owner": "mef_eline",
805
                    "cookie_mask": int(0xffffffffffffffff)
806
                }
807 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
808 1
                out_flows[dpid].append(flow_mod)
809
810 1
        try:
811 1
            self._send_flow_mods(
812
                dpid_flows_match, 'delete', force=force, by_switch=True
813
            )
814 1
        except FlowModException as err:
815 1
            log.error(
816
                f"Error deleting {self} path flows, path:{path}, error={err}"
817
            )
818
819 1
        try:
820 1
            path.make_vlans_available(self._controller)
821
        except KytosTagError as err:
822
            log.error(f"Error removing {self} path: {err}")
823
824 1
        return out_flows
825
826 1
    @staticmethod
827 1
    def links_zipped(path=None):
828
        """Return an iterator which yields pairs of links in order."""
829 1
        if not path:
830 1
            return []
831 1
        return zip(path[:-1], path[1:])
832
833 1
    def should_deploy(self, path=None):
834
        """Verify if the circuit should be deployed."""
835 1
        if not path:
836 1
            log.debug("Path is empty.")
837 1
            return False
838
839 1
        if not self.is_enabled():
840 1
            log.debug(f"{self} is disabled.")
841 1
            return False
842
843 1
        if not self.is_active():
844 1
            log.debug(f"{self} will be deployed.")
845 1
            return True
846
847 1
        return False
848
849 1
    @staticmethod
850 1
    def is_uni_interface_active(
851
        *interfaces: Interface
852
    ) -> tuple[bool, dict]:
853
        """Whether UNIs are active and their status & status_reason."""
854 1
        active = True
855 1
        bad_interfaces = [
856
            interface
857
            for interface in interfaces
858
            if interface.status != EntityStatus.UP
859
        ]
860 1
        if bad_interfaces:
861 1
            active = False
862 1
            interfaces = bad_interfaces
863 1
        return active, {
864
            interface.id: {
865
                'status': interface.status.value,
866
                'status_reason': interface.status_reason,
867
            }
868
            for interface in interfaces
869
        }
870
871 1
    def try_to_activate(self) -> bool:
872
        """Try to activate the EVC."""
873 1
        if self.is_intra_switch():
874 1
            return self._try_to_activate_intra_evc()
875 1
        return self._try_to_activate_inter_evc()
876
877 1
    def _try_to_activate_intra_evc(self) -> bool:
878
        """Try to activate intra EVC."""
879 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
880 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
881 1
        if not is_active:
882 1
            raise ActivationError(
883
                f"Won't be able to activate {self} due to UNIs: {reason}"
884
            )
885 1
        self.activate()
886 1
        return True
887
888 1
    def _try_to_activate_inter_evc(self) -> bool:
889
        """Try to activate inter EVC."""
890 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
891 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
892 1
        if not is_active:
893 1
            raise ActivationError(
894
                f"Won't be able to activate {self} due to UNIs: {reason}"
895
            )
896 1
        if self.current_path.status != EntityStatus.UP:
897 1
            raise ActivationError(
898
                f"Won't be able to activate {self} due to current_path "
899
                f"status {self.current_path.status}"
900
            )
901 1
        self.activate()
902 1
        return True
903
904
    # pylint: disable=too-many-branches, too-many-statements
905 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
906
        """Install the flows for this circuit.
907
908
        Procedures to deploy:
909
910
        0. Remove current flows installed
911
        1. Decide if will deploy "path" or discover a new path
912
        2. Choose vlan
913
        3. Install NNI flows
914
        4. Install UNI flows
915
        5. Activate
916
        6. Update current_path
917
        7. Update links caches(primary, current, backup)
918
919
        """
920 1
        self.remove_current_flows(sync=False)
921 1
        use_path = path or Path([])
922 1
        if not old_path_dict:
923 1
            old_path_dict = {}
924 1
        tag_errors = []
925 1
        no_valid_path = False
926 1
        if self.should_deploy(use_path):
927 1
            try:
928 1
                use_path.choose_vlans(self._controller, old_path_dict)
929 1
            except KytosNoTagAvailableError as e:
930 1
                tag_errors.append(str(e))
931 1
                use_path = None
932
        else:
933 1
            for use_path in self.discover_new_paths():
934 1
                if use_path is None:
935 1
                    no_valid_path = True
936 1
                    continue
937 1
                try:
938 1
                    use_path.choose_vlans(self._controller, old_path_dict)
939 1
                    break
940 1
                except KytosNoTagAvailableError as e:
941 1
                    tag_errors.append(str(e))
942
            else:
943 1
                use_path = None
944
945 1
        try:
946 1
            if use_path:
947 1
                self._install_flows(use_path)
948 1
            elif self.is_intra_switch():
949 1
                use_path = Path()
950 1
                self._install_direct_uni_flows()
951
            else:
952 1
                no_path_msg = "No available path was found."
953 1
                if no_valid_path:
954 1
                    no_path_msg = "No valid path was found, "\
955
                                  "try increasing `max_paths`"\
956
                                 f" from {self.max_paths}."
957 1
                msg = f"{self} was not deployed. {no_path_msg}"
958 1
                if tag_errors:
959 1
                    msg = self.add_tag_errors(msg, tag_errors)
960 1
                    log.error(msg)
961
                else:
962 1
                    log.warning(msg)
963 1
                return False
964 1
        except EVCPathNotInstalled as err:
965 1
            log.error(
966
                f"Error deploying EVC {self} when calling flow_manager: {err}"
967
            )
968 1
            self.remove_current_flows(use_path, sync=True)
969 1
            return False
970
971 1
        self.current_path = use_path
972 1
        msg = f"{self} was deployed."
973 1
        try:
974 1
            self.try_to_activate()
975
        except ActivationError as exc:
976
            msg = f"{msg} {str(exc)}"
977 1
        self.sync()
978 1
        log.info(msg)
979 1
        return True
980
981
    # pylint: disable=too-many-statements
982 1
    def setup_failover_path(self, warn_if_not_path=True):
983
        """Install flows for the failover path of this EVC.
984
985
        Procedures to deploy:
986
987
        0. Remove flows currently installed for failover_path (if any)
988
        1. Discover a disjoint path from current_path
989
        2. Choose vlans
990
        3. Install NNI flows
991
        4. Install UNI egress flows
992
        5. Update failover_path
993
        """
994
        # Intra-switch EVCs have no failover_path
995 1
        if self.is_intra_switch():
996 1
            return False
997
998
        # For not only setup failover path for totally dynamic EVCs
999 1
        if not self.is_eligible_for_failover_path():
1000 1
            return False
1001
1002 1
        out_new_flows: dict[str, list[dict]] = {}
1003 1
        reason = ""
1004 1
        tag_errors = []
1005 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1006 1
        self.failover_path = Path([])
1007
1008 1
        for use_path in self.get_failover_path_candidates():
1009 1
            if not use_path:
1010 1
                continue
1011 1
            try:
1012 1
                use_path.choose_vlans(self._controller)
1013 1
                break
1014 1
            except KytosNoTagAvailableError as e:
1015 1
                tag_errors.append(str(e))
1016
        else:
1017 1
            use_path = Path([])
1018 1
            reason = "No available path was found"
1019
1020 1
        try:
1021 1
            if use_path:
1022 1
                out_new_flows = self._install_flows(
1023
                    use_path, skip_in=True
1024
                )
1025 1
        except EVCPathNotInstalled as err:
1026 1
            reason = "Error deploying failover path"
1027 1
            log.error(
1028
                f"{reason} for {self}. FlowManager error: {err}"
1029
            )
1030 1
            _rmed_flows = self.remove_path_flows(use_path)
1031 1
            out_removed_flows = merge_flow_dicts(
1032
                out_removed_flows, _rmed_flows
1033
            )
1034 1
            use_path = Path([])
1035
1036 1
        self.failover_path = use_path
1037 1
        self.sync()
1038
1039 1
        if out_new_flows or out_removed_flows:
1040 1
            emit_event(self._controller, "failover_deployed", content={
1041
                self.id: map_evc_event_content(
1042
                    self,
1043
                    flows=deepcopy(out_new_flows),
1044
                    removed_flows=deepcopy(out_removed_flows),
1045
                    error_reason=reason,
1046
                    current_path=self.current_path.as_dict(),
1047
                )
1048
            })
1049
1050 1
        if not use_path:
1051 1
            msg = f"Failover path for {self} was not deployed: {reason}."
1052 1
            if tag_errors:
1053 1
                msg = self.add_tag_errors(msg, tag_errors)
1054 1
                log.error(msg)
1055 1
            elif warn_if_not_path:
1056 1
                log.warning(msg)
1057 1
            return False
1058 1
        log.info(f"Failover path for {self} was deployed.")
1059 1
        return True
1060
1061 1
    @staticmethod
1062 1
    def add_tag_errors(msg: str, tag_errors: list):
1063
        """Add to msg the tag errors ecountered when chossing path."""
1064 1
        path = ['path', 'paths']
1065 1
        was = ['was', 'were']
1066 1
        message = ['message', 'messages']
1067
1068
        # Choose either singular(0) or plural(1) words
1069 1
        n = 1
1070 1
        if len(tag_errors) == 1:
1071 1
            n = 0
1072
1073 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1074 1
        msg += f" with {message[n]}: {tag_errors}"
1075 1
        return msg
1076
1077 1
    def get_failover_flows(self):
1078
        """Return the flows needed to make the failover path active, i.e. the
1079
        flows for ingress forwarding.
1080
1081
        Return:
1082
            dict: A dict of flows indexed by the switch_id will be returned, or
1083
                an empty dict if no failover_path is available.
1084
        """
1085 1
        if not self.failover_path:
1086 1
            return {}
1087 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1088
1089
    # pylint: disable=too-many-branches
1090 1
    def _prepare_direct_uni_flows(self):
1091
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1092 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1093 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1094
1095 1
        flow_mod_az = self._prepare_flow_mod(
1096
            self.uni_a.interface, self.uni_z.interface,
1097
            self.queue_id, vlan_a
1098
        )
1099 1
        flow_mod_za = self._prepare_flow_mod(
1100
            self.uni_z.interface, self.uni_a.interface,
1101
            self.queue_id, vlan_z
1102
        )
1103
1104 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1105 1
            flow_mod_az["actions"].insert(
1106
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1107
            )
1108 1
            if not vlan_a:
1109 1
                flow_mod_az["actions"].insert(
1110
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1111
                )
1112 1
            if vlan_a == 0:
1113 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1114 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1115 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1116
1117 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1118 1
            flow_mod_za["actions"].insert(
1119
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1120
                )
1121 1
            if not vlan_z:
1122 1
                flow_mod_za["actions"].insert(
1123
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1124
                )
1125 1
            if vlan_z == 0:
1126 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1127 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1128 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1129
1130 1
        flows = []
1131 1
        if isinstance(vlan_a, list):
1132 1
            for mask_a in vlan_a:
1133 1
                flow_aux = deepcopy(flow_mod_az)
1134 1
                flow_aux["match"]["dl_vlan"] = mask_a
1135 1
                flows.append(flow_aux)
1136
        else:
1137 1
            if vlan_a is not None:
1138 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1139 1
            flows.append(flow_mod_az)
1140
1141 1
        if isinstance(vlan_z, list):
1142 1
            for mask_z in vlan_z:
1143 1
                flow_aux = deepcopy(flow_mod_za)
1144 1
                flow_aux["match"]["dl_vlan"] = mask_z
1145 1
                flows.append(flow_aux)
1146
        else:
1147 1
            if vlan_z is not None:
1148 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1149 1
            flows.append(flow_mod_za)
1150 1
        return (
1151
            self.uni_a.interface.switch.id, flows
1152
        )
1153
1154 1
    def _install_direct_uni_flows(self):
1155
        """Install flows connecting two UNIs.
1156
1157
        This case happens when the circuit is between UNIs in the
1158
        same switch.
1159
        """
1160 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1161 1
        flow_mods = {"switches": [dpid], "flows": flows}
1162 1
        try:
1163 1
            self._send_flow_mods(flow_mods, "install")
1164 1
        except FlowModException as err:
1165 1
            raise EVCPathNotInstalled(str(err)) from err
1166
1167 1
    def _prepare_nni_flows(self, path=None):
1168
        """Prepare NNI flows."""
1169 1
        nni_flows = OrderedDict()
1170 1
        previous = self.uni_a.interface.switch.dpid
1171 1
        for incoming, outcoming in self.links_zipped(path):
1172 1
            in_vlan = incoming.get_metadata("s_vlan").value
1173 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1174 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1175 1
            out_endpoint = self.get_endpoint_by_id(
1176
                outcoming, in_endpoint.switch.id, eq
1177
            )
1178
1179 1
            flows = []
1180
            # Flow for one direction
1181 1
            flows.append(
1182
                self._prepare_nni_flow(
1183
                    in_endpoint,
1184
                    out_endpoint,
1185
                    in_vlan,
1186
                    out_vlan,
1187
                    queue_id=self.queue_id,
1188
                )
1189
            )
1190
1191
            # Flow for the other direction
1192 1
            flows.append(
1193
                self._prepare_nni_flow(
1194
                    out_endpoint,
1195
                    in_endpoint,
1196
                    out_vlan,
1197
                    in_vlan,
1198
                    queue_id=self.queue_id,
1199
                )
1200
            )
1201 1
            previous = in_endpoint.switch.id
1202 1
            nni_flows[in_endpoint.switch.id] = flows
1203 1
        return nni_flows
1204
1205 1
    def _install_flows(
1206
        self, path=None, skip_in=False, skip_out=False
1207
    ) -> dict[str, list[dict]]:
1208
        """Install uni and nni flows"""
1209 1
        flows_by_switch = defaultdict(lambda: {"flows": []})
1210 1
        new_flows = defaultdict(list)
1211 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1212 1
            flows_by_switch[dpid]["flows"].extend(flows)
1213 1
            new_flows[dpid].extend(flows)
1214 1
        for dpid, flows in self._prepare_uni_flows(
1215
            path, skip_in, skip_out
1216
        ).items():
1217 1
            flows_by_switch[dpid]["flows"].extend(flows)
1218 1
            new_flows[dpid].extend(flows)
1219
1220 1
        try:
1221 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1222 1
        except FlowModException as err:
1223 1
            raise EVCPathNotInstalled(str(err)) from err
1224
1225 1
        return new_flows
1226
1227 1
    @staticmethod
1228 1
    def _get_value_from_uni_tag(uni: UNI):
1229
        """Returns the value from tag. In case of any and untagged
1230
        it should return 4096/4096 and 0 respectively"""
1231 1
        special = {"any": "4096/4096", "untagged": 0}
1232 1
        if uni.user_tag:
1233 1
            value = uni.user_tag.value
1234 1
            if isinstance(value, list):
1235 1
                return uni.user_tag.mask_list
1236 1
            return special.get(value, value)
1237 1
        return None
1238
1239
    # pylint: disable=too-many-locals
1240 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1241
        """Prepare flows to install UNIs."""
1242 1
        uni_flows = {}
1243 1
        if not path:
1244
            log.info("install uni flows without path.")
1245
            return uni_flows
1246
1247
        # Determine VLANs
1248 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1249 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1250
1251 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1252 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1253
1254
        # Get endpoints from path
1255 1
        endpoint_a = self.get_endpoint_by_id(
1256
            path[0], self.uni_a.interface.switch.id, eq
1257
        )
1258 1
        endpoint_z = self.get_endpoint_by_id(
1259
            path[-1], self.uni_z.interface.switch.id, eq
1260
        )
1261
1262
        # Flows for the first UNI
1263 1
        flows_a = []
1264
1265
        # Flow for one direction, pushing the service tag
1266 1
        if not skip_in:
1267 1
            if isinstance(in_vlan_a, list):
1268 1
                for in_mask_a in in_vlan_a:
1269 1
                    push_flow = self._prepare_push_flow(
1270
                        self.uni_a.interface,
1271
                        endpoint_a,
1272
                        in_mask_a,
1273
                        out_vlan_a,
1274
                        in_vlan_z,
1275
                        queue_id=self.queue_id,
1276
                    )
1277 1
                    flows_a.append(push_flow)
1278
            else:
1279
                push_flow = self._prepare_push_flow(
1280
                    self.uni_a.interface,
1281
                    endpoint_a,
1282
                    in_vlan_a,
1283
                    out_vlan_a,
1284
                    in_vlan_z,
1285
                    queue_id=self.queue_id,
1286
                )
1287
                flows_a.append(push_flow)
1288
1289
        # Flow for the other direction, popping the service tag
1290 1
        if not skip_out:
1291 1
            pop_flow = self._prepare_pop_flow(
1292
                endpoint_a,
1293
                self.uni_a.interface,
1294
                out_vlan_a,
1295
                in_vlan_a,
1296
                in_vlan_z,
1297
                queue_id=self.queue_id,
1298
            )
1299 1
            flows_a.append(pop_flow)
1300
1301 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1302
1303
        # Flows for the second UNI
1304 1
        flows_z = []
1305
1306
        # Flow for one direction, pushing the service tag
1307 1
        if not skip_in:
1308 1
            if isinstance(in_vlan_z, list):
1309 1
                for in_mask_z in in_vlan_z:
1310 1
                    push_flow = self._prepare_push_flow(
1311
                        self.uni_z.interface,
1312
                        endpoint_z,
1313
                        in_mask_z,
1314
                        out_vlan_z,
1315
                        in_vlan_a,
1316
                        queue_id=self.queue_id,
1317
                    )
1318 1
                    flows_z.append(push_flow)
1319
            else:
1320
                push_flow = self._prepare_push_flow(
1321
                    self.uni_z.interface,
1322
                    endpoint_z,
1323
                    in_vlan_z,
1324
                    out_vlan_z,
1325
                    in_vlan_a,
1326
                    queue_id=self.queue_id,
1327
                )
1328
                flows_z.append(push_flow)
1329
1330
        # Flow for the other direction, popping the service tag
1331 1
        if not skip_out:
1332 1
            pop_flow = self._prepare_pop_flow(
1333
                endpoint_z,
1334
                self.uni_z.interface,
1335
                out_vlan_z,
1336
                in_vlan_z,
1337
                in_vlan_a,
1338
                queue_id=self.queue_id,
1339
            )
1340 1
            flows_z.append(pop_flow)
1341
1342 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1343
1344 1
        return uni_flows
1345
1346 1
    @staticmethod
1347 1
    @retry(
1348
        stop=stop_after_attempt(3),
1349
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1350
        retry=retry_if_exception_type(FlowModException),
1351
        before_sleep=before_sleep,
1352
        reraise=True,
1353
    )
1354 1
    def _send_flow_mods(
1355
        data_content: dict,
1356
        command="install",
1357
        force=False,
1358
        by_switch=False
1359
    ):
1360
        """Send a flow_mod list to a specific switch.
1361
1362
        Args:
1363
            dpid(str): The target of flows (i.e. Switch.id).
1364
            flow_mods(dict): Python dictionary with flow_mods.
1365
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1366
            force(bool): True to send via consistency check in case of errors.
1367
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1368
        """
1369 1
        if by_switch:
1370 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1371
        else:
1372 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1373 1
            data_content["force"] = force
1374 1
        try:
1375 1
            if command == "install":
1376 1
                res = httpx.post(endpoint, json=data_content, timeout=30)
1377 1
            elif command == "delete":
1378 1
                res = httpx.request(
1379
                    "DELETE", endpoint, json=data_content, timeout=30
1380
                )
1381 1
        except httpx.RequestError as err:
1382 1
            raise FlowModException(str(err)) from err
1383 1
        if res.is_server_error or res.status_code >= 400:
1384 1
            raise FlowModException(res.text)
1385
1386 1
    def get_cookie(self):
1387
        """Return the cookie integer from evc id."""
1388 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1389
1390 1
    @staticmethod
1391 1
    def get_id_from_cookie(cookie):
1392
        """Return the evc id given a cookie value."""
1393 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1394 1
        return f"{evc_id:x}".zfill(14)
1395
1396 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1397
        """Set table_group and table_id"""
1398 1
        table_group = "epl" if vlan is None else "evpl"
1399 1
        flow_mod["table_group"] = table_group
1400 1
        flow_mod["table_id"] = self.table_group[table_group]
1401 1
        return flow_mod
1402
1403 1
    @staticmethod
1404 1
    def get_priority(vlan):
1405
        """Return priority value depending on vlan value"""
1406 1
        if isinstance(vlan, list):
1407 1
            return settings.EVPL_SB_PRIORITY
1408 1
        if vlan not in {None, "4096/4096", 0}:
1409 1
            return settings.EVPL_SB_PRIORITY
1410 1
        if vlan == 0:
1411 1
            return settings.UNTAGGED_SB_PRIORITY
1412 1
        if vlan == "4096/4096":
1413 1
            return settings.ANY_SB_PRIORITY
1414 1
        return settings.EPL_SB_PRIORITY
1415
1416 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1417
                          queue_id=None, vlan=True):
1418
        """Prepare a common flow mod."""
1419 1
        default_actions = [
1420
            {"action_type": "output", "port": out_interface.port_number}
1421
        ]
1422 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1423 1
        if queue_id is not None:
1424 1
            default_actions.insert(
1425
                0,
1426
                {"action_type": "set_queue", "queue_id": queue_id}
1427
            )
1428
1429 1
        flow_mod = {
1430
            "match": {"in_port": in_interface.port_number},
1431
            "cookie": self.get_cookie(),
1432
            "actions": default_actions,
1433
            "owner": "mef_eline",
1434
        }
1435
1436 1
        self.set_flow_table_group_id(flow_mod, vlan)
1437 1
        if self.sb_priority:
1438 1
            flow_mod["priority"] = self.sb_priority
1439
        else:
1440 1
            flow_mod["priority"] = self.get_priority(vlan)
1441 1
        return flow_mod
1442
1443 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1444
        """Create NNI flows."""
1445 1
        in_interface, out_interface, in_vlan, out_vlan = args
1446 1
        flow_mod = self._prepare_flow_mod(
1447
            in_interface, out_interface, queue_id
1448
        )
1449 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1450 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1451 1
        flow_mod["actions"].insert(0, new_action)
1452
1453 1
        return flow_mod
1454
1455 1
    def _prepare_push_flow(self, *args, queue_id=None):
1456
        """Prepare push flow.
1457
1458
        Arguments:
1459
            in_interface(Interface): Interface input.
1460
            out_interface(Interface): Interface output.
1461
            in_vlan(int,str,None): Vlan input.
1462
            out_vlan(int): Vlan output.
1463
            new_c_vlan(int,str,list,None): New client vlan.
1464
1465
        Return:
1466
            dict: An python dictionary representing a FlowMod
1467
1468
        """
1469
        # assign all arguments
1470 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1471 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1472 1
        flow_mod = self._prepare_flow_mod(
1473
            in_interface, out_interface, queue_id, vlan_pri
1474
        )
1475
        # the service tag must be always pushed
1476 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1477 1
        flow_mod["actions"].insert(0, new_action)
1478
1479 1
        if (
1480
            not (in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1481
                 isinstance(new_c_vlan, int))
1482
        ):
1483
            # Add service VLAN header when it does NOT fall into this
1484
            # statement: Both VLANs should be integer and different.
1485 1
            new_action = {"action_type": "push_vlan", "tag_type": "s"}
1486 1
            flow_mod["actions"].insert(0, new_action)
1487
1488 1
        if in_vlan is not None:
1489
            # if in_vlan is set, it must be included in the match
1490 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1491
1492 1
        if (
1493
            not isinstance(in_vlan, int) and isinstance(new_c_vlan, int) and
1494
            new_c_vlan != 0
1495
        ):
1496
            # new_in_vlan is an integer but zero, action to set is required
1497 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1498 1
            flow_mod["actions"].insert(0, new_action)
1499
1500 1
        if in_vlan == "4096/4096" and new_c_vlan == 0:
1501
            # if in_vlan match with any tags and new_c_vlan does not,
1502
            # a pop action is required
1503 1
            new_action = {"action_type": "pop_vlan"}
1504 1
            flow_mod["actions"].insert(0, new_action)
1505
1506 1
        elif (not in_vlan and
1507
                (not isinstance(new_c_vlan, list) and
1508
                 new_c_vlan not in self.special_cases)):
1509
            # new_in_vlan is an integer but zero and in_vlan is a no-tag or
1510
            # untagged
1511 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1512 1
            flow_mod["actions"].insert(0, new_action)
1513
1514 1
        return flow_mod
1515
1516 1
    def _prepare_pop_flow(
1517
        self,
1518
        in_interface: Interface,
1519
        out_interface: Interface,
1520
        out_vlan: int,
1521
        in_vlan: Union[int, str, list, None],
1522
        new_c_vlan: Union[int, str, list, None],
1523
        queue_id=None,
1524
    ):
1525
        # pylint: disable=too-many-arguments
1526
        """Prepare pop flow."""
1527 1
        flow_mod = self._prepare_flow_mod(
1528
            in_interface, out_interface, queue_id
1529
        )
1530 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1531 1
        if in_vlan == 0:
1532 1
            new_action = {"action_type": "pop_vlan"}
1533 1
            flow_mod["actions"].insert(0, new_action)
1534 1
        elif (
1535
            in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1536
            isinstance(new_c_vlan, int)
1537
        ):
1538
            # If UNI VLANs are different and in_vlan is not 0
1539 1
            new_action = {"action_type": "set_vlan", "vlan_id": in_vlan}
1540 1
            flow_mod["actions"].insert(0, new_action)
1541
        else:
1542 1
            new_action = {"action_type": "pop_vlan"}
1543 1
            flow_mod["actions"].insert(0, new_action)
1544 1
        return flow_mod
1545
1546 1
    @staticmethod
1547 1
    def run_bulk_sdntraces(
1548
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1549
    ) -> dict:
1550
        """Run SDN traces on control plane starting from EVC UNIs."""
1551 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1552 1
        data = []
1553 1
        for interface, tag_value in uni_list:
1554 1
            data_uni = {
1555
                "trace": {
1556
                            "switch": {
1557
                                "dpid": interface.switch.dpid,
1558
                                "in_port": interface.port_number,
1559
                            }
1560
                        }
1561
                }
1562 1
            if tag_value:
1563 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1564 1
                if uni_dl_vlan:
1565 1
                    data_uni["trace"]["eth"] = {
1566
                                            "dl_type": 0x8100,
1567
                                            "dl_vlan": uni_dl_vlan,
1568
                                            }
1569 1
            data.append(data_uni)
1570 1
        try:
1571 1
            response = httpx.put(endpoint, json=data, timeout=30)
1572 1
        except httpx.TimeoutException as exception:
1573 1
            log.error(f"Request has timed out: {exception}")
1574 1
            return {"result": []}
1575 1
        if response.status_code >= 400:
1576 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1577 1
            return {"result": []}
1578 1
        return response.json()
1579
1580
    # pylint: disable=too-many-return-statements, too-many-arguments
1581 1
    @staticmethod
1582 1
    def check_trace(
1583
        evc_id: str,
1584
        evc_name: str,
1585
        tag_a: Union[None, int, str],
1586
        tag_z: Union[None, int, str],
1587
        interface_a: Interface,
1588
        interface_z: Interface,
1589
        current_path: list,
1590
        trace_a: list,
1591
        trace_z: list
1592
    ) -> bool:
1593
        """Auxiliar function to check an individual trace"""
1594 1
        if (
1595
            len(trace_a) != len(current_path) + 1
1596
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1597
        ):
1598 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1599
                        f"Invalid trace from uni_a: {trace_a}")
1600 1
            return False
1601 1
        if (
1602
            len(trace_z) != len(current_path) + 1
1603
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1604
        ):
1605 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1606
                        f"Invalid trace from uni_z: {trace_z}")
1607 1
            return False
1608
1609 1
        if not current_path:
1610
            return True
1611
1612 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1613 1
        if (
1614
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1615
        ):
1616 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1617 1
        elif (
1618
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1619
        ):
1620 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1621
        else:
1622
            msg = (
1623
                f"first link {first_link} endpoint_a didn't match the first "
1624
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1625
            )
1626
            log.warning(msg)
1627
            return False
1628
1629 1
        for link, trace1, trace2 in zip(current_path,
1630
                                        trace_path_begin[1:],
1631
                                        trace_path_end[:0:-1]):
1632 1
            metadata_vlan = None
1633 1
            if link.metadata:
1634 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1635 1
            if compare_endpoint_trace(
1636
                                        link.endpoint_a,
1637
                                        metadata_vlan,
1638
                                        trace2
1639
                                    ) is False:
1640 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1641
                            f"Invalid trace from uni_a: {trace_a}")
1642 1
                return False
1643 1
            if compare_endpoint_trace(
1644
                                        link.endpoint_b,
1645
                                        metadata_vlan,
1646
                                        trace1
1647
                                    ) is False:
1648 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1649
                            f"Invalid trace from uni_z: {trace_z}")
1650 1
                return False
1651
1652 1
        return True
1653
1654 1
    @staticmethod
1655 1
    def check_range(circuit, traces: list) -> bool:
1656
        """Check traces when for UNI with TAGRange"""
1657 1
        check = True
1658 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1659 1
            trace_a = traces[i*2]
1660 1
            trace_z = traces[i*2+1]
1661 1
            check &= EVCDeploy.check_trace(
1662
                circuit.id, circuit.name,
1663
                mask, mask,
1664
                circuit.uni_a.interface,
1665
                circuit.uni_z.interface,
1666
                circuit.current_path,
1667
                trace_a, trace_z,
1668
            )
1669 1
        return check
1670
1671 1
    @staticmethod
1672 1
    def check_list_traces(list_circuits: list) -> dict:
1673
        """Check if current_path is deployed comparing with SDN traces."""
1674 1
        if not list_circuits:
1675 1
            return {}
1676 1
        uni_list = make_uni_list(list_circuits)
1677 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1678
1679 1
        if not traces:
1680 1
            return {}
1681
1682 1
        try:
1683 1
            circuits_checked = {}
1684 1
            i = 0
1685 1
            for circuit in list_circuits:
1686 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1687 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1688 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1689
                        circuit, traces[i:i+length*2]
1690
                    )
1691 1
                    i += length*2
1692
                else:
1693 1
                    trace_a = traces[i]
1694 1
                    trace_z = traces[i+1]
1695 1
                    tag_a = None
1696 1
                    if circuit.uni_a.user_tag:
1697 1
                        tag_a = circuit.uni_a.user_tag.value
1698 1
                    tag_z = None
1699 1
                    if circuit.uni_z.user_tag:
1700 1
                        tag_z = circuit.uni_z.user_tag.value
1701 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1702
                        circuit.id, circuit.name,
1703
                        tag_a, tag_z,
1704
                        circuit.uni_a.interface,
1705
                        circuit.uni_z.interface,
1706
                        circuit.current_path,
1707
                        trace_a, trace_z
1708
                    )
1709 1
                    i += 2
1710 1
        except IndexError as err:
1711 1
            log.error(
1712
                f"Bulk sdntraces returned fewer items than expected."
1713
                f"Error = {err}"
1714
            )
1715 1
            return {}
1716
1717 1
        return circuits_checked
1718
1719 1
    @staticmethod
1720 1
    def get_endpoint_by_id(
1721
        link: Link,
1722
        id_: str,
1723
        operator: Union[eq, ne]
1724
    ) -> Interface:
1725
        """Return endpoint from link
1726
        either equal(eq) or not equal(ne) to id"""
1727 1
        if operator(link.endpoint_a.switch.id, id_):
1728 1
            return link.endpoint_a
1729 1
        return link.endpoint_b
1730
1731
1732 1
class LinkProtection(EVCDeploy):
1733
    """Class to handle link protection."""
1734
1735 1
    def is_affected_by_link(self, link=None):
1736
        """Verify if the current path is affected by link down event."""
1737
        return self.current_path.is_affected_by_link(link)
1738
1739 1
    def is_using_primary_path(self):
1740
        """Verify if the current deployed path is self.primary_path."""
1741 1
        return self.current_path == self.primary_path
1742
1743 1
    def is_using_backup_path(self):
1744
        """Verify if the current deployed path is self.backup_path."""
1745 1
        return self.current_path == self.backup_path
1746
1747 1
    def is_using_dynamic_path(self):
1748
        """Verify if the current deployed path is dynamic."""
1749 1
        if (
1750
            self.current_path
1751
            and not self.is_using_primary_path()
1752
            and not self.is_using_backup_path()
1753
            and self.current_path.status is EntityStatus.UP
1754
        ):
1755
            return True
1756 1
        return False
1757
1758 1
    def handle_link_up(self, link=None, interface=None):
1759
        """Handle circuit when link up.
1760
1761
        Args:
1762
            link(Link): Link affected by link.up event.
1763
1764
        """
1765 1
        condition_pairs = [
1766
            (
1767
                lambda me: me.is_using_primary_path(),
1768
                lambda _: (True, 'nothing')
1769
            ),
1770
            (
1771
                lambda me: me.is_intra_switch(),
1772
                lambda _: (True, 'nothing')
1773
            ),
1774
            (
1775
                lambda me: me.primary_path.is_affected_by_link(link),
1776
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1777
            ),
1778
            # For this special case, it reached this point because interface
1779
            # was previously confirmed to be a UNI and both UNI are UP
1780
            (
1781
                lambda me: (me.primary_path.status == EntityStatus.UP
1782
                            and interface),
1783
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1784
            ),
1785
            (
1786
                lambda me: (me.backup_path.status == EntityStatus.UP
1787
                            and interface),
1788
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1789
            ),
1790
            # We tried to deploy(primary_path) without success.
1791
            # And in this case is up by some how. Nothing to do.
1792
            (
1793
                lambda me: me.is_using_backup_path(),
1794
                lambda _: (True, 'nothing')
1795
            ),
1796
            (
1797
                lambda me:  me.is_using_dynamic_path(),
1798
                lambda _: (True, 'nothing')
1799
            ),
1800
            # In this case, probably the circuit is not being used and
1801
            # we can move to backup
1802
            (
1803
                lambda me: me.backup_path.is_affected_by_link(link),
1804
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1805
            ),
1806
            # In this case, the circuit is not being used and we should
1807
            # try a dynamic path
1808
            (
1809
                lambda me: me.dynamic_backup_path and not me.is_active(),
1810
                lambda me: (me.deploy_to_path(), 'redeploy')
1811
            )
1812
        ]
1813 1
        for predicate, action in condition_pairs:
1814 1
            if not predicate(self):
1815 1
                continue
1816 1
            success, succcess_type = action(self)
1817 1
            if success:
1818 1
                if succcess_type == 'redeploy':
1819 1
                    emit_event(
1820
                        self._controller,
1821
                        "redeployed_link_up",
1822
                        content=map_evc_event_content(self)
1823
                    )
1824 1
                return True
1825 1
        return False
1826
1827 1
    def handle_link_down(self):
1828
        """Handle circuit when link down.
1829
1830
        Returns:
1831
            bool: True if the re-deploy was successly otherwise False.
1832
1833
        """
1834 1
        success = False
1835 1
        if self.is_using_primary_path():
1836 1
            success = self.deploy_to_backup_path()
1837 1
        elif self.is_using_backup_path():
1838 1
            success = self.deploy_to_primary_path()
1839
1840 1
        if not success and self.dynamic_backup_path:
1841 1
            success = self.deploy_to_path()
1842
1843 1
        if success:
1844 1
            log.debug(f"{self} deployed after link down.")
1845
        else:
1846 1
            self.remove_current_flows(sync=False)
1847 1
            self.deactivate()
1848 1
            self.sync()
1849 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1850
1851 1
        return success
1852
1853 1
    def are_unis_active(self) -> bool:
1854
        """Determine whether this EVC should be active"""
1855 1
        interface_a = self.uni_a.interface
1856 1
        interface_z = self.uni_z.interface
1857 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1858 1
        return active
1859
1860 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1861
        """Try to handle UNI as link_up to trigger deployment."""
1862
        if (
1863
            self.current_path.status != EntityStatus.UP
1864
            and not self.is_intra_switch()
1865
        ):
1866
            succeeded = self.handle_link_up(interface=interface)
1867
            if succeeded:
1868
                msg = (
1869
                    f"Activated {self} due to successful "
1870
                    f"deployment triggered by {interface}"
1871
                )
1872
            else:
1873
                msg = (
1874
                    f"Couldn't activate {self} due to unsuccessful "
1875
                    f"deployment triggered by {interface}"
1876
                )
1877
            log.info(msg)
1878
            return True
1879
        return False
1880
1881 1
    def handle_interface_link_up(self, interface: Interface):
1882
        """
1883
        Handler for interface link_up events
1884
        """
1885 1
        if not _does_uni_affect_evc(self, interface, "up"):
1886 1
            return
1887 1
        if self.try_to_handle_uni_as_link_up(interface):
1888
            return
1889
1890 1
        interface_dicts = {
1891
            interface.id: {
1892
                'status': interface.status.value,
1893
                'status_reason': interface.status_reason,
1894
            }
1895
            for interface in (self.uni_a.interface, self.uni_z.interface)
1896
        }
1897 1
        try:
1898 1
            self.try_to_activate()
1899 1
            log.info(
1900
                f"Activating {self}. Interfaces: "
1901
                f"{interface_dicts}."
1902
            )
1903 1
            emit_event(self._controller, "uni_active_updated",
1904
                       content=map_evc_event_content(self))
1905 1
            self.sync()
1906
        except ActivationError as exc:
1907
            # On this ctx, no ActivationError isn't expected since the
1908
            # activation pre-requisites states were checked, so handled as err
1909
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1910
1911 1
    def handle_interface_link_down(self, interface):
1912
        """
1913
        Handler for interface link_down events
1914
        """
1915 1
        if not _does_uni_affect_evc(self, interface, "down"):
1916 1
            return
1917 1
        interface_dicts = {
1918
            interface.id: {
1919
                'status': interface.status.value,
1920
                'status_reason': interface.status_reason,
1921
            }
1922
            for interface in (self.uni_a.interface, self.uni_z.interface)
1923
            if interface.status != EntityStatus.UP
1924
        }
1925 1
        self.deactivate()
1926 1
        log.info(
1927
            f"Deactivating {self}. Interfaces: "
1928
            f"{interface_dicts}."
1929
        )
1930 1
        emit_event(self._controller, "uni_active_updated",
1931
                   content=map_evc_event_content(self))
1932 1
        self.sync()
1933
1934
1935 1
class EVC(LinkProtection):
1936
    """Class that represents a E-Line Virtual Connection."""
1937