Passed
Push — master ( 327c84...9a93f5 )
by Aldo
07:35 queued 03:39
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc,
30
                                         compare_endpoint_trace,
31
                                         compare_uni_out_trace, emit_event,
32
                                         make_uni_list, map_dl_vlan,
33
                                         map_evc_event_content,
34
                                         merge_flow_dicts)
35
36 1
from .path import DynamicPathManager, Path
37
38
39 1
class EVCBase(GenericEntity):
40
    """Class to represent a circuit."""
41
42 1
    attributes_requiring_redeploy = [
43
        "primary_path",
44
        "backup_path",
45
        "dynamic_backup_path",
46
        "queue_id",
47
        "sb_priority",
48
        "primary_constraints",
49
        "secondary_constraints",
50
        "uni_a",
51
        "uni_z",
52
    ]
53 1
    required_attributes = ["name", "uni_a", "uni_z"]
54
55 1
    updatable_attributes = {
56
        "uni_a",
57
        "uni_z",
58
        "name",
59
        "start_date",
60
        "end_date",
61
        "queue_id",
62
        "bandwidth",
63
        "primary_path",
64
        "backup_path",
65
        "dynamic_backup_path",
66
        "primary_constraints",
67
        "secondary_constraints",
68
        "owner",
69
        "sb_priority",
70
        "service_level",
71
        "circuit_scheduler",
72
        "metadata",
73
        "enabled",
74
        "max_paths",
75
    }
76
77
    # pylint: disable=too-many-statements
78 1
    def __init__(self, controller, **kwargs):
79
        """Create an EVC instance with the provided parameters.
80
81
        Args:
82
            id(str): EVC identifier. Whether it's None an ID will be genereted.
83
                     Only the first 14 bytes passed will be used.
84
            name: represents an EVC name.(Required)
85
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
86
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
87
            start_date(datetime|str): Date when the EVC was registred.
88
                                      Default is now().
89
            end_date(datetime|str): Final date that the EVC will be fineshed.
90
                                    Default is None.
91
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
92
            primary_links(list): Primary links used by evc. Default is []
93
            backup_links(list): Backups links used by evc. Default is []
94
            current_path(list): Circuit being used at the moment if this is an
95
                                active circuit. Default is [].
96
            failover_path(list): Path being used to provide EVC protection via
97
                                failover during link failures. Default is [].
98
            primary_path(list): primary circuit offered to user IF one or more
99
                                links were provided. Default is [].
100
            backup_path(list): backup circuit offered to the user IF one or
101
                               more links were provided. Default is [].
102
            dynamic_backup_path(bool): Enable computer backup path dynamically.
103
                                       Dafault is False.
104
            creation_time(datetime|str): datetime when the circuit should be
105
                                         activated. default is now().
106
            enabled(Boolean): attribute to indicate the administrative state;
107
                              default is False.
108
            active(Boolean): attribute to indicate the operational state;
109
                             default is False.
110
            archived(Boolean): indicate the EVC has been deleted and is
111
                               archived; default is False.
112
            owner(str): The EVC owner. Default is None.
113
            sb_priority(int): Service level provided in the request.
114
                              Default is None.
115
            service_level(int): Service level provided. The higher the better.
116
                                Default is 0.
117
118
        Raises:
119
            ValueError: raised when object attributes are invalid.
120
121
        """
122 1
        self._controller = controller
123 1
        self._validate(**kwargs)
124 1
        super().__init__()
125
126
        # required attributes
127 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
128 1
        self.uni_a: UNI = kwargs.get("uni_a")
129 1
        self.uni_z: UNI = kwargs.get("uni_z")
130 1
        self.name = kwargs.get("name")
131
132
        # optional attributes
133 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
134 1
        self.end_date = get_time(kwargs.get("end_date")) or None
135 1
        self.queue_id = kwargs.get("queue_id", -1)
136
137 1
        self.bandwidth = kwargs.get("bandwidth", 0)
138 1
        self.primary_links = Path(kwargs.get("primary_links", []))
139 1
        self.backup_links = Path(kwargs.get("backup_links", []))
140 1
        self.current_path = Path(kwargs.get("current_path", []))
141 1
        self.failover_path = Path(kwargs.get("failover_path", []))
142 1
        self.primary_path = Path(kwargs.get("primary_path", []))
143 1
        self.backup_path = Path(kwargs.get("backup_path", []))
144 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
145 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
146 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
147 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
148 1
        self.owner = kwargs.get("owner", None)
149 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
150
            "priority", None
151
        )
152 1
        self.service_level = kwargs.get("service_level", 0)
153 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
154 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
155 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
156 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
157 1
        self.current_links_cache = set()
158 1
        self.primary_links_cache = set()
159 1
        self.backup_links_cache = set()
160 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
161 1
        self.old_path = Path([])
162 1
        self.max_paths = kwargs.get("max_paths", 2)
163
164 1
        self.lock = Lock()
165
166 1
        self.archived = kwargs.get("archived", False)
167
168 1
        self.metadata = kwargs.get("metadata", {})
169
170 1
        self._mongo_controller = controllers.ELineController()
171
172 1
        if kwargs.get("active", False):
173 1
            self.activate()
174
        else:
175 1
            self.deactivate()
176
177 1
        if kwargs.get("enabled", False):
178 1
            self.enable()
179
        else:
180 1
            self.disable()
181
182
        # datetime of user request for a EVC (or datetime when object was
183
        # created)
184 1
        self.request_time = kwargs.get("request_time", now())
185
        # dict with the user original request (input)
186 1
        self._requested = kwargs
187
188
        # Special cases: No tag, any, untagged
189 1
        self.special_cases = {None, "4096/4096", 0}
190 1
        self.table_group = kwargs.get("table_group")
191
192 1
    def sync(self, keys: set = None):
193
        """Sync this EVC in the MongoDB."""
194 1
        self.updated_at = now()
195 1
        if keys:
196 1
            self._mongo_controller.update_evc(self.as_dict(keys))
197 1
            return
198 1
        self._mongo_controller.upsert_evc(self.as_dict())
199
200 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
201
        """Obtain both UNIs (uni_a, uni_z).
202
        If a UNI is changing, verify tags"""
203 1
        uni_a = kwargs.get("uni_a", None)
204 1
        uni_a_flag = False
205 1
        if uni_a and uni_a != self.uni_a:
206 1
            uni_a_flag = True
207 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
208
209 1
        uni_z = kwargs.get("uni_z", None)
210 1
        if uni_z and uni_z != self.uni_z:
211 1
            try:
212 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
213 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
214 1
            except KytosTagError as err:
215 1
                if uni_a_flag:
216 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
217 1
                raise err
218
        else:
219 1
            uni_z = self.uni_z
220
221 1
        if uni_a_flag:
222 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
223
        else:
224 1
            uni_a = self.uni_a
225 1
        return uni_a, uni_z
226
227 1
    def update(self, **kwargs):
228
        """Update evc attributes.
229
230
        This method will raises an error trying to change the following
231
        attributes: [creation_time, active, current_path, failover_path,
232
        _id, archived]
233
        [name, uni_a and uni_z]
234
235
        Returns:
236
            the values for enable and a redeploy attribute, if exists and None
237
            otherwise
238
        Raises:
239
            ValueError: message with error detail.
240
241
        """
242 1
        enable, redeploy = (None, None)
243 1
        if not self._tag_lists_equal(**kwargs):
244 1
            raise ValueError(
245
                "UNI_A and UNI_Z tag lists should be the same."
246
            )
247 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
248 1
        self._validate_has_primary_or_dynamic(
249
            primary_path=kwargs.get("primary_path"),
250
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
251
            uni_a=uni_a,
252
            uni_z=uni_z,
253
        )
254 1
        for attribute, value in kwargs.items():
255 1
            if attribute not in self.updatable_attributes:
256 1
                raise ValueError(f"{attribute} can't be updated.")
257 1
            if attribute in ("primary_path", "backup_path"):
258 1
                try:
259 1
                    value.is_valid(
260
                        uni_a.interface.switch, uni_z.interface.switch
261
                    )
262 1
                except InvalidPath as exception:
263 1
                    raise ValueError(  # pylint: disable=raise-missing-from
264
                        f"{attribute} is not a " f"valid path: {exception}"
265
                    )
266 1
        for attribute, value in kwargs.items():
267 1
            if attribute == "enabled":
268 1
                if value:
269 1
                    self.enable()
270
                else:
271 1
                    self.disable()
272 1
                enable = value
273
            else:
274 1
                setattr(self, attribute, value)
275 1
                if attribute in self.attributes_requiring_redeploy:
276 1
                    redeploy = True
277 1
        self.sync(set(kwargs.keys()))
278 1
        return enable, redeploy
279
280 1
    def set_flow_removed_at(self):
281
        """Update flow_removed_at attribute."""
282
        self.flow_removed_at = now()
283
284 1
    def has_recent_removed_flow(self, setting=settings):
285
        """Check if any flow has been removed from the evc"""
286
        if self.flow_removed_at is None:
287
            return False
288
        res_seconds = (now() - self.flow_removed_at).seconds
289
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
290
291 1
    def is_recent_updated(self, setting=settings):
292
        """Check if the evc has been updated recently"""
293
        res_seconds = (now() - self.updated_at).seconds
294
        return res_seconds < setting.TIME_RECENT_UPDATED
295
296 1
    def __repr__(self):
297
        """Repr method."""
298 1
        return f"EVC({self._id}, {self.name})"
299
300 1
    def _validate(self, **kwargs):
301
        """Do Basic validations.
302
303
        Verify required attributes: name, uni_a, uni_z
304
305
        Raises:
306
            ValueError: message with error detail.
307
308
        """
309 1
        for attribute in self.required_attributes:
310
311 1
            if attribute not in kwargs:
312 1
                raise ValueError(f"{attribute} is required.")
313
314 1
            if "uni" in attribute:
315 1
                uni = kwargs.get(attribute)
316 1
                if not isinstance(uni, UNI):
317
                    raise ValueError(f"{attribute} is an invalid UNI.")
318
319 1
    def _tag_lists_equal(self, **kwargs):
320
        """Verify that tag lists are the same."""
321 1
        uni_a = kwargs.get("uni_a") or self.uni_a
322 1
        uni_z = kwargs.get("uni_z") or self.uni_z
323 1
        uni_a_list = uni_z_list = False
324 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
325 1
            uni_a_list = True
326 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
327 1
            uni_z_list = True
328 1
        if uni_a_list and uni_z_list:
329 1
            return uni_a.user_tag.value == uni_z.user_tag.value
330 1
        return uni_a_list == uni_z_list
331
332 1
    def _validate_has_primary_or_dynamic(
333
        self,
334
        primary_path=None,
335
        dynamic_backup_path=None,
336
        uni_a=None,
337
        uni_z=None,
338
    ) -> None:
339
        """Validate that it must have a primary path or allow dynamic paths."""
340 1
        primary_path = (
341
            primary_path
342
            if primary_path is not None
343
            else self.primary_path
344
        )
345 1
        dynamic_backup_path = (
346
            dynamic_backup_path
347
            if dynamic_backup_path is not None
348
            else self.dynamic_backup_path
349
        )
350 1
        uni_a = uni_a if uni_a is not None else self.uni_a
351 1
        uni_z = uni_z if uni_z is not None else self.uni_z
352 1
        if (
353
            not primary_path
354
            and not dynamic_backup_path
355
            and uni_a and uni_z
356
            and uni_a.interface.switch != uni_z.interface.switch
357
        ):
358 1
            msg = "The EVC must have a primary path or allow dynamic paths."
359 1
            raise ValueError(msg)
360
361 1
    def __eq__(self, other):
362
        """Override the default implementation."""
363 1
        if not isinstance(other, EVC):
364
            return False
365
366 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
367 1
        for attribute in attrs_to_compare:
368 1
            if getattr(other, attribute) != getattr(self, attribute):
369 1
                return False
370 1
        return True
371
372 1
    def is_intra_switch(self):
373
        """Check if the UNIs are in the same switch."""
374 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
375
376 1
    def check_no_tag_duplicate(self, other_uni: UNI):
377
        """Check if a no tag UNI is duplicated."""
378 1
        if other_uni in (self.uni_a, self.uni_z):
379 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
380
                  f" duplicated with {self}."
381 1
            raise DuplicatedNoTagUNI(msg)
382
383 1
    def as_dict(self, keys: set = None):
384
        """Return a dictionary representing an EVC object.
385
            keys: Only fields on this variable will be
386
                  returned in the dictionary"""
387 1
        evc_dict = {
388
            "id": self.id,
389
            "name": self.name,
390
            "uni_a": self.uni_a.as_dict(),
391
            "uni_z": self.uni_z.as_dict(),
392
        }
393
394 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
395
396 1
        evc_dict["start_date"] = self.start_date
397 1
        if isinstance(self.start_date, datetime):
398 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
399
400 1
        evc_dict["end_date"] = self.end_date
401 1
        if isinstance(self.end_date, datetime):
402 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
403
404 1
        evc_dict["queue_id"] = self.queue_id
405 1
        evc_dict["bandwidth"] = self.bandwidth
406 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
407 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
408 1
        evc_dict["current_path"] = self.current_path.as_dict()
409 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
410 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
411 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
412 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
413 1
        evc_dict["metadata"] = self.metadata
414
415 1
        evc_dict["request_time"] = self.request_time
416 1
        if isinstance(self.request_time, datetime):
417 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
418
419 1
        time = self.creation_time.strftime(time_fmt)
420 1
        evc_dict["creation_time"] = time
421
422 1
        evc_dict["owner"] = self.owner
423 1
        evc_dict["circuit_scheduler"] = [
424
            sc.as_dict() for sc in self.circuit_scheduler
425
        ]
426
427 1
        evc_dict["active"] = self.is_active()
428 1
        evc_dict["enabled"] = self.is_enabled()
429 1
        evc_dict["archived"] = self.archived
430 1
        evc_dict["sb_priority"] = self.sb_priority
431 1
        evc_dict["service_level"] = self.service_level
432 1
        evc_dict["primary_constraints"] = self.primary_constraints
433 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
434 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
435 1
        evc_dict["updated_at"] = self.updated_at
436 1
        evc_dict["max_paths"] = self.max_paths
437
438 1
        if keys:
439 1
            selected = {}
440 1
            for key in keys:
441 1
                selected[key] = evc_dict[key]
442 1
            selected["id"] = evc_dict["id"]
443 1
            return selected
444 1
        return evc_dict
445
446 1
    @property
447 1
    def id(self):  # pylint: disable=invalid-name
448
        """Return this EVC's ID."""
449 1
        return self._id
450
451 1
    def archive(self):
452
        """Archive this EVC on deletion."""
453 1
        self.archived = True
454
455 1
    def _use_uni_vlan(
456
        self,
457
        uni: UNI,
458
        uni_dif: Union[None, UNI] = None
459
    ):
460
        """Use tags from UNI"""
461 1
        if uni.user_tag is None:
462 1
            return
463 1
        tag = uni.user_tag.value
464 1
        tag_type = uni.user_tag.tag_type
465 1
        if (uni_dif and isinstance(tag, list) and
466
                isinstance(uni_dif.user_tag.value, list)):
467 1
            tag = range_difference(tag, uni_dif.user_tag.value)
468 1
            if not tag:
469 1
                return
470 1
        uni.interface.use_tags(
471
            self._controller, tag, tag_type, use_lock=True, check_order=False
472
        )
473
474 1
    def make_uni_vlan_available(
475
        self,
476
        uni: UNI,
477
        uni_dif: Union[None, UNI] = None,
478
    ):
479
        """Make available tag from UNI"""
480 1
        if uni.user_tag is None:
481 1
            return
482 1
        tag = uni.user_tag.value
483 1
        tag_type = uni.user_tag.tag_type
484 1
        if (uni_dif and isinstance(tag, list) and
485
                isinstance(uni_dif.user_tag.value, list)):
486 1
            tag = range_difference(tag, uni_dif.user_tag.value)
487 1
            if not tag:
488
                return
489 1
        try:
490 1
            conflict = uni.interface.make_tags_available(
491
                self._controller, tag, tag_type, use_lock=True,
492
                check_order=False
493
            )
494 1
        except KytosTagError as err:
495 1
            log.error(f"Error in {self}: {err}")
496 1
            return
497 1
        if conflict:
498 1
            intf = uni.interface.id
499 1
            log.warning(f"Tags {conflict} was already available in {intf}")
500
501 1
    def remove_uni_tags(self):
502
        """Remove both UNI usage of a tag"""
503 1
        self.make_uni_vlan_available(self.uni_a)
504 1
        self.make_uni_vlan_available(self.uni_z)
505
506
507
# pylint: disable=fixme, too-many-public-methods
508 1
class EVCDeploy(EVCBase):
509
    """Class to handle the deploy procedures."""
510
511 1
    def create(self):
512
        """Create a EVC."""
513
514 1
    def discover_new_paths(self):
515
        """Discover new paths to satisfy this circuit and deploy it."""
516
        return DynamicPathManager.get_best_paths(self, self.max_paths,
517
                                                 **self.primary_constraints)
518
519 1
    def get_failover_path_candidates(self):
520
        """Get failover paths to satisfy this EVC."""
521
        # in the future we can return primary/backup paths as well
522
        # we just have to properly handle link_up and failover paths
523
        # if (
524
        #     self.is_using_primary_path() and
525
        #     self.backup_path.status is EntityStatus.UP
526
        # ):
527
        #     yield self.backup_path
528 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
529
530 1
    def change_path(self):
531
        """Change EVC path."""
532
533 1
    def reprovision(self):
534
        """Force the EVC (re-)provisioning."""
535
536 1
    def is_affected_by_link(self, link):
537
        """Return True if this EVC has the given link on its current path."""
538 1
        return link in self.current_path
539
540 1
    def link_affected_by_interface(self, interface):
541
        """Return True if this EVC has the given link on its current path."""
542
        return self.current_path.link_affected_by_interface(interface)
543
544 1
    def is_backup_path_affected_by_link(self, link):
545
        """Return True if the backup path of this EVC uses the given link."""
546 1
        return link in self.backup_path
547
548
    # pylint: disable=invalid-name
549 1
    def is_primary_path_affected_by_link(self, link):
550
        """Return True if the primary path of this EVC uses the given link."""
551 1
        return link in self.primary_path
552
553 1
    def is_failover_path_affected_by_link(self, link):
554
        """Return True if this EVC has the given link on its failover path."""
555 1
        return link in self.failover_path
556
557 1
    def is_eligible_for_failover_path(self):
558
        """Verify if this EVC is eligible for failover path (EP029)"""
559
        # In the future this function can be augmented to consider
560
        # primary/backup, primary/dynamic, and other path combinations
561 1
        return (
562
            self.dynamic_backup_path and
563
            not self.primary_path and not self.backup_path
564
        )
565
566 1
    def is_using_primary_path(self):
567
        """Verify if the current deployed path is self.primary_path."""
568 1
        return self.primary_path and (self.current_path == self.primary_path)
569
570 1
    def is_using_backup_path(self):
571
        """Verify if the current deployed path is self.backup_path."""
572 1
        return self.backup_path and (self.current_path == self.backup_path)
573
574 1
    def is_using_dynamic_path(self):
575
        """Verify if the current deployed path is a dynamic path."""
576 1
        if (
577
            self.current_path
578
            and not self.is_using_primary_path()
579
            and not self.is_using_backup_path()
580
            and self.current_path.status == EntityStatus.UP
581
        ):
582
            return True
583 1
        return False
584
585 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
586
        """Deploy the backup path into the datapaths of this circuit.
587
588
        If the backup_path attribute is valid and up, this method will try to
589
        deploy this backup_path.
590
591
        If everything fails and dynamic_backup_path is True, then tries to
592
        deploy a dynamic path.
593
        """
594
        # TODO: Remove flows from current (cookies)
595 1
        if self.is_using_backup_path():
596
            # TODO: Log to say that cannot move backup to backup
597
            return True
598
599 1
        success = False
600 1
        if self.backup_path.status is EntityStatus.UP:
601 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
602
603 1
        if success:
604 1
            return True
605
606 1
        if self.dynamic_backup_path or self.is_intra_switch():
607 1
            return self.deploy_to_path(old_path_dict=old_path_dict)
608
609
        return False
610
611 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
612
        """Deploy the primary path into the datapaths of this circuit.
613
614
        If the primary_path attribute is valid and up, this method will try to
615
        deploy this primary_path.
616
        """
617
        # TODO: Remove flows from current (cookies)
618 1
        if self.is_using_primary_path():
619
            # TODO: Log to say that cannot move primary to primary
620
            return True
621
622 1
        if self.primary_path.status is EntityStatus.UP:
623 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
624 1
        return False
625
626 1
    def deploy(self, old_path_dict: dict = None):
627
        """Deploy EVC to best path.
628
629
        Best path can be the primary path, if available. If not, the backup
630
        path, and, if it is also not available, a dynamic path.
631
        """
632 1
        if self.archived:
633 1
            return False
634 1
        self.enable()
635 1
        success = self.deploy_to_primary_path(old_path_dict)
636 1
        if not success:
637 1
            success = self.deploy_to_backup_path(old_path_dict)
638
639 1
        if success:
640 1
            emit_event(self._controller, "deployed",
641
                       content=map_evc_event_content(self))
642 1
        return success
643
644 1
    @staticmethod
645 1
    def get_path_status(path):
646
        """Check for the current status of a path.
647
648
        If any link in this path is down, the path is considered down.
649
        """
650 1
        if not path:
651 1
            return EntityStatus.DISABLED
652
653 1
        for link in path:
654 1
            if link.status is not EntityStatus.UP:
655 1
                return link.status
656 1
        return EntityStatus.UP
657
658
    #    def discover_new_path(self):
659
    #        # TODO: discover a new path to satisfy this circuit and deploy
660
661 1
    def remove(self):
662
        """Remove EVC path and disable it."""
663 1
        self.remove_current_flows(sync=False)
664 1
        self.remove_failover_flows(sync=False)
665 1
        self.disable()
666 1
        self.sync()
667 1
        emit_event(self._controller, "undeployed",
668
                   content=map_evc_event_content(self))
669
670 1
    def remove_failover_flows(self, exclude_uni_switches=True,
671
                              force=True, sync=True) -> None:
672
        """Remove failover_flows.
673
674
        By default, it'll exclude UNI switches, if mef_eline has already
675
        called remove_current_flows before then this minimizes the number
676
        of FlowMods and IO.
677
        """
678 1
        if not self.failover_path:
679 1
            return
680 1
        switches, cookie, excluded = set(), self.get_cookie(), set()
681 1
        if exclude_uni_switches:
682 1
            excluded.add(self.uni_a.interface.switch.id)
683 1
            excluded.add(self.uni_z.interface.switch.id)
684 1
        for link in self.failover_path:
685 1
            if link.endpoint_a.switch.id not in excluded:
686 1
                switches.add(link.endpoint_a.switch.id)
687 1
            if link.endpoint_b.switch.id not in excluded:
688 1
                switches.add(link.endpoint_b.switch.id)
689 1
        flow_mods = {
690
            "switches": list(switches),
691
            "flows": [{
692
                "cookie": cookie,
693
                "cookie_mask": int(0xffffffffffffffff),
694
                "owner": "mef_eline",
695
            }]
696
        }
697 1
        try:
698 1
            self._send_flow_mods(
699
                flow_mods,
700
                "delete",
701
                force=force,
702
            )
703
        except FlowModException as err:
704
            log.error(f"Error deleting {self} failover_path flows, {err}")
705 1
        try:
706 1
            self.failover_path.make_vlans_available(self._controller)
707
        except KytosTagError as err:
708
            log.error(f"Error removing {self} failover_path: {err}")
709 1
        self.failover_path = Path([])
710 1
        if sync:
711 1
            self.sync()
712
713 1
    def remove_current_flows(
714
        self,
715
        current_path=None,
716
        force=True,
717
        sync=True,
718
        return_path=False
719
    ) -> dict[str, int]:
720
        """Remove all flows from current path or path intended for
721
         current path if exists."""
722 1
        switches, old_path_dict = set(), {}
723 1
        current_path = self.current_path if not current_path else current_path
724 1
        if not current_path and not self.is_intra_switch():
725 1
            return {}
726
727 1
        if return_path:
728 1
            for link in self.current_path:
729 1
                s_vlan = link.metadata.get("s_vlan")
730 1
                if s_vlan:
731 1
                    old_path_dict[link.id] = s_vlan.value
732
733 1
        for link in current_path:
734 1
            switches.add(link.endpoint_a.switch.id)
735 1
            switches.add(link.endpoint_b.switch.id)
736 1
        switches.add(self.uni_a.interface.switch.id)
737 1
        switches.add(self.uni_z.interface.switch.id)
738 1
        flow_mods = {
739
            "switches": list(switches),
740
            "flows": [{
741
                "cookie": self.get_cookie(),
742
                "cookie_mask": int(0xffffffffffffffff),
743
                "owner": "mef_eline",
744
            }]
745
        }
746
747 1
        try:
748 1
            self._send_flow_mods(flow_mods, "delete", force=force)
749 1
        except FlowModException as err:
750 1
            log.error(f"Error deleting {self} current_path flows, {err}")
751
752 1
        try:
753 1
            current_path.make_vlans_available(self._controller)
754 1
        except KytosTagError as err:
755 1
            log.error(f"Error removing {self} current_path: {err}")
756 1
        self.current_path = Path([])
757 1
        self.deactivate()
758 1
        if sync:
759 1
            self.sync()
760 1
        return old_path_dict
761
762 1
    def remove_path_flows(
763
        self, path=None, force=True
764
    ) -> dict[str, list[dict]]:
765
        """Remove all flows from path, and return the removed flows."""
766 1
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
767 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
768
769 1
        if not path:
770 1
            return dpid_flows_match
771
772 1
        try:
773 1
            nni_flows = self._prepare_nni_flows(path)
774
        # pylint: disable=broad-except
775
        except Exception:
776
            err = traceback.format_exc()
777
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
778
            nni_flows = {}
779
780 1
        for dpid, flows in nni_flows.items():
781 1
            for flow in flows:
782 1
                flow_mod = {
783
                    "cookie": flow["cookie"],
784
                    "match": flow["match"],
785
                    "owner": "mef_eline",
786
                    "cookie_mask": int(0xffffffffffffffff)
787
                }
788 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
789 1
                out_flows[dpid].append(flow_mod)
790
791 1
        try:
792 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
793
        # pylint: disable=broad-except
794
        except Exception:
795
            err = traceback.format_exc()
796
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
797
            uni_flows = {}
798
799 1
        for dpid, flows in uni_flows.items():
800 1
            for flow in flows:
801 1
                flow_mod = {
802
                    "cookie": flow["cookie"],
803
                    "match": flow["match"],
804
                    "owner": "mef_eline",
805
                    "cookie_mask": int(0xffffffffffffffff)
806
                }
807 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
808 1
                out_flows[dpid].append(flow_mod)
809
810 1
        try:
811 1
            self._send_flow_mods(
812
                dpid_flows_match, 'delete', force=force, by_switch=True
813
            )
814 1
        except FlowModException as err:
815 1
            log.error(
816
                f"Error deleting {self} path flows, path:{path}, error={err}"
817
            )
818
819 1
        try:
820 1
            path.make_vlans_available(self._controller)
821
        except KytosTagError as err:
822
            log.error(f"Error removing {self} path: {err}")
823
824 1
        return out_flows
825
826 1
    @staticmethod
827 1
    def links_zipped(path=None):
828
        """Return an iterator which yields pairs of links in order."""
829 1
        if not path:
830 1
            return []
831 1
        return zip(path[:-1], path[1:])
832
833 1
    def should_deploy(self, path=None):
834
        """Verify if the circuit should be deployed."""
835 1
        if not path:
836 1
            log.debug("Path is empty.")
837 1
            return False
838
839 1
        if not self.is_enabled():
840 1
            log.debug(f"{self} is disabled.")
841 1
            return False
842
843 1
        if not self.is_active():
844 1
            log.debug(f"{self} will be deployed.")
845 1
            return True
846
847 1
        return False
848
849 1
    @staticmethod
850 1
    def is_uni_interface_active(
851
        *interfaces: Interface
852
    ) -> tuple[bool, dict]:
853
        """Whether UNIs are active and their status & status_reason."""
854 1
        active = True
855 1
        bad_interfaces = [
856
            interface
857
            for interface in interfaces
858
            if interface.status != EntityStatus.UP
859
        ]
860 1
        if bad_interfaces:
861 1
            active = False
862 1
            interfaces = bad_interfaces
863 1
        return active, {
864
            interface.id: {
865
                'status': interface.status.value,
866
                'status_reason': interface.status_reason,
867
            }
868
            for interface in interfaces
869
        }
870
871 1
    def try_to_activate(self) -> bool:
872
        """Try to activate the EVC."""
873 1
        if self.is_intra_switch():
874 1
            return self._try_to_activate_intra_evc()
875 1
        return self._try_to_activate_inter_evc()
876
877 1
    def _try_to_activate_intra_evc(self) -> bool:
878
        """Try to activate intra EVC."""
879 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
880 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
881 1
        if not is_active:
882 1
            raise ActivationError(
883
                f"Won't be able to activate {self} due to UNIs: {reason}"
884
            )
885 1
        self.activate()
886 1
        return True
887
888 1
    def _try_to_activate_inter_evc(self) -> bool:
889
        """Try to activate inter EVC."""
890 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
891 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
892 1
        if not is_active:
893 1
            raise ActivationError(
894
                f"Won't be able to activate {self} due to UNIs: {reason}"
895
            )
896 1
        if self.current_path.status != EntityStatus.UP:
897 1
            raise ActivationError(
898
                f"Won't be able to activate {self} due to current_path "
899
                f"status {self.current_path.status}"
900
            )
901 1
        self.activate()
902 1
        return True
903
904
    # pylint: disable=too-many-branches, too-many-statements
905 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
906
        """Install the flows for this circuit.
907
908
        Procedures to deploy:
909
910
        0. Remove current flows installed
911
        1. Decide if will deploy "path" or discover a new path
912
        2. Choose vlan
913
        3. Install NNI flows
914
        4. Install UNI flows
915
        5. Activate
916
        6. Update current_path
917
        7. Update links caches(primary, current, backup)
918
919
        """
920 1
        self.remove_current_flows(sync=False)
921 1
        use_path = path or Path([])
922 1
        if not old_path_dict:
923 1
            old_path_dict = {}
924 1
        tag_errors = []
925 1
        no_valid_path = False
926 1
        if self.should_deploy(use_path):
927 1
            try:
928 1
                use_path.choose_vlans(self._controller, old_path_dict)
929 1
            except KytosNoTagAvailableError as e:
930 1
                tag_errors.append(str(e))
931 1
                use_path = None
932
        else:
933 1
            for use_path in self.discover_new_paths():
934 1
                if use_path is None:
935 1
                    no_valid_path = True
936 1
                    continue
937 1
                try:
938 1
                    use_path.choose_vlans(self._controller, old_path_dict)
939 1
                    break
940 1
                except KytosNoTagAvailableError as e:
941 1
                    tag_errors.append(str(e))
942
            else:
943 1
                use_path = None
944
945 1
        try:
946 1
            if use_path:
947 1
                self._install_flows(use_path)
948 1
            elif self.is_intra_switch():
949 1
                use_path = Path()
950 1
                self._install_direct_uni_flows()
951
            else:
952 1
                no_path_msg = "No available path was found."
953 1
                if no_valid_path:
954 1
                    no_path_msg = "No valid path was found, "\
955
                                  "try increasing `max_paths`"\
956
                                 f" from {self.max_paths}."
957 1
                msg = f"{self} was not deployed. {no_path_msg}"
958 1
                if tag_errors:
959 1
                    msg = self.add_tag_errors(msg, tag_errors)
960 1
                    log.error(msg)
961
                else:
962 1
                    log.warning(msg)
963 1
                return False
964 1
        except EVCPathNotInstalled as err:
965 1
            log.error(
966
                f"Error deploying EVC {self} when calling flow_manager: {err}"
967
            )
968 1
            self.remove_current_flows(use_path, sync=True)
969 1
            return False
970
971 1
        self.current_path = use_path
972 1
        msg = f"{self} was deployed."
973 1
        try:
974 1
            self.try_to_activate()
975
        except ActivationError as exc:
976
            msg = f"{msg} {str(exc)}"
977 1
        self.sync()
978 1
        log.info(msg)
979 1
        return True
980
981 1
    def try_setup_failover_path(
982
        self,
983
        wait=settings.DEPLOY_EVCS_INTERVAL,
984
        warn_if_not_path=True
985
    ):
986
        """Try setup failover_path whenever possible."""
987 1
        if (
988
                self.failover_path or not self.current_path
989
                or not self.is_active()
990
                ):
991 1
            return
992 1
        if (now() - self.affected_by_link_at).seconds >= wait:
993 1
            with self.lock:
994 1
                self.setup_failover_path(warn_if_not_path)
995
996
    # pylint: disable=too-many-statements
997 1
    def setup_failover_path(self, warn_if_not_path=True):
998
        """Install flows for the failover path of this EVC.
999
1000
        Procedures to deploy:
1001
1002
        0. Remove flows currently installed for failover_path (if any)
1003
        1. Discover a disjoint path from current_path
1004
        2. Choose vlans
1005
        3. Install NNI flows
1006
        4. Install UNI egress flows
1007
        5. Update failover_path
1008
        """
1009
        # Intra-switch EVCs have no failover_path
1010 1
        if self.is_intra_switch():
1011 1
            return False
1012
1013
        # For not only setup failover path for totally dynamic EVCs
1014 1
        if not self.is_eligible_for_failover_path():
1015 1
            return False
1016
1017 1
        out_new_flows: dict[str, list[dict]] = {}
1018 1
        reason = ""
1019 1
        tag_errors = []
1020 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1021 1
        self.failover_path = Path([])
1022
1023 1
        for use_path in self.get_failover_path_candidates():
1024 1
            if not use_path:
1025 1
                continue
1026 1
            try:
1027 1
                use_path.choose_vlans(self._controller)
1028 1
                break
1029 1
            except KytosNoTagAvailableError as e:
1030 1
                tag_errors.append(str(e))
1031
        else:
1032 1
            use_path = Path([])
1033 1
            reason = "No available path was found"
1034
1035 1
        try:
1036 1
            if use_path:
1037 1
                out_new_flows = self._install_flows(
1038
                    use_path, skip_in=True
1039
                )
1040 1
        except EVCPathNotInstalled as err:
1041 1
            reason = "Error deploying failover path"
1042 1
            log.error(
1043
                f"{reason} for {self}. FlowManager error: {err}"
1044
            )
1045 1
            _rmed_flows = self.remove_path_flows(use_path)
1046 1
            out_removed_flows = merge_flow_dicts(
1047
                out_removed_flows, _rmed_flows
1048
            )
1049 1
            use_path = Path([])
1050
1051 1
        self.failover_path = use_path
1052 1
        self.sync()
1053
1054 1
        if out_new_flows or out_removed_flows:
1055 1
            emit_event(self._controller, "failover_deployed", content={
1056
                self.id: map_evc_event_content(
1057
                    self,
1058
                    flows=deepcopy(out_new_flows),
1059
                    removed_flows=deepcopy(out_removed_flows),
1060
                    error_reason=reason,
1061
                    current_path=self.current_path.as_dict(),
1062
                )
1063
            })
1064
1065 1
        if not use_path:
1066 1
            msg = f"Failover path for {self} was not deployed: {reason}."
1067 1
            if tag_errors:
1068 1
                msg = self.add_tag_errors(msg, tag_errors)
1069 1
                log.error(msg)
1070 1
            elif warn_if_not_path:
1071 1
                log.warning(msg)
1072 1
            return False
1073 1
        log.info(f"Failover path for {self} was deployed.")
1074 1
        return True
1075
1076 1
    @staticmethod
1077 1
    def add_tag_errors(msg: str, tag_errors: list):
1078
        """Add to msg the tag errors ecountered when chossing path."""
1079 1
        path = ['path', 'paths']
1080 1
        was = ['was', 'were']
1081 1
        message = ['message', 'messages']
1082
1083
        # Choose either singular(0) or plural(1) words
1084 1
        n = 1
1085 1
        if len(tag_errors) == 1:
1086 1
            n = 0
1087
1088 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1089 1
        msg += f" with {message[n]}: {tag_errors}"
1090 1
        return msg
1091
1092 1
    def get_failover_flows(self):
1093
        """Return the flows needed to make the failover path active, i.e. the
1094
        flows for ingress forwarding.
1095
1096
        Return:
1097
            dict: A dict of flows indexed by the switch_id will be returned, or
1098
                an empty dict if no failover_path is available.
1099
        """
1100 1
        if not self.failover_path:
1101 1
            return {}
1102 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1103
1104
    # pylint: disable=too-many-branches
1105 1
    def _prepare_direct_uni_flows(self):
1106
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1107 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1108 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1109
1110 1
        flow_mod_az = self._prepare_flow_mod(
1111
            self.uni_a.interface, self.uni_z.interface,
1112
            self.queue_id, vlan_a
1113
        )
1114 1
        flow_mod_za = self._prepare_flow_mod(
1115
            self.uni_z.interface, self.uni_a.interface,
1116
            self.queue_id, vlan_z
1117
        )
1118
1119 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1120 1
            flow_mod_az["actions"].insert(
1121
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1122
            )
1123 1
            if not vlan_a:
1124 1
                flow_mod_az["actions"].insert(
1125
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1126
                )
1127 1
            if vlan_a == 0:
1128 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1129 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1130 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1131
1132 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1133 1
            flow_mod_za["actions"].insert(
1134
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1135
                )
1136 1
            if not vlan_z:
1137 1
                flow_mod_za["actions"].insert(
1138
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1139
                )
1140 1
            if vlan_z == 0:
1141 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1142 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1143 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1144
1145 1
        flows = []
1146 1
        if isinstance(vlan_a, list):
1147 1
            for mask_a in vlan_a:
1148 1
                flow_aux = deepcopy(flow_mod_az)
1149 1
                flow_aux["match"]["dl_vlan"] = mask_a
1150 1
                flows.append(flow_aux)
1151
        else:
1152 1
            if vlan_a is not None:
1153 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1154 1
            flows.append(flow_mod_az)
1155
1156 1
        if isinstance(vlan_z, list):
1157 1
            for mask_z in vlan_z:
1158 1
                flow_aux = deepcopy(flow_mod_za)
1159 1
                flow_aux["match"]["dl_vlan"] = mask_z
1160 1
                flows.append(flow_aux)
1161
        else:
1162 1
            if vlan_z is not None:
1163 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1164 1
            flows.append(flow_mod_za)
1165 1
        return (
1166
            self.uni_a.interface.switch.id, flows
1167
        )
1168
1169 1
    def _install_direct_uni_flows(self):
1170
        """Install flows connecting two UNIs.
1171
1172
        This case happens when the circuit is between UNIs in the
1173
        same switch.
1174
        """
1175 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1176 1
        flow_mods = {"switches": [dpid], "flows": flows}
1177 1
        try:
1178 1
            self._send_flow_mods(flow_mods, "install")
1179 1
        except FlowModException as err:
1180 1
            raise EVCPathNotInstalled(str(err)) from err
1181
1182 1
    def _prepare_nni_flows(self, path=None):
1183
        """Prepare NNI flows."""
1184 1
        nni_flows = OrderedDict()
1185 1
        previous = self.uni_a.interface.switch.dpid
1186 1
        for incoming, outcoming in self.links_zipped(path):
1187 1
            in_vlan = incoming.get_metadata("s_vlan").value
1188 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1189 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1190 1
            out_endpoint = self.get_endpoint_by_id(
1191
                outcoming, in_endpoint.switch.id, eq
1192
            )
1193
1194 1
            flows = []
1195
            # Flow for one direction
1196 1
            flows.append(
1197
                self._prepare_nni_flow(
1198
                    in_endpoint,
1199
                    out_endpoint,
1200
                    in_vlan,
1201
                    out_vlan,
1202
                    queue_id=self.queue_id,
1203
                )
1204
            )
1205
1206
            # Flow for the other direction
1207 1
            flows.append(
1208
                self._prepare_nni_flow(
1209
                    out_endpoint,
1210
                    in_endpoint,
1211
                    out_vlan,
1212
                    in_vlan,
1213
                    queue_id=self.queue_id,
1214
                )
1215
            )
1216 1
            previous = in_endpoint.switch.id
1217 1
            nni_flows[in_endpoint.switch.id] = flows
1218 1
        return nni_flows
1219
1220 1
    def _install_flows(
1221
        self, path=None, skip_in=False, skip_out=False
1222
    ) -> dict[str, list[dict]]:
1223
        """Install uni and nni flows"""
1224 1
        flows_by_switch = defaultdict(lambda: {"flows": []})
1225 1
        new_flows = defaultdict(list)
1226 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1227 1
            flows_by_switch[dpid]["flows"].extend(flows)
1228 1
            new_flows[dpid].extend(flows)
1229 1
        for dpid, flows in self._prepare_uni_flows(
1230
            path, skip_in, skip_out
1231
        ).items():
1232 1
            flows_by_switch[dpid]["flows"].extend(flows)
1233 1
            new_flows[dpid].extend(flows)
1234
1235 1
        try:
1236 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1237 1
        except FlowModException as err:
1238 1
            raise EVCPathNotInstalled(str(err)) from err
1239
1240 1
        return new_flows
1241
1242 1
    @staticmethod
1243 1
    def _get_value_from_uni_tag(uni: UNI):
1244
        """Returns the value from tag. In case of any and untagged
1245
        it should return 4096/4096 and 0 respectively"""
1246 1
        special = {"any": "4096/4096", "untagged": 0}
1247 1
        if uni.user_tag:
1248 1
            value = uni.user_tag.value
1249 1
            if isinstance(value, list):
1250 1
                return uni.user_tag.mask_list
1251 1
            return special.get(value, value)
1252 1
        return None
1253
1254
    # pylint: disable=too-many-locals
1255 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1256
        """Prepare flows to install UNIs."""
1257 1
        uni_flows = {}
1258 1
        if not path:
1259
            log.info("install uni flows without path.")
1260
            return uni_flows
1261
1262
        # Determine VLANs
1263 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1264 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1265
1266 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1267 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1268
1269
        # Get endpoints from path
1270 1
        endpoint_a = self.get_endpoint_by_id(
1271
            path[0], self.uni_a.interface.switch.id, eq
1272
        )
1273 1
        endpoint_z = self.get_endpoint_by_id(
1274
            path[-1], self.uni_z.interface.switch.id, eq
1275
        )
1276
1277
        # Flows for the first UNI
1278 1
        flows_a = []
1279
1280
        # Flow for one direction, pushing the service tag
1281 1
        if not skip_in:
1282 1
            if isinstance(in_vlan_a, list):
1283 1
                for in_mask_a in in_vlan_a:
1284 1
                    push_flow = self._prepare_push_flow(
1285
                        self.uni_a.interface,
1286
                        endpoint_a,
1287
                        in_mask_a,
1288
                        out_vlan_a,
1289
                        in_vlan_z,
1290
                        queue_id=self.queue_id,
1291
                    )
1292 1
                    flows_a.append(push_flow)
1293
            else:
1294
                push_flow = self._prepare_push_flow(
1295
                    self.uni_a.interface,
1296
                    endpoint_a,
1297
                    in_vlan_a,
1298
                    out_vlan_a,
1299
                    in_vlan_z,
1300
                    queue_id=self.queue_id,
1301
                )
1302
                flows_a.append(push_flow)
1303
1304
        # Flow for the other direction, popping the service tag
1305 1
        if not skip_out:
1306 1
            pop_flow = self._prepare_pop_flow(
1307
                endpoint_a,
1308
                self.uni_a.interface,
1309
                out_vlan_a,
1310
                in_vlan_a,
1311
                in_vlan_z,
1312
                queue_id=self.queue_id,
1313
            )
1314 1
            flows_a.append(pop_flow)
1315
1316 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1317
1318
        # Flows for the second UNI
1319 1
        flows_z = []
1320
1321
        # Flow for one direction, pushing the service tag
1322 1
        if not skip_in:
1323 1
            if isinstance(in_vlan_z, list):
1324 1
                for in_mask_z in in_vlan_z:
1325 1
                    push_flow = self._prepare_push_flow(
1326
                        self.uni_z.interface,
1327
                        endpoint_z,
1328
                        in_mask_z,
1329
                        out_vlan_z,
1330
                        in_vlan_a,
1331
                        queue_id=self.queue_id,
1332
                    )
1333 1
                    flows_z.append(push_flow)
1334
            else:
1335
                push_flow = self._prepare_push_flow(
1336
                    self.uni_z.interface,
1337
                    endpoint_z,
1338
                    in_vlan_z,
1339
                    out_vlan_z,
1340
                    in_vlan_a,
1341
                    queue_id=self.queue_id,
1342
                )
1343
                flows_z.append(push_flow)
1344
1345
        # Flow for the other direction, popping the service tag
1346 1
        if not skip_out:
1347 1
            pop_flow = self._prepare_pop_flow(
1348
                endpoint_z,
1349
                self.uni_z.interface,
1350
                out_vlan_z,
1351
                in_vlan_z,
1352
                in_vlan_a,
1353
                queue_id=self.queue_id,
1354
            )
1355 1
            flows_z.append(pop_flow)
1356
1357 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1358
1359 1
        return uni_flows
1360
1361 1
    @staticmethod
1362 1
    @retry(
1363
        stop=stop_after_attempt(3),
1364
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1365
        retry=retry_if_exception_type(FlowModException),
1366
        before_sleep=before_sleep,
1367
        reraise=True,
1368
    )
1369 1
    def _send_flow_mods(
1370
        data_content: dict,
1371
        command="install",
1372
        force=False,
1373
        by_switch=False
1374
    ):
1375
        """Send a flow_mod list to a specific switch.
1376
1377
        Args:
1378
            dpid(str): The target of flows (i.e. Switch.id).
1379
            flow_mods(dict): Python dictionary with flow_mods.
1380
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1381
            force(bool): True to send via consistency check in case of errors.
1382
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1383
        """
1384 1
        if by_switch:
1385 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1386
        else:
1387 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1388 1
            data_content["force"] = force
1389 1
        try:
1390 1
            if command == "install":
1391 1
                res = httpx.post(endpoint, json=data_content, timeout=30)
1392 1
            elif command == "delete":
1393 1
                res = httpx.request(
1394
                    "DELETE", endpoint, json=data_content, timeout=30
1395
                )
1396 1
        except httpx.RequestError as err:
1397 1
            raise FlowModException(str(err)) from err
1398 1
        if res.is_server_error or res.status_code >= 400:
1399 1
            raise FlowModException(res.text)
1400
1401 1
    def get_cookie(self):
1402
        """Return the cookie integer from evc id."""
1403 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1404
1405 1
    @staticmethod
1406 1
    def get_id_from_cookie(cookie):
1407
        """Return the evc id given a cookie value."""
1408 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1409 1
        return f"{evc_id:x}".zfill(14)
1410
1411 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1412
        """Set table_group and table_id"""
1413 1
        table_group = "epl" if vlan is None else "evpl"
1414 1
        flow_mod["table_group"] = table_group
1415 1
        flow_mod["table_id"] = self.table_group[table_group]
1416 1
        return flow_mod
1417
1418 1
    @staticmethod
1419 1
    def get_priority(vlan):
1420
        """Return priority value depending on vlan value"""
1421 1
        if isinstance(vlan, list):
1422 1
            return settings.EVPL_SB_PRIORITY
1423 1
        if vlan not in {None, "4096/4096", 0}:
1424 1
            return settings.EVPL_SB_PRIORITY
1425 1
        if vlan == 0:
1426 1
            return settings.UNTAGGED_SB_PRIORITY
1427 1
        if vlan == "4096/4096":
1428 1
            return settings.ANY_SB_PRIORITY
1429 1
        return settings.EPL_SB_PRIORITY
1430
1431 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1432
                          queue_id=None, vlan=True):
1433
        """Prepare a common flow mod."""
1434 1
        default_actions = [
1435
            {"action_type": "output", "port": out_interface.port_number}
1436
        ]
1437 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1438 1
        if queue_id is not None:
1439 1
            default_actions.insert(
1440
                0,
1441
                {"action_type": "set_queue", "queue_id": queue_id}
1442
            )
1443
1444 1
        flow_mod = {
1445
            "match": {"in_port": in_interface.port_number},
1446
            "cookie": self.get_cookie(),
1447
            "actions": default_actions,
1448
            "owner": "mef_eline",
1449
        }
1450
1451 1
        self.set_flow_table_group_id(flow_mod, vlan)
1452 1
        if self.sb_priority:
1453 1
            flow_mod["priority"] = self.sb_priority
1454
        else:
1455 1
            flow_mod["priority"] = self.get_priority(vlan)
1456 1
        return flow_mod
1457
1458 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1459
        """Create NNI flows."""
1460 1
        in_interface, out_interface, in_vlan, out_vlan = args
1461 1
        flow_mod = self._prepare_flow_mod(
1462
            in_interface, out_interface, queue_id
1463
        )
1464 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1465 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1466 1
        flow_mod["actions"].insert(0, new_action)
1467
1468 1
        return flow_mod
1469
1470 1
    def _prepare_push_flow(self, *args, queue_id=None):
1471
        """Prepare push flow.
1472
1473
        Arguments:
1474
            in_interface(Interface): Interface input.
1475
            out_interface(Interface): Interface output.
1476
            in_vlan(int,str,None): Vlan input.
1477
            out_vlan(int): Vlan output.
1478
            new_c_vlan(int,str,list,None): New client vlan.
1479
1480
        Return:
1481
            dict: An python dictionary representing a FlowMod
1482
1483
        """
1484
        # assign all arguments
1485 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1486 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1487 1
        flow_mod = self._prepare_flow_mod(
1488
            in_interface, out_interface, queue_id, vlan_pri
1489
        )
1490
        # the service tag must be always pushed
1491 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1492 1
        flow_mod["actions"].insert(0, new_action)
1493
1494 1
        if (
1495
            not (in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1496
                 isinstance(new_c_vlan, int))
1497
        ):
1498
            # Add service VLAN header when it does NOT fall into this
1499
            # statement: Both VLANs should be integer and different.
1500 1
            new_action = {"action_type": "push_vlan", "tag_type": "s"}
1501 1
            flow_mod["actions"].insert(0, new_action)
1502
1503 1
        if in_vlan is not None:
1504
            # if in_vlan is set, it must be included in the match
1505 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1506
1507 1
        if (
1508
            not isinstance(in_vlan, int) and isinstance(new_c_vlan, int) and
1509
            new_c_vlan != 0
1510
        ):
1511
            # new_in_vlan is an integer but zero, action to set is required
1512 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1513 1
            flow_mod["actions"].insert(0, new_action)
1514
1515 1
        if in_vlan == "4096/4096" and new_c_vlan == 0:
1516
            # if in_vlan match with any tags and new_c_vlan does not,
1517
            # a pop action is required
1518 1
            new_action = {"action_type": "pop_vlan"}
1519 1
            flow_mod["actions"].insert(0, new_action)
1520
1521 1
        elif (not in_vlan and
1522
                (not isinstance(new_c_vlan, list) and
1523
                 new_c_vlan not in self.special_cases)):
1524
            # new_in_vlan is an integer but zero and in_vlan is a no-tag or
1525
            # untagged
1526 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1527 1
            flow_mod["actions"].insert(0, new_action)
1528
1529 1
        return flow_mod
1530
1531 1
    def _prepare_pop_flow(
1532
        self,
1533
        in_interface: Interface,
1534
        out_interface: Interface,
1535
        out_vlan: int,
1536
        in_vlan: Union[int, str, list, None],
1537
        new_c_vlan: Union[int, str, list, None],
1538
        queue_id=None,
1539
    ):
1540
        # pylint: disable=too-many-arguments
1541
        """Prepare pop flow."""
1542 1
        flow_mod = self._prepare_flow_mod(
1543
            in_interface, out_interface, queue_id
1544
        )
1545 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1546 1
        if in_vlan == 0:
1547 1
            new_action = {"action_type": "pop_vlan"}
1548 1
            flow_mod["actions"].insert(0, new_action)
1549 1
        elif (
1550
            in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1551
            isinstance(new_c_vlan, int)
1552
        ):
1553
            # If UNI VLANs are different and in_vlan is not 0
1554 1
            new_action = {"action_type": "set_vlan", "vlan_id": in_vlan}
1555 1
            flow_mod["actions"].insert(0, new_action)
1556
        else:
1557 1
            new_action = {"action_type": "pop_vlan"}
1558 1
            flow_mod["actions"].insert(0, new_action)
1559 1
        return flow_mod
1560
1561 1
    @staticmethod
1562 1
    def run_bulk_sdntraces(
1563
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1564
    ) -> dict:
1565
        """Run SDN traces on control plane starting from EVC UNIs."""
1566 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1567 1
        data = []
1568 1
        for interface, tag_value in uni_list:
1569 1
            data_uni = {
1570
                "trace": {
1571
                            "switch": {
1572
                                "dpid": interface.switch.dpid,
1573
                                "in_port": interface.port_number,
1574
                            }
1575
                        }
1576
                }
1577 1
            if tag_value:
1578 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1579 1
                if uni_dl_vlan:
1580 1
                    data_uni["trace"]["eth"] = {
1581
                                            "dl_type": 0x8100,
1582
                                            "dl_vlan": uni_dl_vlan,
1583
                                            }
1584 1
            data.append(data_uni)
1585 1
        try:
1586 1
            response = httpx.put(endpoint, json=data, timeout=30)
1587 1
        except httpx.TimeoutException as exception:
1588 1
            log.error(f"Request has timed out: {exception}")
1589 1
            return {"result": []}
1590 1
        if response.status_code >= 400:
1591 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1592 1
            return {"result": []}
1593 1
        return response.json()
1594
1595
    # pylint: disable=too-many-return-statements, too-many-arguments
1596 1
    @staticmethod
1597 1
    def check_trace(
1598
        evc_id: str,
1599
        evc_name: str,
1600
        tag_a: Union[None, int, str],
1601
        tag_z: Union[None, int, str],
1602
        interface_a: Interface,
1603
        interface_z: Interface,
1604
        current_path: list,
1605
        trace_a: list,
1606
        trace_z: list
1607
    ) -> bool:
1608
        """Auxiliar function to check an individual trace"""
1609 1
        if (
1610
            len(trace_a) != len(current_path) + 1
1611
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1612
        ):
1613 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1614
                        f"Invalid trace from uni_a: {trace_a}")
1615 1
            return False
1616 1
        if (
1617
            len(trace_z) != len(current_path) + 1
1618
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1619
        ):
1620 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1621
                        f"Invalid trace from uni_z: {trace_z}")
1622 1
            return False
1623
1624 1
        if not current_path:
1625
            return True
1626
1627 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1628 1
        if (
1629
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1630
        ):
1631 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1632 1
        elif (
1633
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1634
        ):
1635 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1636
        else:
1637
            msg = (
1638
                f"first link {first_link} endpoint_a didn't match the first "
1639
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1640
            )
1641
            log.warning(msg)
1642
            return False
1643
1644 1
        for link, trace1, trace2 in zip(current_path,
1645
                                        trace_path_begin[1:],
1646
                                        trace_path_end[:0:-1]):
1647 1
            metadata_vlan = None
1648 1
            if link.metadata:
1649 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1650 1
            if compare_endpoint_trace(
1651
                                        link.endpoint_a,
1652
                                        metadata_vlan,
1653
                                        trace2
1654
                                    ) is False:
1655 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1656
                            f"Invalid trace from uni_a: {trace_a}")
1657 1
                return False
1658 1
            if compare_endpoint_trace(
1659
                                        link.endpoint_b,
1660
                                        metadata_vlan,
1661
                                        trace1
1662
                                    ) is False:
1663 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1664
                            f"Invalid trace from uni_z: {trace_z}")
1665 1
                return False
1666
1667 1
        return True
1668
1669 1
    @staticmethod
1670 1
    def check_range(circuit, traces: list) -> bool:
1671
        """Check traces when for UNI with TAGRange"""
1672 1
        check = True
1673 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1674 1
            trace_a = traces[i*2]
1675 1
            trace_z = traces[i*2+1]
1676 1
            check &= EVCDeploy.check_trace(
1677
                circuit.id, circuit.name,
1678
                mask, mask,
1679
                circuit.uni_a.interface,
1680
                circuit.uni_z.interface,
1681
                circuit.current_path,
1682
                trace_a, trace_z,
1683
            )
1684 1
        return check
1685
1686 1
    @staticmethod
1687 1
    def check_list_traces(list_circuits: list) -> dict:
1688
        """Check if current_path is deployed comparing with SDN traces."""
1689 1
        if not list_circuits:
1690 1
            return {}
1691 1
        uni_list = make_uni_list(list_circuits)
1692 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1693
1694 1
        if not traces:
1695 1
            return {}
1696
1697 1
        try:
1698 1
            circuits_checked = {}
1699 1
            i = 0
1700 1
            for circuit in list_circuits:
1701 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1702 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1703 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1704
                        circuit, traces[i:i+length*2]
1705
                    )
1706 1
                    i += length*2
1707
                else:
1708 1
                    trace_a = traces[i]
1709 1
                    trace_z = traces[i+1]
1710 1
                    tag_a = None
1711 1
                    if circuit.uni_a.user_tag:
1712 1
                        tag_a = circuit.uni_a.user_tag.value
1713 1
                    tag_z = None
1714 1
                    if circuit.uni_z.user_tag:
1715 1
                        tag_z = circuit.uni_z.user_tag.value
1716 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1717
                        circuit.id, circuit.name,
1718
                        tag_a, tag_z,
1719
                        circuit.uni_a.interface,
1720
                        circuit.uni_z.interface,
1721
                        circuit.current_path,
1722
                        trace_a, trace_z
1723
                    )
1724 1
                    i += 2
1725 1
        except IndexError as err:
1726 1
            log.error(
1727
                f"Bulk sdntraces returned fewer items than expected."
1728
                f"Error = {err}"
1729
            )
1730 1
            return {}
1731
1732 1
        return circuits_checked
1733
1734 1
    @staticmethod
1735 1
    def get_endpoint_by_id(
1736
        link: Link,
1737
        id_: str,
1738
        operator: Union[eq, ne]
1739
    ) -> Interface:
1740
        """Return endpoint from link
1741
        either equal(eq) or not equal(ne) to id"""
1742 1
        if operator(link.endpoint_a.switch.id, id_):
1743 1
            return link.endpoint_a
1744 1
        return link.endpoint_b
1745
1746
1747 1
class LinkProtection(EVCDeploy):
1748
    """Class to handle link protection."""
1749
1750 1
    def is_affected_by_link(self, link=None):
1751
        """Verify if the current path is affected by link down event."""
1752
        return self.current_path.is_affected_by_link(link)
1753
1754 1
    def is_using_primary_path(self):
1755
        """Verify if the current deployed path is self.primary_path."""
1756 1
        return self.current_path == self.primary_path
1757
1758 1
    def is_using_backup_path(self):
1759
        """Verify if the current deployed path is self.backup_path."""
1760 1
        return self.current_path == self.backup_path
1761
1762 1
    def is_using_dynamic_path(self):
1763
        """Verify if the current deployed path is dynamic."""
1764 1
        if (
1765
            self.current_path
1766
            and not self.is_using_primary_path()
1767
            and not self.is_using_backup_path()
1768
            and self.current_path.status is EntityStatus.UP
1769
        ):
1770
            return True
1771 1
        return False
1772
1773 1
    def handle_link_up(self, link=None, interface=None):
1774
        """Handle circuit when link up.
1775
1776
        Args:
1777
            link(Link): Link affected by link.up event.
1778
1779
        """
1780 1
        condition_pairs = [
1781
            (
1782
                lambda me: me.is_using_primary_path(),
1783
                lambda _: (True, 'nothing')
1784
            ),
1785
            (
1786
                lambda me: me.is_intra_switch(),
1787
                lambda _: (True, 'nothing')
1788
            ),
1789
            (
1790
                lambda me: me.primary_path.is_affected_by_link(link),
1791
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1792
            ),
1793
            # For this special case, it reached this point because interface
1794
            # was previously confirmed to be a UNI and both UNI are UP
1795
            (
1796
                lambda me: (me.primary_path.status == EntityStatus.UP
1797
                            and interface),
1798
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1799
            ),
1800
            (
1801
                lambda me: (me.backup_path.status == EntityStatus.UP
1802
                            and interface),
1803
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1804
            ),
1805
            # We tried to deploy(primary_path) without success.
1806
            # And in this case is up by some how. Nothing to do.
1807
            (
1808
                lambda me: me.is_using_backup_path(),
1809
                lambda _: (True, 'nothing')
1810
            ),
1811
            (
1812
                lambda me:  me.is_using_dynamic_path(),
1813
                lambda _: (True, 'nothing')
1814
            ),
1815
            # In this case, probably the circuit is not being used and
1816
            # we can move to backup
1817
            (
1818
                lambda me: me.backup_path.is_affected_by_link(link),
1819
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1820
            ),
1821
            # In this case, the circuit is not being used and we should
1822
            # try a dynamic path
1823
            (
1824
                lambda me: me.dynamic_backup_path and not me.is_active(),
1825
                lambda me: (me.deploy_to_path(), 'redeploy')
1826
            )
1827
        ]
1828 1
        for predicate, action in condition_pairs:
1829 1
            if not predicate(self):
1830 1
                continue
1831 1
            success, succcess_type = action(self)
1832 1
            if success:
1833 1
                if succcess_type == 'redeploy':
1834 1
                    emit_event(
1835
                        self._controller,
1836
                        "redeployed_link_up",
1837
                        content=map_evc_event_content(self)
1838
                    )
1839 1
                return True
1840 1
        return False
1841
1842 1
    def handle_link_down(self):
1843
        """Handle circuit when link down.
1844
1845
        Returns:
1846
            bool: True if the re-deploy was successly otherwise False.
1847
1848
        """
1849 1
        success = False
1850 1
        if self.is_using_primary_path():
1851 1
            success = self.deploy_to_backup_path()
1852 1
        elif self.is_using_backup_path():
1853 1
            success = self.deploy_to_primary_path()
1854
1855 1
        if not success and self.dynamic_backup_path:
1856 1
            success = self.deploy_to_path()
1857
1858 1
        if success:
1859 1
            log.debug(f"{self} deployed after link down.")
1860
        else:
1861 1
            self.remove_current_flows(sync=False)
1862 1
            self.deactivate()
1863 1
            self.sync()
1864 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1865
1866 1
        return success
1867
1868 1
    def are_unis_active(self) -> bool:
1869
        """Determine whether this EVC should be active"""
1870 1
        interface_a = self.uni_a.interface
1871 1
        interface_z = self.uni_z.interface
1872 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1873 1
        return active
1874
1875 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1876
        """Try to handle UNI as link_up to trigger deployment."""
1877
        if (
1878
            self.current_path.status != EntityStatus.UP
1879
            and not self.is_intra_switch()
1880
        ):
1881
            succeeded = self.handle_link_up(interface=interface)
1882
            if succeeded:
1883
                msg = (
1884
                    f"Activated {self} due to successful "
1885
                    f"deployment triggered by {interface}"
1886
                )
1887
            else:
1888
                msg = (
1889
                    f"Couldn't activate {self} due to unsuccessful "
1890
                    f"deployment triggered by {interface}"
1891
                )
1892
            log.info(msg)
1893
            return True
1894
        return False
1895
1896 1
    def handle_interface_link_up(self, interface: Interface):
1897
        """
1898
        Handler for interface link_up events
1899
        """
1900 1
        if not _does_uni_affect_evc(self, interface, "up"):
1901 1
            return
1902 1
        if self.try_to_handle_uni_as_link_up(interface):
1903
            return
1904
1905 1
        interface_dicts = {
1906
            interface.id: {
1907
                'status': interface.status.value,
1908
                'status_reason': interface.status_reason,
1909
            }
1910
            for interface in (self.uni_a.interface, self.uni_z.interface)
1911
        }
1912 1
        try:
1913 1
            self.try_to_activate()
1914 1
            log.info(
1915
                f"Activating {self}. Interfaces: "
1916
                f"{interface_dicts}."
1917
            )
1918 1
            emit_event(self._controller, "uni_active_updated",
1919
                       content=map_evc_event_content(self))
1920 1
            self.sync()
1921
        except ActivationError as exc:
1922
            # On this ctx, no ActivationError isn't expected since the
1923
            # activation pre-requisites states were checked, so handled as err
1924
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1925
1926 1
    def handle_interface_link_down(self, interface):
1927
        """
1928
        Handler for interface link_down events
1929
        """
1930 1
        if not _does_uni_affect_evc(self, interface, "down"):
1931 1
            return
1932 1
        interface_dicts = {
1933
            interface.id: {
1934
                'status': interface.status.value,
1935
                'status_reason': interface.status_reason,
1936
            }
1937
            for interface in (self.uni_a.interface, self.uni_z.interface)
1938
            if interface.status != EntityStatus.UP
1939
        }
1940 1
        self.deactivate()
1941 1
        log.info(
1942
            f"Deactivating {self}. Interfaces: "
1943
            f"{interface_dicts}."
1944
        )
1945 1
        emit_event(self._controller, "uni_active_updated",
1946
                   content=map_evc_event_content(self))
1947 1
        self.sync()
1948
1949
1950 1
class EVC(LinkProtection):
1951
    """Class that represents a E-Line Virtual Connection."""
1952