Test Failed
Pull Request — master (#708)
by Aldo
06:14
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 29
CRAP Score 12.2563

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 29
cts 33
cp 0.8788
crap 12.2563
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc,
30
                                         compare_endpoint_trace,
31
                                         compare_uni_out_trace, emit_event,
32
                                         make_uni_list, map_dl_vlan,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts)
35
36 1
from .path import DynamicPathManager, Path
37
38
39 1
class EVCBase(GenericEntity):
40
    """Class to represent a circuit."""
41
42 1
    attributes_requiring_redeploy = [
43
        "primary_path",
44
        "backup_path",
45
        "dynamic_backup_path",
46
        "queue_id",
47
        "sb_priority",
48
        "primary_constraints",
49
        "secondary_constraints",
50
        "uni_a",
51
        "uni_z",
52
    ]
53 1
    required_attributes = ["name", "uni_a", "uni_z"]
54
55 1
    updatable_attributes = {
56
        "uni_a",
57
        "uni_z",
58
        "name",
59
        "start_date",
60
        "end_date",
61
        "queue_id",
62
        "bandwidth",
63
        "primary_path",
64
        "backup_path",
65
        "dynamic_backup_path",
66
        "primary_constraints",
67
        "secondary_constraints",
68
        "owner",
69
        "sb_priority",
70
        "service_level",
71
        "circuit_scheduler",
72
        "metadata",
73
        "enabled",
74
        "max_paths",
75
    }
76
77
    # pylint: disable=too-many-statements
78 1
    def __init__(self, controller, **kwargs):
79
        """Create an EVC instance with the provided parameters.
80
81
        Args:
82
            id(str): EVC identifier. Whether it's None an ID will be genereted.
83
                     Only the first 14 bytes passed will be used.
84
            name: represents an EVC name.(Required)
85
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
86
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
87
            start_date(datetime|str): Date when the EVC was registred.
88
                                      Default is now().
89
            end_date(datetime|str): Final date that the EVC will be fineshed.
90
                                    Default is None.
91
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
92
            primary_links(list): Primary links used by evc. Default is []
93
            backup_links(list): Backups links used by evc. Default is []
94
            current_path(list): Circuit being used at the moment if this is an
95
                                active circuit. Default is [].
96
            failover_path(list): Path being used to provide EVC protection via
97
                                failover during link failures. Default is [].
98
            primary_path(list): primary circuit offered to user IF one or more
99
                                links were provided. Default is [].
100
            backup_path(list): backup circuit offered to the user IF one or
101
                               more links were provided. Default is [].
102
            dynamic_backup_path(bool): Enable computer backup path dynamically.
103
                                       Dafault is False.
104
            creation_time(datetime|str): datetime when the circuit should be
105
                                         activated. default is now().
106
            enabled(Boolean): attribute to indicate the administrative state;
107
                              default is False.
108
            active(Boolean): attribute to indicate the operational state;
109
                             default is False.
110
            archived(Boolean): indicate the EVC has been deleted and is
111
                               archived; default is False.
112
            owner(str): The EVC owner. Default is None.
113
            sb_priority(int): Service level provided in the request.
114
                              Default is None.
115
            service_level(int): Service level provided. The higher the better.
116
                                Default is 0.
117
118
        Raises:
119
            ValueError: raised when object attributes are invalid.
120
121
        """
122 1
        self._controller = controller
123 1
        self._validate(**kwargs)
124 1
        super().__init__()
125
126
        # required attributes
127 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
128 1
        self.uni_a: UNI = kwargs.get("uni_a")
129 1
        self.uni_z: UNI = kwargs.get("uni_z")
130 1
        self.name = kwargs.get("name")
131
132
        # optional attributes
133 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
134 1
        self.end_date = get_time(kwargs.get("end_date")) or None
135 1
        self.queue_id = kwargs.get("queue_id", -1)
136
137 1
        self.bandwidth = kwargs.get("bandwidth", 0)
138 1
        self.primary_links = Path(kwargs.get("primary_links", []))
139 1
        self.backup_links = Path(kwargs.get("backup_links", []))
140 1
        self.current_path = Path(kwargs.get("current_path", []))
141 1
        self.failover_path = Path(kwargs.get("failover_path", []))
142 1
        self.primary_path = Path(kwargs.get("primary_path", []))
143 1
        self.backup_path = Path(kwargs.get("backup_path", []))
144 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
145 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
146 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
147 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
148 1
        self.owner = kwargs.get("owner", None)
149 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
150
            "priority", None
151
        )
152 1
        self.service_level = kwargs.get("service_level", 0)
153 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
154 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
155 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
156 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
157 1
        self.current_links_cache = set()
158 1
        self.primary_links_cache = set()
159 1
        self.backup_links_cache = set()
160 1
        self.old_path = Path([])
161 1
        self.max_paths = kwargs.get("max_paths", 2)
162
163 1
        self.lock = Lock()
164
165 1
        self.archived = kwargs.get("archived", False)
166
167 1
        self.metadata = kwargs.get("metadata", {})
168
169 1
        self._mongo_controller = controllers.ELineController()
170
171 1
        if kwargs.get("active", False):
172 1
            self.activate()
173
        else:
174 1
            self.deactivate()
175
176 1
        if kwargs.get("enabled", False):
177 1
            self.enable()
178
        else:
179 1
            self.disable()
180
181
        # datetime of user request for a EVC (or datetime when object was
182
        # created)
183 1
        self.request_time = kwargs.get("request_time", now())
184
        # dict with the user original request (input)
185 1
        self._requested = kwargs
186
187
        # Special cases: No tag, any, untagged
188 1
        self.special_cases = {None, "4096/4096", 0}
189 1
        self.table_group = kwargs.get("table_group")
190
191 1
    def sync(self, keys: set = None):
192
        """Sync this EVC in the MongoDB."""
193 1
        self.updated_at = now()
194 1
        if keys:
195 1
            self._mongo_controller.update_evc(self.as_dict(keys))
196 1
            return
197 1
        self._mongo_controller.upsert_evc(self.as_dict())
198
199 1
    def _get_unis(self, **kwargs) -> tuple[UNI, UNI]:
200
        """Get the updated UNIs."""
201
        uni_a = kwargs.get("uni_a", None) or self.uni_a
202 1
        uni_z = kwargs.get("uni_z", None) or self.uni_z
203 1
        return uni_a, uni_z
204 1
205 1
    def _get_unis_use_tags(self, uni_a, uni_z) -> tuple[UNI, UNI]:
206 1
        """Obtain both UNIs (uni_a, uni_z).
207
        If a UNI is changing, verify tags"""
208 1
        uni_a_flag = False
209 1
        if uni_a != self.uni_a:
210 1
            uni_a_flag = True
211 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
212 1
213 1
        if uni_z != self.uni_z:
214 1
            try:
215 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
216 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
217
            except KytosTagError as err:
218 1
                if uni_a_flag:
219
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
220 1
                raise err
221 1
        else:
222
            uni_z = self.uni_z
223 1
224 1
        if uni_a_flag:
225
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
226 1
        else:
227
            uni_a = self.uni_a
228
        return uni_a, uni_z
229
230
    def update(self, **kwargs):
231
        """Update evc attributes.
232
233
        This method will raises an error trying to change the following
234
        attributes: [creation_time, active, current_path, failover_path,
235
        _id, archived]
236
        [name, uni_a and uni_z]
237
238
        Returns:
239
            the values for enable and a redeploy attribute, if exists and None
240
            otherwise
241 1
        Raises:
242 1
            ValueError: message with error detail.
243 1
244
        """
245
        enable, redeploy = (None, None)
246 1
        if not self._tag_lists_equal(**kwargs):
247 1
            raise ValueError(
248
                "UNI_A and UNI_Z tag lists should be the same."
249
            )
250
251
        uni_a, uni_z = self._get_unis(**kwargs)
252
        self._validate_has_primary_or_dynamic(
253 1
            primary_path=kwargs.get("primary_path"),
254 1
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
255 1
            uni_a=uni_a,
256 1
            uni_z=uni_z,
257 1
        )
258 1
        valid_path = False
259
        for attribute, value in kwargs.items():
260
            if attribute not in self.updatable_attributes:
261 1
                raise ValueError(f"{attribute} can't be updated.")
262 1
            if attribute in ("primary_path", "backup_path"):
263
                try:
264
                    value.is_valid(
265 1
                        uni_a.interface.switch, uni_z.interface.switch
266 1
                    )
267 1
                    valid_path = True
268 1
                except InvalidPath as exception:
269
                    raise ValueError(  # pylint: disable=raise-missing-from
270 1
                        f"{attribute} is not a valid path: {exception}"
271 1
                    )
272
        if not valid_path:
273 1
            try:
274 1
                self.primary_path.is_valid(
275 1
                    uni_a.interface.switch, uni_z.interface.switch
276 1
                )
277 1
                self.backup_path.is_valid(
278
                    uni_a.interface.switch, uni_z.interface.switch
279 1
                )
280
                valid_path = True
281
            except InvalidPath as exception:
282
                raise ValueError(  # pylint: disable=raise-missing-from
283 1
                        f"{attribute} is not a valid path: {exception}"
284
                    )
285
        uni_a, uni_z = self._get_unis_use_tags(uni_a, uni_z)
286
        for attribute, value in kwargs.items():
287
            if attribute == "enabled":
288
                if value:
289
                    self.enable()
290 1
                else:
291
                    self.disable()
292
                enable = value
293
            else:
294
                setattr(self, attribute, value)
295 1
                if attribute in self.attributes_requiring_redeploy:
296
                    redeploy = True
297 1
        self.sync(set(kwargs.keys()))
298
        return enable, redeploy
299 1
300
    def set_flow_removed_at(self):
301
        """Update flow_removed_at attribute."""
302
        self.flow_removed_at = now()
303
304
    def has_recent_removed_flow(self, setting=settings):
305
        """Check if any flow has been removed from the evc"""
306
        if self.flow_removed_at is None:
307
            return False
308 1
        res_seconds = (now() - self.flow_removed_at).seconds
309
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
310 1
311 1
    def is_recent_updated(self, setting=settings):
312
        """Check if the evc has been updated recently"""
313 1
        res_seconds = (now() - self.updated_at).seconds
314 1
        return res_seconds < setting.TIME_RECENT_UPDATED
315 1
316
    def __repr__(self):
317
        """Repr method."""
318 1
        return f"EVC({self._id}, {self.name})"
319
320 1
    def _validate(self, **kwargs):
321 1
        """Do Basic validations.
322 1
323 1
        Verify required attributes: name, uni_a, uni_z
324 1
325 1
        Raises:
326 1
            ValueError: message with error detail.
327 1
328 1
        """
329 1
        for attribute in self.required_attributes:
330
331 1
            if attribute not in kwargs:
332
                raise ValueError(f"{attribute} is required.")
333
334
            if "uni" in attribute:
335
                uni = kwargs.get(attribute)
336
                if not isinstance(uni, UNI):
337
                    raise ValueError(f"{attribute} is an invalid UNI.")
338
339 1
    def _tag_lists_equal(self, **kwargs):
340
        """Verify that tag lists are the same."""
341
        uni_a = kwargs.get("uni_a") or self.uni_a
342
        uni_z = kwargs.get("uni_z") or self.uni_z
343
        uni_a_list = uni_z_list = False
344 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
345
            uni_a_list = True
346
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
347
            uni_z_list = True
348
        if uni_a_list and uni_z_list:
349 1
            return uni_a.user_tag.value == uni_z.user_tag.value
350 1
        return uni_a_list == uni_z_list
351 1
352
    def _validate_has_primary_or_dynamic(
353
        self,
354
        primary_path=None,
355
        dynamic_backup_path=None,
356
        uni_a=None,
357 1
        uni_z=None,
358 1
    ) -> None:
359
        """Validate that it must have a primary path or allow dynamic paths."""
360 1
        primary_path = (
361
            primary_path
362 1
            if primary_path is not None
363
            else self.primary_path
364
        )
365 1
        dynamic_backup_path = (
366 1
            dynamic_backup_path
367 1
            if dynamic_backup_path is not None
368 1
            else self.dynamic_backup_path
369 1
        )
370
        uni_a = uni_a if uni_a is not None else self.uni_a
371 1
        uni_z = uni_z if uni_z is not None else self.uni_z
372
        if (
373 1
            not primary_path
374
            and not dynamic_backup_path
375 1
            and uni_a and uni_z
376
            and uni_a.interface.switch != uni_z.interface.switch
377 1
        ):
378 1
            msg = "The EVC must have a primary path or allow dynamic paths."
379
            raise ValueError(msg)
380 1
381
    def __eq__(self, other):
382 1
        """Override the default implementation."""
383
        if not isinstance(other, EVC):
384
            return False
385
386 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
387
        for attribute in attrs_to_compare:
388
            if getattr(other, attribute) != getattr(self, attribute):
389
                return False
390
        return True
391
392
    def is_intra_switch(self):
393 1
        """Check if the UNIs are in the same switch."""
394
        return self.uni_a.interface.switch == self.uni_z.interface.switch
395 1
396 1
    def check_no_tag_duplicate(self, other_uni: UNI):
397 1
        """Check if a no tag UNI is duplicated."""
398
        if other_uni in (self.uni_a, self.uni_z):
399 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
400 1
                  f" duplicated with {self}."
401 1
            raise DuplicatedNoTagUNI(msg)
402
403 1
    def as_dict(self, keys: set = None):
404 1
        """Return a dictionary representing an EVC object.
405 1
            keys: Only fields on this variable will be
406 1
                  returned in the dictionary"""
407 1
        evc_dict = {
408 1
            "id": self.id,
409 1
            "name": self.name,
410 1
            "uni_a": self.uni_a.as_dict(),
411 1
            "uni_z": self.uni_z.as_dict(),
412 1
        }
413
414 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
415 1
416 1
        evc_dict["start_date"] = self.start_date
417
        if isinstance(self.start_date, datetime):
418 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
419 1
420
        evc_dict["end_date"] = self.end_date
421 1
        if isinstance(self.end_date, datetime):
422 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
423
424
        evc_dict["queue_id"] = self.queue_id
425
        evc_dict["bandwidth"] = self.bandwidth
426 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
427 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
428 1
        evc_dict["current_path"] = self.current_path.as_dict()
429 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
430 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
431 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
432 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
433 1
        evc_dict["metadata"] = self.metadata
434 1
435 1
        evc_dict["request_time"] = self.request_time
436
        if isinstance(self.request_time, datetime):
437 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
438 1
439 1
        time = self.creation_time.strftime(time_fmt)
440 1
        evc_dict["creation_time"] = time
441 1
442 1
        evc_dict["owner"] = self.owner
443 1
        evc_dict["circuit_scheduler"] = [
444
            sc.as_dict() for sc in self.circuit_scheduler
445 1
        ]
446 1
447
        evc_dict["active"] = self.is_active()
448 1
        evc_dict["enabled"] = self.is_enabled()
449
        evc_dict["archived"] = self.archived
450 1
        evc_dict["sb_priority"] = self.sb_priority
451
        evc_dict["service_level"] = self.service_level
452 1
        evc_dict["primary_constraints"] = self.primary_constraints
453
        evc_dict["secondary_constraints"] = self.secondary_constraints
454 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
455
        evc_dict["updated_at"] = self.updated_at
456
        evc_dict["max_paths"] = self.max_paths
457
458
        if keys:
459
            selected = {}
460 1
            for key in keys:
461 1
                selected[key] = evc_dict[key]
462 1
            selected["id"] = evc_dict["id"]
463 1
            return selected
464 1
        return evc_dict
465
466 1
    @property
467 1
    def id(self):  # pylint: disable=invalid-name
468 1
        """Return this EVC's ID."""
469 1
        return self._id
470
471
    def archive(self):
472
        """Archive this EVC on deletion."""
473 1
        self.archived = True
474
475
    def _use_uni_vlan(
476
        self,
477
        uni: UNI,
478
        uni_dif: Union[None, UNI] = None
479 1
    ):
480 1
        """Use tags from UNI"""
481 1
        if uni.user_tag is None:
482 1
            return
483 1
        tag = uni.user_tag.value
484
        tag_type = uni.user_tag.tag_type
485 1
        if (uni_dif and isinstance(tag, list) and
486 1
                isinstance(uni_dif.user_tag.value, list)):
487
            tag = range_difference(tag, uni_dif.user_tag.value)
488 1
            if not tag:
489 1
                return
490
        uni.interface.use_tags(
491
            self._controller, tag, tag_type, use_lock=True, check_order=False
492
        )
493 1
494 1
    def make_uni_vlan_available(
495 1
        self,
496 1
        uni: UNI,
497 1
        uni_dif: Union[None, UNI] = None,
498 1
    ):
499
        """Make available tag from UNI"""
500 1
        if uni.user_tag is None:
501
            return
502 1
        tag = uni.user_tag.value
503 1
        tag_type = uni.user_tag.tag_type
504
        if (uni_dif and isinstance(tag, list) and
505
                isinstance(uni_dif.user_tag.value, list)):
506
            tag = range_difference(tag, uni_dif.user_tag.value)
507 1
            if not tag:
508
                return
509
        try:
510 1
            conflict = uni.interface.make_tags_available(
511
                self._controller, tag, tag_type, use_lock=True,
512
                check_order=False
513 1
            )
514
        except KytosTagError as err:
515
            log.error(f"Error in {self}: {err}")
516
            return
517
        if conflict:
518 1
            intf = uni.interface.id
519
            log.warning(f"Tags {conflict} was already available in {intf}")
520
521
    def remove_uni_tags(self):
522
        """Remove both UNI usage of a tag"""
523
        self.make_uni_vlan_available(self.uni_a)
524
        self.make_uni_vlan_available(self.uni_z)
525
526
527 1
# pylint: disable=fixme, too-many-public-methods
528
class EVCDeploy(EVCBase):
529 1
    """Class to handle the deploy procedures."""
530
531
    def create(self):
532 1
        """Create a EVC."""
533
534
    def discover_new_paths(self):
535 1
        """Discover new paths to satisfy this circuit and deploy it."""
536
        return DynamicPathManager.get_best_paths(self, self.max_paths,
537 1
                                                 **self.primary_constraints)
538
539 1
    def get_failover_path_candidates(self):
540
        """Get failover paths to satisfy this EVC."""
541
        # in the future we can return primary/backup paths as well
542
        # we just have to properly handle link_up and failover paths
543 1
        # if (
544
        #     self.is_using_primary_path() and
545 1
        #     self.backup_path.status is EntityStatus.UP
546
        # ):
547
        #     yield self.backup_path
548 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
549
550 1
    def change_path(self):
551
        """Change EVC path."""
552 1
553
    def reprovision(self):
554 1
        """Force the EVC (re-)provisioning."""
555
556 1
    def is_affected_by_link(self, link):
557
        """Return True if this EVC has the given link on its current path."""
558
        return link in self.current_path
559
560 1
    def link_affected_by_interface(self, interface):
561
        """Return True if this EVC has the given link on its current path."""
562
        return self.current_path.link_affected_by_interface(interface)
563
564
    def is_backup_path_affected_by_link(self, link):
565 1
        """Return True if the backup path of this EVC uses the given link."""
566
        return link in self.backup_path
567 1
568
    # pylint: disable=invalid-name
569 1
    def is_primary_path_affected_by_link(self, link):
570
        """Return True if the primary path of this EVC uses the given link."""
571 1
        return link in self.primary_path
572
573 1
    def is_failover_path_affected_by_link(self, link):
574
        """Return True if this EVC has the given link on its failover path."""
575 1
        return link in self.failover_path
576
577
    def is_eligible_for_failover_path(self):
578
        """Verify if this EVC is eligible for failover path (EP029)"""
579
        # In the future this function can be augmented to consider
580
        # primary/backup, primary/dynamic, and other path combinations
581
        return (
582 1
            self.dynamic_backup_path and
583
            not self.primary_path and not self.backup_path
584 1
        )
585
586
    def is_using_primary_path(self):
587
        """Verify if the current deployed path is self.primary_path."""
588
        return self.primary_path and (self.current_path == self.primary_path)
589
590
    def is_using_backup_path(self):
591
        """Verify if the current deployed path is self.backup_path."""
592
        return self.backup_path and (self.current_path == self.backup_path)
593
594 1
    def is_using_dynamic_path(self):
595
        """Verify if the current deployed path is a dynamic path."""
596
        if (
597
            self.current_path
598 1
            and not self.is_using_primary_path()
599 1
            and not self.is_using_backup_path()
600 1
            and self.current_path.status == EntityStatus.UP
601
        ):
602 1
            return True
603 1
        return False
604
605 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
606 1
        """Deploy the backup path into the datapaths of this circuit.
607
608
        If the backup_path attribute is valid and up, this method will try to
609
        deploy this backup_path.
610 1
611
        If everything fails and dynamic_backup_path is True, then tries to
612
        deploy a dynamic path.
613
        """
614
        # TODO: Remove flows from current (cookies)
615
        if self.is_using_backup_path():
616
            # TODO: Log to say that cannot move backup to backup
617 1
            return True
618
619
        success = False
620
        if self.backup_path.status is EntityStatus.UP:
621 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
622 1
623
        if success:
624
            return True
625 1
626
        if self.dynamic_backup_path or self.is_intra_switch():
627
            return self.deploy_to_path(old_path_dict=old_path_dict)
628
629
        return False
630
631 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
632 1
        """Deploy the primary path into the datapaths of this circuit.
633 1
634 1
        If the primary_path attribute is valid and up, this method will try to
635 1
        deploy this primary_path.
636 1
        """
637
        # TODO: Remove flows from current (cookies)
638 1
        if self.is_using_primary_path():
639 1
            # TODO: Log to say that cannot move primary to primary
640
            return True
641 1
642
        if self.primary_path.status is EntityStatus.UP:
643 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
644 1
        return False
645
646
    def deploy(self, old_path_dict: dict = None):
647
        """Deploy EVC to best path.
648
649 1
        Best path can be the primary path, if available. If not, the backup
650 1
        path, and, if it is also not available, a dynamic path.
651
        """
652 1
        if self.archived:
653 1
            return False
654 1
        self.enable()
655 1
        success = self.deploy_to_primary_path(old_path_dict)
656
        if not success:
657
            success = self.deploy_to_backup_path(old_path_dict)
658
659
        if success:
660 1
            emit_event(self._controller, "deployed",
661
                       content=map_evc_event_content(self))
662 1
        return success
663 1
664 1
    @staticmethod
665 1
    def get_path_status(path):
666 1
        """Check for the current status of a path.
667
668
        If any link in this path is down, the path is considered down.
669 1
        """
670
        if not path:
671
            return EntityStatus.DISABLED
672
673
        for link in path:
674
            if link.status is not EntityStatus.UP:
675
                return link.status
676
        return EntityStatus.UP
677 1
678 1
    #    def discover_new_path(self):
679 1
    #        # TODO: discover a new path to satisfy this circuit and deploy
680 1
681 1
    def remove(self):
682 1
        """Remove EVC path and disable it."""
683 1
        self.remove_current_flows(sync=False)
684 1
        self.remove_failover_flows(sync=False)
685 1
        self.disable()
686 1
        self.sync()
687 1
        emit_event(self._controller, "undeployed",
688 1
                   content=map_evc_event_content(self))
689
690
    def remove_failover_flows(self, exclude_uni_switches=True,
691
                              force=True, sync=True) -> None:
692
        """Remove failover_flows.
693
694
        By default, it'll exclude UNI switches, if mef_eline has already
695
        called remove_current_flows before then this minimizes the number
696 1
        of FlowMods and IO.
697 1
        """
698
        if not self.failover_path:
699
            return
700
        switches, cookie, excluded = set(), self.get_cookie(), set()
701
        if exclude_uni_switches:
702
            excluded.add(self.uni_a.interface.switch.id)
703
            excluded.add(self.uni_z.interface.switch.id)
704 1
        for link in self.failover_path:
705 1
            if link.endpoint_a.switch.id not in excluded:
706
                switches.add(link.endpoint_a.switch.id)
707
            if link.endpoint_b.switch.id not in excluded:
708 1
                switches.add(link.endpoint_b.switch.id)
709 1
        flow_mods = {
710 1
            "switches": list(switches),
711
            "flows": [{
712 1
                "cookie": cookie,
713
                "cookie_mask": int(0xffffffffffffffff),
714
                "owner": "mef_eline",
715
            }]
716
        }
717
        try:
718
            self._send_flow_mods(
719
                flow_mods,
720
                "delete",
721 1
                force=force,
722 1
            )
723 1
        except FlowModException as err:
724 1
            log.error(f"Error deleting {self} failover_path flows, {err}")
725
        try:
726 1
            self.failover_path.make_vlans_available(self._controller)
727 1
        except KytosTagError as err:
728 1
            log.error(f"Error removing {self} failover_path: {err}")
729 1
        self.failover_path = Path([])
730 1
        if sync:
731
            self.sync()
732 1
733 1
    def remove_current_flows(
734 1
        self,
735 1
        current_path=None,
736 1
        force=True,
737 1
        sync=True,
738
        return_path=False
739
    ) -> dict[str, int]:
740
        """Remove all flows from current path or path intended for
741
         current path if exists."""
742
        switches, old_path_dict = set(), {}
743
        current_path = self.current_path if not current_path else current_path
744
        if not current_path and not self.is_intra_switch():
745
            return {}
746 1
747 1
        if return_path:
748 1
            for link in self.current_path:
749 1
                s_vlan = link.metadata.get("s_vlan")
750
                if s_vlan:
751 1
                    old_path_dict[link.id] = s_vlan.value
752 1
753 1
        for link in current_path:
754 1
            switches.add(link.endpoint_a.switch.id)
755 1
            switches.add(link.endpoint_b.switch.id)
756 1
        switches.add(self.uni_a.interface.switch.id)
757 1
        switches.add(self.uni_z.interface.switch.id)
758 1
        flow_mods = {
759 1
            "switches": list(switches),
760
            "flows": [{
761 1
                "cookie": self.get_cookie(),
762
                "cookie_mask": int(0xffffffffffffffff),
763
                "owner": "mef_eline",
764
            }]
765 1
        }
766 1
767
        try:
768 1
            self._send_flow_mods(flow_mods, "delete", force=force)
769 1
        except FlowModException as err:
770
            log.error(f"Error deleting {self} current_path flows, {err}")
771 1
772 1
        try:
773
            current_path.make_vlans_available(self._controller)
774
        except KytosTagError as err:
775
            log.error(f"Error removing {self} current_path: {err}")
776
        self.current_path = Path([])
777
        self.deactivate()
778
        if sync:
779 1
            self.sync()
780 1
        return old_path_dict
781 1
782
    def remove_path_flows(
783
        self, path=None, force=True
784
    ) -> dict[str, list[dict]]:
785
        """Remove all flows from path, and return the removed flows."""
786
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
787 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
788 1
789
        if not path:
790 1
            return dpid_flows_match
791 1
792
        try:
793
            nni_flows = self._prepare_nni_flows(path)
794
        # pylint: disable=broad-except
795
        except Exception:
796
            err = traceback.format_exc()
797
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
798 1
            nni_flows = {}
799 1
800 1
        for dpid, flows in nni_flows.items():
801
            for flow in flows:
802
                flow_mod = {
803
                    "cookie": flow["cookie"],
804
                    "match": flow["match"],
805
                    "owner": "mef_eline",
806 1
                    "cookie_mask": int(0xffffffffffffffff)
807 1
                }
808
                dpid_flows_match[dpid]["flows"].append(flow_mod)
809 1
                out_flows[dpid].append(flow_mod)
810 1
811
        try:
812
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
813 1
        # pylint: disable=broad-except
814 1
        except Exception:
815
            err = traceback.format_exc()
816
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
817
            uni_flows = {}
818 1
819 1
        for dpid, flows in uni_flows.items():
820
            for flow in flows:
821
                flow_mod = {
822
                    "cookie": flow["cookie"],
823 1
                    "match": flow["match"],
824
                    "owner": "mef_eline",
825 1
                    "cookie_mask": int(0xffffffffffffffff)
826 1
                }
827
                dpid_flows_match[dpid]["flows"].append(flow_mod)
828 1
                out_flows[dpid].append(flow_mod)
829 1
830 1
        try:
831
            self._send_flow_mods(
832 1
                dpid_flows_match, 'delete', force=force, by_switch=True
833
            )
834 1
        except FlowModException as err:
835 1
            log.error(
836 1
                f"Error deleting {self} path flows, path:{path}, error={err}"
837
            )
838 1
839 1
        try:
840 1
            path.make_vlans_available(self._controller)
841
        except KytosTagError as err:
842 1
            log.error(f"Error removing {self} path: {err}")
843 1
844 1
        return out_flows
845
846 1
    @staticmethod
847
    def links_zipped(path=None):
848 1
        """Return an iterator which yields pairs of links in order."""
849 1
        if not path:
850
            return []
851
        return zip(path[:-1], path[1:])
852
853 1
    def should_deploy(self, path=None):
854 1
        """Verify if the circuit should be deployed."""
855
        if not path:
856
            log.debug("Path is empty.")
857
            return False
858
859 1
        if not self.is_enabled():
860 1
            log.debug(f"{self} is disabled.")
861 1
            return False
862 1
863
        if not self.is_active():
864
            log.debug(f"{self} will be deployed.")
865
            return True
866
867
        return False
868
869
    @staticmethod
870 1
    def is_uni_interface_active(
871
        *interfaces: Interface
872 1
    ) -> tuple[bool, dict]:
873 1
        """Whether UNIs are active and their status & status_reason."""
874 1
        active = True
875
        bad_interfaces = [
876 1
            interface
877
            for interface in interfaces
878 1
            if interface.status != EntityStatus.UP
879 1
        ]
880 1
        if bad_interfaces:
881 1
            active = False
882
            interfaces = bad_interfaces
883
        return active, {
884 1
            interface.id: {
885 1
                'status': interface.status.value,
886
                'status_reason': interface.status_reason,
887 1
            }
888
            for interface in interfaces
889 1
        }
890 1
891 1
    def try_to_activate(self) -> bool:
892 1
        """Try to activate the EVC."""
893
        if self.is_intra_switch():
894
            return self._try_to_activate_intra_evc()
895 1
        return self._try_to_activate_inter_evc()
896 1
897
    def _try_to_activate_intra_evc(self) -> bool:
898
        """Try to activate intra EVC."""
899
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
900 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
901 1
        if not is_active:
902
            raise ActivationError(
903
                f"Won't be able to activate {self} due to UNIs: {reason}"
904 1
            )
905
        self.activate()
906
        return True
907
908
    def _try_to_activate_inter_evc(self) -> bool:
909
        """Try to activate inter EVC."""
910
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
911
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
912
        if not is_active:
913
            raise ActivationError(
914
                f"Won't be able to activate {self} due to UNIs: {reason}"
915
            )
916
        if self.current_path.status != EntityStatus.UP:
917
            raise ActivationError(
918
                f"Won't be able to activate {self} due to current_path "
919 1
                f"status {self.current_path.status}"
920 1
            )
921 1
        self.activate()
922 1
        return True
923 1
924 1
    # pylint: disable=too-many-branches, too-many-statements
925 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
926 1
        """Install the flows for this circuit.
927 1
928 1
        Procedures to deploy:
929 1
930 1
        0. Remove current flows installed
931
        1. Decide if will deploy "path" or discover a new path
932 1
        2. Choose vlan
933 1
        3. Install NNI flows
934 1
        4. Install UNI flows
935 1
        5. Activate
936 1
        6. Update current_path
937 1
        7. Update links caches(primary, current, backup)
938 1
939 1
        """
940 1
        self.remove_current_flows(sync=False)
941
        use_path = path or Path([])
942 1
        if not old_path_dict:
943
            old_path_dict = {}
944 1
        tag_errors = []
945 1
        no_valid_path = False
946 1
        if self.should_deploy(use_path):
947 1
            try:
948 1
                use_path.choose_vlans(self._controller, old_path_dict)
949 1
            except KytosNoTagAvailableError as e:
950
                tag_errors.append(str(e))
951 1
                use_path = None
952 1
        else:
953 1
            for use_path in self.discover_new_paths():
954
                if use_path is None:
955
                    no_valid_path = True
956 1
                    continue
957 1
                try:
958 1
                    use_path.choose_vlans(self._controller, old_path_dict)
959 1
                    break
960
                except KytosNoTagAvailableError as e:
961 1
                    tag_errors.append(str(e))
962 1
            else:
963 1
                use_path = None
964 1
965
        try:
966
            if use_path:
967 1
                self._install_flows(use_path)
968 1
            elif self.is_intra_switch():
969
                use_path = Path()
970 1
                self._install_direct_uni_flows()
971 1
            else:
972 1
                no_path_msg = "No available path was found."
973 1
                if no_valid_path:
974
                    no_path_msg = "No valid path was found, "\
975
                                  "try increasing `max_paths`"\
976 1
                                 f" from {self.max_paths}."
977 1
                msg = f"{self} was not deployed. {no_path_msg}"
978 1
                if tag_errors:
979
                    msg = self.add_tag_errors(msg, tag_errors)
980
                    log.error(msg)
981 1
                else:
982
                    log.warning(msg)
983
                return False
984
        except EVCPathNotInstalled as err:
985
            log.error(
986
                f"Error deploying EVC {self} when calling flow_manager: {err}"
987
            )
988
            self.remove_current_flows(use_path, sync=True)
989
            return False
990
991
        self.current_path = use_path
992
        msg = f"{self} was deployed."
993
        try:
994 1
            self.try_to_activate()
995 1
        except ActivationError as exc:
996
            msg = f"{msg} {str(exc)}"
997
        self.sync()
998 1
        log.info(msg)
999 1
        return True
1000
1001 1
    # pylint: disable=too-many-statements
1002 1
    def setup_failover_path(self, warn_if_not_path=True):
1003 1
        """Install flows for the failover path of this EVC.
1004 1
1005 1
        Procedures to deploy:
1006
1007 1
        0. Remove flows currently installed for failover_path (if any)
1008 1
        1. Discover a disjoint path from current_path
1009 1
        2. Choose vlans
1010 1
        3. Install NNI flows
1011 1
        4. Install UNI egress flows
1012 1
        5. Update failover_path
1013 1
        """
1014 1
        # Intra-switch EVCs have no failover_path
1015
        if self.is_intra_switch():
1016 1
            return False
1017 1
1018
        # For not only setup failover path for totally dynamic EVCs
1019 1
        if not self.is_eligible_for_failover_path():
1020 1
            return False
1021 1
1022
        out_new_flows: dict[str, list[dict]] = {}
1023
        reason = ""
1024 1
        tag_errors = []
1025 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1026 1
        self.failover_path = Path([])
1027
1028
        for use_path in self.get_failover_path_candidates():
1029 1
            if not use_path:
1030 1
                continue
1031
            try:
1032
                use_path.choose_vlans(self._controller)
1033 1
                break
1034
            except KytosNoTagAvailableError as e:
1035 1
                tag_errors.append(str(e))
1036 1
        else:
1037
            use_path = Path([])
1038 1
            reason = "No available path was found"
1039 1
1040
        try:
1041
            if use_path:
1042
                out_new_flows = self._install_flows(
1043
                    use_path, skip_in=True
1044
                )
1045
        except EVCPathNotInstalled as err:
1046
            reason = "Error deploying failover path"
1047
            log.error(
1048
                f"{reason} for {self}. FlowManager error: {err}"
1049 1
            )
1050 1
            _rmed_flows = self.remove_path_flows(use_path)
1051 1
            out_removed_flows = merge_flow_dicts(
1052 1
                out_removed_flows, _rmed_flows
1053 1
            )
1054 1
            use_path = Path([])
1055 1
1056 1
        self.failover_path = use_path
1057 1
        self.sync()
1058 1
1059
        if out_new_flows or out_removed_flows:
1060 1
            emit_event(self._controller, "failover_deployed", content={
1061 1
                self.id: map_evc_event_content(
1062
                    self,
1063 1
                    flows=deepcopy(out_new_flows),
1064 1
                    removed_flows=deepcopy(out_removed_flows),
1065 1
                    error_reason=reason,
1066
                    current_path=self.current_path.as_dict(),
1067
                )
1068 1
            })
1069 1
1070 1
        if not use_path:
1071
            msg = f"Failover path for {self} was not deployed: {reason}."
1072 1
            if tag_errors:
1073 1
                msg = self.add_tag_errors(msg, tag_errors)
1074 1
                log.error(msg)
1075
            elif warn_if_not_path:
1076 1
                log.warning(msg)
1077
            return False
1078
        log.info(f"Failover path for {self} was deployed.")
1079
        return True
1080
1081
    @staticmethod
1082
    def add_tag_errors(msg: str, tag_errors: list):
1083
        """Add to msg the tag errors ecountered when chossing path."""
1084 1
        path = ['path', 'paths']
1085 1
        was = ['was', 'were']
1086 1
        message = ['message', 'messages']
1087
1088
        # Choose either singular(0) or plural(1) words
1089 1
        n = 1
1090
        if len(tag_errors) == 1:
1091 1
            n = 0
1092 1
1093
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1094 1
        msg += f" with {message[n]}: {tag_errors}"
1095
        return msg
1096
1097
    def get_failover_flows(self):
1098 1
        """Return the flows needed to make the failover path active, i.e. the
1099
        flows for ingress forwarding.
1100
1101
        Return:
1102
            dict: A dict of flows indexed by the switch_id will be returned, or
1103 1
                an empty dict if no failover_path is available.
1104 1
        """
1105
        if not self.failover_path:
1106
            return {}
1107 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1108 1
1109
    # pylint: disable=too-many-branches
1110
    def _prepare_direct_uni_flows(self):
1111 1
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1112 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1113 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1114 1
1115
        flow_mod_az = self._prepare_flow_mod(
1116 1
            self.uni_a.interface, self.uni_z.interface,
1117 1
            self.queue_id, vlan_a
1118
        )
1119
        flow_mod_za = self._prepare_flow_mod(
1120 1
            self.uni_z.interface, self.uni_a.interface,
1121 1
            self.queue_id, vlan_z
1122
        )
1123
1124 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1125 1
            flow_mod_az["actions"].insert(
1126 1
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1127 1
            )
1128
            if not vlan_a:
1129 1
                flow_mod_az["actions"].insert(
1130 1
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1131 1
                )
1132 1
            if vlan_a == 0:
1133 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1134 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1135
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1136 1
1137 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1138 1
            flow_mod_za["actions"].insert(
1139
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1140 1
                )
1141 1
            if not vlan_z:
1142 1
                flow_mod_za["actions"].insert(
1143 1
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1144 1
                )
1145
            if vlan_z == 0:
1146 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1147 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1148 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1149 1
1150
        flows = []
1151
        if isinstance(vlan_a, list):
1152
            for mask_a in vlan_a:
1153 1
                flow_aux = deepcopy(flow_mod_az)
1154
                flow_aux["match"]["dl_vlan"] = mask_a
1155
                flows.append(flow_aux)
1156
        else:
1157
            if vlan_a is not None:
1158
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1159 1
            flows.append(flow_mod_az)
1160 1
1161 1
        if isinstance(vlan_z, list):
1162 1
            for mask_z in vlan_z:
1163 1
                flow_aux = deepcopy(flow_mod_za)
1164 1
                flow_aux["match"]["dl_vlan"] = mask_z
1165
                flows.append(flow_aux)
1166 1
        else:
1167
            if vlan_z is not None:
1168 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1169 1
            flows.append(flow_mod_za)
1170 1
        return (
1171 1
            self.uni_a.interface.switch.id, flows
1172 1
        )
1173 1
1174 1
    def _install_direct_uni_flows(self):
1175
        """Install flows connecting two UNIs.
1176
1177
        This case happens when the circuit is between UNIs in the
1178 1
        same switch.
1179
        """
1180 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1181
        flow_mods = {"switches": [dpid], "flows": flows}
1182
        try:
1183
            self._send_flow_mods(flow_mods, "install")
1184
        except FlowModException as err:
1185
            raise EVCPathNotInstalled(str(err)) from err
1186
1187
    def _prepare_nni_flows(self, path=None):
1188
        """Prepare NNI flows."""
1189
        nni_flows = OrderedDict()
1190
        previous = self.uni_a.interface.switch.dpid
1191 1
        for incoming, outcoming in self.links_zipped(path):
1192
            in_vlan = incoming.get_metadata("s_vlan").value
1193
            out_vlan = outcoming.get_metadata("s_vlan").value
1194
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1195
            out_endpoint = self.get_endpoint_by_id(
1196
                outcoming, in_endpoint.switch.id, eq
1197
            )
1198
1199
            flows = []
1200 1
            # Flow for one direction
1201 1
            flows.append(
1202 1
                self._prepare_nni_flow(
1203
                    in_endpoint,
1204 1
                    out_endpoint,
1205
                    in_vlan,
1206
                    out_vlan,
1207
                    queue_id=self.queue_id,
1208 1
                )
1209 1
            )
1210 1
1211 1
            # Flow for the other direction
1212 1
            flows.append(
1213 1
                self._prepare_nni_flow(
1214
                    out_endpoint,
1215
                    in_endpoint,
1216 1
                    out_vlan,
1217 1
                    in_vlan,
1218
                    queue_id=self.queue_id,
1219 1
                )
1220 1
            )
1221 1
            previous = in_endpoint.switch.id
1222 1
            nni_flows[in_endpoint.switch.id] = flows
1223
        return nni_flows
1224 1
1225
    def _install_flows(
1226 1
        self, path=None, skip_in=False, skip_out=False
1227 1
    ) -> dict[str, list[dict]]:
1228
        """Install uni and nni flows"""
1229
        flows_by_switch = defaultdict(lambda: {"flows": []})
1230 1
        new_flows = defaultdict(list)
1231 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1232 1
            flows_by_switch[dpid]["flows"].extend(flows)
1233 1
            new_flows[dpid].extend(flows)
1234 1
        for dpid, flows in self._prepare_uni_flows(
1235 1
            path, skip_in, skip_out
1236 1
        ).items():
1237
            flows_by_switch[dpid]["flows"].extend(flows)
1238
            new_flows[dpid].extend(flows)
1239 1
1240
        try:
1241 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1242 1
        except FlowModException as err:
1243
            raise EVCPathNotInstalled(str(err)) from err
1244
1245
        return new_flows
1246 1
1247 1
    @staticmethod
1248
    def _get_value_from_uni_tag(uni: UNI):
1249 1
        """Returns the value from tag. In case of any and untagged
1250 1
        it should return 4096/4096 and 0 respectively"""
1251
        special = {"any": "4096/4096", "untagged": 0}
1252
        if uni.user_tag:
1253 1
            value = uni.user_tag.value
1254
            if isinstance(value, list):
1255
                return uni.user_tag.mask_list
1256 1
            return special.get(value, value)
1257
        return None
1258
1259
    # pylint: disable=too-many-locals
1260
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1261 1
        """Prepare flows to install UNIs."""
1262
        uni_flows = {}
1263
        if not path:
1264 1
            return uni_flows
1265 1
1266 1
        # Determine VLANs
1267 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1268
        out_vlan_a = path[0].get_metadata("s_vlan").value
1269
1270
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1271
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1272
1273
        # Get endpoints from path
1274
        endpoint_a = self.get_endpoint_by_id(
1275 1
            path[0], self.uni_a.interface.switch.id, eq
1276
        )
1277
        endpoint_z = self.get_endpoint_by_id(
1278
            path[-1], self.uni_z.interface.switch.id, eq
1279
        )
1280
1281
        # Flows for the first UNI
1282
        flows_a = []
1283
1284
        # Flow for one direction, pushing the service tag
1285
        if not skip_in:
1286
            if isinstance(in_vlan_a, list):
1287
                for in_mask_a in in_vlan_a:
1288 1
                    push_flow = self._prepare_push_flow(
1289 1
                        self.uni_a.interface,
1290
                        endpoint_a,
1291
                        in_mask_a,
1292
                        out_vlan_a,
1293
                        in_vlan_z,
1294
                        queue_id=self.queue_id,
1295
                    )
1296
                    flows_a.append(push_flow)
1297 1
            else:
1298
                push_flow = self._prepare_push_flow(
1299 1
                    self.uni_a.interface,
1300
                    endpoint_a,
1301
                    in_vlan_a,
1302 1
                    out_vlan_a,
1303
                    in_vlan_z,
1304
                    queue_id=self.queue_id,
1305 1
                )
1306 1
                flows_a.append(push_flow)
1307 1
1308 1
        # Flow for the other direction, popping the service tag
1309
        if not skip_out:
1310
            pop_flow = self._prepare_pop_flow(
1311
                endpoint_a,
1312
                self.uni_a.interface,
1313
                out_vlan_a,
1314
                in_vlan_a,
1315
                in_vlan_z,
1316 1
                queue_id=self.queue_id,
1317
            )
1318
            flows_a.append(pop_flow)
1319
1320
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1321
1322
        # Flows for the second UNI
1323
        flows_z = []
1324
1325
        # Flow for one direction, pushing the service tag
1326
        if not skip_in:
1327
            if isinstance(in_vlan_z, list):
1328
                for in_mask_z in in_vlan_z:
1329 1
                    push_flow = self._prepare_push_flow(
1330 1
                        self.uni_z.interface,
1331
                        endpoint_z,
1332
                        in_mask_z,
1333
                        out_vlan_z,
1334
                        in_vlan_a,
1335
                        queue_id=self.queue_id,
1336
                    )
1337
                    flows_z.append(push_flow)
1338 1
            else:
1339
                push_flow = self._prepare_push_flow(
1340 1
                    self.uni_z.interface,
1341
                    endpoint_z,
1342 1
                    in_vlan_z,
1343
                    out_vlan_z,
1344 1
                    in_vlan_a,
1345 1
                    queue_id=self.queue_id,
1346
                )
1347
                flows_z.append(push_flow)
1348
1349
        # Flow for the other direction, popping the service tag
1350
        if not skip_out:
1351
            pop_flow = self._prepare_pop_flow(
1352 1
                endpoint_z,
1353
                self.uni_z.interface,
1354
                out_vlan_z,
1355
                in_vlan_z,
1356
                in_vlan_a,
1357
                queue_id=self.queue_id,
1358
            )
1359
            flows_z.append(pop_flow)
1360
1361
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1362
1363
        return uni_flows
1364
1365
    @staticmethod
1366
    @retry(
1367 1
        stop=stop_after_attempt(3),
1368 1
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1369
        retry=retry_if_exception_type(FlowModException),
1370 1
        before_sleep=before_sleep,
1371 1
        reraise=True,
1372 1
    )
1373 1
    def _send_flow_mods(
1374 1
        data_content: dict,
1375 1
        command="install",
1376 1
        force=False,
1377
        by_switch=False
1378
    ):
1379 1
        """Send a flow_mod list to a specific switch.
1380 1
1381 1
        Args:
1382 1
            dpid(str): The target of flows (i.e. Switch.id).
1383
            flow_mods(dict): Python dictionary with flow_mods.
1384 1
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1385
            force(bool): True to send via consistency check in case of errors.
1386 1
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1387
        """
1388 1
        if by_switch:
1389 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1390
        else:
1391 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1392 1
            data_content["force"] = force
1393
        try:
1394 1
            if command == "install":
1395
                res = httpx.post(endpoint, json=data_content, timeout=30)
1396 1
            elif command == "delete":
1397 1
                res = httpx.request(
1398 1
                    "DELETE", endpoint, json=data_content, timeout=30
1399 1
                )
1400
        except httpx.RequestError as err:
1401 1
            raise FlowModException(str(err)) from err
1402 1
        if res.is_server_error or res.status_code >= 400:
1403
            raise FlowModException(res.text)
1404 1
1405 1
    def get_cookie(self):
1406 1
        """Return the cookie integer from evc id."""
1407 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1408 1
1409 1
    @staticmethod
1410 1
    def get_id_from_cookie(cookie):
1411 1
        """Return the evc id given a cookie value."""
1412 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1413
        return f"{evc_id:x}".zfill(14)
1414 1
1415
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1416
        """Set table_group and table_id"""
1417 1
        table_group = "epl" if vlan is None else "evpl"
1418
        flow_mod["table_group"] = table_group
1419
        flow_mod["table_id"] = self.table_group[table_group]
1420 1
        return flow_mod
1421 1
1422 1
    @staticmethod
1423
    def get_priority(vlan):
1424
        """Return priority value depending on vlan value"""
1425
        if isinstance(vlan, list):
1426
            return settings.EVPL_SB_PRIORITY
1427 1
        if vlan not in {None, "4096/4096", 0}:
1428
            return settings.EVPL_SB_PRIORITY
1429
        if vlan == 0:
1430
            return settings.UNTAGGED_SB_PRIORITY
1431
        if vlan == "4096/4096":
1432
            return settings.ANY_SB_PRIORITY
1433
        return settings.EPL_SB_PRIORITY
1434 1
1435 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1436 1
                          queue_id=None, vlan=True):
1437
        """Prepare a common flow mod."""
1438 1
        default_actions = [
1439 1
            {"action_type": "output", "port": out_interface.port_number}
1440
        ]
1441 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1442
        if queue_id is not None:
1443 1
            default_actions.insert(
1444 1
                0,
1445
                {"action_type": "set_queue", "queue_id": queue_id}
1446
            )
1447 1
1448 1
        flow_mod = {
1449 1
            "match": {"in_port": in_interface.port_number},
1450
            "cookie": self.get_cookie(),
1451 1
            "actions": default_actions,
1452
            "owner": "mef_eline",
1453 1
        }
1454
1455
        self.set_flow_table_group_id(flow_mod, vlan)
1456
        if self.sb_priority:
1457
            flow_mod["priority"] = self.sb_priority
1458
        else:
1459
            flow_mod["priority"] = self.get_priority(vlan)
1460
        return flow_mod
1461
1462
    def _prepare_nni_flow(self, *args, queue_id=None):
1463
        """Create NNI flows."""
1464
        in_interface, out_interface, in_vlan, out_vlan = args
1465
        flow_mod = self._prepare_flow_mod(
1466
            in_interface, out_interface, queue_id
1467
        )
1468 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1469 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1470 1
        flow_mod["actions"].insert(0, new_action)
1471
1472
        return flow_mod
1473
1474 1
    def _prepare_push_flow(self, *args, queue_id=None):
1475 1
        """Prepare push flow.
1476
1477 1
        Arguments:
1478
            in_interface(Interface): Interface input.
1479
            out_interface(Interface): Interface output.
1480
            in_vlan(int,str,None): Vlan input.
1481
            out_vlan(int): Vlan output.
1482
            new_c_vlan(int,str,list,None): New client vlan.
1483 1
1484 1
        Return:
1485
            dict: An python dictionary representing a FlowMod
1486 1
1487
        """
1488 1
        # assign all arguments
1489
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1490 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1491
        flow_mod = self._prepare_flow_mod(
1492
            in_interface, out_interface, queue_id, vlan_pri
1493
        )
1494
        # the service tag must be always pushed
1495 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1496 1
        flow_mod["actions"].insert(0, new_action)
1497
1498 1
        if (
1499
            not (in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1500
                 isinstance(new_c_vlan, int))
1501 1
        ):
1502 1
            # Add service VLAN header when it does NOT fall into this
1503
            # statement: Both VLANs should be integer and different.
1504 1
            new_action = {"action_type": "push_vlan", "tag_type": "s"}
1505
            flow_mod["actions"].insert(0, new_action)
1506
1507
        if in_vlan is not None:
1508
            # if in_vlan is set, it must be included in the match
1509 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1510 1
1511
        if (
1512 1
            not isinstance(in_vlan, int) and isinstance(new_c_vlan, int) and
1513
            new_c_vlan != 0
1514 1
        ):
1515
            # new_in_vlan is an integer but zero, action to set is required
1516
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1517
            flow_mod["actions"].insert(0, new_action)
1518
1519
        if in_vlan == "4096/4096" and new_c_vlan == 0:
1520
            # if in_vlan match with any tags and new_c_vlan does not,
1521
            # a pop action is required
1522
            new_action = {"action_type": "pop_vlan"}
1523
            flow_mod["actions"].insert(0, new_action)
1524
1525 1
        elif (not in_vlan and
1526
                (not isinstance(new_c_vlan, list) and
1527
                 new_c_vlan not in self.special_cases)):
1528 1
            # new_in_vlan is an integer but zero and in_vlan is a no-tag or
1529 1
            # untagged
1530 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1531 1
            flow_mod["actions"].insert(0, new_action)
1532 1
1533
        return flow_mod
1534
1535
    def _prepare_pop_flow(
1536
        self,
1537 1
        in_interface: Interface,
1538 1
        out_interface: Interface,
1539
        out_vlan: int,
1540 1
        in_vlan: Union[int, str, list, None],
1541 1
        new_c_vlan: Union[int, str, list, None],
1542 1
        queue_id=None,
1543
    ):
1544 1
        # pylint: disable=too-many-arguments
1545 1
        """Prepare pop flow."""
1546
        flow_mod = self._prepare_flow_mod(
1547
            in_interface, out_interface, queue_id
1548
        )
1549 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1550 1
        if in_vlan == 0:
1551 1
            new_action = {"action_type": "pop_vlan"}
1552 1
            flow_mod["actions"].insert(0, new_action)
1553
        elif (
1554
            in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1555
            isinstance(new_c_vlan, int)
1556
        ):
1557
            # If UNI VLANs are different and in_vlan is not 0
1558
            new_action = {"action_type": "set_vlan", "vlan_id": in_vlan}
1559
            flow_mod["actions"].insert(0, new_action)
1560 1
        else:
1561 1
            new_action = {"action_type": "pop_vlan"}
1562 1
            flow_mod["actions"].insert(0, new_action)
1563 1
        return flow_mod
1564
1565
    @staticmethod
1566
    def run_bulk_sdntraces(
1567 1
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1568 1
    ) -> dict:
1569 1
        """Run SDN traces on control plane starting from EVC UNIs."""
1570 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1571 1
        data = []
1572 1
        for interface, tag_value in uni_list:
1573 1
            data_uni = {
1574 1
                "trace": {
1575 1
                            "switch": {
1576 1
                                "dpid": interface.switch.dpid,
1577
                                "in_port": interface.port_number,
1578
                            }
1579 1
                        }
1580 1
                }
1581
            if tag_value:
1582
                uni_dl_vlan = map_dl_vlan(tag_value)
1583
                if uni_dl_vlan:
1584
                    data_uni["trace"]["eth"] = {
1585
                                            "dl_type": 0x8100,
1586
                                            "dl_vlan": uni_dl_vlan,
1587
                                            }
1588
            data.append(data_uni)
1589
        try:
1590
            response = httpx.put(endpoint, json=data, timeout=30)
1591
        except httpx.TimeoutException as exception:
1592 1
            log.error(f"Request has timed out: {exception}")
1593
            return {"result": []}
1594
        if response.status_code >= 400:
1595
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1596 1
            return {"result": []}
1597
        return response.json()
1598 1
1599 1
    # pylint: disable=too-many-return-statements, too-many-arguments
1600
    @staticmethod
1601
    def check_trace(
1602
        evc_id: str,
1603 1
        evc_name: str,
1604
        tag_a: Union[None, int, str],
1605 1
        tag_z: Union[None, int, str],
1606
        interface_a: Interface,
1607 1
        interface_z: Interface,
1608
        current_path: list,
1609
        trace_a: list,
1610 1
        trace_z: list
1611 1
    ) -> bool:
1612
        """Auxiliar function to check an individual trace"""
1613
        if (
1614 1
            len(trace_a) != len(current_path) + 1
1615 1
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1616
        ):
1617
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1618 1
                        f"Invalid trace from uni_a: {trace_a}")
1619
            return False
1620
        if (
1621
            len(trace_z) != len(current_path) + 1
1622
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1623
        ):
1624
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1625
                        f"Invalid trace from uni_z: {trace_z}")
1626
            return False
1627 1
1628
        if not current_path:
1629
            return True
1630 1
1631 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1632 1
        if (
1633 1
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1634
        ):
1635
            trace_path_begin, trace_path_end = trace_a, trace_z
1636
        elif (
1637
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1638 1
        ):
1639
            trace_path_begin, trace_path_end = trace_z, trace_a
1640 1
        else:
1641 1
            msg = (
1642
                f"first link {first_link} endpoint_a didn't match the first "
1643
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1644
            )
1645
            log.warning(msg)
1646 1
            return False
1647
1648 1
        for link, trace1, trace2 in zip(current_path,
1649
                                        trace_path_begin[1:],
1650 1
                                        trace_path_end[:0:-1]):
1651
            metadata_vlan = None
1652 1
            if link.metadata:
1653 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1654
            if compare_endpoint_trace(
1655 1
                                        link.endpoint_a,
1656 1
                                        metadata_vlan,
1657 1
                                        trace2
1658 1
                                    ) is False:
1659 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1660
                            f"Invalid trace from uni_a: {trace_a}")
1661
                return False
1662
            if compare_endpoint_trace(
1663
                                        link.endpoint_b,
1664
                                        metadata_vlan,
1665
                                        trace1
1666
                                    ) is False:
1667 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1668
                            f"Invalid trace from uni_z: {trace_z}")
1669 1
                return False
1670 1
1671
        return True
1672 1
1673 1
    @staticmethod
1674 1
    def check_range(circuit, traces: list) -> bool:
1675 1
        """Check traces when for UNI with TAGRange"""
1676
        check = True
1677 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1678 1
            trace_a = traces[i*2]
1679
            trace_z = traces[i*2+1]
1680 1
            check &= EVCDeploy.check_trace(
1681 1
                circuit.id, circuit.name,
1682 1
                mask, mask,
1683 1
                circuit.uni_a.interface,
1684 1
                circuit.uni_z.interface,
1685 1
                circuit.current_path,
1686 1
                trace_a, trace_z,
1687
            )
1688
        return check
1689 1
1690
    @staticmethod
1691 1
    def check_list_traces(list_circuits: list) -> dict:
1692 1
        """Check if current_path is deployed comparing with SDN traces."""
1693 1
        if not list_circuits:
1694 1
            return {}
1695 1
        uni_list = make_uni_list(list_circuits)
1696 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1697 1
1698 1
        if not traces:
1699 1
            return {}
1700
1701
        try:
1702
            circuits_checked = {}
1703
            i = 0
1704
            for circuit in list_circuits:
1705
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1706
                    length = len(circuit.uni_a.user_tag.mask_list)
1707 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1708 1
                        circuit, traces[i:i+length*2]
1709 1
                    )
1710
                    i += length*2
1711
                else:
1712
                    trace_a = traces[i]
1713 1
                    trace_z = traces[i+1]
1714
                    tag_a = None
1715 1
                    if circuit.uni_a.user_tag:
1716
                        tag_a = circuit.uni_a.user_tag.value
1717 1
                    tag_z = None
1718 1
                    if circuit.uni_z.user_tag:
1719
                        tag_z = circuit.uni_z.user_tag.value
1720
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1721
                        circuit.id, circuit.name,
1722
                        tag_a, tag_z,
1723
                        circuit.uni_a.interface,
1724
                        circuit.uni_z.interface,
1725 1
                        circuit.current_path,
1726 1
                        trace_a, trace_z
1727 1
                    )
1728
                    i += 2
1729
        except IndexError as err:
1730 1
            log.error(
1731
                f"Bulk sdntraces returned fewer items than expected."
1732
                f"Error = {err}"
1733 1
            )
1734
            return {}
1735
1736
        return circuits_checked
1737 1
1738
    @staticmethod
1739 1
    def get_endpoint_by_id(
1740
        link: Link,
1741 1
        id_: str,
1742
        operator: Union[eq, ne]
1743 1
    ) -> Interface:
1744
        """Return endpoint from link
1745 1
        either equal(eq) or not equal(ne) to id"""
1746
        if operator(link.endpoint_a.switch.id, id_):
1747 1
            return link.endpoint_a
1748
        return link.endpoint_b
1749
1750
1751
class LinkProtection(EVCDeploy):
1752
    """Class to handle link protection."""
1753
1754 1
    def is_affected_by_link(self, link=None):
1755
        """Verify if the current path is affected by link down event."""
1756 1
        return self.current_path.is_affected_by_link(link)
1757
1758
    def is_using_primary_path(self):
1759
        """Verify if the current deployed path is self.primary_path."""
1760
        return self.current_path == self.primary_path
1761
1762
    def is_using_backup_path(self):
1763 1
        """Verify if the current deployed path is self.backup_path."""
1764
        return self.current_path == self.backup_path
1765
1766
    def is_using_dynamic_path(self):
1767
        """Verify if the current deployed path is dynamic."""
1768
        if (
1769
            self.current_path
1770
            and not self.is_using_primary_path()
1771
            and not self.is_using_backup_path()
1772
            and self.current_path.status is EntityStatus.UP
1773
        ):
1774
            return True
1775
        return False
1776
1777
    def handle_link_up(self, link=None, interface=None):
1778
        """Handle circuit when link up.
1779
1780
        Args:
1781
            link(Link): Link affected by link.up event.
1782
1783
        """
1784
        condition_pairs = [
1785
            (
1786
                lambda me: me.is_using_primary_path(),
1787
                lambda _: (True, 'nothing')
1788
            ),
1789
            (
1790
                lambda me: me.is_intra_switch(),
1791
                lambda _: (True, 'nothing')
1792
            ),
1793
            (
1794
                lambda me: me.primary_path.is_affected_by_link(link),
1795
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1796
            ),
1797
            # For this special case, it reached this point because interface
1798
            # was previously confirmed to be a UNI and both UNI are UP
1799
            (
1800
                lambda me: (me.primary_path.status == EntityStatus.UP
1801
                            and interface),
1802
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1803
            ),
1804
            (
1805
                lambda me: (me.backup_path.status == EntityStatus.UP
1806
                            and interface),
1807
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1808
            ),
1809
            # We tried to deploy(primary_path) without success.
1810
            # And in this case is up by some how. Nothing to do.
1811 1
            (
1812 1
                lambda me: me.is_using_backup_path(),
1813 1
                lambda _: (True, 'nothing')
1814 1
            ),
1815 1
            (
1816 1
                lambda me:  me.is_using_dynamic_path(),
1817 1
                lambda _: (True, 'nothing')
1818
            ),
1819
            # In this case, probably the circuit is not being used and
1820
            # we can move to backup
1821
            (
1822 1
                lambda me: me.backup_path.is_affected_by_link(link),
1823 1
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1824
            ),
1825 1
            # In this case, the circuit is not being used and we should
1826
            # try a dynamic path
1827
            (
1828
                lambda me: me.dynamic_backup_path and not me.is_active(),
1829
                lambda me: (me.deploy_to_path(), 'redeploy')
1830
            )
1831
        ]
1832 1
        for predicate, action in condition_pairs:
1833 1
            if not predicate(self):
1834 1
                continue
1835 1
            success, succcess_type = action(self)
1836 1
            if success:
1837
                if succcess_type == 'redeploy':
1838 1
                    emit_event(
1839 1
                        self._controller,
1840
                        "redeployed_link_up",
1841 1
                        content=map_evc_event_content(self)
1842 1
                    )
1843
                return True
1844 1
        return False
1845 1
1846 1
    def handle_link_down(self):
1847 1
        """Handle circuit when link down.
1848
1849 1
        Returns:
1850
            bool: True if the re-deploy was successly otherwise False.
1851 1
1852
        """
1853 1
        success = False
1854 1
        if self.is_using_primary_path():
1855 1
            success = self.deploy_to_backup_path()
1856 1
        elif self.is_using_backup_path():
1857
            success = self.deploy_to_primary_path()
1858 1
1859
        if not success and self.dynamic_backup_path:
1860
            success = self.deploy_to_path()
1861
1862
        if success:
1863
            log.debug(f"{self} deployed after link down.")
1864
        else:
1865
            self.remove_current_flows(sync=False)
1866
            self.deactivate()
1867
            self.sync()
1868
            log.debug(f"Failed to re-deploy {self} after link down.")
1869
1870
        return success
1871
1872
    def are_unis_active(self) -> bool:
1873
        """Determine whether this EVC should be active"""
1874
        interface_a = self.uni_a.interface
1875
        interface_z = self.uni_z.interface
1876
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1877
        return active
1878
1879 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1880
        """Try to handle UNI as link_up to trigger deployment."""
1881
        if (
1882
            self.current_path.status != EntityStatus.UP
1883 1
            and not self.is_intra_switch()
1884 1
        ):
1885 1
            succeeded = self.handle_link_up(interface=interface)
1886
            if succeeded:
1887
                msg = (
1888 1
                    f"Activated {self} due to successful "
1889
                    f"deployment triggered by {interface}"
1890
                )
1891
            else:
1892
                msg = (
1893
                    f"Couldn't activate {self} due to unsuccessful "
1894
                    f"deployment triggered by {interface}"
1895 1
                )
1896 1
            log.info(msg)
1897 1
            return True
1898
        return False
1899
1900
    def handle_interface_link_up(self, interface: Interface):
1901 1
        """
1902
        Handler for interface link_up events
1903 1
        """
1904
        if not _does_uni_affect_evc(self, interface, "up"):
1905
            return
1906
        if self.try_to_handle_uni_as_link_up(interface):
1907
            return
1908
1909 1
        interface_dicts = {
1910
            interface.id: {
1911
                'status': interface.status.value,
1912
                'status_reason': interface.status_reason,
1913 1
            }
1914 1
            for interface in (self.uni_a.interface, self.uni_z.interface)
1915 1
        }
1916
        try:
1917
            self.try_to_activate()
1918
            log.info(
1919
                f"Activating {self}. Interfaces: "
1920
                f"{interface_dicts}."
1921
            )
1922
            emit_event(self._controller, "uni_active_updated",
1923 1
                       content=map_evc_event_content(self))
1924 1
            self.sync()
1925
        except ActivationError as exc:
1926
            # On this ctx, no ActivationError isn't expected since the
1927
            # activation pre-requisites states were checked, so handled as err
1928 1
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1929
1930 1
    def handle_interface_link_down(self, interface):
1931
        """
1932
        Handler for interface link_down events
1933 1
        """
1934
        if not _does_uni_affect_evc(self, interface, "down"):
1935
            return
1936
        interface_dicts = {
1937
            interface.id: {
1938
                'status': interface.status.value,
1939
                'status_reason': interface.status_reason,
1940
            }
1941
            for interface in (self.uni_a.interface, self.uni_z.interface)
1942
            if interface.status != EntityStatus.UP
1943
        }
1944
        self.deactivate()
1945
        log.info(
1946
            f"Deactivating {self}. Interfaces: "
1947
            f"{interface_dicts}."
1948
        )
1949
        emit_event(self._controller, "uni_active_updated",
1950
                   content=map_evc_event_content(self))
1951
        self.sync()
1952
1953
1954
class EVC(LinkProtection):
1955
    """Class that represents a E-Line Virtual Connection."""
1956