Passed
Push — master ( 38a8d5...80fe45 )
by Aldo
13:53 queued 09:52
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (check_disabled_component,
30
                                         compare_endpoint_trace,
31
                                         compare_uni_out_trace, emit_event,
32
                                         make_uni_list, map_dl_vlan,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts)
35
36 1
from .path import DynamicPathManager, Path
37
38
39 1
class EVCBase(GenericEntity):
40
    """Class to represent a circuit."""
41
42 1
    attributes_requiring_redeploy = [
43
        "primary_path",
44
        "backup_path",
45
        "dynamic_backup_path",
46
        "queue_id",
47
        "sb_priority",
48
        "primary_constraints",
49
        "secondary_constraints",
50
        "uni_a",
51
        "uni_z",
52
    ]
53 1
    required_attributes = ["name", "uni_a", "uni_z"]
54
55 1
    updatable_attributes = {
56
        "uni_a",
57
        "uni_z",
58
        "name",
59
        "start_date",
60
        "end_date",
61
        "queue_id",
62
        "bandwidth",
63
        "primary_path",
64
        "backup_path",
65
        "dynamic_backup_path",
66
        "primary_constraints",
67
        "secondary_constraints",
68
        "owner",
69
        "sb_priority",
70
        "service_level",
71
        "circuit_scheduler",
72
        "metadata",
73
        "enabled"
74
    }
75
76
    # pylint: disable=too-many-statements
77 1
    def __init__(self, controller, **kwargs):
78
        """Create an EVC instance with the provided parameters.
79
80
        Args:
81
            id(str): EVC identifier. Whether it's None an ID will be genereted.
82
                     Only the first 14 bytes passed will be used.
83
            name: represents an EVC name.(Required)
84
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
85
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
86
            start_date(datetime|str): Date when the EVC was registred.
87
                                      Default is now().
88
            end_date(datetime|str): Final date that the EVC will be fineshed.
89
                                    Default is None.
90
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
91
            primary_links(list): Primary links used by evc. Default is []
92
            backup_links(list): Backups links used by evc. Default is []
93
            current_path(list): Circuit being used at the moment if this is an
94
                                active circuit. Default is [].
95
            failover_path(list): Path being used to provide EVC protection via
96
                                failover during link failures. Default is [].
97
            primary_path(list): primary circuit offered to user IF one or more
98
                                links were provided. Default is [].
99
            backup_path(list): backup circuit offered to the user IF one or
100
                               more links were provided. Default is [].
101
            dynamic_backup_path(bool): Enable computer backup path dynamically.
102
                                       Dafault is False.
103
            creation_time(datetime|str): datetime when the circuit should be
104
                                         activated. default is now().
105
            enabled(Boolean): attribute to indicate the administrative state;
106
                              default is False.
107
            active(Boolean): attribute to indicate the operational state;
108
                             default is False.
109
            archived(Boolean): indicate the EVC has been deleted and is
110
                               archived; default is False.
111
            owner(str): The EVC owner. Default is None.
112
            sb_priority(int): Service level provided in the request.
113
                              Default is None.
114
            service_level(int): Service level provided. The higher the better.
115
                                Default is 0.
116
117
        Raises:
118
            ValueError: raised when object attributes are invalid.
119
120
        """
121 1
        self._controller = controller
122 1
        self._validate(**kwargs)
123 1
        super().__init__()
124
125
        # required attributes
126 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
127 1
        self.uni_a: UNI = kwargs.get("uni_a")
128 1
        self.uni_z: UNI = kwargs.get("uni_z")
129 1
        self.name = kwargs.get("name")
130
131
        # optional attributes
132 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
133 1
        self.end_date = get_time(kwargs.get("end_date")) or None
134 1
        self.queue_id = kwargs.get("queue_id", -1)
135
136 1
        self.bandwidth = kwargs.get("bandwidth", 0)
137 1
        self.primary_links = Path(kwargs.get("primary_links", []))
138 1
        self.backup_links = Path(kwargs.get("backup_links", []))
139 1
        self.current_path = Path(kwargs.get("current_path", []))
140 1
        self.failover_path = Path(kwargs.get("failover_path", []))
141 1
        self.primary_path = Path(kwargs.get("primary_path", []))
142 1
        self.backup_path = Path(kwargs.get("backup_path", []))
143 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
144 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
145 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
146 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
147 1
        self.owner = kwargs.get("owner", None)
148 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
149
            "priority", None
150
        )
151 1
        self.service_level = kwargs.get("service_level", 0)
152 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
153 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
154 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
155 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
156 1
        self.current_links_cache = set()
157 1
        self.primary_links_cache = set()
158 1
        self.backup_links_cache = set()
159 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
160 1
        self.old_path = Path([])
161
162 1
        self.lock = Lock()
163
164 1
        self.archived = kwargs.get("archived", False)
165
166 1
        self.metadata = kwargs.get("metadata", {})
167
168 1
        self._mongo_controller = controllers.ELineController()
169
170 1
        if kwargs.get("active", False):
171 1
            self.activate()
172
        else:
173 1
            self.deactivate()
174
175 1
        if kwargs.get("enabled", False):
176 1
            self.enable()
177
        else:
178 1
            self.disable()
179
180
        # datetime of user request for a EVC (or datetime when object was
181
        # created)
182 1
        self.request_time = kwargs.get("request_time", now())
183
        # dict with the user original request (input)
184 1
        self._requested = kwargs
185
186
        # Special cases: No tag, any, untagged
187 1
        self.special_cases = {None, "4096/4096", 0}
188 1
        self.table_group = kwargs.get("table_group")
189
190 1
    def sync(self, keys: set = None):
191
        """Sync this EVC in the MongoDB."""
192 1
        self.updated_at = now()
193 1
        if keys:
194 1
            self._mongo_controller.update_evc(self.as_dict(keys))
195 1
            return
196 1
        self._mongo_controller.upsert_evc(self.as_dict())
197
198 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
199
        """Obtain both UNIs (uni_a, uni_z).
200
        If a UNI is changing, verify tags"""
201 1
        uni_a = kwargs.get("uni_a", None)
202 1
        uni_a_flag = False
203 1
        if uni_a and uni_a != self.uni_a:
204 1
            uni_a_flag = True
205 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
206
207 1
        uni_z = kwargs.get("uni_z", None)
208 1
        if uni_z and uni_z != self.uni_z:
209 1
            try:
210 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
211 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
212 1
            except KytosTagError as err:
213 1
                if uni_a_flag:
214 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
215 1
                raise err
216
        else:
217 1
            uni_z = self.uni_z
218
219 1
        if uni_a_flag:
220 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
221
        else:
222 1
            uni_a = self.uni_a
223 1
        return uni_a, uni_z
224
225 1
    def update(self, **kwargs):
226
        """Update evc attributes.
227
228
        This method will raises an error trying to change the following
229
        attributes: [creation_time, active, current_path, failover_path,
230
        _id, archived]
231
        [name, uni_a and uni_z]
232
233
        Returns:
234
            the values for enable and a redeploy attribute, if exists and None
235
            otherwise
236
        Raises:
237
            ValueError: message with error detail.
238
239
        """
240 1
        enable, redeploy = (None, None)
241 1
        if not self._tag_lists_equal(**kwargs):
242 1
            raise ValueError(
243
                "UNI_A and UNI_Z tag lists should be the same."
244
            )
245 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
246 1
        check_disabled_component(uni_a, uni_z)
247 1
        self._validate_has_primary_or_dynamic(
248
            primary_path=kwargs.get("primary_path"),
249
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
250
            uni_a=uni_a,
251
            uni_z=uni_z,
252
        )
253 1
        for attribute, value in kwargs.items():
254 1
            if attribute not in self.updatable_attributes:
255 1
                raise ValueError(f"{attribute} can't be updated.")
256 1
            if attribute in ("primary_path", "backup_path"):
257 1
                try:
258 1
                    value.is_valid(
259
                        uni_a.interface.switch, uni_z.interface.switch
260
                    )
261 1
                except InvalidPath as exception:
262 1
                    raise ValueError(  # pylint: disable=raise-missing-from
263
                        f"{attribute} is not a " f"valid path: {exception}"
264
                    )
265 1
        for attribute, value in kwargs.items():
266 1
            if attribute == "enabled":
267 1
                if value:
268 1
                    self.enable()
269
                else:
270 1
                    self.disable()
271 1
                enable = value
272
            else:
273 1
                setattr(self, attribute, value)
274 1
                if attribute in self.attributes_requiring_redeploy:
275 1
                    redeploy = True
276 1
        self.sync(set(kwargs.keys()))
277 1
        return enable, redeploy
278
279 1
    def set_flow_removed_at(self):
280
        """Update flow_removed_at attribute."""
281
        self.flow_removed_at = now()
282
283 1
    def has_recent_removed_flow(self, setting=settings):
284
        """Check if any flow has been removed from the evc"""
285
        if self.flow_removed_at is None:
286
            return False
287
        res_seconds = (now() - self.flow_removed_at).seconds
288
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
289
290 1
    def is_recent_updated(self, setting=settings):
291
        """Check if the evc has been updated recently"""
292
        res_seconds = (now() - self.updated_at).seconds
293
        return res_seconds < setting.TIME_RECENT_UPDATED
294
295 1
    def __repr__(self):
296
        """Repr method."""
297 1
        return f"EVC({self._id}, {self.name})"
298
299 1
    def _validate(self, **kwargs):
300
        """Do Basic validations.
301
302
        Verify required attributes: name, uni_a, uni_z
303
304
        Raises:
305
            ValueError: message with error detail.
306
307
        """
308 1
        for attribute in self.required_attributes:
309
310 1
            if attribute not in kwargs:
311 1
                raise ValueError(f"{attribute} is required.")
312
313 1
            if "uni" in attribute:
314 1
                uni = kwargs.get(attribute)
315 1
                if not isinstance(uni, UNI):
316
                    raise ValueError(f"{attribute} is an invalid UNI.")
317
318 1
    def _tag_lists_equal(self, **kwargs):
319
        """Verify that tag lists are the same."""
320 1
        uni_a = kwargs.get("uni_a") or self.uni_a
321 1
        uni_z = kwargs.get("uni_z") or self.uni_z
322 1
        uni_a_list = uni_z_list = False
323 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
324 1
            uni_a_list = True
325 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
326 1
            uni_z_list = True
327 1
        if uni_a_list and uni_z_list:
328 1
            return uni_a.user_tag.value == uni_z.user_tag.value
329 1
        return uni_a_list == uni_z_list
330
331 1
    def _validate_has_primary_or_dynamic(
332
        self,
333
        primary_path=None,
334
        dynamic_backup_path=None,
335
        uni_a=None,
336
        uni_z=None,
337
    ) -> None:
338
        """Validate that it must have a primary path or allow dynamic paths."""
339 1
        primary_path = (
340
            primary_path
341
            if primary_path is not None
342
            else self.primary_path
343
        )
344 1
        dynamic_backup_path = (
345
            dynamic_backup_path
346
            if dynamic_backup_path is not None
347
            else self.dynamic_backup_path
348
        )
349 1
        uni_a = uni_a if uni_a is not None else self.uni_a
350 1
        uni_z = uni_z if uni_z is not None else self.uni_z
351 1
        if (
352
            not primary_path
353
            and not dynamic_backup_path
354
            and uni_a and uni_z
355
            and uni_a.interface.switch != uni_z.interface.switch
356
        ):
357 1
            msg = "The EVC must have a primary path or allow dynamic paths."
358 1
            raise ValueError(msg)
359
360 1
    def __eq__(self, other):
361
        """Override the default implementation."""
362 1
        if not isinstance(other, EVC):
363
            return False
364
365 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
366 1
        for attribute in attrs_to_compare:
367 1
            if getattr(other, attribute) != getattr(self, attribute):
368 1
                return False
369 1
        return True
370
371 1
    def is_intra_switch(self):
372
        """Check if the UNIs are in the same switch."""
373 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
374
375 1
    def check_no_tag_duplicate(self, other_uni: UNI):
376
        """Check if a no tag UNI is duplicated."""
377 1
        if other_uni in (self.uni_a, self.uni_z):
378 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
379
                  f" duplicated with {self}."
380 1
            raise DuplicatedNoTagUNI(msg)
381
382 1
    def as_dict(self, keys: set = None):
383
        """Return a dictionary representing an EVC object.
384
            keys: Only fields on this variable will be
385
                  returned in the dictionary"""
386 1
        evc_dict = {
387
            "id": self.id,
388
            "name": self.name,
389
            "uni_a": self.uni_a.as_dict(),
390
            "uni_z": self.uni_z.as_dict(),
391
        }
392
393 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
394
395 1
        evc_dict["start_date"] = self.start_date
396 1
        if isinstance(self.start_date, datetime):
397 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
398
399 1
        evc_dict["end_date"] = self.end_date
400 1
        if isinstance(self.end_date, datetime):
401 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
402
403 1
        evc_dict["queue_id"] = self.queue_id
404 1
        evc_dict["bandwidth"] = self.bandwidth
405 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
406 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
407 1
        evc_dict["current_path"] = self.current_path.as_dict()
408 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
409 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
410 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
411 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
412 1
        evc_dict["metadata"] = self.metadata
413
414 1
        evc_dict["request_time"] = self.request_time
415 1
        if isinstance(self.request_time, datetime):
416 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
417
418 1
        time = self.creation_time.strftime(time_fmt)
419 1
        evc_dict["creation_time"] = time
420
421 1
        evc_dict["owner"] = self.owner
422 1
        evc_dict["circuit_scheduler"] = [
423
            sc.as_dict() for sc in self.circuit_scheduler
424
        ]
425
426 1
        evc_dict["active"] = self.is_active()
427 1
        evc_dict["enabled"] = self.is_enabled()
428 1
        evc_dict["archived"] = self.archived
429 1
        evc_dict["sb_priority"] = self.sb_priority
430 1
        evc_dict["service_level"] = self.service_level
431 1
        evc_dict["primary_constraints"] = self.primary_constraints
432 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
433 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
434 1
        evc_dict["updated_at"] = self.updated_at
435
436 1
        if keys:
437 1
            selected = {}
438 1
            for key in keys:
439 1
                selected[key] = evc_dict[key]
440 1
            selected["id"] = evc_dict["id"]
441 1
            return selected
442 1
        return evc_dict
443
444 1
    @property
445 1
    def id(self):  # pylint: disable=invalid-name
446
        """Return this EVC's ID."""
447 1
        return self._id
448
449 1
    def archive(self):
450
        """Archive this EVC on deletion."""
451 1
        self.archived = True
452
453 1
    def _use_uni_vlan(
454
        self,
455
        uni: UNI,
456
        uni_dif: Union[None, UNI] = None
457
    ):
458
        """Use tags from UNI"""
459 1
        if uni.user_tag is None:
460 1
            return
461 1
        tag = uni.user_tag.value
462 1
        tag_type = uni.user_tag.tag_type
463 1
        if (uni_dif and isinstance(tag, list) and
464
                isinstance(uni_dif.user_tag.value, list)):
465 1
            tag = range_difference(tag, uni_dif.user_tag.value)
466 1
            if not tag:
467 1
                return
468 1
        uni.interface.use_tags(
469
            self._controller, tag, tag_type, use_lock=True, check_order=False
470
        )
471
472 1
    def make_uni_vlan_available(
473
        self,
474
        uni: UNI,
475
        uni_dif: Union[None, UNI] = None,
476
    ):
477
        """Make available tag from UNI"""
478 1
        if uni.user_tag is None:
479 1
            return
480 1
        tag = uni.user_tag.value
481 1
        tag_type = uni.user_tag.tag_type
482 1
        if (uni_dif and isinstance(tag, list) and
483
                isinstance(uni_dif.user_tag.value, list)):
484 1
            tag = range_difference(tag, uni_dif.user_tag.value)
485 1
            if not tag:
486
                return
487 1
        try:
488 1
            conflict = uni.interface.make_tags_available(
489
                self._controller, tag, tag_type, use_lock=True,
490
                check_order=False
491
            )
492 1
        except KytosTagError as err:
493 1
            log.error(f"Error in {self}: {err}")
494 1
            return
495 1
        if conflict:
496 1
            intf = uni.interface.id
497 1
            log.warning(f"Tags {conflict} was already available in {intf}")
498
499 1
    def remove_uni_tags(self):
500
        """Remove both UNI usage of a tag"""
501 1
        self.make_uni_vlan_available(self.uni_a)
502 1
        self.make_uni_vlan_available(self.uni_z)
503
504
505
# pylint: disable=fixme, too-many-public-methods
506 1
class EVCDeploy(EVCBase):
507
    """Class to handle the deploy procedures."""
508
509 1
    def create(self):
510
        """Create a EVC."""
511
512 1
    def discover_new_paths(self):
513
        """Discover new paths to satisfy this circuit and deploy it."""
514
        return DynamicPathManager.get_best_paths(self,
515
                                                 **self.primary_constraints)
516
517 1
    def get_failover_path_candidates(self):
518
        """Get failover paths to satisfy this EVC."""
519
        # in the future we can return primary/backup paths as well
520
        # we just have to properly handle link_up and failover paths
521
        # if (
522
        #     self.is_using_primary_path() and
523
        #     self.backup_path.status is EntityStatus.UP
524
        # ):
525
        #     yield self.backup_path
526 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
527
528 1
    def change_path(self):
529
        """Change EVC path."""
530
531 1
    def reprovision(self):
532
        """Force the EVC (re-)provisioning."""
533
534 1
    def is_affected_by_link(self, link):
535
        """Return True if this EVC has the given link on its current path."""
536 1
        return link in self.current_path
537
538 1
    def link_affected_by_interface(self, interface):
539
        """Return True if this EVC has the given link on its current path."""
540
        return self.current_path.link_affected_by_interface(interface)
541
542 1
    def is_backup_path_affected_by_link(self, link):
543
        """Return True if the backup path of this EVC uses the given link."""
544 1
        return link in self.backup_path
545
546
    # pylint: disable=invalid-name
547 1
    def is_primary_path_affected_by_link(self, link):
548
        """Return True if the primary path of this EVC uses the given link."""
549 1
        return link in self.primary_path
550
551 1
    def is_failover_path_affected_by_link(self, link):
552
        """Return True if this EVC has the given link on its failover path."""
553 1
        return link in self.failover_path
554
555 1
    def is_eligible_for_failover_path(self):
556
        """Verify if this EVC is eligible for failover path (EP029)"""
557
        # In the future this function can be augmented to consider
558
        # primary/backup, primary/dynamic, and other path combinations
559 1
        return (
560
            self.dynamic_backup_path and
561
            not self.primary_path and not self.backup_path
562
        )
563
564 1
    def is_using_primary_path(self):
565
        """Verify if the current deployed path is self.primary_path."""
566 1
        return self.primary_path and (self.current_path == self.primary_path)
567
568 1
    def is_using_backup_path(self):
569
        """Verify if the current deployed path is self.backup_path."""
570 1
        return self.backup_path and (self.current_path == self.backup_path)
571
572 1
    def is_using_dynamic_path(self):
573
        """Verify if the current deployed path is a dynamic path."""
574 1
        if (
575
            self.current_path
576
            and not self.is_using_primary_path()
577
            and not self.is_using_backup_path()
578
            and self.current_path.status == EntityStatus.UP
579
        ):
580
            return True
581 1
        return False
582
583 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
584
        """Deploy the backup path into the datapaths of this circuit.
585
586
        If the backup_path attribute is valid and up, this method will try to
587
        deploy this backup_path.
588
589
        If everything fails and dynamic_backup_path is True, then tries to
590
        deploy a dynamic path.
591
        """
592
        # TODO: Remove flows from current (cookies)
593 1
        if self.is_using_backup_path():
594
            # TODO: Log to say that cannot move backup to backup
595
            return True
596
597 1
        success = False
598 1
        if self.backup_path.status is EntityStatus.UP:
599 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
600
601 1
        if success:
602 1
            return True
603
604 1
        if self.dynamic_backup_path or self.is_intra_switch():
605 1
            return self.deploy_to_path(old_path_dict=old_path_dict)
606
607
        return False
608
609 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
610
        """Deploy the primary path into the datapaths of this circuit.
611
612
        If the primary_path attribute is valid and up, this method will try to
613
        deploy this primary_path.
614
        """
615
        # TODO: Remove flows from current (cookies)
616 1
        if self.is_using_primary_path():
617
            # TODO: Log to say that cannot move primary to primary
618
            return True
619
620 1
        if self.primary_path.status is EntityStatus.UP:
621 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
622 1
        return False
623
624 1
    def deploy(self, old_path_dict: dict = None):
625
        """Deploy EVC to best path.
626
627
        Best path can be the primary path, if available. If not, the backup
628
        path, and, if it is also not available, a dynamic path.
629
        """
630 1
        if self.archived:
631 1
            return False
632 1
        self.enable()
633 1
        success = self.deploy_to_primary_path(old_path_dict)
634 1
        if not success:
635 1
            success = self.deploy_to_backup_path(old_path_dict)
636
637 1
        if success:
638 1
            emit_event(self._controller, "deployed",
639
                       content=map_evc_event_content(self))
640 1
        return success
641
642 1
    @staticmethod
643 1
    def get_path_status(path):
644
        """Check for the current status of a path.
645
646
        If any link in this path is down, the path is considered down.
647
        """
648 1
        if not path:
649 1
            return EntityStatus.DISABLED
650
651 1
        for link in path:
652 1
            if link.status is not EntityStatus.UP:
653 1
                return link.status
654 1
        return EntityStatus.UP
655
656
    #    def discover_new_path(self):
657
    #        # TODO: discover a new path to satisfy this circuit and deploy
658
659 1
    def remove(self):
660
        """Remove EVC path and disable it."""
661 1
        self.remove_current_flows(sync=False)
662 1
        self.remove_failover_flows(sync=False)
663 1
        self.disable()
664 1
        self.sync()
665 1
        emit_event(self._controller, "undeployed",
666
                   content=map_evc_event_content(self))
667
668 1
    def remove_failover_flows(self, exclude_uni_switches=True,
669
                              force=True, sync=True) -> None:
670
        """Remove failover_flows.
671
672
        By default, it'll exclude UNI switches, if mef_eline has already
673
        called remove_current_flows before then this minimizes the number
674
        of FlowMods and IO.
675
        """
676 1
        if not self.failover_path:
677 1
            return
678 1
        switches, cookie, excluded = set(), self.get_cookie(), set()
679 1
        if exclude_uni_switches:
680 1
            excluded.add(self.uni_a.interface.switch.id)
681 1
            excluded.add(self.uni_z.interface.switch.id)
682 1
        for link in self.failover_path:
683 1
            if link.endpoint_a.switch.id not in excluded:
684 1
                switches.add(link.endpoint_a.switch.id)
685 1
            if link.endpoint_b.switch.id not in excluded:
686 1
                switches.add(link.endpoint_b.switch.id)
687 1
        flow_mods = {
688
            "switches": list(switches),
689
            "flows": [{
690
                "cookie": cookie,
691
                "cookie_mask": int(0xffffffffffffffff),
692
                "owner": "mef_eline",
693
            }]
694
        }
695 1
        try:
696 1
            self._send_flow_mods(
697
                flow_mods,
698
                "delete",
699
                force=force,
700
            )
701
        except FlowModException as err:
702
            log.error(f"Error deleting {self} failover_path flows, {err}")
703 1
        try:
704 1
            self.failover_path.make_vlans_available(self._controller)
705
        except KytosTagError as err:
706
            log.error(f"Error removing {self} failover_path: {err}")
707 1
        self.failover_path = Path([])
708 1
        if sync:
709 1
            self.sync()
710
711 1
    def remove_current_flows(
712
        self,
713
        current_path=None,
714
        force=True,
715
        sync=True,
716
        return_path=False
717
    ) -> dict[str, int]:
718
        """Remove all flows from current path or path intended for
719
         current path if exists."""
720 1
        switches, old_path_dict = set(), {}
721 1
        current_path = self.current_path if not current_path else current_path
722 1
        if not current_path and not self.is_intra_switch():
723 1
            return {}
724
725 1
        if return_path:
726 1
            for link in self.current_path:
727 1
                s_vlan = link.metadata.get("s_vlan")
728 1
                if s_vlan:
729 1
                    old_path_dict[link.id] = s_vlan.value
730
731 1
        for link in current_path:
732 1
            switches.add(link.endpoint_a.switch.id)
733 1
            switches.add(link.endpoint_b.switch.id)
734 1
        switches.add(self.uni_a.interface.switch.id)
735 1
        switches.add(self.uni_z.interface.switch.id)
736 1
        flow_mods = {
737
            "switches": list(switches),
738
            "flows": [{
739
                "cookie": self.get_cookie(),
740
                "cookie_mask": int(0xffffffffffffffff),
741
                "owner": "mef_eline",
742
            }]
743
        }
744
745 1
        try:
746 1
            self._send_flow_mods(flow_mods, "delete", force=force)
747 1
        except FlowModException as err:
748 1
            log.error(f"Error deleting {self} current_path flows, {err}")
749
750 1
        try:
751 1
            current_path.make_vlans_available(self._controller)
752 1
        except KytosTagError as err:
753 1
            log.error(f"Error removing {self} current_path: {err}")
754 1
        self.current_path = Path([])
755 1
        self.deactivate()
756 1
        if sync:
757 1
            self.sync()
758 1
        return old_path_dict
759
760 1
    def remove_path_flows(
761
        self, path=None, force=True
762
    ) -> dict[str, list[dict]]:
763
        """Remove all flows from path, and return the removed flows."""
764 1
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
765 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
766
767 1
        if not path:
768 1
            return dpid_flows_match
769
770 1
        try:
771 1
            nni_flows = self._prepare_nni_flows(path)
772
        # pylint: disable=broad-except
773
        except Exception:
774
            err = traceback.format_exc().replace("\n", ", ")
775
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
776
            nni_flows = {}
777
778 1
        for dpid, flows in nni_flows.items():
779 1
            for flow in flows:
780 1
                flow_mod = {
781
                    "cookie": flow["cookie"],
782
                    "match": flow["match"],
783
                    "owner": "mef_eline",
784
                    "cookie_mask": int(0xffffffffffffffff)
785
                }
786 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
787 1
                out_flows[dpid].append(flow_mod)
788
789 1
        try:
790 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
791
        # pylint: disable=broad-except
792
        except Exception:
793
            err = traceback.format_exc().replace("\n", ", ")
794
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
795
            uni_flows = {}
796
797 1
        for dpid, flows in uni_flows.items():
798 1
            for flow in flows:
799 1
                flow_mod = {
800
                    "cookie": flow["cookie"],
801
                    "match": flow["match"],
802
                    "owner": "mef_eline",
803
                    "cookie_mask": int(0xffffffffffffffff)
804
                }
805 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
806 1
                out_flows[dpid].append(flow_mod)
807
808 1
        try:
809 1
            self._send_flow_mods(
810
                dpid_flows_match, 'delete', force=force, by_switch=True
811
            )
812 1
        except FlowModException as err:
813 1
            log.error(
814
                f"Error deleting {self} path flows, path:{path}, error={err}"
815
            )
816
817 1
        try:
818 1
            path.make_vlans_available(self._controller)
819
        except KytosTagError as err:
820
            log.error(f"Error removing {self} path: {err}")
821
822 1
        return out_flows
823
824 1
    @staticmethod
825 1
    def links_zipped(path=None):
826
        """Return an iterator which yields pairs of links in order."""
827 1
        if not path:
828 1
            return []
829 1
        return zip(path[:-1], path[1:])
830
831 1
    def should_deploy(self, path=None):
832
        """Verify if the circuit should be deployed."""
833 1
        if not path:
834 1
            log.debug("Path is empty.")
835 1
            return False
836
837 1
        if not self.is_enabled():
838 1
            log.debug(f"{self} is disabled.")
839 1
            return False
840
841 1
        if not self.is_active():
842 1
            log.debug(f"{self} will be deployed.")
843 1
            return True
844
845 1
        return False
846
847 1
    @staticmethod
848 1
    def is_uni_interface_active(
849
        *interfaces: Interface
850
    ) -> tuple[bool, dict]:
851
        """Whether UNIs are active and their status & status_reason."""
852 1
        active = True
853 1
        bad_interfaces = [
854
            interface
855
            for interface in interfaces
856
            if interface.status != EntityStatus.UP
857
        ]
858 1
        if bad_interfaces:
859 1
            active = False
860 1
            interfaces = bad_interfaces
861 1
        return active, {
862
            interface.id: {
863
                'status': interface.status.value,
864
                'status_reason': interface.status_reason,
865
            }
866
            for interface in interfaces
867
        }
868
869 1
    def try_to_activate(self) -> bool:
870
        """Try to activate the EVC."""
871 1
        if self.is_intra_switch():
872 1
            return self._try_to_activate_intra_evc()
873 1
        return self._try_to_activate_inter_evc()
874
875 1
    def _try_to_activate_intra_evc(self) -> bool:
876
        """Try to activate intra EVC."""
877 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
878 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
879 1
        if not is_active:
880 1
            raise ActivationError(
881
                f"Won't be able to activate {self} due to UNIs: {reason}"
882
            )
883 1
        self.activate()
884 1
        return True
885
886 1
    def _try_to_activate_inter_evc(self) -> bool:
887
        """Try to activate inter EVC."""
888 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
889 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
890 1
        if not is_active:
891 1
            raise ActivationError(
892
                f"Won't be able to activate {self} due to UNIs: {reason}"
893
            )
894 1
        if self.current_path.status != EntityStatus.UP:
895 1
            raise ActivationError(
896
                f"Won't be able to activate {self} due to current_path "
897
                f"status {self.current_path.status}"
898
            )
899 1
        self.activate()
900 1
        return True
901
902
    # pylint: disable=too-many-branches, too-many-statements
903 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
904
        """Install the flows for this circuit.
905
906
        Procedures to deploy:
907
908
        0. Remove current flows installed
909
        1. Decide if will deploy "path" or discover a new path
910
        2. Choose vlan
911
        3. Install NNI flows
912
        4. Install UNI flows
913
        5. Activate
914
        6. Update current_path
915
        7. Update links caches(primary, current, backup)
916
917
        """
918 1
        self.remove_current_flows(sync=False)
919 1
        use_path = path or Path([])
920 1
        if not old_path_dict:
921 1
            old_path_dict = {}
922 1
        tag_errors = []
923 1
        if self.should_deploy(use_path):
924 1
            try:
925 1
                use_path.choose_vlans(self._controller, old_path_dict)
926 1
            except KytosNoTagAvailableError as e:
927 1
                tag_errors.append(str(e))
928 1
                use_path = None
929
        else:
930 1
            for use_path in self.discover_new_paths():
931 1
                if use_path is None:
932
                    continue
933 1
                try:
934 1
                    use_path.choose_vlans(self._controller, old_path_dict)
935 1
                    break
936 1
                except KytosNoTagAvailableError as e:
937 1
                    tag_errors.append(str(e))
938
            else:
939 1
                use_path = None
940
941 1
        try:
942 1
            if use_path:
943 1
                self._install_flows(use_path)
944 1
            elif self.is_intra_switch():
945 1
                use_path = Path()
946 1
                self._install_direct_uni_flows()
947
            else:
948 1
                msg = f"{self} was not deployed. No available path was found."
949 1
                if tag_errors:
950 1
                    msg = self.add_tag_errors(msg, tag_errors)
951 1
                    log.error(msg)
952
                else:
953 1
                    log.warning(msg)
954 1
                return False
955 1
        except EVCPathNotInstalled as err:
956 1
            log.error(
957
                f"Error deploying EVC {self} when calling flow_manager: {err}"
958
            )
959 1
            self.remove_current_flows(use_path, sync=True)
960 1
            return False
961
962 1
        self.current_path = use_path
963 1
        msg = f"{self} was deployed."
964 1
        try:
965 1
            self.try_to_activate()
966
        except ActivationError as exc:
967
            msg = f"{msg} {str(exc)}"
968 1
        self.sync()
969 1
        log.info(msg)
970 1
        return True
971
972 1
    def try_setup_failover_path(
973
        self,
974
        wait=settings.DEPLOY_EVCS_INTERVAL,
975
        warn_if_not_path=True
976
    ):
977
        """Try setup failover_path whenever possible."""
978 1
        if (
979
                self.failover_path or not self.current_path
980
                or not self.is_active()
981
                ):
982 1
            return
983 1
        if (now() - self.affected_by_link_at).seconds >= wait:
984 1
            with self.lock:
985 1
                self.setup_failover_path(warn_if_not_path)
986
987
    # pylint: disable=too-many-statements
988 1
    def setup_failover_path(self, warn_if_not_path=True):
989
        """Install flows for the failover path of this EVC.
990
991
        Procedures to deploy:
992
993
        0. Remove flows currently installed for failover_path (if any)
994
        1. Discover a disjoint path from current_path
995
        2. Choose vlans
996
        3. Install NNI flows
997
        4. Install UNI egress flows
998
        5. Update failover_path
999
        """
1000
        # Intra-switch EVCs have no failover_path
1001 1
        if self.is_intra_switch():
1002 1
            return False
1003
1004
        # For not only setup failover path for totally dynamic EVCs
1005 1
        if not self.is_eligible_for_failover_path():
1006 1
            return False
1007
1008 1
        out_new_flows: dict[str, list[dict]] = {}
1009 1
        reason = ""
1010 1
        tag_errors = []
1011 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1012 1
        self.failover_path = Path([])
1013
1014 1
        for use_path in self.get_failover_path_candidates():
1015 1
            if not use_path:
1016 1
                continue
1017 1
            try:
1018 1
                use_path.choose_vlans(self._controller)
1019 1
                break
1020 1
            except KytosNoTagAvailableError as e:
1021 1
                tag_errors.append(str(e))
1022
        else:
1023 1
            use_path = Path([])
1024 1
            reason = "No available path was found"
1025
1026 1
        try:
1027 1
            if use_path:
1028 1
                out_new_flows = self._install_flows(
1029
                    use_path, skip_in=True
1030
                )
1031 1
        except EVCPathNotInstalled as err:
1032 1
            reason = "Error deploying failover path"
1033 1
            log.error(
1034
                f"{reason} for {self}. FlowManager error: {err}"
1035
            )
1036 1
            _rmed_flows = self.remove_path_flows(use_path)
1037 1
            out_removed_flows = merge_flow_dicts(
1038
                out_removed_flows, _rmed_flows
1039
            )
1040 1
            use_path = Path([])
1041
1042 1
        self.failover_path = use_path
1043 1
        self.sync()
1044
1045 1
        if out_new_flows or out_removed_flows:
1046 1
            emit_event(self._controller, "failover_deployed", content={
1047
                self.id: map_evc_event_content(
1048
                    self,
1049
                    flows=deepcopy(out_new_flows),
1050
                    removed_flows=deepcopy(out_removed_flows),
1051
                    error_reason=reason,
1052
                    current_path=self.current_path.as_dict(),
1053
                )
1054
            })
1055
1056 1
        if not use_path:
1057 1
            msg = f"Failover path for {self} was not deployed: {reason}."
1058 1
            if tag_errors:
1059 1
                msg = self.add_tag_errors(msg, tag_errors)
1060 1
                log.error(msg)
1061 1
            elif warn_if_not_path:
1062 1
                log.warning(msg)
1063 1
            return False
1064 1
        log.info(f"Failover path for {self} was deployed.")
1065 1
        return True
1066
1067 1
    @staticmethod
1068 1
    def add_tag_errors(msg: str, tag_errors: list):
1069
        """Add to msg the tag errors ecountered when chossing path."""
1070 1
        path = ['path', 'paths']
1071 1
        was = ['was', 'were']
1072 1
        message = ['message', 'messages']
1073
1074
        # Choose either singular(0) or plural(1) words
1075 1
        n = 1
1076 1
        if len(tag_errors) == 1:
1077 1
            n = 0
1078
1079 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1080 1
        msg += f" with {message[n]}: {tag_errors}"
1081 1
        return msg
1082
1083 1
    def get_failover_flows(self):
1084
        """Return the flows needed to make the failover path active, i.e. the
1085
        flows for ingress forwarding.
1086
1087
        Return:
1088
            dict: A dict of flows indexed by the switch_id will be returned, or
1089
                an empty dict if no failover_path is available.
1090
        """
1091 1
        if not self.failover_path:
1092 1
            return {}
1093 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1094
1095
    # pylint: disable=too-many-branches
1096 1
    def _prepare_direct_uni_flows(self):
1097
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1098 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1099 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1100
1101 1
        flow_mod_az = self._prepare_flow_mod(
1102
            self.uni_a.interface, self.uni_z.interface,
1103
            self.queue_id, vlan_a
1104
        )
1105 1
        flow_mod_za = self._prepare_flow_mod(
1106
            self.uni_z.interface, self.uni_a.interface,
1107
            self.queue_id, vlan_z
1108
        )
1109
1110 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1111 1
            flow_mod_az["actions"].insert(
1112
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1113
            )
1114 1
            if not vlan_a:
1115 1
                flow_mod_az["actions"].insert(
1116
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1117
                )
1118 1
            if vlan_a == 0:
1119 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1120 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1121 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1122
1123 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1124 1
            flow_mod_za["actions"].insert(
1125
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1126
                )
1127 1
            if not vlan_z:
1128 1
                flow_mod_za["actions"].insert(
1129
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1130
                )
1131 1
            if vlan_z == 0:
1132 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1133 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1134 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1135
1136 1
        flows = []
1137 1
        if isinstance(vlan_a, list):
1138 1
            for mask_a in vlan_a:
1139 1
                flow_aux = deepcopy(flow_mod_az)
1140 1
                flow_aux["match"]["dl_vlan"] = mask_a
1141 1
                flows.append(flow_aux)
1142
        else:
1143 1
            if vlan_a is not None:
1144 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1145 1
            flows.append(flow_mod_az)
1146
1147 1
        if isinstance(vlan_z, list):
1148 1
            for mask_z in vlan_z:
1149 1
                flow_aux = deepcopy(flow_mod_za)
1150 1
                flow_aux["match"]["dl_vlan"] = mask_z
1151 1
                flows.append(flow_aux)
1152
        else:
1153 1
            if vlan_z is not None:
1154 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1155 1
            flows.append(flow_mod_za)
1156 1
        return (
1157
            self.uni_a.interface.switch.id, flows
1158
        )
1159
1160 1
    def _install_direct_uni_flows(self):
1161
        """Install flows connecting two UNIs.
1162
1163
        This case happens when the circuit is between UNIs in the
1164
        same switch.
1165
        """
1166 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1167 1
        flow_mods = {"switches": [dpid], "flows": flows}
1168 1
        try:
1169 1
            self._send_flow_mods(flow_mods, "install")
1170 1
        except FlowModException as err:
1171 1
            raise EVCPathNotInstalled(str(err)) from err
1172
1173 1
    def _prepare_nni_flows(self, path=None):
1174
        """Prepare NNI flows."""
1175 1
        nni_flows = OrderedDict()
1176 1
        previous = self.uni_a.interface.switch.dpid
1177 1
        for incoming, outcoming in self.links_zipped(path):
1178 1
            in_vlan = incoming.get_metadata("s_vlan").value
1179 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1180 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1181 1
            out_endpoint = self.get_endpoint_by_id(
1182
                outcoming, in_endpoint.switch.id, eq
1183
            )
1184
1185 1
            flows = []
1186
            # Flow for one direction
1187 1
            flows.append(
1188
                self._prepare_nni_flow(
1189
                    in_endpoint,
1190
                    out_endpoint,
1191
                    in_vlan,
1192
                    out_vlan,
1193
                    queue_id=self.queue_id,
1194
                )
1195
            )
1196
1197
            # Flow for the other direction
1198 1
            flows.append(
1199
                self._prepare_nni_flow(
1200
                    out_endpoint,
1201
                    in_endpoint,
1202
                    out_vlan,
1203
                    in_vlan,
1204
                    queue_id=self.queue_id,
1205
                )
1206
            )
1207 1
            previous = in_endpoint.switch.id
1208 1
            nni_flows[in_endpoint.switch.id] = flows
1209 1
        return nni_flows
1210
1211 1
    def _install_flows(
1212
        self, path=None, skip_in=False, skip_out=False
1213
    ) -> dict[str, list[dict]]:
1214
        """Install uni and nni flows"""
1215 1
        flows_by_switch = defaultdict(lambda: {"flows": []})
1216 1
        new_flows = defaultdict(list)
1217 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1218 1
            flows_by_switch[dpid]["flows"].extend(flows)
1219 1
            new_flows[dpid].extend(flows)
1220 1
        for dpid, flows in self._prepare_uni_flows(
1221
            path, skip_in, skip_out
1222
        ).items():
1223 1
            flows_by_switch[dpid]["flows"].extend(flows)
1224 1
            new_flows[dpid].extend(flows)
1225
1226 1
        try:
1227 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1228 1
        except FlowModException as err:
1229 1
            raise EVCPathNotInstalled(str(err)) from err
1230
1231 1
        return new_flows
1232
1233 1
    @staticmethod
1234 1
    def _get_value_from_uni_tag(uni: UNI):
1235
        """Returns the value from tag. In case of any and untagged
1236
        it should return 4096/4096 and 0 respectively"""
1237 1
        special = {"any": "4096/4096", "untagged": 0}
1238 1
        if uni.user_tag:
1239 1
            value = uni.user_tag.value
1240 1
            if isinstance(value, list):
1241 1
                return uni.user_tag.mask_list
1242 1
            return special.get(value, value)
1243 1
        return None
1244
1245
    # pylint: disable=too-many-locals
1246 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1247
        """Prepare flows to install UNIs."""
1248 1
        uni_flows = {}
1249 1
        if not path:
1250
            log.info("install uni flows without path.")
1251
            return uni_flows
1252
1253
        # Determine VLANs
1254 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1255 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1256
1257 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1258 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1259
1260
        # Get endpoints from path
1261 1
        endpoint_a = self.get_endpoint_by_id(
1262
            path[0], self.uni_a.interface.switch.id, eq
1263
        )
1264 1
        endpoint_z = self.get_endpoint_by_id(
1265
            path[-1], self.uni_z.interface.switch.id, eq
1266
        )
1267
1268
        # Flows for the first UNI
1269 1
        flows_a = []
1270
1271
        # Flow for one direction, pushing the service tag
1272 1
        if not skip_in:
1273 1
            if isinstance(in_vlan_a, list):
1274 1
                for in_mask_a in in_vlan_a:
1275 1
                    push_flow = self._prepare_push_flow(
1276
                        self.uni_a.interface,
1277
                        endpoint_a,
1278
                        in_mask_a,
1279
                        out_vlan_a,
1280
                        in_vlan_z,
1281
                        queue_id=self.queue_id,
1282
                    )
1283 1
                    flows_a.append(push_flow)
1284
            else:
1285
                push_flow = self._prepare_push_flow(
1286
                    self.uni_a.interface,
1287
                    endpoint_a,
1288
                    in_vlan_a,
1289
                    out_vlan_a,
1290
                    in_vlan_z,
1291
                    queue_id=self.queue_id,
1292
                )
1293
                flows_a.append(push_flow)
1294
1295
        # Flow for the other direction, popping the service tag
1296 1
        if not skip_out:
1297 1
            pop_flow = self._prepare_pop_flow(
1298
                endpoint_a,
1299
                self.uni_a.interface,
1300
                out_vlan_a,
1301
                queue_id=self.queue_id,
1302
            )
1303 1
            flows_a.append(pop_flow)
1304
1305 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1306
1307
        # Flows for the second UNI
1308 1
        flows_z = []
1309
1310
        # Flow for one direction, pushing the service tag
1311 1
        if not skip_in:
1312 1
            if isinstance(in_vlan_z, list):
1313 1
                for in_mask_z in in_vlan_z:
1314 1
                    push_flow = self._prepare_push_flow(
1315
                        self.uni_z.interface,
1316
                        endpoint_z,
1317
                        in_mask_z,
1318
                        out_vlan_z,
1319
                        in_vlan_a,
1320
                        queue_id=self.queue_id,
1321
                    )
1322 1
                    flows_z.append(push_flow)
1323
            else:
1324
                push_flow = self._prepare_push_flow(
1325
                    self.uni_z.interface,
1326
                    endpoint_z,
1327
                    in_vlan_z,
1328
                    out_vlan_z,
1329
                    in_vlan_a,
1330
                    queue_id=self.queue_id,
1331
                )
1332
                flows_z.append(push_flow)
1333
1334
        # Flow for the other direction, popping the service tag
1335 1
        if not skip_out:
1336 1
            pop_flow = self._prepare_pop_flow(
1337
                endpoint_z,
1338
                self.uni_z.interface,
1339
                out_vlan_z,
1340
                queue_id=self.queue_id,
1341
            )
1342 1
            flows_z.append(pop_flow)
1343
1344 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1345
1346 1
        return uni_flows
1347
1348 1
    @staticmethod
1349 1
    @retry(
1350
        stop=stop_after_attempt(3),
1351
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1352
        retry=retry_if_exception_type(FlowModException),
1353
        before_sleep=before_sleep,
1354
        reraise=True,
1355
    )
1356 1
    def _send_flow_mods(
1357
        data_content: dict,
1358
        command="install",
1359
        force=False,
1360
        by_switch=False
1361
    ):
1362
        """Send a flow_mod list to a specific switch.
1363
1364
        Args:
1365
            dpid(str): The target of flows (i.e. Switch.id).
1366
            flow_mods(dict): Python dictionary with flow_mods.
1367
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1368
            force(bool): True to send via consistency check in case of errors.
1369
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1370
        """
1371 1
        if by_switch:
1372 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1373
        else:
1374 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1375 1
            data_content["force"] = force
1376 1
        try:
1377 1
            if command == "install":
1378 1
                res = httpx.post(endpoint, json=data_content, timeout=30)
1379 1
            elif command == "delete":
1380 1
                res = httpx.request(
1381
                    "DELETE", endpoint, json=data_content, timeout=30
1382
                )
1383 1
        except httpx.RequestError as err:
1384 1
            raise FlowModException(str(err)) from err
1385 1
        if res.is_server_error or res.status_code >= 400:
1386 1
            raise FlowModException(res.text)
1387
1388 1
    def get_cookie(self):
1389
        """Return the cookie integer from evc id."""
1390 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1391
1392 1
    @staticmethod
1393 1
    def get_id_from_cookie(cookie):
1394
        """Return the evc id given a cookie value."""
1395 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1396 1
        return f"{evc_id:x}".zfill(14)
1397
1398 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1399
        """Set table_group and table_id"""
1400 1
        table_group = "epl" if vlan is None else "evpl"
1401 1
        flow_mod["table_group"] = table_group
1402 1
        flow_mod["table_id"] = self.table_group[table_group]
1403 1
        return flow_mod
1404
1405 1
    @staticmethod
1406 1
    def get_priority(vlan):
1407
        """Return priority value depending on vlan value"""
1408 1
        if isinstance(vlan, list):
1409 1
            return settings.EVPL_SB_PRIORITY
1410 1
        if vlan not in {None, "4096/4096", 0}:
1411 1
            return settings.EVPL_SB_PRIORITY
1412 1
        if vlan == 0:
1413 1
            return settings.UNTAGGED_SB_PRIORITY
1414 1
        if vlan == "4096/4096":
1415 1
            return settings.ANY_SB_PRIORITY
1416 1
        return settings.EPL_SB_PRIORITY
1417
1418 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1419
                          queue_id=None, vlan=True):
1420
        """Prepare a common flow mod."""
1421 1
        default_actions = [
1422
            {"action_type": "output", "port": out_interface.port_number}
1423
        ]
1424 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1425 1
        if queue_id is not None:
1426 1
            default_actions.append(
1427
                {"action_type": "set_queue", "queue_id": queue_id}
1428
            )
1429
1430 1
        flow_mod = {
1431
            "match": {"in_port": in_interface.port_number},
1432
            "cookie": self.get_cookie(),
1433
            "actions": default_actions,
1434
            "owner": "mef_eline",
1435
        }
1436
1437 1
        self.set_flow_table_group_id(flow_mod, vlan)
1438 1
        if self.sb_priority:
1439 1
            flow_mod["priority"] = self.sb_priority
1440
        else:
1441 1
            flow_mod["priority"] = self.get_priority(vlan)
1442 1
        return flow_mod
1443
1444 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1445
        """Create NNI flows."""
1446 1
        in_interface, out_interface, in_vlan, out_vlan = args
1447 1
        flow_mod = self._prepare_flow_mod(
1448
            in_interface, out_interface, queue_id
1449
        )
1450 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1451 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1452 1
        flow_mod["actions"].insert(0, new_action)
1453
1454 1
        return flow_mod
1455
1456 1
    def _prepare_push_flow(self, *args, queue_id=None):
1457
        """Prepare push flow.
1458
1459
        Arguments:
1460
            in_interface(str): Interface input.
1461
            out_interface(str): Interface output.
1462
            in_vlan(int,str,None): Vlan input.
1463
            out_vlan(str): Vlan output.
1464
            new_c_vlan(int,str,list,None): New client vlan.
1465
1466
        Return:
1467
            dict: An python dictionary representing a FlowMod
1468
1469
        """
1470
        # assign all arguments
1471 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1472 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1473 1
        flow_mod = self._prepare_flow_mod(
1474
            in_interface, out_interface, queue_id, vlan_pri
1475
        )
1476
        # the service tag must be always pushed
1477 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1478 1
        flow_mod["actions"].insert(0, new_action)
1479
1480 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1481 1
        flow_mod["actions"].insert(0, new_action)
1482
1483 1
        if in_vlan is not None:
1484
            # if in_vlan is set, it must be included in the match
1485 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1486
1487 1
        if (not isinstance(new_c_vlan, list) and in_vlan != new_c_vlan and
1488
                new_c_vlan not in self.special_cases):
1489
            # new_in_vlan is an integer but zero, action to set is required
1490 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1491 1
            flow_mod["actions"].insert(0, new_action)
1492
1493 1
        if in_vlan not in self.special_cases and new_c_vlan == 0:
1494
            # # new_in_vlan is an integer but zero and new_c_vlan does not
1495
            # a pop action is required
1496 1
            new_action = {"action_type": "pop_vlan"}
1497 1
            flow_mod["actions"].insert(0, new_action)
1498
1499 1
        elif in_vlan == "4096/4096" and new_c_vlan == 0:
1500
            # if in_vlan match with any tags and new_c_vlan does not
1501
            # a pop action is required
1502 1
            new_action = {"action_type": "pop_vlan"}
1503 1
            flow_mod["actions"].insert(0, new_action)
1504
1505 1
        elif (not in_vlan and
1506
                (not isinstance(new_c_vlan, list) and
1507
                 new_c_vlan not in self.special_cases)):
1508
            # new_in_vlan is an integer but zero and in_vlan is not set
1509
            # then it is set now
1510 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1511 1
            flow_mod["actions"].insert(0, new_action)
1512
1513 1
        return flow_mod
1514
1515 1
    def _prepare_pop_flow(
1516
        self, in_interface, out_interface, out_vlan, queue_id=None
1517
    ):
1518
        # pylint: disable=too-many-arguments
1519
        """Prepare pop flow."""
1520 1
        flow_mod = self._prepare_flow_mod(
1521
            in_interface, out_interface, queue_id
1522
        )
1523 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1524 1
        new_action = {"action_type": "pop_vlan"}
1525 1
        flow_mod["actions"].insert(0, new_action)
1526 1
        return flow_mod
1527
1528 1
    @staticmethod
1529 1
    def run_bulk_sdntraces(
1530
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1531
    ) -> dict:
1532
        """Run SDN traces on control plane starting from EVC UNIs."""
1533 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1534 1
        data = []
1535 1
        for interface, tag_value in uni_list:
1536 1
            data_uni = {
1537
                "trace": {
1538
                            "switch": {
1539
                                "dpid": interface.switch.dpid,
1540
                                "in_port": interface.port_number,
1541
                            }
1542
                        }
1543
                }
1544 1
            if tag_value:
1545 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1546 1
                if uni_dl_vlan:
1547 1
                    data_uni["trace"]["eth"] = {
1548
                                            "dl_type": 0x8100,
1549
                                            "dl_vlan": uni_dl_vlan,
1550
                                            }
1551 1
            data.append(data_uni)
1552 1
        try:
1553 1
            response = httpx.put(endpoint, json=data, timeout=30)
1554 1
        except httpx.TimeoutException as exception:
1555 1
            log.error(f"Request has timed out: {exception}")
1556 1
            return {"result": []}
1557 1
        if response.status_code >= 400:
1558 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1559 1
            return {"result": []}
1560 1
        return response.json()
1561
1562
    # pylint: disable=too-many-return-statements, too-many-arguments
1563 1
    @staticmethod
1564 1
    def check_trace(
1565
        evc_id: str,
1566
        evc_name: str,
1567
        tag_a: Union[None, int, str],
1568
        tag_z: Union[None, int, str],
1569
        interface_a: Interface,
1570
        interface_z: Interface,
1571
        current_path: list,
1572
        trace_a: list,
1573
        trace_z: list
1574
    ) -> bool:
1575
        """Auxiliar function to check an individual trace"""
1576 1
        if (
1577
            len(trace_a) != len(current_path) + 1
1578
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1579
        ):
1580 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1581
                        f"Invalid trace from uni_a: {trace_a}")
1582 1
            return False
1583 1
        if (
1584
            len(trace_z) != len(current_path) + 1
1585
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1586
        ):
1587 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1588
                        f"Invalid trace from uni_z: {trace_z}")
1589 1
            return False
1590
1591 1
        if not current_path:
1592
            return True
1593
1594 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1595 1
        if (
1596
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1597
        ):
1598 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1599 1
        elif (
1600
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1601
        ):
1602 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1603
        else:
1604
            msg = (
1605
                f"first link {first_link} endpoint_a didn't match the first "
1606
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1607
            )
1608
            log.warning(msg)
1609
            return False
1610
1611 1
        for link, trace1, trace2 in zip(current_path,
1612
                                        trace_path_begin[1:],
1613
                                        trace_path_end[:0:-1]):
1614 1
            metadata_vlan = None
1615 1
            if link.metadata:
1616 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1617 1
            if compare_endpoint_trace(
1618
                                        link.endpoint_a,
1619
                                        metadata_vlan,
1620
                                        trace2
1621
                                    ) is False:
1622 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1623
                            f"Invalid trace from uni_a: {trace_a}")
1624 1
                return False
1625 1
            if compare_endpoint_trace(
1626
                                        link.endpoint_b,
1627
                                        metadata_vlan,
1628
                                        trace1
1629
                                    ) is False:
1630 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1631
                            f"Invalid trace from uni_z: {trace_z}")
1632 1
                return False
1633
1634 1
        return True
1635
1636 1
    @staticmethod
1637 1
    def check_range(circuit, traces: list) -> bool:
1638
        """Check traces when for UNI with TAGRange"""
1639 1
        check = True
1640 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1641 1
            trace_a = traces[i*2]
1642 1
            trace_z = traces[i*2+1]
1643 1
            check &= EVCDeploy.check_trace(
1644
                circuit.id, circuit.name,
1645
                mask, mask,
1646
                circuit.uni_a.interface,
1647
                circuit.uni_z.interface,
1648
                circuit.current_path,
1649
                trace_a, trace_z,
1650
            )
1651 1
        return check
1652
1653 1
    @staticmethod
1654 1
    def check_list_traces(list_circuits: list) -> dict:
1655
        """Check if current_path is deployed comparing with SDN traces."""
1656 1
        if not list_circuits:
1657 1
            return {}
1658 1
        uni_list = make_uni_list(list_circuits)
1659 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1660
1661 1
        if not traces:
1662 1
            return {}
1663
1664 1
        try:
1665 1
            circuits_checked = {}
1666 1
            i = 0
1667 1
            for circuit in list_circuits:
1668 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1669 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1670 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1671
                        circuit, traces[i:i+length*2]
1672
                    )
1673 1
                    i += length*2
1674
                else:
1675 1
                    trace_a = traces[i]
1676 1
                    trace_z = traces[i+1]
1677 1
                    tag_a = None
1678 1
                    if circuit.uni_a.user_tag:
1679 1
                        tag_a = circuit.uni_a.user_tag.value
1680 1
                    tag_z = None
1681 1
                    if circuit.uni_z.user_tag:
1682 1
                        tag_z = circuit.uni_z.user_tag.value
1683 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1684
                        circuit.id, circuit.name,
1685
                        tag_a, tag_z,
1686
                        circuit.uni_a.interface,
1687
                        circuit.uni_z.interface,
1688
                        circuit.current_path,
1689
                        trace_a, trace_z
1690
                    )
1691 1
                    i += 2
1692 1
        except IndexError as err:
1693 1
            log.error(
1694
                f"Bulk sdntraces returned fewer items than expected."
1695
                f"Error = {err}"
1696
            )
1697 1
            return {}
1698
1699 1
        return circuits_checked
1700
1701 1
    @staticmethod
1702 1
    def get_endpoint_by_id(
1703
        link: Link,
1704
        id_: str,
1705
        operator: Union[eq, ne]
1706
    ) -> Interface:
1707
        """Return endpoint from link
1708
        either equal(eq) or not equal(ne) to id"""
1709 1
        if operator(link.endpoint_a.switch.id, id_):
1710 1
            return link.endpoint_a
1711 1
        return link.endpoint_b
1712
1713
1714 1
class LinkProtection(EVCDeploy):
1715
    """Class to handle link protection."""
1716
1717 1
    def is_affected_by_link(self, link=None):
1718
        """Verify if the current path is affected by link down event."""
1719
        return self.current_path.is_affected_by_link(link)
1720
1721 1
    def is_using_primary_path(self):
1722
        """Verify if the current deployed path is self.primary_path."""
1723 1
        return self.current_path == self.primary_path
1724
1725 1
    def is_using_backup_path(self):
1726
        """Verify if the current deployed path is self.backup_path."""
1727 1
        return self.current_path == self.backup_path
1728
1729 1
    def is_using_dynamic_path(self):
1730
        """Verify if the current deployed path is dynamic."""
1731 1
        if (
1732
            self.current_path
1733
            and not self.is_using_primary_path()
1734
            and not self.is_using_backup_path()
1735
            and self.current_path.status is EntityStatus.UP
1736
        ):
1737
            return True
1738 1
        return False
1739
1740 1
    def handle_link_up(self, link):
1741
        """Handle circuit when link up.
1742
1743
        Args:
1744
            link(Link): Link affected by link.up event.
1745
1746
        """
1747 1
        condition_pairs = [
1748
            (
1749
                lambda me: me.is_using_primary_path(),
1750
                lambda _: (True, 'nothing')
1751
            ),
1752
            (
1753
                lambda me: me.is_intra_switch(),
1754
                lambda _: (True, 'nothing')
1755
            ),
1756
            (
1757
                lambda me: me.primary_path.is_affected_by_link(link),
1758
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1759
            ),
1760
            # We tried to deploy(primary_path) without success.
1761
            # And in this case is up by some how. Nothing to do.
1762
            (
1763
                lambda me: me.is_using_backup_path(),
1764
                lambda _: (True, 'nothing')
1765
            ),
1766
            (
1767
                lambda me:  me.is_using_dynamic_path(),
1768
                lambda _: (True, 'nothing')
1769
            ),
1770
            # In this case, probably the circuit is not being used and
1771
            # we can move to backup
1772
            (
1773
                lambda me: me.backup_path.is_affected_by_link(link),
1774
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1775
            ),
1776
            # In this case, the circuit is not being used and we should
1777
            # try a dynamic path
1778
            (
1779
                lambda me: me.dynamic_backup_path and not me.is_active(),
1780
                lambda me: (me.deploy_to_path(), 'redeploy')
1781
            )
1782
        ]
1783 1
        for predicate, action in condition_pairs:
1784 1
            if not predicate(self):
1785 1
                continue
1786 1
            success, succcess_type = action(self)
1787 1
            if success:
1788 1
                if succcess_type == 'redeploy':
1789 1
                    emit_event(
1790
                        self._controller,
1791
                        "redeployed_link_up",
1792
                        content=map_evc_event_content(self)
1793
                    )
1794 1
                return True
1795 1
        return False
1796
1797 1
    def handle_link_down(self):
1798
        """Handle circuit when link down.
1799
1800
        Returns:
1801
            bool: True if the re-deploy was successly otherwise False.
1802
1803
        """
1804 1
        success = False
1805 1
        if self.is_using_primary_path():
1806 1
            success = self.deploy_to_backup_path()
1807 1
        elif self.is_using_backup_path():
1808 1
            success = self.deploy_to_primary_path()
1809
1810 1
        if not success and self.dynamic_backup_path:
1811 1
            success = self.deploy_to_path()
1812
1813 1
        if success:
1814 1
            log.debug(f"{self} deployed after link down.")
1815
        else:
1816 1
            self.remove_current_flows(sync=False)
1817 1
            self.deactivate()
1818 1
            self.sync()
1819 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1820
1821 1
        return success
1822
1823 1
    @staticmethod
1824 1
    def get_interface_from_switch(uni: UNI, switches: dict) -> Interface:
1825
        """Get interface from switch by uni"""
1826 1
        switch = switches[uni.interface.switch.dpid]
1827 1
        interface = switch.interfaces[uni.interface.port_number]
1828 1
        return interface
1829
1830 1
    def are_unis_active(self, switches: dict) -> bool:
1831
        """Determine whether this EVC should be active"""
1832 1
        interface_a = self.get_interface_from_switch(self.uni_a, switches)
1833 1
        interface_z = self.get_interface_from_switch(self.uni_z, switches)
1834 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1835 1
        return active
1836
1837 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1838
        """Try to handle UNI as link_up to trigger deployment."""
1839
        if (
1840
            self.current_path.status != EntityStatus.UP
1841
            and not self.is_intra_switch()
1842
        ):
1843
            succeeded = self.handle_link_up(interface)
1844
            if succeeded:
1845
                msg = (
1846
                    f"Activated {self} due to successful "
1847
                    f"deployment triggered by {interface}"
1848
                )
1849
            else:
1850
                msg = (
1851
                    f"Couldn't activate {self} due to unsuccessful "
1852
                    f"deployment triggered by {interface}"
1853
                )
1854
            log.info(msg)
1855
            return True
1856
        return False
1857
1858 1
    def handle_interface_link_up(self, interface: Interface):
1859
        """
1860
        Handler for interface link_up events
1861
        """
1862 1
        if self.is_active():
1863 1
            return
1864 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1865 1
        if interface not in interfaces:
1866
            return
1867 1
        down_interfaces = [
1868
            interface
1869
            for interface in interfaces
1870
            if interface.status != EntityStatus.UP
1871
        ]
1872 1
        if down_interfaces:
1873
            return
1874 1
        if self.try_to_handle_uni_as_link_up(interface):
1875
            return
1876
1877 1
        interface_dicts = {
1878
            interface.id: {
1879
                'status': interface.status.value,
1880
                'status_reason': interface.status_reason,
1881
            }
1882
            for interface in interfaces
1883
        }
1884 1
        try:
1885 1
            self.try_to_activate()
1886 1
            log.info(
1887
                f"Activating {self}. Interfaces: "
1888
                f"{interface_dicts}."
1889
            )
1890 1
            emit_event(self._controller, "uni_active_updated",
1891
                       content=map_evc_event_content(self))
1892 1
            self.sync()
1893
        except ActivationError as exc:
1894
            # On this ctx, no ActivationError isn't expected since the
1895
            # activation pre-requisites states were checked, so handled as err
1896
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1897
1898 1
    def handle_interface_link_down(self, interface):
1899
        """
1900
        Handler for interface link_down events
1901
        """
1902 1
        if not self.is_active():
1903 1
            return
1904 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1905 1
        if interface not in interfaces:
1906
            return
1907 1
        down_interfaces = [
1908
            interface
1909
            for interface in interfaces
1910
            if interface.status != EntityStatus.UP
1911
        ]
1912 1
        if not down_interfaces:
1913
            return
1914 1
        interface_dicts = {
1915
            interface.id: {
1916
                'status': interface.status.value,
1917
                'status_reason': interface.status_reason,
1918
            }
1919
            for interface in down_interfaces
1920
        }
1921 1
        self.deactivate()
1922 1
        log.info(
1923
            f"Deactivating {self}. Interfaces: "
1924
            f"{interface_dicts}."
1925
        )
1926 1
        emit_event(self._controller, "uni_active_updated",
1927
                   content=map_evc_event_content(self))
1928 1
        self.sync()
1929
1930
1931 1
class EVC(LinkProtection):
1932
    """Class that represents a E-Line Virtual Connection."""
1933