Passed
Pull Request — master (#563)
by Aldo
04:21
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import httpx
12 1
from glom import glom
13 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
14
                      wait_combine, wait_fixed, wait_random)
15
16 1
from kytos.core import log
17 1
from kytos.core.common import EntityStatus, GenericEntity
18 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
19 1
from kytos.core.helpers import get_time, now
20 1
from kytos.core.interface import UNI, Interface, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.retry import before_sleep
23 1
from kytos.core.tag_ranges import range_difference
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (DuplicatedNoTagUNI,
26
                                              EVCPathNotInstalled,
27
                                              FlowModException, InvalidPath)
28 1
from napps.kytos.mef_eline.utils import (check_disabled_component,
29
                                         compare_endpoint_trace,
30
                                         compare_uni_out_trace, emit_event,
31
                                         make_uni_list, map_dl_vlan,
32
                                         map_evc_event_content,
33
                                         merge_flow_dicts)
34
35 1
from .path import DynamicPathManager, Path
36
37
38 1
class EVCBase(GenericEntity):
39
    """Class to represent a circuit."""
40
41 1
    attributes_requiring_redeploy = [
42
        "primary_path",
43
        "backup_path",
44
        "dynamic_backup_path",
45
        "queue_id",
46
        "sb_priority",
47
        "primary_constraints",
48
        "secondary_constraints",
49
        "uni_a",
50
        "uni_z",
51
    ]
52 1
    required_attributes = ["name", "uni_a", "uni_z"]
53
54 1
    updatable_attributes = {
55
        "uni_a",
56
        "uni_z",
57
        "name",
58
        "start_date",
59
        "end_date",
60
        "queue_id",
61
        "bandwidth",
62
        "primary_path",
63
        "backup_path",
64
        "dynamic_backup_path",
65
        "primary_constraints",
66
        "secondary_constraints",
67
        "owner",
68
        "sb_priority",
69
        "service_level",
70
        "circuit_scheduler",
71
        "metadata",
72
        "enabled"
73
    }
74
75
    # pylint: disable=too-many-statements
76 1
    def __init__(self, controller, **kwargs):
77
        """Create an EVC instance with the provided parameters.
78
79
        Args:
80
            id(str): EVC identifier. Whether it's None an ID will be genereted.
81
                     Only the first 14 bytes passed will be used.
82
            name: represents an EVC name.(Required)
83
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
84
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
85
            start_date(datetime|str): Date when the EVC was registred.
86
                                      Default is now().
87
            end_date(datetime|str): Final date that the EVC will be fineshed.
88
                                    Default is None.
89
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
90
            primary_links(list): Primary links used by evc. Default is []
91
            backup_links(list): Backups links used by evc. Default is []
92
            current_path(list): Circuit being used at the moment if this is an
93
                                active circuit. Default is [].
94
            failover_path(list): Path being used to provide EVC protection via
95
                                failover during link failures. Default is [].
96
            primary_path(list): primary circuit offered to user IF one or more
97
                                links were provided. Default is [].
98
            backup_path(list): backup circuit offered to the user IF one or
99
                               more links were provided. Default is [].
100
            dynamic_backup_path(bool): Enable computer backup path dynamically.
101
                                       Dafault is False.
102
            creation_time(datetime|str): datetime when the circuit should be
103
                                         activated. default is now().
104
            enabled(Boolean): attribute to indicate the administrative state;
105
                              default is False.
106
            active(Boolean): attribute to indicate the operational state;
107
                             default is False.
108
            archived(Boolean): indicate the EVC has been deleted and is
109
                               archived; default is False.
110
            owner(str): The EVC owner. Default is None.
111
            sb_priority(int): Service level provided in the request.
112
                              Default is None.
113
            service_level(int): Service level provided. The higher the better.
114
                                Default is 0.
115
116
        Raises:
117
            ValueError: raised when object attributes are invalid.
118
119
        """
120 1
        self._controller = controller
121 1
        self._validate(**kwargs)
122 1
        super().__init__()
123
124
        # required attributes
125 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
126 1
        self.uni_a: UNI = kwargs.get("uni_a")
127 1
        self.uni_z: UNI = kwargs.get("uni_z")
128 1
        self.name = kwargs.get("name")
129
130
        # optional attributes
131 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
132 1
        self.end_date = get_time(kwargs.get("end_date")) or None
133 1
        self.queue_id = kwargs.get("queue_id", -1)
134
135 1
        self.bandwidth = kwargs.get("bandwidth", 0)
136 1
        self.primary_links = Path(kwargs.get("primary_links", []))
137 1
        self.backup_links = Path(kwargs.get("backup_links", []))
138 1
        self.current_path = Path(kwargs.get("current_path", []))
139 1
        self.failover_path = Path(kwargs.get("failover_path", []))
140 1
        self.primary_path = Path(kwargs.get("primary_path", []))
141 1
        self.backup_path = Path(kwargs.get("backup_path", []))
142 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
143 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
144 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
145 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
146 1
        self.owner = kwargs.get("owner", None)
147 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
148
            "priority", None
149
        )
150 1
        self.service_level = kwargs.get("service_level", 0)
151 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
152 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
153 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
154 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
155 1
        self.current_links_cache = set()
156 1
        self.primary_links_cache = set()
157 1
        self.backup_links_cache = set()
158 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
159 1
        self.old_path = Path([])
160 1
        self.take_last = kwargs.get("take_last", False)
161
162 1
        self.lock = Lock()
163
164 1
        self.archived = kwargs.get("archived", False)
165
166 1
        self.metadata = kwargs.get("metadata", {})
167
168 1
        self._mongo_controller = controllers.ELineController()
169
170 1
        if kwargs.get("active", False):
171 1
            self.activate()
172
        else:
173 1
            self.deactivate()
174
175 1
        if kwargs.get("enabled", False):
176 1
            self.enable()
177
        else:
178 1
            self.disable()
179
180
        # datetime of user request for a EVC (or datetime when object was
181
        # created)
182 1
        self.request_time = kwargs.get("request_time", now())
183
        # dict with the user original request (input)
184 1
        self._requested = kwargs
185
186
        # Special cases: No tag, any, untagged
187 1
        self.special_cases = {None, "4096/4096", 0}
188 1
        self.table_group = kwargs.get("table_group")
189
190 1
    def sync(self, keys: set = None):
191
        """Sync this EVC in the MongoDB."""
192 1
        self.updated_at = now()
193 1
        if keys:
194 1
            self._mongo_controller.update_evc(self.as_dict(keys))
195 1
            return
196 1
        self._mongo_controller.upsert_evc(self.as_dict())
197
198 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
199
        """Obtain both UNIs (uni_a, uni_z).
200
        If a UNI is changing, verify tags"""
201 1
        uni_a = kwargs.get("uni_a", None)
202 1
        uni_a_flag = False
203 1
        if uni_a and uni_a != self.uni_a:
204 1
            uni_a_flag = True
205 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
206
207 1
        uni_z = kwargs.get("uni_z", None)
208 1
        if uni_z and uni_z != self.uni_z:
209 1
            try:
210 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
211 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
212 1
            except KytosTagError as err:
213 1
                if uni_a_flag:
214 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
215 1
                raise err
216
        else:
217 1
            uni_z = self.uni_z
218
219 1
        if uni_a_flag:
220 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
221
        else:
222 1
            uni_a = self.uni_a
223 1
        return uni_a, uni_z
224
225 1
    def update(self, **kwargs):
226
        """Update evc attributes.
227
228
        This method will raises an error trying to change the following
229
        attributes: [creation_time, active, current_path, failover_path,
230
        _id, archived]
231
        [name, uni_a and uni_z]
232
233
        Returns:
234
            the values for enable and a redeploy attribute, if exists and None
235
            otherwise
236
        Raises:
237
            ValueError: message with error detail.
238
239
        """
240 1
        enable, redeploy = (None, None)
241 1
        if not self._tag_lists_equal(**kwargs):
242 1
            raise ValueError(
243
                "UNI_A and UNI_Z tag lists should be the same."
244
            )
245 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
246 1
        check_disabled_component(uni_a, uni_z)
247 1
        self._validate_has_primary_or_dynamic(
248
            primary_path=kwargs.get("primary_path"),
249
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
250
            uni_a=uni_a,
251
            uni_z=uni_z,
252
        )
253 1
        for attribute, value in kwargs.items():
254 1
            if attribute not in self.updatable_attributes:
255 1
                raise ValueError(f"{attribute} can't be updated.")
256 1
            if attribute in ("primary_path", "backup_path"):
257 1
                try:
258 1
                    value.is_valid(
259
                        uni_a.interface.switch, uni_z.interface.switch
260
                    )
261 1
                except InvalidPath as exception:
262 1
                    raise ValueError(  # pylint: disable=raise-missing-from
263
                        f"{attribute} is not a " f"valid path: {exception}"
264
                    )
265 1
        for attribute, value in kwargs.items():
266 1
            if attribute == "enabled":
267 1
                if value:
268 1
                    self.enable()
269
                else:
270 1
                    self.disable()
271 1
                enable = value
272
            else:
273 1
                setattr(self, attribute, value)
274 1
                if attribute in self.attributes_requiring_redeploy:
275 1
                    redeploy = True
276 1
        self.sync(set(kwargs.keys()))
277 1
        return enable, redeploy
278
279 1
    def set_flow_removed_at(self):
280
        """Update flow_removed_at attribute."""
281
        self.flow_removed_at = now()
282
283 1
    def has_recent_removed_flow(self, setting=settings):
284
        """Check if any flow has been removed from the evc"""
285
        if self.flow_removed_at is None:
286
            return False
287
        res_seconds = (now() - self.flow_removed_at).seconds
288
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
289
290 1
    def is_recent_updated(self, setting=settings):
291
        """Check if the evc has been updated recently"""
292
        res_seconds = (now() - self.updated_at).seconds
293
        return res_seconds < setting.TIME_RECENT_UPDATED
294
295 1
    def __repr__(self):
296
        """Repr method."""
297 1
        return f"EVC({self._id}, {self.name})"
298
299 1
    def _validate(self, **kwargs):
300
        """Do Basic validations.
301
302
        Verify required attributes: name, uni_a, uni_z
303
304
        Raises:
305
            ValueError: message with error detail.
306
307
        """
308 1
        for attribute in self.required_attributes:
309
310 1
            if attribute not in kwargs:
311 1
                raise ValueError(f"{attribute} is required.")
312
313 1
            if "uni" in attribute:
314 1
                uni = kwargs.get(attribute)
315 1
                if not isinstance(uni, UNI):
316
                    raise ValueError(f"{attribute} is an invalid UNI.")
317
318 1
    def _tag_lists_equal(self, **kwargs):
319
        """Verify that tag lists are the same."""
320 1
        uni_a = kwargs.get("uni_a") or self.uni_a
321 1
        uni_z = kwargs.get("uni_z") or self.uni_z
322 1
        uni_a_list = uni_z_list = False
323 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
324 1
            uni_a_list = True
325 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
326 1
            uni_z_list = True
327 1
        if uni_a_list and uni_z_list:
328 1
            return uni_a.user_tag.value == uni_z.user_tag.value
329 1
        return uni_a_list == uni_z_list
330
331 1
    def _validate_has_primary_or_dynamic(
332
        self,
333
        primary_path=None,
334
        dynamic_backup_path=None,
335
        uni_a=None,
336
        uni_z=None,
337
    ) -> None:
338
        """Validate that it must have a primary path or allow dynamic paths."""
339 1
        primary_path = (
340
            primary_path
341
            if primary_path is not None
342
            else self.primary_path
343
        )
344 1
        dynamic_backup_path = (
345
            dynamic_backup_path
346
            if dynamic_backup_path is not None
347
            else self.dynamic_backup_path
348
        )
349 1
        uni_a = uni_a if uni_a is not None else self.uni_a
350 1
        uni_z = uni_z if uni_z is not None else self.uni_z
351 1
        if (
352
            not primary_path
353
            and not dynamic_backup_path
354
            and uni_a and uni_z
355
            and uni_a.interface.switch != uni_z.interface.switch
356
        ):
357 1
            msg = "The EVC must have a primary path or allow dynamic paths."
358 1
            raise ValueError(msg)
359
360 1
    def __eq__(self, other):
361
        """Override the default implementation."""
362 1
        if not isinstance(other, EVC):
363
            return False
364
365 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
366 1
        for attribute in attrs_to_compare:
367 1
            if getattr(other, attribute) != getattr(self, attribute):
368 1
                return False
369 1
        return True
370
371 1
    def is_intra_switch(self):
372
        """Check if the UNIs are in the same switch."""
373 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
374
375 1
    def check_no_tag_duplicate(self, other_uni: UNI):
376
        """Check if a no tag UNI is duplicated."""
377 1
        if other_uni in (self.uni_a, self.uni_z):
378 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
379
                  f" duplicated with {self}."
380 1
            raise DuplicatedNoTagUNI(msg)
381
382 1
    def as_dict(self, keys: set = None):
383
        """Return a dictionary representing an EVC object.
384
            keys: Only fields on this variable will be
385
                  returned in the dictionary"""
386 1
        evc_dict = {
387
            "id": self.id,
388
            "name": self.name,
389
            "uni_a": self.uni_a.as_dict(),
390
            "uni_z": self.uni_z.as_dict(),
391
        }
392
393 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
394
395 1
        evc_dict["start_date"] = self.start_date
396 1
        if isinstance(self.start_date, datetime):
397 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
398
399 1
        evc_dict["end_date"] = self.end_date
400 1
        if isinstance(self.end_date, datetime):
401 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
402
403 1
        evc_dict["queue_id"] = self.queue_id
404 1
        evc_dict["bandwidth"] = self.bandwidth
405 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
406 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
407 1
        evc_dict["current_path"] = self.current_path.as_dict()
408 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
409 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
410 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
411 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
412 1
        evc_dict["metadata"] = self.metadata
413
414 1
        evc_dict["request_time"] = self.request_time
415 1
        if isinstance(self.request_time, datetime):
416 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
417
418 1
        time = self.creation_time.strftime(time_fmt)
419 1
        evc_dict["creation_time"] = time
420
421 1
        evc_dict["owner"] = self.owner
422 1
        evc_dict["circuit_scheduler"] = [
423
            sc.as_dict() for sc in self.circuit_scheduler
424
        ]
425
426 1
        evc_dict["active"] = self.is_active()
427 1
        evc_dict["enabled"] = self.is_enabled()
428 1
        evc_dict["archived"] = self.archived
429 1
        evc_dict["sb_priority"] = self.sb_priority
430 1
        evc_dict["service_level"] = self.service_level
431 1
        evc_dict["primary_constraints"] = self.primary_constraints
432 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
433 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
434 1
        evc_dict["updated_at"] = self.updated_at
435 1
        evc_dict["take_last"] = self.take_last
436
437 1
        if keys:
438 1
            selected = {}
439 1
            for key in keys:
440 1
                selected[key] = evc_dict[key]
441 1
            selected["id"] = evc_dict["id"]
442 1
            return selected
443 1
        return evc_dict
444
445 1
    @property
446 1
    def id(self):  # pylint: disable=invalid-name
447
        """Return this EVC's ID."""
448 1
        return self._id
449
450 1
    def archive(self):
451
        """Archive this EVC on deletion."""
452 1
        self.archived = True
453
454 1
    def _use_uni_vlan(
455
        self,
456
        uni: UNI,
457
        uni_dif: Union[None, UNI] = None
458
    ):
459
        """Use tags from UNI"""
460 1
        if uni.user_tag is None:
461 1
            return
462 1
        tag = uni.user_tag.value
463 1
        tag_type = uni.user_tag.tag_type
464 1
        if (uni_dif and isinstance(tag, list) and
465
                isinstance(uni_dif.user_tag.value, list)):
466 1
            tag = range_difference(tag, uni_dif.user_tag.value)
467 1
            if not tag:
468 1
                return
469 1
        uni.interface.use_tags(
470
            self._controller, tag, tag_type, use_lock=True, check_order=False
471
        )
472
473 1
    def make_uni_vlan_available(
474
        self,
475
        uni: UNI,
476
        uni_dif: Union[None, UNI] = None,
477
    ):
478
        """Make available tag from UNI"""
479 1
        if uni.user_tag is None:
480 1
            return
481 1
        tag = uni.user_tag.value
482 1
        tag_type = uni.user_tag.tag_type
483 1
        if (uni_dif and isinstance(tag, list) and
484
                isinstance(uni_dif.user_tag.value, list)):
485 1
            tag = range_difference(tag, uni_dif.user_tag.value)
486 1
            if not tag:
487
                return
488 1
        try:
489 1
            conflict = uni.interface.make_tags_available(
490
                self._controller, tag, tag_type, use_lock=True,
491
                check_order=False
492
            )
493 1
        except KytosTagError as err:
494 1
            log.error(f"Error in {self}: {err}")
495 1
            return
496 1
        if conflict:
497 1
            intf = uni.interface.id
498 1
            log.warning(f"Tags {conflict} was already available in {intf}")
499
500 1
    def remove_uni_tags(self):
501
        """Remove both UNI usage of a tag"""
502 1
        self.make_uni_vlan_available(self.uni_a)
503 1
        self.make_uni_vlan_available(self.uni_z)
504
505
506
# pylint: disable=fixme, too-many-public-methods
507 1
class EVCDeploy(EVCBase):
508
    """Class to handle the deploy procedures."""
509
510 1
    def create(self):
511
        """Create a EVC."""
512
513 1
    def discover_new_paths(self):
514
        """Discover new paths to satisfy this circuit and deploy it."""
515
        return DynamicPathManager.get_best_paths(self,
516
                                                 **self.primary_constraints)
517
518 1
    def get_failover_path_candidates(self):
519
        """Get failover paths to satisfy this EVC."""
520
        # in the future we can return primary/backup paths as well
521
        # we just have to properly handle link_up and failover paths
522
        # if (
523
        #     self.is_using_primary_path() and
524
        #     self.backup_path.status is EntityStatus.UP
525
        # ):
526
        #     yield self.backup_path
527 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
528
529 1
    def change_path(self):
530
        """Change EVC path."""
531
532 1
    def reprovision(self):
533
        """Force the EVC (re-)provisioning."""
534
535 1
    def is_affected_by_link(self, link):
536
        """Return True if this EVC has the given link on its current path."""
537 1
        return link in self.current_path
538
539 1
    def link_affected_by_interface(self, interface):
540
        """Return True if this EVC has the given link on its current path."""
541
        return self.current_path.link_affected_by_interface(interface)
542
543 1
    def is_backup_path_affected_by_link(self, link):
544
        """Return True if the backup path of this EVC uses the given link."""
545 1
        return link in self.backup_path
546
547
    # pylint: disable=invalid-name
548 1
    def is_primary_path_affected_by_link(self, link):
549
        """Return True if the primary path of this EVC uses the given link."""
550 1
        return link in self.primary_path
551
552 1
    def is_failover_path_affected_by_link(self, link):
553
        """Return True if this EVC has the given link on its failover path."""
554 1
        return link in self.failover_path
555
556 1
    def is_eligible_for_failover_path(self):
557
        """Verify if this EVC is eligible for failover path (EP029)"""
558
        # In the future this function can be augmented to consider
559
        # primary/backup, primary/dynamic, and other path combinations
560 1
        return (
561
            self.dynamic_backup_path and
562
            not self.primary_path and not self.backup_path
563
        )
564
565 1
    def is_using_primary_path(self):
566
        """Verify if the current deployed path is self.primary_path."""
567 1
        return self.primary_path and (self.current_path == self.primary_path)
568
569 1
    def is_using_backup_path(self):
570
        """Verify if the current deployed path is self.backup_path."""
571 1
        return self.backup_path and (self.current_path == self.backup_path)
572
573 1
    def is_using_dynamic_path(self):
574
        """Verify if the current deployed path is a dynamic path."""
575 1
        if (
576
            self.current_path
577
            and not self.is_using_primary_path()
578
            and not self.is_using_backup_path()
579
            and self.current_path.status == EntityStatus.UP
580
        ):
581
            return True
582 1
        return False
583
584 1
    def deploy_to_backup_path(self):
585
        """Deploy the backup path into the datapaths of this circuit.
586
587
        If the backup_path attribute is valid and up, this method will try to
588
        deploy this backup_path.
589
590
        If everything fails and dynamic_backup_path is True, then tries to
591
        deploy a dynamic path.
592
        """
593
        # TODO: Remove flows from current (cookies)
594 1
        if self.is_using_backup_path():
595
            # TODO: Log to say that cannot move backup to backup
596
            return True
597
598 1
        success = False
599 1
        if self.backup_path.status is EntityStatus.UP:
600 1
            success = self.deploy_to_path(self.backup_path)
601
602 1
        if success:
603 1
            return True
604
605 1
        if self.dynamic_backup_path or self.is_intra_switch():
606 1
            return self.deploy_to_path()
607
608
        return False
609
610 1
    def deploy_to_primary_path(self):
611
        """Deploy the primary path into the datapaths of this circuit.
612
613
        If the primary_path attribute is valid and up, this method will try to
614
        deploy this primary_path.
615
        """
616
        # TODO: Remove flows from current (cookies)
617 1
        if self.is_using_primary_path():
618
            # TODO: Log to say that cannot move primary to primary
619
            return True
620
621 1
        if self.primary_path.status is EntityStatus.UP:
622 1
            return self.deploy_to_path(self.primary_path)
623
        return False
624
625 1
    def deploy(self):
626
        """Deploy EVC to best path.
627
628
        Best path can be the primary path, if available. If not, the backup
629
        path, and, if it is also not available, a dynamic path.
630
        """
631 1
        if self.archived:
632 1
            return False
633 1
        self.enable()
634 1
        success = self.deploy_to_primary_path()
635 1
        if not success:
636 1
            success = self.deploy_to_backup_path()
637
638 1
        if success:
639 1
            emit_event(self._controller, "deployed",
640
                       content=map_evc_event_content(self))
641 1
        return success
642
643 1
    @staticmethod
644 1
    def get_path_status(path):
645
        """Check for the current status of a path.
646
647
        If any link in this path is down, the path is considered down.
648
        """
649 1
        if not path:
650 1
            return EntityStatus.DISABLED
651
652 1
        for link in path:
653 1
            if link.status is not EntityStatus.UP:
654 1
                return link.status
655 1
        return EntityStatus.UP
656
657
    #    def discover_new_path(self):
658
    #        # TODO: discover a new path to satisfy this circuit and deploy
659
660 1
    def remove(self):
661
        """Remove EVC path and disable it."""
662 1
        self.remove_current_flows(sync=False)
663 1
        self.remove_failover_flows(sync=False)
664 1
        self.disable()
665 1
        self.sync()
666 1
        emit_event(self._controller, "undeployed",
667
                   content=map_evc_event_content(self))
668
669 1
    def remove_failover_flows(self, exclude_uni_switches=True,
670
                              force=True, sync=True) -> None:
671
        """Remove failover_flows.
672
673
        By default, it'll exclude UNI switches, if mef_eline has already
674
        called remove_current_flows before then this minimizes the number
675
        of FlowMods and IO.
676
        """
677 1
        if not self.failover_path:
678 1
            return
679 1
        switches, cookie, excluded = OrderedDict(), self.get_cookie(), set()
680 1
        links = set()
681 1
        if exclude_uni_switches:
682 1
            excluded.add(self.uni_a.interface.switch.id)
683 1
            excluded.add(self.uni_z.interface.switch.id)
684 1
        for link in self.failover_path:
685 1
            if link.endpoint_a.switch.id not in excluded:
686 1
                switches[link.endpoint_a.switch.id] = link.endpoint_a.switch
687 1
                links.add(link)
688 1
            if link.endpoint_b.switch.id not in excluded:
689 1
                switches[link.endpoint_b.switch.id] = link.endpoint_b.switch
690 1
                links.add(link)
691 1
        for switch in switches.values():
692 1
            try:
693 1
                self._send_flow_mods(
694
                    switch.id,
695
                    [
696
                        {
697
                            "cookie": cookie,
698
                            "cookie_mask": int(0xffffffffffffffff),
699
                            "owner": "mef_eline",
700
                        }
701
                    ],
702
                    "delete",
703
                    force=force,
704
                )
705
            except FlowModException as err:
706
                log.error(f"Error deleting path in {switch.id}, {err}")
707
                break
708 1
        try:
709 1
            self.failover_path.make_vlans_available(self._controller)
710
        except KytosTagError as err:
711
            log.error(f"Error when removing failover flows: {err}")
712 1
        self.failover_path = Path([])
713 1
        if sync:
714 1
            self.sync()
715
716 1
    def remove_current_flows(self, force=True, sync=True):
717
        """Remove all flows from current path or path intended for
718
         current path if exists."""
719 1
        switches = set()
720
721 1
        if not self.current_path and not self.is_intra_switch():
722 1
            return
723 1
        current_path = self.current_path
724 1
        for link in current_path:
725 1
            switches.add(link.endpoint_a.switch)
726 1
            switches.add(link.endpoint_b.switch)
727 1
        switches.add(self.uni_a.interface.switch)
728 1
        switches.add(self.uni_z.interface.switch)
729 1
        match = {
730
            "cookie": self.get_cookie(),
731
            "cookie_mask": int(0xffffffffffffffff),
732
            "owner": "mef_eline",
733
        }
734
735 1
        for switch in switches:
736 1
            try:
737 1
                self._send_flow_mods(switch.id, [match], 'delete', force=force)
738
            except FlowModException as err:
739
                log.error(f"Error deleting current_path in {switch.id}, {err}")
740
                break
741 1
        try:
742 1
            current_path.make_vlans_available(self._controller)
743
        except KytosTagError as err:
744
            log.error(f"Error when removing current path flows: {err}")
745 1
        self.current_path = Path([])
746 1
        self.deactivate()
747 1
        if sync:
748 1
            self.sync()
749
750 1
    def remove_path_flows(
751
        self, path=None, force=True
752
    ) -> dict[str, list[dict]]:
753
        """Remove all flows from path, and return the removed flows."""
754 1
        dpid_flows_match: dict[str, list[dict]] = {}
755
756 1
        if not path:
757 1
            return dpid_flows_match
758
759 1
        try:
760 1
            nni_flows = self._prepare_nni_flows(path)
761
        # pylint: disable=broad-except
762
        except Exception:
763
            err = traceback.format_exc().replace("\n", ", ")
764
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
765
            nni_flows = {}
766
767 1
        for dpid, flows in nni_flows.items():
768 1
            dpid_flows_match.setdefault(dpid, [])
769 1
            for flow in flows:
770 1
                dpid_flows_match[dpid].append({
771
                    "cookie": flow["cookie"],
772
                    "match": flow["match"],
773
                    "owner": "mef_eline",
774
                    "cookie_mask": int(0xffffffffffffffff)
775
                })
776
777 1
        try:
778 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
779
        # pylint: disable=broad-except
780
        except Exception:
781
            err = traceback.format_exc().replace("\n", ", ")
782
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
783
            uni_flows = {}
784
785 1
        for dpid, flows in uni_flows.items():
786 1
            dpid_flows_match.setdefault(dpid, [])
787 1
            for flow in flows:
788 1
                dpid_flows_match[dpid].append({
789
                    "cookie": flow["cookie"],
790
                    "match": flow["match"],
791
                    "owner": "mef_eline",
792
                    "cookie_mask": int(0xffffffffffffffff)
793
                })
794
795 1
        for dpid, flows in dpid_flows_match.items():
796 1
            try:
797 1
                self._send_flow_mods(dpid, flows, 'delete', force=force)
798 1
            except FlowModException as err:
799 1
                log.error(f"Error deleting path in {dpid}, {err}")
800 1
                break
801 1
        try:
802 1
            path.make_vlans_available(self._controller)
803
        except KytosTagError as err:
804
            log.error(f"Error when removing path flows: {err}")
805
806 1
        return dpid_flows_match
807
808 1
    @staticmethod
809 1
    def links_zipped(path=None):
810
        """Return an iterator which yields pairs of links in order."""
811 1
        if not path:
812 1
            return []
813 1
        return zip(path[:-1], path[1:])
814
815 1
    def should_deploy(self, path=None):
816
        """Verify if the circuit should be deployed."""
817 1
        if not path:
818 1
            log.debug("Path is empty.")
819 1
            return False
820
821 1
        if not self.is_enabled():
822 1
            log.debug(f"{self} is disabled.")
823 1
            return False
824
825 1
        if not self.is_active():
826 1
            log.debug(f"{self} will be deployed.")
827 1
            return True
828
829 1
        return False
830
831
    # pylint: disable=too-many-branches, too-many-statements
832 1
    def deploy_to_path(self, path=None):
833
        """Install the flows for this circuit.
834
835
        Procedures to deploy:
836
837
        0. Remove current flows installed
838
        1. Decide if will deploy "path" or discover a new path
839
        2. Choose vlan
840
        3. Install NNI flows
841
        4. Install UNI flows
842
        5. Activate
843
        6. Update current_path
844
        7. Update links caches(primary, current, backup)
845
846
        """
847 1
        self.remove_current_flows(sync=False)
848 1
        use_path = path or Path([])
849 1
        tag_errors = []
850 1
        if self.should_deploy(use_path):
851 1
            try:
852 1
                use_path.choose_vlans(self._controller, self.take_last)
853 1
            except KytosNoTagAvailableError as e:
854 1
                tag_errors.append(str(e))
855 1
                use_path = None
856
        else:
857 1
            for use_path in self.discover_new_paths():
858 1
                if use_path is None:
859
                    continue
860 1
                try:
861 1
                    use_path.choose_vlans(self._controller, self.take_last)
862 1
                    break
863 1
                except KytosNoTagAvailableError as e:
864 1
                    tag_errors.append(str(e))
865
            else:
866 1
                use_path = None
867
868 1
        try:
869 1
            if use_path:
870 1
                self._install_nni_flows(use_path)
871 1
                self._install_uni_flows(use_path)
872 1
            elif self.is_intra_switch():
873 1
                use_path = Path()
874 1
                self._install_direct_uni_flows()
875
            else:
876 1
                msg = f"{self} was not deployed. No available path was found."
877 1
                if tag_errors:
878 1
                    msg = self.add_tag_errors(msg, tag_errors)
879 1
                    log.error(msg)
880
                else:
881 1
                    log.warning(msg)
882 1
                return False
883 1
        except EVCPathNotInstalled as err:
884 1
            log.error(
885
                f"Error deploying EVC {self} when calling flow_manager: {err}"
886
            )
887 1
            self.remove_current_flows(use_path, sync=True)
888 1
            return False
889 1
        self.activate()
890 1
        self.current_path = use_path
891 1
        self.take_last = not self.take_last
892 1
        self.sync()
893 1
        log.info(f"{self} was deployed.")
894 1
        return True
895
896 1
    def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
897
        """Try setup failover_path whenever possible."""
898 1
        if (
899
                self.failover_path or not self.current_path
900
                or not self.is_active()
901
                ):
902 1
            return
903 1
        if (now() - self.affected_by_link_at).seconds >= wait:
904 1
            with self.lock:
905 1
                self.setup_failover_path()
906
907
    # pylint: disable=too-many-statements
908 1
    def setup_failover_path(self):
909
        """Install flows for the failover path of this EVC.
910
911
        Procedures to deploy:
912
913
        0. Remove flows currently installed for failover_path (if any)
914
        1. Discover a disjoint path from current_path
915
        2. Choose vlans
916
        3. Install NNI flows
917
        4. Install UNI egress flows
918
        5. Update failover_path
919
        """
920
        # Intra-switch EVCs have no failover_path
921 1
        if self.is_intra_switch():
922 1
            return False
923
924
        # For not only setup failover path for totally dynamic EVCs
925 1
        if not self.is_eligible_for_failover_path():
926 1
            return False
927
928 1
        out_new_flows: dict[str, list[dict]] = {}
929 1
        reason = ""
930 1
        tag_errors = []
931 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
932 1
        self.failover_path = Path([])
933
934 1
        for use_path in self.get_failover_path_candidates():
935 1
            if not use_path:
936 1
                continue
937 1
            try:
938 1
                use_path.choose_vlans(self._controller, not self.take_last)
939 1
                break
940 1
            except KytosNoTagAvailableError as e:
941 1
                tag_errors.append(str(e))
942
        else:
943 1
            use_path = Path([])
944 1
            reason = "No available path was found"
945
946 1
        try:
947 1
            if use_path:
948 1
                _nni_flows = self._install_nni_flows(use_path)
949 1
                out_new_flows = merge_flow_dicts(out_new_flows, _nni_flows)
950 1
                _uni_flows = self._install_uni_flows(use_path, skip_in=True)
951 1
                out_new_flows = merge_flow_dicts(out_new_flows, _uni_flows)
952 1
        except EVCPathNotInstalled as err:
953 1
            reason = "Error deploying failover path"
954 1
            log.error(
955
                f"{reason} for {self}. FlowManager error: {err}"
956
            )
957 1
            _rmed_flows = self.remove_path_flows(use_path)
958 1
            out_removed_flows = merge_flow_dicts(
959
                out_removed_flows, _rmed_flows
960
            )
961 1
            use_path = Path([])
962
963 1
        self.failover_path = use_path
964 1
        self.sync()
965
966 1
        if out_new_flows or out_removed_flows:
967 1
            emit_event(self._controller, "failover_deployed", content={
968
                self.id: map_evc_event_content(
969
                    self,
970
                    flows=deepcopy(out_new_flows),
971
                    removed_flows=deepcopy(out_removed_flows),
972
                    error_reason=reason,
973
                    current_path=self.current_path.as_dict(),
974
                )
975
            })
976
977 1
        if not use_path:
978 1
            msg = f"Failover path for {self} was not deployed: {reason}."
979 1
            if tag_errors:
980 1
                msg = self.add_tag_errors(msg, tag_errors)
981 1
                log.error(msg)
982
            else:
983 1
                log.warning(msg)
984 1
            return False
985 1
        log.info(f"Failover path for {self} was deployed.")
986 1
        return True
987
988 1
    @staticmethod
989 1
    def add_tag_errors(msg: str, tag_errors: list):
990
        """Add to msg the tag errors ecountered when chossing path."""
991 1
        path = ['path', 'paths']
992 1
        was = ['was', 'were']
993 1
        message = ['message', 'messages']
994
995
        # Choose either singular(0) or plural(1) words
996 1
        n = 1
997 1
        if len(tag_errors) == 1:
998 1
            n = 0
999
1000 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
1001 1
        msg += f" with {message[n]}: {tag_errors}"
1002 1
        return msg
1003
1004 1
    def get_failover_flows(self):
1005
        """Return the flows needed to make the failover path active, i.e. the
1006
        flows for ingress forwarding.
1007
1008
        Return:
1009
            dict: A dict of flows indexed by the switch_id will be returned, or
1010
                an empty dict if no failover_path is available.
1011
        """
1012 1
        if not self.failover_path:
1013 1
            return {}
1014 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1015
1016
    # pylint: disable=too-many-branches
1017 1
    def _prepare_direct_uni_flows(self):
1018
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1019 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1020 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1021
1022 1
        flow_mod_az = self._prepare_flow_mod(
1023
            self.uni_a.interface, self.uni_z.interface,
1024
            self.queue_id, vlan_a
1025
        )
1026 1
        flow_mod_za = self._prepare_flow_mod(
1027
            self.uni_z.interface, self.uni_a.interface,
1028
            self.queue_id, vlan_z
1029
        )
1030
1031 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1032 1
            flow_mod_az["actions"].insert(
1033
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1034
            )
1035 1
            if not vlan_a:
1036 1
                flow_mod_az["actions"].insert(
1037
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1038
                )
1039 1
            if vlan_a == 0:
1040 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1041 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1042 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1043
1044 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1045 1
            flow_mod_za["actions"].insert(
1046
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1047
                )
1048 1
            if not vlan_z:
1049 1
                flow_mod_za["actions"].insert(
1050
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1051
                )
1052 1
            if vlan_z == 0:
1053 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1054 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1055 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1056
1057 1
        flows = []
1058 1
        if isinstance(vlan_a, list):
1059 1
            for mask_a in vlan_a:
1060 1
                flow_aux = deepcopy(flow_mod_az)
1061 1
                flow_aux["match"]["dl_vlan"] = mask_a
1062 1
                flows.append(flow_aux)
1063
        else:
1064 1
            if vlan_a is not None:
1065 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1066 1
            flows.append(flow_mod_az)
1067
1068 1
        if isinstance(vlan_z, list):
1069 1
            for mask_z in vlan_z:
1070 1
                flow_aux = deepcopy(flow_mod_za)
1071 1
                flow_aux["match"]["dl_vlan"] = mask_z
1072 1
                flows.append(flow_aux)
1073
        else:
1074 1
            if vlan_z is not None:
1075 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1076 1
            flows.append(flow_mod_za)
1077 1
        return (
1078
            self.uni_a.interface.switch.id, flows
1079
        )
1080
1081 1
    def _install_direct_uni_flows(self):
1082
        """Install flows connecting two UNIs.
1083
1084
        This case happens when the circuit is between UNIs in the
1085
        same switch.
1086
        """
1087 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1088 1
        try:
1089 1
            self._send_flow_mods(dpid, flows)
1090 1
        except FlowModException as err:
1091 1
            raise EVCPathNotInstalled(f"In {dpid}, {err}") from err
1092
1093 1
    def _prepare_nni_flows(self, path=None):
1094
        """Prepare NNI flows."""
1095 1
        nni_flows = OrderedDict()
1096 1
        previous = self.uni_a.interface.switch.dpid
1097 1
        for incoming, outcoming in self.links_zipped(path):
1098 1
            in_vlan = incoming.get_metadata("s_vlan").value
1099 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1100 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1101 1
            out_endpoint = self.get_endpoint_by_id(
1102
                outcoming, in_endpoint.switch.id, eq
1103
            )
1104
1105 1
            flows = []
1106
            # Flow for one direction
1107 1
            flows.append(
1108
                self._prepare_nni_flow(
1109
                    in_endpoint,
1110
                    out_endpoint,
1111
                    in_vlan,
1112
                    out_vlan,
1113
                    queue_id=self.queue_id,
1114
                )
1115
            )
1116
1117
            # Flow for the other direction
1118 1
            flows.append(
1119
                self._prepare_nni_flow(
1120
                    out_endpoint,
1121
                    in_endpoint,
1122
                    out_vlan,
1123
                    in_vlan,
1124
                    queue_id=self.queue_id,
1125
                )
1126
            )
1127 1
            previous = in_endpoint.switch.id
1128 1
            nni_flows[in_endpoint.switch.id] = flows
1129 1
        return nni_flows
1130
1131 1
    def _install_nni_flows(
1132
        self, path=None
1133
    ) -> dict[str, list[dict]]:
1134
        """Install NNI flows."""
1135 1
        nni_flows = self._prepare_nni_flows(path)
1136 1
        try:
1137 1
            for dpid, flows in nni_flows.items():
1138 1
                self._send_flow_mods(dpid, flows)
1139 1
        except FlowModException as err:
1140 1
            raise EVCPathNotInstalled(f"In {dpid}, {err}") from err
1141 1
        return nni_flows
1142
1143 1
    @staticmethod
1144 1
    def _get_value_from_uni_tag(uni: UNI):
1145
        """Returns the value from tag. In case of any and untagged
1146
        it should return 4096/4096 and 0 respectively"""
1147 1
        special = {"any": "4096/4096", "untagged": 0}
1148 1
        if uni.user_tag:
1149 1
            value = uni.user_tag.value
1150 1
            if isinstance(value, list):
1151 1
                return uni.user_tag.mask_list
1152 1
            return special.get(value, value)
1153 1
        return None
1154
1155
    # pylint: disable=too-many-locals
1156 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1157
        """Prepare flows to install UNIs."""
1158 1
        uni_flows = {}
1159 1
        if not path:
1160 1
            log.info("install uni flows without path.")
1161 1
            return uni_flows
1162
1163
        # Determine VLANs
1164 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1165 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1166
1167 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1168 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1169
1170
        # Get endpoints from path
1171 1
        endpoint_a = self.get_endpoint_by_id(
1172
            path[0], self.uni_a.interface.switch.id, eq
1173
        )
1174 1
        endpoint_z = self.get_endpoint_by_id(
1175
            path[-1], self.uni_z.interface.switch.id, eq
1176
        )
1177
1178
        # Flows for the first UNI
1179 1
        flows_a = []
1180
1181
        # Flow for one direction, pushing the service tag
1182 1
        if not skip_in:
1183 1
            if isinstance(in_vlan_a, list):
1184 1
                for in_mask_a in in_vlan_a:
1185 1
                    push_flow = self._prepare_push_flow(
1186
                        self.uni_a.interface,
1187
                        endpoint_a,
1188
                        in_mask_a,
1189
                        out_vlan_a,
1190
                        in_vlan_z,
1191
                        queue_id=self.queue_id,
1192
                    )
1193 1
                    flows_a.append(push_flow)
1194
            else:
1195 1
                push_flow = self._prepare_push_flow(
1196
                    self.uni_a.interface,
1197
                    endpoint_a,
1198
                    in_vlan_a,
1199
                    out_vlan_a,
1200
                    in_vlan_z,
1201
                    queue_id=self.queue_id,
1202
                )
1203 1
                flows_a.append(push_flow)
1204
1205
        # Flow for the other direction, popping the service tag
1206 1
        if not skip_out:
1207 1
            pop_flow = self._prepare_pop_flow(
1208
                endpoint_a,
1209
                self.uni_a.interface,
1210
                out_vlan_a,
1211
                queue_id=self.queue_id,
1212
            )
1213 1
            flows_a.append(pop_flow)
1214
1215 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1216
1217
        # Flows for the second UNI
1218 1
        flows_z = []
1219
1220
        # Flow for one direction, pushing the service tag
1221 1
        if not skip_in:
1222 1
            if isinstance(in_vlan_z, list):
1223 1
                for in_mask_z in in_vlan_z:
1224 1
                    push_flow = self._prepare_push_flow(
1225
                        self.uni_z.interface,
1226
                        endpoint_z,
1227
                        in_mask_z,
1228
                        out_vlan_z,
1229
                        in_vlan_a,
1230
                        queue_id=self.queue_id,
1231
                    )
1232 1
                    flows_z.append(push_flow)
1233
            else:
1234 1
                push_flow = self._prepare_push_flow(
1235
                    self.uni_z.interface,
1236
                    endpoint_z,
1237
                    in_vlan_z,
1238
                    out_vlan_z,
1239
                    in_vlan_a,
1240
                    queue_id=self.queue_id,
1241
                )
1242 1
                flows_z.append(push_flow)
1243
1244
        # Flow for the other direction, popping the service tag
1245 1
        if not skip_out:
1246 1
            pop_flow = self._prepare_pop_flow(
1247
                endpoint_z,
1248
                self.uni_z.interface,
1249
                out_vlan_z,
1250
                queue_id=self.queue_id,
1251
            )
1252 1
            flows_z.append(pop_flow)
1253
1254 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1255
1256 1
        return uni_flows
1257
1258 1
    def _install_uni_flows(
1259
        self, path=None, skip_in=False, skip_out=False
1260
    ) -> dict[str, list[dict]]:
1261
        """Install UNI flows."""
1262 1
        uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)
1263 1
        try:
1264 1
            for (dpid, flows) in uni_flows.items():
1265 1
                self._send_flow_mods(dpid, flows)
1266
        except FlowModException as err:
1267
            raise EVCPathNotInstalled(f"In {dpid}, {err}") from err
1268
1269 1
        return uni_flows
1270
1271 1
    @staticmethod
1272 1
    @retry(
1273
        stop=stop_after_attempt(3),
1274
        wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
1275
        retry=retry_if_exception_type(FlowModException),
1276
        before_sleep=before_sleep,
1277
        reraise=True,
1278
    )
1279 1
    def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
1280
        """Send a flow_mod list to a specific switch.
1281
1282
        Args:
1283
            dpid(str): The target of flows (i.e. Switch.id).
1284
            flow_mods(dict): Python dictionary with flow_mods.
1285
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1286
            force(bool): True to send via consistency check in case of errors
1287
1288
        """
1289 1
        endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"
1290
1291 1
        data = {"flows": flow_mods, "force": force}
1292 1
        try:
1293 1
            res = httpx.post(endpoint, json=data, timeout=30)
1294
        except httpx.RequestError as err:
1295
            raise FlowModException(str(err)) from err
1296 1
        if res.is_server_error or res.status_code >= 400:
1297 1
            raise FlowModException(res.text)
1298
1299 1
    def get_cookie(self):
1300
        """Return the cookie integer from evc id."""
1301 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1302
1303 1
    @staticmethod
1304 1
    def get_id_from_cookie(cookie):
1305
        """Return the evc id given a cookie value."""
1306 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1307 1
        return f"{evc_id:x}".zfill(14)
1308
1309 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1310
        """Set table_group and table_id"""
1311 1
        table_group = "epl" if vlan is None else "evpl"
1312 1
        flow_mod["table_group"] = table_group
1313 1
        flow_mod["table_id"] = self.table_group[table_group]
1314 1
        return flow_mod
1315
1316 1
    @staticmethod
1317 1
    def get_priority(vlan):
1318
        """Return priority value depending on vlan value"""
1319 1
        if isinstance(vlan, list):
1320 1
            return settings.EVPL_SB_PRIORITY
1321 1
        if vlan not in {None, "4096/4096", 0}:
1322 1
            return settings.EVPL_SB_PRIORITY
1323 1
        if vlan == 0:
1324 1
            return settings.UNTAGGED_SB_PRIORITY
1325 1
        if vlan == "4096/4096":
1326 1
            return settings.ANY_SB_PRIORITY
1327 1
        return settings.EPL_SB_PRIORITY
1328
1329 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1330
                          queue_id=None, vlan=True):
1331
        """Prepare a common flow mod."""
1332 1
        default_actions = [
1333
            {"action_type": "output", "port": out_interface.port_number}
1334
        ]
1335 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1336 1
        if queue_id is not None:
1337 1
            default_actions.append(
1338
                {"action_type": "set_queue", "queue_id": queue_id}
1339
            )
1340
1341 1
        flow_mod = {
1342
            "match": {"in_port": in_interface.port_number},
1343
            "cookie": self.get_cookie(),
1344
            "actions": default_actions,
1345
            "owner": "mef_eline",
1346
        }
1347
1348 1
        self.set_flow_table_group_id(flow_mod, vlan)
1349 1
        if self.sb_priority:
1350 1
            flow_mod["priority"] = self.sb_priority
1351
        else:
1352 1
            flow_mod["priority"] = self.get_priority(vlan)
1353 1
        return flow_mod
1354
1355 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1356
        """Create NNI flows."""
1357 1
        in_interface, out_interface, in_vlan, out_vlan = args
1358 1
        flow_mod = self._prepare_flow_mod(
1359
            in_interface, out_interface, queue_id
1360
        )
1361 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1362 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1363 1
        flow_mod["actions"].insert(0, new_action)
1364
1365 1
        return flow_mod
1366
1367 1
    def _prepare_push_flow(self, *args, queue_id=None):
1368
        """Prepare push flow.
1369
1370
        Arguments:
1371
            in_interface(str): Interface input.
1372
            out_interface(str): Interface output.
1373
            in_vlan(int,str,None): Vlan input.
1374
            out_vlan(str): Vlan output.
1375
            new_c_vlan(int,str,list,None): New client vlan.
1376
1377
        Return:
1378
            dict: An python dictionary representing a FlowMod
1379
1380
        """
1381
        # assign all arguments
1382 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1383 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1384 1
        flow_mod = self._prepare_flow_mod(
1385
            in_interface, out_interface, queue_id, vlan_pri
1386
        )
1387
        # the service tag must be always pushed
1388 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1389 1
        flow_mod["actions"].insert(0, new_action)
1390
1391 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1392 1
        flow_mod["actions"].insert(0, new_action)
1393
1394 1
        if in_vlan is not None:
1395
            # if in_vlan is set, it must be included in the match
1396 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1397
1398 1
        if (not isinstance(new_c_vlan, list) and in_vlan != new_c_vlan and
1399
                new_c_vlan not in self.special_cases):
1400
            # new_in_vlan is an integer but zero, action to set is required
1401 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1402 1
            flow_mod["actions"].insert(0, new_action)
1403
1404 1
        if in_vlan not in self.special_cases and new_c_vlan == 0:
1405
            # # new_in_vlan is an integer but zero and new_c_vlan does not
1406
            # a pop action is required
1407 1
            new_action = {"action_type": "pop_vlan"}
1408 1
            flow_mod["actions"].insert(0, new_action)
1409
1410 1
        elif in_vlan == "4096/4096" and new_c_vlan == 0:
1411
            # if in_vlan match with any tags and new_c_vlan does not
1412
            # a pop action is required
1413 1
            new_action = {"action_type": "pop_vlan"}
1414 1
            flow_mod["actions"].insert(0, new_action)
1415
1416 1
        elif (not in_vlan and
1417
                (not isinstance(new_c_vlan, list) and
1418
                 new_c_vlan not in self.special_cases)):
1419
            # new_in_vlan is an integer but zero and in_vlan is not set
1420
            # then it is set now
1421 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1422 1
            flow_mod["actions"].insert(0, new_action)
1423
1424 1
        return flow_mod
1425
1426 1
    def _prepare_pop_flow(
1427
        self, in_interface, out_interface, out_vlan, queue_id=None
1428
    ):
1429
        # pylint: disable=too-many-arguments
1430
        """Prepare pop flow."""
1431 1
        flow_mod = self._prepare_flow_mod(
1432
            in_interface, out_interface, queue_id
1433
        )
1434 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1435 1
        new_action = {"action_type": "pop_vlan"}
1436 1
        flow_mod["actions"].insert(0, new_action)
1437 1
        return flow_mod
1438
1439 1
    @staticmethod
1440 1
    def run_bulk_sdntraces(
1441
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1442
    ) -> dict:
1443
        """Run SDN traces on control plane starting from EVC UNIs."""
1444 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1445 1
        data = []
1446 1
        for interface, tag_value in uni_list:
1447 1
            data_uni = {
1448
                "trace": {
1449
                            "switch": {
1450
                                "dpid": interface.switch.dpid,
1451
                                "in_port": interface.port_number,
1452
                            }
1453
                        }
1454
                }
1455 1
            if tag_value:
1456 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1457 1
                if uni_dl_vlan:
1458 1
                    data_uni["trace"]["eth"] = {
1459
                                            "dl_type": 0x8100,
1460
                                            "dl_vlan": uni_dl_vlan,
1461
                                            }
1462 1
            data.append(data_uni)
1463 1
        try:
1464 1
            response = httpx.put(endpoint, json=data, timeout=30)
1465 1
        except httpx.TimeoutException as exception:
1466 1
            log.error(f"Request has timed out: {exception}")
1467 1
            return {"result": []}
1468 1
        if response.status_code >= 400:
1469 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1470 1
            return {"result": []}
1471 1
        return response.json()
1472
1473
    # pylint: disable=too-many-return-statements, too-many-arguments
1474 1
    @staticmethod
1475 1
    def check_trace(
1476
        evc_id: str,
1477
        evc_name: str,
1478
        tag_a: Union[None, int, str],
1479
        tag_z: Union[None, int, str],
1480
        interface_a: Interface,
1481
        interface_z: Interface,
1482
        current_path: list,
1483
        trace_a: list,
1484
        trace_z: list
1485
    ) -> bool:
1486
        """Auxiliar function to check an individual trace"""
1487 1
        if (
1488
            len(trace_a) != len(current_path) + 1
1489
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1490
        ):
1491 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1492
                        f"Invalid trace from uni_a: {trace_a}")
1493 1
            return False
1494 1
        if (
1495
            len(trace_z) != len(current_path) + 1
1496
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1497
        ):
1498 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1499
                        f"Invalid trace from uni_z: {trace_z}")
1500 1
            return False
1501
1502 1
        if not current_path:
1503
            return True
1504
1505 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1506 1
        if (
1507
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1508
        ):
1509 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1510 1
        elif (
1511
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1512
        ):
1513 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1514
        else:
1515
            msg = (
1516
                f"first link {first_link} endpoint_a didn't match the first "
1517
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1518
            )
1519
            log.warning(msg)
1520
            return False
1521
1522 1
        for link, trace1, trace2 in zip(current_path,
1523
                                        trace_path_begin[1:],
1524
                                        trace_path_end[:0:-1]):
1525 1
            metadata_vlan = None
1526 1
            if link.metadata:
1527 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1528 1
            if compare_endpoint_trace(
1529
                                        link.endpoint_a,
1530
                                        metadata_vlan,
1531
                                        trace2
1532
                                    ) is False:
1533 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1534
                            f"Invalid trace from uni_a: {trace_a}")
1535 1
                return False
1536 1
            if compare_endpoint_trace(
1537
                                        link.endpoint_b,
1538
                                        metadata_vlan,
1539
                                        trace1
1540
                                    ) is False:
1541 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1542
                            f"Invalid trace from uni_z: {trace_z}")
1543 1
                return False
1544
1545 1
        return True
1546
1547 1
    @staticmethod
1548 1
    def check_range(circuit, traces: list) -> bool:
1549
        """Check traces when for UNI with TAGRange"""
1550 1
        check = True
1551 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1552 1
            trace_a = traces[i*2]
1553 1
            trace_z = traces[i*2+1]
1554 1
            check &= EVCDeploy.check_trace(
1555
                circuit.id, circuit.name,
1556
                mask, mask,
1557
                circuit.uni_a.interface,
1558
                circuit.uni_z.interface,
1559
                circuit.current_path,
1560
                trace_a, trace_z,
1561
            )
1562 1
        return check
1563
1564 1
    @staticmethod
1565 1
    def check_list_traces(list_circuits: list) -> dict:
1566
        """Check if current_path is deployed comparing with SDN traces."""
1567 1
        if not list_circuits:
1568 1
            return {}
1569 1
        uni_list = make_uni_list(list_circuits)
1570 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1571
1572 1
        if not traces:
1573 1
            return {}
1574
1575 1
        try:
1576 1
            circuits_checked = {}
1577 1
            i = 0
1578 1
            for circuit in list_circuits:
1579 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1580 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1581 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1582
                        circuit, traces[i:i+length*2]
1583
                    )
1584 1
                    i += length*2
1585
                else:
1586 1
                    trace_a = traces[i]
1587 1
                    trace_z = traces[i+1]
1588 1
                    tag_a = None
1589 1
                    if circuit.uni_a.user_tag:
1590 1
                        tag_a = circuit.uni_a.user_tag.value
1591 1
                    tag_z = None
1592 1
                    if circuit.uni_z.user_tag:
1593 1
                        tag_z = circuit.uni_z.user_tag.value
1594 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1595
                        circuit.id, circuit.name,
1596
                        tag_a, tag_z,
1597
                        circuit.uni_a.interface,
1598
                        circuit.uni_z.interface,
1599
                        circuit.current_path,
1600
                        trace_a, trace_z
1601
                    )
1602 1
                    i += 2
1603 1
        except IndexError as err:
1604 1
            log.error(
1605
                f"Bulk sdntraces returned fewer items than expected."
1606
                f"Error = {err}"
1607
            )
1608 1
            return {}
1609
1610 1
        return circuits_checked
1611
1612 1
    @staticmethod
1613 1
    def get_endpoint_by_id(
1614
        link: Link,
1615
        id_: str,
1616
        operator: Union[eq, ne]
1617
    ) -> Interface:
1618
        """Return endpoint from link
1619
        either equal(eq) or not equal(ne) to id"""
1620 1
        if operator(link.endpoint_a.switch.id, id_):
1621 1
            return link.endpoint_a
1622 1
        return link.endpoint_b
1623
1624
1625 1
class LinkProtection(EVCDeploy):
1626
    """Class to handle link protection."""
1627
1628 1
    def is_affected_by_link(self, link=None):
1629
        """Verify if the current path is affected by link down event."""
1630
        return self.current_path.is_affected_by_link(link)
1631
1632 1
    def is_using_primary_path(self):
1633
        """Verify if the current deployed path is self.primary_path."""
1634 1
        return self.current_path == self.primary_path
1635
1636 1
    def is_using_backup_path(self):
1637
        """Verify if the current deployed path is self.backup_path."""
1638 1
        return self.current_path == self.backup_path
1639
1640 1
    def is_using_dynamic_path(self):
1641
        """Verify if the current deployed path is dynamic."""
1642 1
        if (
1643
            self.current_path
1644
            and not self.is_using_primary_path()
1645
            and not self.is_using_backup_path()
1646
            and self.current_path.status is EntityStatus.UP
1647
        ):
1648
            return True
1649 1
        return False
1650
1651 1
    def handle_link_up(self, link):
1652
        """Handle circuit when link up.
1653
1654
        Args:
1655
            link(Link): Link affected by link.up event.
1656
1657
        """
1658 1
        condition_pairs = [
1659
            (
1660
                lambda me: me.is_using_primary_path(),
1661
                lambda _: (True, 'nothing')
1662
            ),
1663
            (
1664
                lambda me: me.is_intra_switch(),
1665
                lambda _: (True, 'nothing')
1666
            ),
1667
            (
1668
                lambda me: me.primary_path.is_affected_by_link(link),
1669
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1670
            ),
1671
            # We tried to deploy(primary_path) without success.
1672
            # And in this case is up by some how. Nothing to do.
1673
            (
1674
                lambda me: me.is_using_backup_path(),
1675
                lambda _: (True, 'nothing')
1676
            ),
1677
            (
1678
                lambda me:  me.is_using_dynamic_path(),
1679
                lambda _: (True, 'nothing')
1680
            ),
1681
            # In this case, probably the circuit is not being used and
1682
            # we can move to backup
1683
            (
1684
                lambda me: me.backup_path.is_affected_by_link(link),
1685
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1686
            ),
1687
            # In this case, the circuit is not being used and we should
1688
            # try a dynamic path
1689
            (
1690
                lambda me: me.dynamic_backup_path and not me.is_active(),
1691
                lambda me: (me.deploy_to_path(), 'redeploy')
1692
            )
1693
        ]
1694 1
        for predicate, action in condition_pairs:
1695 1
            if not predicate(self):
1696 1
                continue
1697 1
            success, succcess_type = action(self)
1698 1
            if success:
1699 1
                if succcess_type == 'redeploy':
1700 1
                    emit_event(
1701
                        self._controller,
1702
                        "redeployed_link_up",
1703
                        content=map_evc_event_content(self)
1704
                    )
1705 1
                return True
1706 1
        return False
1707
1708 1
    def handle_link_down(self):
1709
        """Handle circuit when link down.
1710
1711
        Returns:
1712
            bool: True if the re-deploy was successly otherwise False.
1713
1714
        """
1715 1
        success = False
1716 1
        if self.is_using_primary_path():
1717 1
            success = self.deploy_to_backup_path()
1718 1
        elif self.is_using_backup_path():
1719 1
            success = self.deploy_to_primary_path()
1720
1721 1
        if not success and self.dynamic_backup_path:
1722 1
            success = self.deploy_to_path()
1723
1724 1
        if success:
1725 1
            log.debug(f"{self} deployed after link down.")
1726
        else:
1727 1
            self.remove_current_flows(sync=False)
1728 1
            self.deactivate()
1729 1
            self.sync()
1730 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1731
1732 1
        return success
1733
1734 1
    @staticmethod
1735 1
    def get_interface_from_switch(uni: UNI, switches: dict) -> Interface:
1736
        """Get interface from switch by uni"""
1737 1
        switch = switches[uni.interface.switch.dpid]
1738 1
        interface = switch.interfaces[uni.interface.port_number]
1739 1
        return interface
1740
1741 1
    def are_unis_active(self, switches: dict) -> bool:
1742
        """Determine whether this EVC should be active"""
1743 1
        interface_a = self.get_interface_from_switch(self.uni_a, switches)
1744 1
        interface_z = self.get_interface_from_switch(self.uni_z, switches)
1745 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1746 1
        return active
1747
1748 1
    @staticmethod
1749 1
    def is_uni_interface_active(
1750
        *interfaces: Interface
1751
    ) -> tuple[bool, dict]:
1752
        """Determine whether a UNI should be active"""
1753 1
        active = True
1754 1
        bad_interfaces = [
1755
            interface
1756
            for interface in interfaces
1757
            if interface.status != EntityStatus.UP
1758
        ]
1759 1
        if bad_interfaces:
1760 1
            active = False
1761 1
            interfaces = bad_interfaces
1762 1
        return active, {
1763
            interface.id: {
1764
                'status': interface.status.value,
1765
                'status_reason': interface.status_reason,
1766
            }
1767
            for interface in interfaces
1768
        }
1769
1770 1
    def handle_interface_link_up(self, interface: Interface):
1771
        """
1772
        Handler for interface link_up events
1773
        """
1774 1
        if self.is_active():
1775 1
            return
1776 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1777 1
        if interface not in interfaces:
1778
            return
1779 1
        down_interfaces = [
1780
            interface
1781
            for interface in interfaces
1782
            if interface.status != EntityStatus.UP
1783
        ]
1784 1
        if down_interfaces:
1785
            return
1786 1
        interface_dicts = {
1787
            interface.id: {
1788
                'status': interface.status.value,
1789
                'status_reason': interface.status_reason,
1790
            }
1791
            for interface in interfaces
1792
        }
1793 1
        self.activate()
1794 1
        log.info(
1795
            f"Activating {self}. Interfaces: "
1796
            f"{interface_dicts}."
1797
        )
1798 1
        emit_event(self._controller, "uni_active_updated",
1799
                   content=map_evc_event_content(self))
1800 1
        self.sync()
1801
1802 1
    def handle_interface_link_down(self, interface):
1803
        """
1804
        Handler for interface link_down events
1805
        """
1806 1
        if not self.is_active():
1807 1
            return
1808 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1809 1
        if interface not in interfaces:
1810
            return
1811 1
        down_interfaces = [
1812
            interface
1813
            for interface in interfaces
1814
            if interface.status != EntityStatus.UP
1815
        ]
1816 1
        if not down_interfaces:
1817
            return
1818 1
        interface_dicts = {
1819
            interface.id: {
1820
                'status': interface.status.value,
1821
                'status_reason': interface.status_reason,
1822
            }
1823
            for interface in down_interfaces
1824
        }
1825 1
        self.deactivate()
1826 1
        log.info(
1827
            f"Deactivating {self}. Interfaces: "
1828
            f"{interface_dicts}."
1829
        )
1830 1
        emit_event(self._controller, "uni_active_updated",
1831
                   content=map_evc_event_content(self))
1832 1
        self.sync()
1833
1834
1835 1
class EVC(LinkProtection):
1836
    """Class that represents a E-Line Virtual Connection."""
1837