build.models.evc.EVCDeploy.check_trace()   D
last analyzed

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict, defaultdict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (ActivationError,
26
                                              DuplicatedNoTagUNI,
27
                                              EVCPathNotInstalled,
28
                                              FlowModException, InvalidPath)
29 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc,
30
                                         check_disabled_component,
31
                                         compare_endpoint_trace,
32
                                         compare_uni_out_trace, emit_event,
33
                                         make_uni_list, map_dl_vlan,
34
                                         map_evc_event_content,
35
                                         merge_flow_dicts)
36
37 1
from .path import DynamicPathManager, Path
38
39
40 1
class EVCBase(GenericEntity):
41
    """Class to represent a circuit."""
42
43 1
    attributes_requiring_redeploy = [
44
        "primary_path",
45
        "backup_path",
46
        "dynamic_backup_path",
47
        "queue_id",
48
        "sb_priority",
49
        "primary_constraints",
50
        "secondary_constraints",
51
        "uni_a",
52
        "uni_z",
53
    ]
54 1
    required_attributes = ["name", "uni_a", "uni_z"]
55
56 1
    updatable_attributes = {
57
        "uni_a",
58
        "uni_z",
59
        "name",
60
        "start_date",
61
        "end_date",
62
        "queue_id",
63
        "bandwidth",
64
        "primary_path",
65
        "backup_path",
66
        "dynamic_backup_path",
67
        "primary_constraints",
68
        "secondary_constraints",
69
        "owner",
70
        "sb_priority",
71
        "service_level",
72
        "circuit_scheduler",
73
        "metadata",
74
        "enabled",
75
        "max_paths",
76
    }
77
78
    # pylint: disable=too-many-statements
79 1
    def __init__(self, controller, **kwargs):
80
        """Create an EVC instance with the provided parameters.
81
82
        Args:
83
            id(str): EVC identifier. Whether it's None an ID will be genereted.
84
                     Only the first 14 bytes passed will be used.
85
            name: represents an EVC name.(Required)
86
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
87
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
88
            start_date(datetime|str): Date when the EVC was registred.
89
                                      Default is now().
90
            end_date(datetime|str): Final date that the EVC will be fineshed.
91
                                    Default is None.
92
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
93
            primary_links(list): Primary links used by evc. Default is []
94
            backup_links(list): Backups links used by evc. Default is []
95
            current_path(list): Circuit being used at the moment if this is an
96
                                active circuit. Default is [].
97
            failover_path(list): Path being used to provide EVC protection via
98
                                failover during link failures. Default is [].
99
            primary_path(list): primary circuit offered to user IF one or more
100
                                links were provided. Default is [].
101
            backup_path(list): backup circuit offered to the user IF one or
102
                               more links were provided. Default is [].
103
            dynamic_backup_path(bool): Enable computer backup path dynamically.
104
                                       Dafault is False.
105
            creation_time(datetime|str): datetime when the circuit should be
106
                                         activated. default is now().
107
            enabled(Boolean): attribute to indicate the administrative state;
108
                              default is False.
109
            active(Boolean): attribute to indicate the operational state;
110
                             default is False.
111
            archived(Boolean): indicate the EVC has been deleted and is
112
                               archived; default is False.
113
            owner(str): The EVC owner. Default is None.
114
            sb_priority(int): Service level provided in the request.
115
                              Default is None.
116
            service_level(int): Service level provided. The higher the better.
117
                                Default is 0.
118
119
        Raises:
120
            ValueError: raised when object attributes are invalid.
121
122
        """
123 1
        self._controller = controller
124 1
        self._validate(**kwargs)
125 1
        super().__init__()
126
127
        # required attributes
128 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
129 1
        self.uni_a: UNI = kwargs.get("uni_a")
130 1
        self.uni_z: UNI = kwargs.get("uni_z")
131 1
        self.name = kwargs.get("name")
132
133
        # optional attributes
134 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
135 1
        self.end_date = get_time(kwargs.get("end_date")) or None
136 1
        self.queue_id = kwargs.get("queue_id", -1)
137
138 1
        self.bandwidth = kwargs.get("bandwidth", 0)
139 1
        self.primary_links = Path(kwargs.get("primary_links", []))
140 1
        self.backup_links = Path(kwargs.get("backup_links", []))
141 1
        self.current_path = Path(kwargs.get("current_path", []))
142 1
        self.failover_path = Path(kwargs.get("failover_path", []))
143 1
        self.primary_path = Path(kwargs.get("primary_path", []))
144 1
        self.backup_path = Path(kwargs.get("backup_path", []))
145 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
146 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
147 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
148 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
149 1
        self.owner = kwargs.get("owner", None)
150 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
151
            "priority", None
152
        )
153 1
        self.service_level = kwargs.get("service_level", 0)
154 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
155 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
156 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
157 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
158 1
        self.current_links_cache = set()
159 1
        self.primary_links_cache = set()
160 1
        self.backup_links_cache = set()
161 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
162 1
        self.old_path = Path([])
163 1
        self.max_paths = kwargs.get("max_paths", 2)
164
165 1
        self.lock = Lock()
166
167 1
        self.archived = kwargs.get("archived", False)
168
169 1
        self.metadata = kwargs.get("metadata", {})
170
171 1
        self._mongo_controller = controllers.ELineController()
172
173 1
        if kwargs.get("active", False):
174 1
            self.activate()
175
        else:
176 1
            self.deactivate()
177
178 1
        if kwargs.get("enabled", False):
179 1
            self.enable()
180
        else:
181 1
            self.disable()
182
183
        # datetime of user request for a EVC (or datetime when object was
184
        # created)
185 1
        self.request_time = kwargs.get("request_time", now())
186
        # dict with the user original request (input)
187 1
        self._requested = kwargs
188
189
        # Special cases: No tag, any, untagged
190 1
        self.special_cases = {None, "4096/4096", 0}
191 1
        self.table_group = kwargs.get("table_group")
192
193 1
    def sync(self, keys: set = None):
194
        """Sync this EVC in the MongoDB."""
195 1
        self.updated_at = now()
196 1
        if keys:
197 1
            self._mongo_controller.update_evc(self.as_dict(keys))
198 1
            return
199 1
        self._mongo_controller.upsert_evc(self.as_dict())
200
201 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
202
        """Obtain both UNIs (uni_a, uni_z).
203
        If a UNI is changing, verify tags"""
204 1
        uni_a = kwargs.get("uni_a", None)
205 1
        uni_a_flag = False
206 1
        if uni_a and uni_a != self.uni_a:
207 1
            uni_a_flag = True
208 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
209
210 1
        uni_z = kwargs.get("uni_z", None)
211 1
        if uni_z and uni_z != self.uni_z:
212 1
            try:
213 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
214 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
215 1
            except KytosTagError as err:
216 1
                if uni_a_flag:
217 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
218 1
                raise err
219
        else:
220 1
            uni_z = self.uni_z
221
222 1
        if uni_a_flag:
223 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
224
        else:
225 1
            uni_a = self.uni_a
226 1
        return uni_a, uni_z
227
228 1
    def update(self, **kwargs):
229
        """Update evc attributes.
230
231
        This method will raises an error trying to change the following
232
        attributes: [creation_time, active, current_path, failover_path,
233
        _id, archived]
234
        [name, uni_a and uni_z]
235
236
        Returns:
237
            the values for enable and a redeploy attribute, if exists and None
238
            otherwise
239
        Raises:
240
            ValueError: message with error detail.
241
242
        """
243 1
        enable, redeploy = (None, None)
244 1
        if not self._tag_lists_equal(**kwargs):
245 1
            raise ValueError(
246
                "UNI_A and UNI_Z tag lists should be the same."
247
            )
248 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
249 1
        check_disabled_component(uni_a, uni_z)
250 1
        self._validate_has_primary_or_dynamic(
251
            primary_path=kwargs.get("primary_path"),
252
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
253
            uni_a=uni_a,
254
            uni_z=uni_z,
255
        )
256 1
        for attribute, value in kwargs.items():
257 1
            if attribute not in self.updatable_attributes:
258 1
                raise ValueError(f"{attribute} can't be updated.")
259 1
            if attribute in ("primary_path", "backup_path"):
260 1
                try:
261 1
                    value.is_valid(
262
                        uni_a.interface.switch, uni_z.interface.switch
263
                    )
264 1
                except InvalidPath as exception:
265 1
                    raise ValueError(  # pylint: disable=raise-missing-from
266
                        f"{attribute} is not a " f"valid path: {exception}"
267
                    )
268 1
        for attribute, value in kwargs.items():
269 1
            if attribute == "enabled":
270 1
                if value:
271 1
                    self.enable()
272
                else:
273 1
                    self.disable()
274 1
                enable = value
275
            else:
276 1
                setattr(self, attribute, value)
277 1
                if attribute in self.attributes_requiring_redeploy:
278 1
                    redeploy = True
279 1
        self.sync(set(kwargs.keys()))
280 1
        return enable, redeploy
281
282 1
    def set_flow_removed_at(self):
283
        """Update flow_removed_at attribute."""
284
        self.flow_removed_at = now()
285
286 1
    def has_recent_removed_flow(self, setting=settings):
287
        """Check if any flow has been removed from the evc"""
288
        if self.flow_removed_at is None:
289
            return False
290
        res_seconds = (now() - self.flow_removed_at).seconds
291
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
292
293 1
    def is_recent_updated(self, setting=settings):
294
        """Check if the evc has been updated recently"""
295
        res_seconds = (now() - self.updated_at).seconds
296
        return res_seconds < setting.TIME_RECENT_UPDATED
297
298 1
    def __repr__(self):
299
        """Repr method."""
300 1
        return f"EVC({self._id}, {self.name})"
301
302 1
    def _validate(self, **kwargs):
303
        """Do Basic validations.
304
305
        Verify required attributes: name, uni_a, uni_z
306
307
        Raises:
308
            ValueError: message with error detail.
309
310
        """
311 1
        for attribute in self.required_attributes:
312
313 1
            if attribute not in kwargs:
314 1
                raise ValueError(f"{attribute} is required.")
315
316 1
            if "uni" in attribute:
317 1
                uni = kwargs.get(attribute)
318 1
                if not isinstance(uni, UNI):
319
                    raise ValueError(f"{attribute} is an invalid UNI.")
320
321 1
    def _tag_lists_equal(self, **kwargs):
322
        """Verify that tag lists are the same."""
323 1
        uni_a = kwargs.get("uni_a") or self.uni_a
324 1
        uni_z = kwargs.get("uni_z") or self.uni_z
325 1
        uni_a_list = uni_z_list = False
326 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
327 1
            uni_a_list = True
328 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
329 1
            uni_z_list = True
330 1
        if uni_a_list and uni_z_list:
331 1
            return uni_a.user_tag.value == uni_z.user_tag.value
332 1
        return uni_a_list == uni_z_list
333
334 1
    def _validate_has_primary_or_dynamic(
335
        self,
336
        primary_path=None,
337
        dynamic_backup_path=None,
338
        uni_a=None,
339
        uni_z=None,
340
    ) -> None:
341
        """Validate that it must have a primary path or allow dynamic paths."""
342 1
        primary_path = (
343
            primary_path
344
            if primary_path is not None
345
            else self.primary_path
346
        )
347 1
        dynamic_backup_path = (
348
            dynamic_backup_path
349
            if dynamic_backup_path is not None
350
            else self.dynamic_backup_path
351
        )
352 1
        uni_a = uni_a if uni_a is not None else self.uni_a
353 1
        uni_z = uni_z if uni_z is not None else self.uni_z
354 1
        if (
355
            not primary_path
356
            and not dynamic_backup_path
357
            and uni_a and uni_z
358
            and uni_a.interface.switch != uni_z.interface.switch
359
        ):
360 1
            msg = "The EVC must have a primary path or allow dynamic paths."
361 1
            raise ValueError(msg)
362
363 1
    def __eq__(self, other):
364
        """Override the default implementation."""
365 1
        if not isinstance(other, EVC):
366
            return False
367
368 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
369 1
        for attribute in attrs_to_compare:
370 1
            if getattr(other, attribute) != getattr(self, attribute):
371 1
                return False
372 1
        return True
373
374 1
    def is_intra_switch(self):
375
        """Check if the UNIs are in the same switch."""
376 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
377
378 1
    def check_no_tag_duplicate(self, other_uni: UNI):
379
        """Check if a no tag UNI is duplicated."""
380 1
        if other_uni in (self.uni_a, self.uni_z):
381 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
382
                  f" duplicated with {self}."
383 1
            raise DuplicatedNoTagUNI(msg)
384
385 1
    def as_dict(self, keys: set = None):
386
        """Return a dictionary representing an EVC object.
387
            keys: Only fields on this variable will be
388
                  returned in the dictionary"""
389 1
        evc_dict = {
390
            "id": self.id,
391
            "name": self.name,
392
            "uni_a": self.uni_a.as_dict(),
393
            "uni_z": self.uni_z.as_dict(),
394
        }
395
396 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
397
398 1
        evc_dict["start_date"] = self.start_date
399 1
        if isinstance(self.start_date, datetime):
400 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
401
402 1
        evc_dict["end_date"] = self.end_date
403 1
        if isinstance(self.end_date, datetime):
404 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
405
406 1
        evc_dict["queue_id"] = self.queue_id
407 1
        evc_dict["bandwidth"] = self.bandwidth
408 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
409 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
410 1
        evc_dict["current_path"] = self.current_path.as_dict()
411 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
412 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
413 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
414 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
415 1
        evc_dict["metadata"] = self.metadata
416
417 1
        evc_dict["request_time"] = self.request_time
418 1
        if isinstance(self.request_time, datetime):
419 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
420
421 1
        time = self.creation_time.strftime(time_fmt)
422 1
        evc_dict["creation_time"] = time
423
424 1
        evc_dict["owner"] = self.owner
425 1
        evc_dict["circuit_scheduler"] = [
426
            sc.as_dict() for sc in self.circuit_scheduler
427
        ]
428
429 1
        evc_dict["active"] = self.is_active()
430 1
        evc_dict["enabled"] = self.is_enabled()
431 1
        evc_dict["archived"] = self.archived
432 1
        evc_dict["sb_priority"] = self.sb_priority
433 1
        evc_dict["service_level"] = self.service_level
434 1
        evc_dict["primary_constraints"] = self.primary_constraints
435 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
436 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
437 1
        evc_dict["updated_at"] = self.updated_at
438 1
        evc_dict["max_paths"] = self.max_paths
439
440 1
        if keys:
441 1
            selected = {}
442 1
            for key in keys:
443 1
                selected[key] = evc_dict[key]
444 1
            selected["id"] = evc_dict["id"]
445 1
            return selected
446 1
        return evc_dict
447
448 1
    @property
449 1
    def id(self):  # pylint: disable=invalid-name
450
        """Return this EVC's ID."""
451 1
        return self._id
452
453 1
    def archive(self):
454
        """Archive this EVC on deletion."""
455 1
        self.archived = True
456
457 1
    def _use_uni_vlan(
458
        self,
459
        uni: UNI,
460
        uni_dif: Union[None, UNI] = None
461
    ):
462
        """Use tags from UNI"""
463 1
        if uni.user_tag is None:
464 1
            return
465 1
        tag = uni.user_tag.value
466 1
        tag_type = uni.user_tag.tag_type
467 1
        if (uni_dif and isinstance(tag, list) and
468
                isinstance(uni_dif.user_tag.value, list)):
469 1
            tag = range_difference(tag, uni_dif.user_tag.value)
470 1
            if not tag:
471 1
                return
472 1
        uni.interface.use_tags(
473
            self._controller, tag, tag_type, use_lock=True, check_order=False
474
        )
475
476 1
    def make_uni_vlan_available(
477
        self,
478
        uni: UNI,
479
        uni_dif: Union[None, UNI] = None,
480
    ):
481
        """Make available tag from UNI"""
482 1
        if uni.user_tag is None:
483 1
            return
484 1
        tag = uni.user_tag.value
485 1
        tag_type = uni.user_tag.tag_type
486 1
        if (uni_dif and isinstance(tag, list) and
487
                isinstance(uni_dif.user_tag.value, list)):
488 1
            tag = range_difference(tag, uni_dif.user_tag.value)
489 1
            if not tag:
490
                return
491 1
        try:
492 1
            conflict = uni.interface.make_tags_available(
493
                self._controller, tag, tag_type, use_lock=True,
494
                check_order=False
495
            )
496 1
        except KytosTagError as err:
497 1
            log.error(f"Error in {self}: {err}")
498 1
            return
499 1
        if conflict:
500 1
            intf = uni.interface.id
501 1
            log.warning(f"Tags {conflict} was already available in {intf}")
502
503 1
    def remove_uni_tags(self):
504
        """Remove both UNI usage of a tag"""
505 1
        self.make_uni_vlan_available(self.uni_a)
506 1
        self.make_uni_vlan_available(self.uni_z)
507
508
509
# pylint: disable=fixme, too-many-public-methods
510 1
class EVCDeploy(EVCBase):
511
    """Class to handle the deploy procedures."""
512
513 1
    def create(self):
514
        """Create a EVC."""
515
516 1
    def discover_new_paths(self):
517
        """Discover new paths to satisfy this circuit and deploy it."""
518
        return DynamicPathManager.get_best_paths(self, self.max_paths,
519
                                                 **self.primary_constraints)
520
521 1
    def get_failover_path_candidates(self):
522
        """Get failover paths to satisfy this EVC."""
523
        # in the future we can return primary/backup paths as well
524
        # we just have to properly handle link_up and failover paths
525
        # if (
526
        #     self.is_using_primary_path() and
527
        #     self.backup_path.status is EntityStatus.UP
528
        # ):
529
        #     yield self.backup_path
530 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
531
532 1
    def change_path(self):
533
        """Change EVC path."""
534
535 1
    def reprovision(self):
536
        """Force the EVC (re-)provisioning."""
537
538 1
    def is_affected_by_link(self, link):
539
        """Return True if this EVC has the given link on its current path."""
540 1
        return link in self.current_path
541
542 1
    def link_affected_by_interface(self, interface):
543
        """Return True if this EVC has the given link on its current path."""
544
        return self.current_path.link_affected_by_interface(interface)
545
546 1
    def is_backup_path_affected_by_link(self, link):
547
        """Return True if the backup path of this EVC uses the given link."""
548 1
        return link in self.backup_path
549
550
    # pylint: disable=invalid-name
551 1
    def is_primary_path_affected_by_link(self, link):
552
        """Return True if the primary path of this EVC uses the given link."""
553 1
        return link in self.primary_path
554
555 1
    def is_failover_path_affected_by_link(self, link):
556
        """Return True if this EVC has the given link on its failover path."""
557 1
        return link in self.failover_path
558
559 1
    def is_eligible_for_failover_path(self):
560
        """Verify if this EVC is eligible for failover path (EP029)"""
561
        # In the future this function can be augmented to consider
562
        # primary/backup, primary/dynamic, and other path combinations
563 1
        return (
564
            self.dynamic_backup_path and
565
            not self.primary_path and not self.backup_path
566
        )
567
568 1
    def is_using_primary_path(self):
569
        """Verify if the current deployed path is self.primary_path."""
570 1
        return self.primary_path and (self.current_path == self.primary_path)
571
572 1
    def is_using_backup_path(self):
573
        """Verify if the current deployed path is self.backup_path."""
574 1
        return self.backup_path and (self.current_path == self.backup_path)
575
576 1
    def is_using_dynamic_path(self):
577
        """Verify if the current deployed path is a dynamic path."""
578 1
        if (
579
            self.current_path
580
            and not self.is_using_primary_path()
581
            and not self.is_using_backup_path()
582
            and self.current_path.status == EntityStatus.UP
583
        ):
584
            return True
585 1
        return False
586
587 1
    def deploy_to_backup_path(self, old_path_dict: dict = None):
588
        """Deploy the backup path into the datapaths of this circuit.
589
590
        If the backup_path attribute is valid and up, this method will try to
591
        deploy this backup_path.
592
593
        If everything fails and dynamic_backup_path is True, then tries to
594
        deploy a dynamic path.
595
        """
596
        # TODO: Remove flows from current (cookies)
597 1
        if self.is_using_backup_path():
598
            # TODO: Log to say that cannot move backup to backup
599
            return True
600
601 1
        success = False
602 1
        if self.backup_path.status is EntityStatus.UP:
603 1
            success = self.deploy_to_path(self.backup_path, old_path_dict)
604
605 1
        if success:
606 1
            return True
607
608 1
        if self.dynamic_backup_path or self.is_intra_switch():
609 1
            return self.deploy_to_path(old_path_dict=old_path_dict)
610
611
        return False
612
613 1
    def deploy_to_primary_path(self, old_path_dict: dict = None):
614
        """Deploy the primary path into the datapaths of this circuit.
615
616
        If the primary_path attribute is valid and up, this method will try to
617
        deploy this primary_path.
618
        """
619
        # TODO: Remove flows from current (cookies)
620 1
        if self.is_using_primary_path():
621
            # TODO: Log to say that cannot move primary to primary
622
            return True
623
624 1
        if self.primary_path.status is EntityStatus.UP:
625 1
            return self.deploy_to_path(self.primary_path, old_path_dict)
626
        return False
627
628 1
    def deploy(self, old_path_dict: dict = None):
629
        """Deploy EVC to best path.
630
631
        Best path can be the primary path, if available. If not, the backup
632
        path, and, if it is also not available, a dynamic path.
633
        """
634 1
        if self.archived:
635 1
            return False
636 1
        self.enable()
637 1
        success = self.deploy_to_primary_path(old_path_dict)
638 1
        if not success:
639 1
            success = self.deploy_to_backup_path(old_path_dict)
640
641 1
        if success:
642 1
            emit_event(self._controller, "deployed",
643
                       content=map_evc_event_content(self))
644 1
        return success
645
646 1
    @staticmethod
647 1
    def get_path_status(path):
648
        """Check for the current status of a path.
649
650
        If any link in this path is down, the path is considered down.
651
        """
652 1
        if not path:
653 1
            return EntityStatus.DISABLED
654
655 1
        for link in path:
656 1
            if link.status is not EntityStatus.UP:
657 1
                return link.status
658 1
        return EntityStatus.UP
659
660
    #    def discover_new_path(self):
661
    #        # TODO: discover a new path to satisfy this circuit and deploy
662
663 1
    def remove(self):
664
        """Remove EVC path and disable it."""
665 1
        self.remove_current_flows(sync=False)
666 1
        self.remove_failover_flows(sync=False)
667 1
        self.disable()
668 1
        self.sync()
669 1
        emit_event(self._controller, "undeployed",
670
                   content=map_evc_event_content(self))
671
672 1
    def remove_failover_flows(self, exclude_uni_switches=True,
673
                              force=True, sync=True) -> None:
674
        """Remove failover_flows.
675
676
        By default, it'll exclude UNI switches, if mef_eline has already
677
        called remove_current_flows before then this minimizes the number
678
        of FlowMods and IO.
679
        """
680 1
        if not self.failover_path:
681 1
            return
682 1
        switches, cookie, excluded = set(), self.get_cookie(), set()
683 1
        if exclude_uni_switches:
684 1
            excluded.add(self.uni_a.interface.switch.id)
685 1
            excluded.add(self.uni_z.interface.switch.id)
686 1
        for link in self.failover_path:
687 1
            if link.endpoint_a.switch.id not in excluded:
688 1
                switches.add(link.endpoint_a.switch.id)
689 1
            if link.endpoint_b.switch.id not in excluded:
690 1
                switches.add(link.endpoint_b.switch.id)
691 1
        flow_mods = {
692
            "switches": list(switches),
693
            "flows": [{
694
                "cookie": cookie,
695
                "cookie_mask": int(0xffffffffffffffff),
696
                "owner": "mef_eline",
697
            }]
698
        }
699 1
        try:
700 1
            self._send_flow_mods(
701
                flow_mods,
702
                "delete",
703
                force=force,
704
            )
705
        except FlowModException as err:
706
            log.error(f"Error deleting {self} failover_path flows, {err}")
707 1
        try:
708 1
            self.failover_path.make_vlans_available(self._controller)
709
        except KytosTagError as err:
710
            log.error(f"Error removing {self} failover_path: {err}")
711 1
        self.failover_path = Path([])
712 1
        if sync:
713 1
            self.sync()
714
715 1
    def remove_current_flows(
716
        self,
717
        current_path=None,
718
        force=True,
719
        sync=True,
720
        return_path=False
721
    ) -> dict[str, int]:
722
        """Remove all flows from current path or path intended for
723
         current path if exists."""
724 1
        switches, old_path_dict = set(), {}
725 1
        current_path = self.current_path if not current_path else current_path
726 1
        if not current_path and not self.is_intra_switch():
727 1
            return {}
728
729 1
        if return_path:
730 1
            for link in self.current_path:
731 1
                s_vlan = link.metadata.get("s_vlan")
732 1
                if s_vlan:
733 1
                    old_path_dict[link.id] = s_vlan.value
734
735 1
        for link in current_path:
736 1
            switches.add(link.endpoint_a.switch.id)
737 1
            switches.add(link.endpoint_b.switch.id)
738 1
        switches.add(self.uni_a.interface.switch.id)
739 1
        switches.add(self.uni_z.interface.switch.id)
740 1
        flow_mods = {
741
            "switches": list(switches),
742
            "flows": [{
743
                "cookie": self.get_cookie(),
744
                "cookie_mask": int(0xffffffffffffffff),
745
                "owner": "mef_eline",
746
            }]
747
        }
748
749 1
        try:
750 1
            self._send_flow_mods(flow_mods, "delete", force=force)
751 1
        except FlowModException as err:
752 1
            log.error(f"Error deleting {self} current_path flows, {err}")
753
754 1
        try:
755 1
            current_path.make_vlans_available(self._controller)
756 1
        except KytosTagError as err:
757 1
            log.error(f"Error removing {self} current_path: {err}")
758 1
        self.current_path = Path([])
759 1
        self.deactivate()
760 1
        if sync:
761 1
            self.sync()
762 1
        return old_path_dict
763
764 1
    def remove_path_flows(
765
        self, path=None, force=True
766
    ) -> dict[str, list[dict]]:
767
        """Remove all flows from path, and return the removed flows."""
768 1
        dpid_flows_match: dict[str, dict] = defaultdict(lambda: {"flows": []})
769 1
        out_flows: dict[str, list[dict]] = defaultdict(list)
770
771 1
        if not path:
772 1
            return dpid_flows_match
773
774 1
        try:
775 1
            nni_flows = self._prepare_nni_flows(path)
776
        # pylint: disable=broad-except
777
        except Exception:
778
            err = traceback.format_exc()
779
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
780
            nni_flows = {}
781
782 1
        for dpid, flows in nni_flows.items():
783 1
            for flow in flows:
784 1
                flow_mod = {
785
                    "cookie": flow["cookie"],
786
                    "match": flow["match"],
787
                    "owner": "mef_eline",
788
                    "cookie_mask": int(0xffffffffffffffff)
789
                }
790 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
791 1
                out_flows[dpid].append(flow_mod)
792
793 1
        try:
794 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
795
        # pylint: disable=broad-except
796
        except Exception:
797
            err = traceback.format_exc()
798
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
799
            uni_flows = {}
800
801 1
        for dpid, flows in uni_flows.items():
802 1
            for flow in flows:
803 1
                flow_mod = {
804
                    "cookie": flow["cookie"],
805
                    "match": flow["match"],
806
                    "owner": "mef_eline",
807
                    "cookie_mask": int(0xffffffffffffffff)
808
                }
809 1
                dpid_flows_match[dpid]["flows"].append(flow_mod)
810 1
                out_flows[dpid].append(flow_mod)
811
812 1
        try:
813 1
            self._send_flow_mods(
814
                dpid_flows_match, 'delete', force=force, by_switch=True
815
            )
816 1
        except FlowModException as err:
817 1
            log.error(
818
                f"Error deleting {self} path flows, path:{path}, error={err}"
819
            )
820
821 1
        try:
822 1
            path.make_vlans_available(self._controller)
823
        except KytosTagError as err:
824
            log.error(f"Error removing {self} path: {err}")
825
826 1
        return out_flows
827
828 1
    @staticmethod
829 1
    def links_zipped(path=None):
830
        """Return an iterator which yields pairs of links in order."""
831 1
        if not path:
832 1
            return []
833 1
        return zip(path[:-1], path[1:])
834
835 1
    def should_deploy(self, path=None):
836
        """Verify if the circuit should be deployed."""
837 1
        if not path:
838 1
            log.debug("Path is empty.")
839 1
            return False
840
841 1
        if not self.is_enabled():
842 1
            log.debug(f"{self} is disabled.")
843 1
            return False
844
845 1
        if not self.is_active():
846 1
            log.debug(f"{self} will be deployed.")
847 1
            return True
848
849 1
        return False
850
851 1
    @staticmethod
852 1
    def is_uni_interface_active(
853
        *interfaces: Interface
854
    ) -> tuple[bool, dict]:
855
        """Whether UNIs are active and their status & status_reason."""
856 1
        active = True
857 1
        bad_interfaces = [
858
            interface
859
            for interface in interfaces
860
            if interface.status != EntityStatus.UP
861
        ]
862 1
        if bad_interfaces:
863 1
            active = False
864 1
            interfaces = bad_interfaces
865 1
        return active, {
866
            interface.id: {
867
                'status': interface.status.value,
868
                'status_reason': interface.status_reason,
869
            }
870
            for interface in interfaces
871
        }
872
873 1
    def try_to_activate(self) -> bool:
874
        """Try to activate the EVC."""
875 1
        if self.is_intra_switch():
876 1
            return self._try_to_activate_intra_evc()
877 1
        return self._try_to_activate_inter_evc()
878
879 1
    def _try_to_activate_intra_evc(self) -> bool:
880
        """Try to activate intra EVC."""
881 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
882 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
883 1
        if not is_active:
884 1
            raise ActivationError(
885
                f"Won't be able to activate {self} due to UNIs: {reason}"
886
            )
887 1
        self.activate()
888 1
        return True
889
890 1
    def _try_to_activate_inter_evc(self) -> bool:
891
        """Try to activate inter EVC."""
892 1
        intf_a, intf_z = self.uni_a.interface, self.uni_z.interface
893 1
        is_active, reason = self.is_uni_interface_active(intf_a, intf_z)
894 1
        if not is_active:
895 1
            raise ActivationError(
896
                f"Won't be able to activate {self} due to UNIs: {reason}"
897
            )
898 1
        if self.current_path.status != EntityStatus.UP:
899 1
            raise ActivationError(
900
                f"Won't be able to activate {self} due to current_path "
901
                f"status {self.current_path.status}"
902
            )
903 1
        self.activate()
904 1
        return True
905
906
    # pylint: disable=too-many-branches, too-many-statements
907 1
    def deploy_to_path(self, path=None, old_path_dict: dict = None):
908
        """Install the flows for this circuit.
909
910
        Procedures to deploy:
911
912
        0. Remove current flows installed
913
        1. Decide if will deploy "path" or discover a new path
914
        2. Choose vlan
915
        3. Install NNI flows
916
        4. Install UNI flows
917
        5. Activate
918
        6. Update current_path
919
        7. Update links caches(primary, current, backup)
920
921
        """
922 1
        self.remove_current_flows(sync=False)
923 1
        use_path = path or Path([])
924 1
        if not old_path_dict:
925 1
            old_path_dict = {}
926 1
        tag_errors = []
927 1
        no_valid_path = False
928 1
        if self.should_deploy(use_path):
929 1
            try:
930 1
                use_path.choose_vlans(self._controller, old_path_dict)
931 1
            except KytosNoTagAvailableError as e:
932 1
                tag_errors.append(str(e))
933 1
                use_path = None
934
        else:
935 1
            for use_path in self.discover_new_paths():
936 1
                if use_path is None:
937 1
                    no_valid_path = True
938 1
                    continue
939 1
                try:
940 1
                    use_path.choose_vlans(self._controller, old_path_dict)
941 1
                    break
942 1
                except KytosNoTagAvailableError as e:
943 1
                    tag_errors.append(str(e))
944
            else:
945 1
                use_path = None
946
947 1
        try:
948 1
            if use_path:
949 1
                self._install_flows(use_path)
950 1
            elif self.is_intra_switch():
951 1
                use_path = Path()
952 1
                self._install_direct_uni_flows()
953
            else:
954 1
                no_path_msg = "No available path was found."
955 1
                if no_valid_path:
956 1
                    no_path_msg = "No valid path was found, "\
957
                                  "try increasing `max_paths`"\
958
                                 f" from {self.max_paths}."
959 1
                msg = f"{self} was not deployed. {no_path_msg}"
960 1
                if tag_errors:
961 1
                    msg = self.add_tag_errors(msg, tag_errors)
962 1
                    log.error(msg)
963
                else:
964 1
                    log.warning(msg)
965 1
                return False
966 1
        except EVCPathNotInstalled as err:
967 1
            log.error(
968
                f"Error deploying EVC {self} when calling flow_manager: {err}"
969
            )
970 1
            self.remove_current_flows(use_path, sync=True)
971 1
            return False
972
973 1
        self.current_path = use_path
974 1
        msg = f"{self} was deployed."
975 1
        try:
976 1
            self.try_to_activate()
977
        except ActivationError as exc:
978
            msg = f"{msg} {str(exc)}"
979 1
        self.sync()
980 1
        log.info(msg)
981 1
        return True
982
983 1
    def try_setup_failover_path(
984
        self,
985
        wait=settings.DEPLOY_EVCS_INTERVAL,
986
        warn_if_not_path=True
987
    ):
988
        """Try setup failover_path whenever possible."""
989 1
        if (
990
                self.failover_path or not self.current_path
991
                or not self.is_active()
992
                ):
993 1
            return
994 1
        if (now() - self.affected_by_link_at).seconds >= wait:
995 1
            with self.lock:
996 1
                self.setup_failover_path(warn_if_not_path)
997
998
    # pylint: disable=too-many-statements
999 1
    def setup_failover_path(self, warn_if_not_path=True):
1000
        """Install flows for the failover path of this EVC.
1001
1002
        Procedures to deploy:
1003
1004
        0. Remove flows currently installed for failover_path (if any)
1005
        1. Discover a disjoint path from current_path
1006
        2. Choose vlans
1007
        3. Install NNI flows
1008
        4. Install UNI egress flows
1009
        5. Update failover_path
1010
        """
1011
        # Intra-switch EVCs have no failover_path
1012 1
        if self.is_intra_switch():
1013 1
            return False
1014
1015
        # For not only setup failover path for totally dynamic EVCs
1016 1
        if not self.is_eligible_for_failover_path():
1017 1
            return False
1018
1019 1
        out_new_flows: dict[str, list[dict]] = {}
1020 1
        reason = ""
1021 1
        tag_errors = []
1022 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
1023 1
        self.failover_path = Path([])
1024
1025 1
        for use_path in self.get_failover_path_candidates():
1026 1
            if not use_path:
1027 1
                continue
1028 1
            try:
1029 1
                use_path.choose_vlans(self._controller)
1030 1
                break
1031 1
            except KytosNoTagAvailableError as e:
1032 1
                tag_errors.append(str(e))
1033
        else:
1034 1
            use_path = Path([])
1035 1
            reason = "No available path was found"
1036
1037 1
        try:
1038 1
            if use_path:
1039 1
                out_new_flows = self._install_flows(
1040
                    use_path, skip_in=True
1041
                )
1042 1
        except EVCPathNotInstalled as err:
1043 1
            reason = "Error deploying failover path"
1044 1
            log.error(
1045
                f"{reason} for {self}. FlowManager error: {err}"
1046
            )
1047 1
            _rmed_flows = self.remove_path_flows(use_path)
1048 1
            out_removed_flows = merge_flow_dicts(
1049
                out_removed_flows, _rmed_flows
1050
            )
1051 1
            use_path = Path([])
1052
1053 1
        self.failover_path = use_path
1054 1
        self.sync()
1055
1056 1
        if out_new_flows or out_removed_flows:
1057 1
            emit_event(self._controller, "failover_deployed", content={
1058
                self.id: map_evc_event_content(
1059
                    self,
1060
                    flows=deepcopy(out_new_flows),
1061
                    removed_flows=deepcopy(out_removed_flows),
1062
                    error_reason=reason,
1063
                    current_path=self.current_path.as_dict(),
1064
                )
1065
            })
1066
1067 1
        if not use_path:
1068 1
            msg = f"Failover path for {self} was not deployed: {reason}."
1069 1
            if tag_errors:
1070 1
                msg = self.add_tag_errors(msg, tag_errors)
1071 1
                log.error(msg)
1072 1
            elif warn_if_not_path:
1073 1
                log.warning(msg)
1074 1
            return False
1075 1
        log.info(f"Failover path for {self} was deployed.")
1076 1
        return True
1077
1078 1
    @staticmethod
1079 1
    def add_tag_errors(msg: str, tag_errors: list):
1080
        """Add to msg the tag errors ecountered when chossing path."""
1081 1
        path = ['path', 'paths']
1082 1
        was = ['was', 'were']
1083 1
        message = ['message', 'messages']
1084
1085
        # Choose either singular(0) or plural(1) words
1086 1
        n = 1
1087 1
        if len(tag_errors) == 1:
1088 1
            n = 0
1089
1090 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1091 1
        msg += f" with {message[n]}: {tag_errors}"
1092 1
        return msg
1093
1094 1
    def get_failover_flows(self):
1095
        """Return the flows needed to make the failover path active, i.e. the
1096
        flows for ingress forwarding.
1097
1098
        Return:
1099
            dict: A dict of flows indexed by the switch_id will be returned, or
1100
                an empty dict if no failover_path is available.
1101
        """
1102 1
        if not self.failover_path:
1103 1
            return {}
1104 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1105
1106
    # pylint: disable=too-many-branches
1107 1
    def _prepare_direct_uni_flows(self):
1108
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1109 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1110 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1111
1112 1
        flow_mod_az = self._prepare_flow_mod(
1113
            self.uni_a.interface, self.uni_z.interface,
1114
            self.queue_id, vlan_a
1115
        )
1116 1
        flow_mod_za = self._prepare_flow_mod(
1117
            self.uni_z.interface, self.uni_a.interface,
1118
            self.queue_id, vlan_z
1119
        )
1120
1121 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1122 1
            flow_mod_az["actions"].insert(
1123
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1124
            )
1125 1
            if not vlan_a:
1126 1
                flow_mod_az["actions"].insert(
1127
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1128
                )
1129 1
            if vlan_a == 0:
1130 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1131 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1132 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1133
1134 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1135 1
            flow_mod_za["actions"].insert(
1136
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1137
                )
1138 1
            if not vlan_z:
1139 1
                flow_mod_za["actions"].insert(
1140
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1141
                )
1142 1
            if vlan_z == 0:
1143 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1144 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1145 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1146
1147 1
        flows = []
1148 1
        if isinstance(vlan_a, list):
1149 1
            for mask_a in vlan_a:
1150 1
                flow_aux = deepcopy(flow_mod_az)
1151 1
                flow_aux["match"]["dl_vlan"] = mask_a
1152 1
                flows.append(flow_aux)
1153
        else:
1154 1
            if vlan_a is not None:
1155 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1156 1
            flows.append(flow_mod_az)
1157
1158 1
        if isinstance(vlan_z, list):
1159 1
            for mask_z in vlan_z:
1160 1
                flow_aux = deepcopy(flow_mod_za)
1161 1
                flow_aux["match"]["dl_vlan"] = mask_z
1162 1
                flows.append(flow_aux)
1163
        else:
1164 1
            if vlan_z is not None:
1165 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1166 1
            flows.append(flow_mod_za)
1167 1
        return (
1168
            self.uni_a.interface.switch.id, flows
1169
        )
1170
1171 1
    def _install_direct_uni_flows(self):
1172
        """Install flows connecting two UNIs.
1173
1174
        This case happens when the circuit is between UNIs in the
1175
        same switch.
1176
        """
1177 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1178 1
        flow_mods = {"switches": [dpid], "flows": flows}
1179 1
        try:
1180 1
            self._send_flow_mods(flow_mods, "install")
1181 1
        except FlowModException as err:
1182 1
            raise EVCPathNotInstalled(str(err)) from err
1183
1184 1
    def _prepare_nni_flows(self, path=None):
1185
        """Prepare NNI flows."""
1186 1
        nni_flows = OrderedDict()
1187 1
        previous = self.uni_a.interface.switch.dpid
1188 1
        for incoming, outcoming in self.links_zipped(path):
1189 1
            in_vlan = incoming.get_metadata("s_vlan").value
1190 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1191 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1192 1
            out_endpoint = self.get_endpoint_by_id(
1193
                outcoming, in_endpoint.switch.id, eq
1194
            )
1195
1196 1
            flows = []
1197
            # Flow for one direction
1198 1
            flows.append(
1199
                self._prepare_nni_flow(
1200
                    in_endpoint,
1201
                    out_endpoint,
1202
                    in_vlan,
1203
                    out_vlan,
1204
                    queue_id=self.queue_id,
1205
                )
1206
            )
1207
1208
            # Flow for the other direction
1209 1
            flows.append(
1210
                self._prepare_nni_flow(
1211
                    out_endpoint,
1212
                    in_endpoint,
1213
                    out_vlan,
1214
                    in_vlan,
1215
                    queue_id=self.queue_id,
1216
                )
1217
            )
1218 1
            previous = in_endpoint.switch.id
1219 1
            nni_flows[in_endpoint.switch.id] = flows
1220 1
        return nni_flows
1221
1222 1
    def _install_flows(
1223
        self, path=None, skip_in=False, skip_out=False
1224
    ) -> dict[str, list[dict]]:
1225
        """Install uni and nni flows"""
1226 1
        flows_by_switch = defaultdict(lambda: {"flows": []})
1227 1
        new_flows = defaultdict(list)
1228 1
        for dpid, flows in self._prepare_nni_flows(path).items():
1229 1
            flows_by_switch[dpid]["flows"].extend(flows)
1230 1
            new_flows[dpid].extend(flows)
1231 1
        for dpid, flows in self._prepare_uni_flows(
1232
            path, skip_in, skip_out
1233
        ).items():
1234 1
            flows_by_switch[dpid]["flows"].extend(flows)
1235 1
            new_flows[dpid].extend(flows)
1236
1237 1
        try:
1238 1
            self._send_flow_mods(flows_by_switch, "install", by_switch=True)
1239 1
        except FlowModException as err:
1240 1
            raise EVCPathNotInstalled(str(err)) from err
1241
1242 1
        return new_flows
1243
1244 1
    @staticmethod
1245 1
    def _get_value_from_uni_tag(uni: UNI):
1246
        """Returns the value from tag. In case of any and untagged
1247
        it should return 4096/4096 and 0 respectively"""
1248 1
        special = {"any": "4096/4096", "untagged": 0}
1249 1
        if uni.user_tag:
1250 1
            value = uni.user_tag.value
1251 1
            if isinstance(value, list):
1252 1
                return uni.user_tag.mask_list
1253 1
            return special.get(value, value)
1254 1
        return None
1255
1256
    # pylint: disable=too-many-locals
1257 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1258
        """Prepare flows to install UNIs."""
1259 1
        uni_flows = {}
1260 1
        if not path:
1261
            log.info("install uni flows without path.")
1262
            return uni_flows
1263
1264
        # Determine VLANs
1265 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1266 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1267
1268 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1269 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1270
1271
        # Get endpoints from path
1272 1
        endpoint_a = self.get_endpoint_by_id(
1273
            path[0], self.uni_a.interface.switch.id, eq
1274
        )
1275 1
        endpoint_z = self.get_endpoint_by_id(
1276
            path[-1], self.uni_z.interface.switch.id, eq
1277
        )
1278
1279
        # Flows for the first UNI
1280 1
        flows_a = []
1281
1282
        # Flow for one direction, pushing the service tag
1283 1
        if not skip_in:
1284 1
            if isinstance(in_vlan_a, list):
1285 1
                for in_mask_a in in_vlan_a:
1286 1
                    push_flow = self._prepare_push_flow(
1287
                        self.uni_a.interface,
1288
                        endpoint_a,
1289
                        in_mask_a,
1290
                        out_vlan_a,
1291
                        in_vlan_z,
1292
                        queue_id=self.queue_id,
1293
                    )
1294 1
                    flows_a.append(push_flow)
1295
            else:
1296
                push_flow = self._prepare_push_flow(
1297
                    self.uni_a.interface,
1298
                    endpoint_a,
1299
                    in_vlan_a,
1300
                    out_vlan_a,
1301
                    in_vlan_z,
1302
                    queue_id=self.queue_id,
1303
                )
1304
                flows_a.append(push_flow)
1305
1306
        # Flow for the other direction, popping the service tag
1307 1
        if not skip_out:
1308 1
            pop_flow = self._prepare_pop_flow(
1309
                endpoint_a,
1310
                self.uni_a.interface,
1311
                out_vlan_a,
1312
                in_vlan_a,
1313
                in_vlan_z,
1314
                queue_id=self.queue_id,
1315
            )
1316 1
            flows_a.append(pop_flow)
1317
1318 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1319
1320
        # Flows for the second UNI
1321 1
        flows_z = []
1322
1323
        # Flow for one direction, pushing the service tag
1324 1
        if not skip_in:
1325 1
            if isinstance(in_vlan_z, list):
1326 1
                for in_mask_z in in_vlan_z:
1327 1
                    push_flow = self._prepare_push_flow(
1328
                        self.uni_z.interface,
1329
                        endpoint_z,
1330
                        in_mask_z,
1331
                        out_vlan_z,
1332
                        in_vlan_a,
1333
                        queue_id=self.queue_id,
1334
                    )
1335 1
                    flows_z.append(push_flow)
1336
            else:
1337
                push_flow = self._prepare_push_flow(
1338
                    self.uni_z.interface,
1339
                    endpoint_z,
1340
                    in_vlan_z,
1341
                    out_vlan_z,
1342
                    in_vlan_a,
1343
                    queue_id=self.queue_id,
1344
                )
1345
                flows_z.append(push_flow)
1346
1347
        # Flow for the other direction, popping the service tag
1348 1
        if not skip_out:
1349 1
            pop_flow = self._prepare_pop_flow(
1350
                endpoint_z,
1351
                self.uni_z.interface,
1352
                out_vlan_z,
1353
                in_vlan_z,
1354
                in_vlan_a,
1355
                queue_id=self.queue_id,
1356
            )
1357 1
            flows_z.append(pop_flow)
1358
1359 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1360
1361 1
        return uni_flows
1362
1363 1
    @staticmethod
1364 1
    @retry(
1365
        stop=stop_after_attempt(3),
1366
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1367
        retry=retry_if_exception_type(FlowModException),
1368
        before_sleep=before_sleep,
1369
        reraise=True,
1370
    )
1371 1
    def _send_flow_mods(
1372
        data_content: dict,
1373
        command="install",
1374
        force=False,
1375
        by_switch=False
1376
    ):
1377
        """Send a flow_mod list to a specific switch.
1378
1379
        Args:
1380
            dpid(str): The target of flows (i.e. Switch.id).
1381
            flow_mods(dict): Python dictionary with flow_mods.
1382
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1383
            force(bool): True to send via consistency check in case of errors.
1384
            by_switch(bool): True to send to 'flows_by_switch' request instead.
1385
        """
1386 1
        if by_switch:
1387 1
            endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
1388
        else:
1389 1
            endpoint = f"{settings.MANAGER_URL}/flows"
1390 1
            data_content["force"] = force
1391 1
        try:
1392 1
            if command == "install":
1393 1
                res = httpx.post(endpoint, json=data_content, timeout=30)
1394 1
            elif command == "delete":
1395 1
                res = httpx.request(
1396
                    "DELETE", endpoint, json=data_content, timeout=30
1397
                )
1398 1
        except httpx.RequestError as err:
1399 1
            raise FlowModException(str(err)) from err
1400 1
        if res.is_server_error or res.status_code >= 400:
1401 1
            raise FlowModException(res.text)
1402
1403 1
    def get_cookie(self):
1404
        """Return the cookie integer from evc id."""
1405 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1406
1407 1
    @staticmethod
1408 1
    def get_id_from_cookie(cookie):
1409
        """Return the evc id given a cookie value."""
1410 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1411 1
        return f"{evc_id:x}".zfill(14)
1412
1413 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1414
        """Set table_group and table_id"""
1415 1
        table_group = "epl" if vlan is None else "evpl"
1416 1
        flow_mod["table_group"] = table_group
1417 1
        flow_mod["table_id"] = self.table_group[table_group]
1418 1
        return flow_mod
1419
1420 1
    @staticmethod
1421 1
    def get_priority(vlan):
1422
        """Return priority value depending on vlan value"""
1423 1
        if isinstance(vlan, list):
1424 1
            return settings.EVPL_SB_PRIORITY
1425 1
        if vlan not in {None, "4096/4096", 0}:
1426 1
            return settings.EVPL_SB_PRIORITY
1427 1
        if vlan == 0:
1428 1
            return settings.UNTAGGED_SB_PRIORITY
1429 1
        if vlan == "4096/4096":
1430 1
            return settings.ANY_SB_PRIORITY
1431 1
        return settings.EPL_SB_PRIORITY
1432
1433 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1434
                          queue_id=None, vlan=True):
1435
        """Prepare a common flow mod."""
1436 1
        default_actions = [
1437
            {"action_type": "output", "port": out_interface.port_number}
1438
        ]
1439 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1440 1
        if queue_id is not None:
1441 1
            default_actions.insert(
1442
                0,
1443
                {"action_type": "set_queue", "queue_id": queue_id}
1444
            )
1445
1446 1
        flow_mod = {
1447
            "match": {"in_port": in_interface.port_number},
1448
            "cookie": self.get_cookie(),
1449
            "actions": default_actions,
1450
            "owner": "mef_eline",
1451
        }
1452
1453 1
        self.set_flow_table_group_id(flow_mod, vlan)
1454 1
        if self.sb_priority:
1455 1
            flow_mod["priority"] = self.sb_priority
1456
        else:
1457 1
            flow_mod["priority"] = self.get_priority(vlan)
1458 1
        return flow_mod
1459
1460 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1461
        """Create NNI flows."""
1462 1
        in_interface, out_interface, in_vlan, out_vlan = args
1463 1
        flow_mod = self._prepare_flow_mod(
1464
            in_interface, out_interface, queue_id
1465
        )
1466 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1467 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1468 1
        flow_mod["actions"].insert(0, new_action)
1469
1470 1
        return flow_mod
1471
1472 1
    def _prepare_push_flow(self, *args, queue_id=None):
1473
        """Prepare push flow.
1474
1475
        Arguments:
1476
            in_interface(Interface): Interface input.
1477
            out_interface(Interface): Interface output.
1478
            in_vlan(int,str,None): Vlan input.
1479
            out_vlan(int): Vlan output.
1480
            new_c_vlan(int,str,list,None): New client vlan.
1481
1482
        Return:
1483
            dict: An python dictionary representing a FlowMod
1484
1485
        """
1486
        # assign all arguments
1487 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1488 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1489 1
        flow_mod = self._prepare_flow_mod(
1490
            in_interface, out_interface, queue_id, vlan_pri
1491
        )
1492
        # the service tag must be always pushed
1493 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1494 1
        flow_mod["actions"].insert(0, new_action)
1495
1496 1
        if (
1497
            not (in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1498
                 isinstance(new_c_vlan, int))
1499
        ):
1500
            # Add service VLAN header when it does NOT fall into this
1501
            # statement: Both VLANs should be integer and different.
1502 1
            new_action = {"action_type": "push_vlan", "tag_type": "s"}
1503 1
            flow_mod["actions"].insert(0, new_action)
1504
1505 1
        if in_vlan is not None:
1506
            # if in_vlan is set, it must be included in the match
1507 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1508
1509 1
        if (
1510
            not isinstance(in_vlan, int) and isinstance(new_c_vlan, int) and
1511
            new_c_vlan != 0
1512
        ):
1513
            # new_in_vlan is an integer but zero, action to set is required
1514 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1515 1
            flow_mod["actions"].insert(0, new_action)
1516
1517 1
        if in_vlan == "4096/4096" and new_c_vlan == 0:
1518
            # if in_vlan match with any tags and new_c_vlan does not,
1519
            # a pop action is required
1520 1
            new_action = {"action_type": "pop_vlan"}
1521 1
            flow_mod["actions"].insert(0, new_action)
1522
1523 1
        elif (not in_vlan and
1524
                (not isinstance(new_c_vlan, list) and
1525
                 new_c_vlan not in self.special_cases)):
1526
            # new_in_vlan is an integer but zero and in_vlan is a no-tag or
1527
            # untagged
1528 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1529 1
            flow_mod["actions"].insert(0, new_action)
1530
1531 1
        return flow_mod
1532
1533 1
    def _prepare_pop_flow(
1534
        self,
1535
        in_interface: Interface,
1536
        out_interface: Interface,
1537
        out_vlan: int,
1538
        in_vlan: Union[int, str, list, None],
1539
        new_c_vlan: Union[int, str, list, None],
1540
        queue_id=None,
1541
    ):
1542
        # pylint: disable=too-many-arguments
1543
        """Prepare pop flow."""
1544 1
        flow_mod = self._prepare_flow_mod(
1545
            in_interface, out_interface, queue_id
1546
        )
1547 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1548 1
        if in_vlan == 0:
1549 1
            new_action = {"action_type": "pop_vlan"}
1550 1
            flow_mod["actions"].insert(0, new_action)
1551 1
        elif (
1552
            in_vlan != new_c_vlan and isinstance(in_vlan, int) and
1553
            isinstance(new_c_vlan, int)
1554
        ):
1555
            # If UNI VLANs are different and in_vlan is not 0
1556 1
            new_action = {"action_type": "set_vlan", "vlan_id": in_vlan}
1557 1
            flow_mod["actions"].insert(0, new_action)
1558
        else:
1559 1
            new_action = {"action_type": "pop_vlan"}
1560 1
            flow_mod["actions"].insert(0, new_action)
1561 1
        return flow_mod
1562
1563 1
    @staticmethod
1564 1
    def run_bulk_sdntraces(
1565
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1566
    ) -> dict:
1567
        """Run SDN traces on control plane starting from EVC UNIs."""
1568 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1569 1
        data = []
1570 1
        for interface, tag_value in uni_list:
1571 1
            data_uni = {
1572
                "trace": {
1573
                            "switch": {
1574
                                "dpid": interface.switch.dpid,
1575
                                "in_port": interface.port_number,
1576
                            }
1577
                        }
1578
                }
1579 1
            if tag_value:
1580 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1581 1
                if uni_dl_vlan:
1582 1
                    data_uni["trace"]["eth"] = {
1583
                                            "dl_type": 0x8100,
1584
                                            "dl_vlan": uni_dl_vlan,
1585
                                            }
1586 1
            data.append(data_uni)
1587 1
        try:
1588 1
            response = httpx.put(endpoint, json=data, timeout=30)
1589 1
        except httpx.TimeoutException as exception:
1590 1
            log.error(f"Request has timed out: {exception}")
1591 1
            return {"result": []}
1592 1
        if response.status_code >= 400:
1593 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1594 1
            return {"result": []}
1595 1
        return response.json()
1596
1597
    # pylint: disable=too-many-return-statements, too-many-arguments
1598 1
    @staticmethod
1599 1
    def check_trace(
1600
        evc_id: str,
1601
        evc_name: str,
1602
        tag_a: Union[None, int, str],
1603
        tag_z: Union[None, int, str],
1604
        interface_a: Interface,
1605
        interface_z: Interface,
1606
        current_path: list,
1607
        trace_a: list,
1608
        trace_z: list
1609
    ) -> bool:
1610
        """Auxiliar function to check an individual trace"""
1611 1
        if (
1612
            len(trace_a) != len(current_path) + 1
1613
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1614
        ):
1615 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1616
                        f"Invalid trace from uni_a: {trace_a}")
1617 1
            return False
1618 1
        if (
1619
            len(trace_z) != len(current_path) + 1
1620
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1621
        ):
1622 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1623
                        f"Invalid trace from uni_z: {trace_z}")
1624 1
            return False
1625
1626 1
        if not current_path:
1627
            return True
1628
1629 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1630 1
        if (
1631
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1632
        ):
1633 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1634 1
        elif (
1635
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1636
        ):
1637 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1638
        else:
1639
            msg = (
1640
                f"first link {first_link} endpoint_a didn't match the first "
1641
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1642
            )
1643
            log.warning(msg)
1644
            return False
1645
1646 1
        for link, trace1, trace2 in zip(current_path,
1647
                                        trace_path_begin[1:],
1648
                                        trace_path_end[:0:-1]):
1649 1
            metadata_vlan = None
1650 1
            if link.metadata:
1651 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1652 1
            if compare_endpoint_trace(
1653
                                        link.endpoint_a,
1654
                                        metadata_vlan,
1655
                                        trace2
1656
                                    ) is False:
1657 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1658
                            f"Invalid trace from uni_a: {trace_a}")
1659 1
                return False
1660 1
            if compare_endpoint_trace(
1661
                                        link.endpoint_b,
1662
                                        metadata_vlan,
1663
                                        trace1
1664
                                    ) is False:
1665 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1666
                            f"Invalid trace from uni_z: {trace_z}")
1667 1
                return False
1668
1669 1
        return True
1670
1671 1
    @staticmethod
1672 1
    def check_range(circuit, traces: list) -> bool:
1673
        """Check traces when for UNI with TAGRange"""
1674 1
        check = True
1675 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1676 1
            trace_a = traces[i*2]
1677 1
            trace_z = traces[i*2+1]
1678 1
            check &= EVCDeploy.check_trace(
1679
                circuit.id, circuit.name,
1680
                mask, mask,
1681
                circuit.uni_a.interface,
1682
                circuit.uni_z.interface,
1683
                circuit.current_path,
1684
                trace_a, trace_z,
1685
            )
1686 1
        return check
1687
1688 1
    @staticmethod
1689 1
    def check_list_traces(list_circuits: list) -> dict:
1690
        """Check if current_path is deployed comparing with SDN traces."""
1691 1
        if not list_circuits:
1692 1
            return {}
1693 1
        uni_list = make_uni_list(list_circuits)
1694 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1695
1696 1
        if not traces:
1697 1
            return {}
1698
1699 1
        try:
1700 1
            circuits_checked = {}
1701 1
            i = 0
1702 1
            for circuit in list_circuits:
1703 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1704 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1705 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1706
                        circuit, traces[i:i+length*2]
1707
                    )
1708 1
                    i += length*2
1709
                else:
1710 1
                    trace_a = traces[i]
1711 1
                    trace_z = traces[i+1]
1712 1
                    tag_a = None
1713 1
                    if circuit.uni_a.user_tag:
1714 1
                        tag_a = circuit.uni_a.user_tag.value
1715 1
                    tag_z = None
1716 1
                    if circuit.uni_z.user_tag:
1717 1
                        tag_z = circuit.uni_z.user_tag.value
1718 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1719
                        circuit.id, circuit.name,
1720
                        tag_a, tag_z,
1721
                        circuit.uni_a.interface,
1722
                        circuit.uni_z.interface,
1723
                        circuit.current_path,
1724
                        trace_a, trace_z
1725
                    )
1726 1
                    i += 2
1727 1
        except IndexError as err:
1728 1
            log.error(
1729
                f"Bulk sdntraces returned fewer items than expected."
1730
                f"Error = {err}"
1731
            )
1732 1
            return {}
1733
1734 1
        return circuits_checked
1735
1736 1
    @staticmethod
1737 1
    def get_endpoint_by_id(
1738
        link: Link,
1739
        id_: str,
1740
        operator: Union[eq, ne]
1741
    ) -> Interface:
1742
        """Return endpoint from link
1743
        either equal(eq) or not equal(ne) to id"""
1744 1
        if operator(link.endpoint_a.switch.id, id_):
1745 1
            return link.endpoint_a
1746 1
        return link.endpoint_b
1747
1748
1749 1
class LinkProtection(EVCDeploy):
1750
    """Class to handle link protection."""
1751
1752 1
    def is_affected_by_link(self, link=None):
1753
        """Verify if the current path is affected by link down event."""
1754
        return self.current_path.is_affected_by_link(link)
1755
1756 1
    def is_using_primary_path(self):
1757
        """Verify if the current deployed path is self.primary_path."""
1758 1
        return self.current_path == self.primary_path
1759
1760 1
    def is_using_backup_path(self):
1761
        """Verify if the current deployed path is self.backup_path."""
1762 1
        return self.current_path == self.backup_path
1763
1764 1
    def is_using_dynamic_path(self):
1765
        """Verify if the current deployed path is dynamic."""
1766 1
        if (
1767
            self.current_path
1768
            and not self.is_using_primary_path()
1769
            and not self.is_using_backup_path()
1770
            and self.current_path.status is EntityStatus.UP
1771
        ):
1772
            return True
1773 1
        return False
1774
1775 1
    def handle_link_up(self, link=None, interface=None):
1776
        """Handle circuit when link up.
1777
1778
        Args:
1779
            link(Link): Link affected by link.up event.
1780
1781
        """
1782 1
        condition_pairs = [
1783
            (
1784
                lambda me: me.is_using_primary_path(),
1785
                lambda _: (True, 'nothing')
1786
            ),
1787
            (
1788
                lambda me: me.is_intra_switch(),
1789
                lambda _: (True, 'nothing')
1790
            ),
1791
            (
1792
                lambda me: me.primary_path.is_affected_by_link(link),
1793
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1794
            ),
1795
            # For this special case, it reached this point because interface
1796
            # was previously confirmed to be a UNI and both UNI are UP
1797
            (
1798
                lambda me: (me.primary_path.status == EntityStatus.UP
1799
                            and interface),
1800
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1801
            ),
1802
            (
1803
                lambda me: (me.backup_path.status == EntityStatus.UP
1804
                            and interface),
1805
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1806
            ),
1807
            # We tried to deploy(primary_path) without success.
1808
            # And in this case is up by some how. Nothing to do.
1809
            (
1810
                lambda me: me.is_using_backup_path(),
1811
                lambda _: (True, 'nothing')
1812
            ),
1813
            (
1814
                lambda me:  me.is_using_dynamic_path(),
1815
                lambda _: (True, 'nothing')
1816
            ),
1817
            # In this case, probably the circuit is not being used and
1818
            # we can move to backup
1819
            (
1820
                lambda me: me.backup_path.is_affected_by_link(link),
1821
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1822
            ),
1823
            # In this case, the circuit is not being used and we should
1824
            # try a dynamic path
1825
            (
1826
                lambda me: me.dynamic_backup_path and not me.is_active(),
1827
                lambda me: (me.deploy_to_path(), 'redeploy')
1828
            )
1829
        ]
1830 1
        for predicate, action in condition_pairs:
1831 1
            if not predicate(self):
1832 1
                continue
1833 1
            success, succcess_type = action(self)
1834 1
            if success:
1835 1
                if succcess_type == 'redeploy':
1836 1
                    emit_event(
1837
                        self._controller,
1838
                        "redeployed_link_up",
1839
                        content=map_evc_event_content(self)
1840
                    )
1841 1
                return True
1842 1
        return False
1843
1844 1
    def handle_link_down(self):
1845
        """Handle circuit when link down.
1846
1847
        Returns:
1848
            bool: True if the re-deploy was successly otherwise False.
1849
1850
        """
1851 1
        success = False
1852 1
        if self.is_using_primary_path():
1853 1
            success = self.deploy_to_backup_path()
1854 1
        elif self.is_using_backup_path():
1855 1
            success = self.deploy_to_primary_path()
1856
1857 1
        if not success and self.dynamic_backup_path:
1858 1
            success = self.deploy_to_path()
1859
1860 1
        if success:
1861 1
            log.debug(f"{self} deployed after link down.")
1862
        else:
1863 1
            self.remove_current_flows(sync=False)
1864 1
            self.deactivate()
1865 1
            self.sync()
1866 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1867
1868 1
        return success
1869
1870 1
    def are_unis_active(self) -> bool:
1871
        """Determine whether this EVC should be active"""
1872 1
        interface_a = self.uni_a.interface
1873 1
        interface_z = self.uni_z.interface
1874 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1875 1
        return active
1876
1877 1
    def try_to_handle_uni_as_link_up(self, interface: Interface) -> bool:
1878
        """Try to handle UNI as link_up to trigger deployment."""
1879
        if (
1880
            self.current_path.status != EntityStatus.UP
1881
            and not self.is_intra_switch()
1882
        ):
1883
            succeeded = self.handle_link_up(interface=interface)
1884
            if succeeded:
1885
                msg = (
1886
                    f"Activated {self} due to successful "
1887
                    f"deployment triggered by {interface}"
1888
                )
1889
            else:
1890
                msg = (
1891
                    f"Couldn't activate {self} due to unsuccessful "
1892
                    f"deployment triggered by {interface}"
1893
                )
1894
            log.info(msg)
1895
            return True
1896
        return False
1897
1898 1
    def handle_interface_link_up(self, interface: Interface):
1899
        """
1900
        Handler for interface link_up events
1901
        """
1902 1
        if not _does_uni_affect_evc(self, interface, "up"):
1903 1
            return
1904 1
        if self.try_to_handle_uni_as_link_up(interface):
1905
            return
1906
1907 1
        interface_dicts = {
1908
            interface.id: {
1909
                'status': interface.status.value,
1910
                'status_reason': interface.status_reason,
1911
            }
1912
            for interface in (self.uni_a.interface, self.uni_z.interface)
1913
        }
1914 1
        try:
1915 1
            self.try_to_activate()
1916 1
            log.info(
1917
                f"Activating {self}. Interfaces: "
1918
                f"{interface_dicts}."
1919
            )
1920 1
            emit_event(self._controller, "uni_active_updated",
1921
                       content=map_evc_event_content(self))
1922 1
            self.sync()
1923
        except ActivationError as exc:
1924
            # On this ctx, no ActivationError isn't expected since the
1925
            # activation pre-requisites states were checked, so handled as err
1926
            log.error(f"ActivationError: {str(exc)} when handling {interface}")
1927
1928 1
    def handle_interface_link_down(self, interface):
1929
        """
1930
        Handler for interface link_down events
1931
        """
1932 1
        if not _does_uni_affect_evc(self, interface, "down"):
1933 1
            return
1934 1
        interface_dicts = {
1935
            interface.id: {
1936
                'status': interface.status.value,
1937
                'status_reason': interface.status_reason,
1938
            }
1939
            for interface in (self.uni_a.interface, self.uni_z.interface)
1940
            if interface.status != EntityStatus.UP
1941
        }
1942 1
        self.deactivate()
1943 1
        log.info(
1944
            f"Deactivating {self}. Interfaces: "
1945
            f"{interface_dicts}."
1946
        )
1947 1
        emit_event(self._controller, "uni_active_updated",
1948
                   content=map_evc_event_content(self))
1949 1
        self.sync()
1950
1951
1952 1
class EVC(LinkProtection):
1953
    """Class that represents a E-Line Virtual Connection."""
1954