Passed
Push — master ( 495f75...db415d )
by Vinicius
04:25 queued 16s
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import requests
12 1
from glom import glom
13 1
from requests.exceptions import Timeout
14
15 1
from kytos.core import log
16 1
from kytos.core.common import EntityStatus, GenericEntity
17 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
18 1
from kytos.core.helpers import get_time, now
19 1
from kytos.core.interface import UNI, Interface, TAGRange
20 1
from kytos.core.link import Link
21 1
from kytos.core.tag_ranges import range_difference
22 1
from napps.kytos.mef_eline import controllers, settings
23 1
from napps.kytos.mef_eline.exceptions import (DuplicatedNoTagUNI,
24
                                              FlowModException, InvalidPath)
25 1
from napps.kytos.mef_eline.utils import (check_disabled_component,
26
                                         compare_endpoint_trace,
27
                                         compare_uni_out_trace, emit_event,
28
                                         make_uni_list, map_dl_vlan,
29
                                         map_evc_event_content,
30
                                         merge_flow_dicts)
31
32 1
from .path import DynamicPathManager, Path
33
34
35 1
class EVCBase(GenericEntity):
36
    """Class to represent a circuit."""
37
38 1
    attributes_requiring_redeploy = [
39
        "primary_path",
40
        "backup_path",
41
        "dynamic_backup_path",
42
        "queue_id",
43
        "sb_priority",
44
        "primary_constraints",
45
        "secondary_constraints",
46
        "uni_a",
47
        "uni_z",
48
    ]
49 1
    required_attributes = ["name", "uni_a", "uni_z"]
50
51 1
    updatable_attributes = {
52
        "uni_a",
53
        "uni_z",
54
        "name",
55
        "start_date",
56
        "end_date",
57
        "queue_id",
58
        "bandwidth",
59
        "primary_path",
60
        "backup_path",
61
        "dynamic_backup_path",
62
        "primary_constraints",
63
        "secondary_constraints",
64
        "owner",
65
        "sb_priority",
66
        "service_level",
67
        "circuit_scheduler",
68
        "metadata",
69
        "enabled"
70
    }
71
72 1
    def __init__(self, controller, **kwargs):
73
        """Create an EVC instance with the provided parameters.
74
75
        Args:
76
            id(str): EVC identifier. Whether it's None an ID will be genereted.
77
                     Only the first 14 bytes passed will be used.
78
            name: represents an EVC name.(Required)
79
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
80
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
81
            start_date(datetime|str): Date when the EVC was registred.
82
                                      Default is now().
83
            end_date(datetime|str): Final date that the EVC will be fineshed.
84
                                    Default is None.
85
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
86
            primary_links(list): Primary links used by evc. Default is []
87
            backup_links(list): Backups links used by evc. Default is []
88
            current_path(list): Circuit being used at the moment if this is an
89
                                active circuit. Default is [].
90
            failover_path(list): Path being used to provide EVC protection via
91
                                failover during link failures. Default is [].
92
            primary_path(list): primary circuit offered to user IF one or more
93
                                links were provided. Default is [].
94
            backup_path(list): backup circuit offered to the user IF one or
95
                               more links were provided. Default is [].
96
            dynamic_backup_path(bool): Enable computer backup path dynamically.
97
                                       Dafault is False.
98
            creation_time(datetime|str): datetime when the circuit should be
99
                                         activated. default is now().
100
            enabled(Boolean): attribute to indicate the administrative state;
101
                              default is False.
102
            active(Boolean): attribute to indicate the operational state;
103
                             default is False.
104
            archived(Boolean): indicate the EVC has been deleted and is
105
                               archived; default is False.
106
            owner(str): The EVC owner. Default is None.
107
            sb_priority(int): Service level provided in the request.
108
                              Default is None.
109
            service_level(int): Service level provided. The higher the better.
110
                                Default is 0.
111
112
        Raises:
113
            ValueError: raised when object attributes are invalid.
114
115
        """
116 1
        self._controller = controller
117 1
        self._validate(**kwargs)
118 1
        super().__init__()
119
120
        # required attributes
121 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
122 1
        self.uni_a: UNI = kwargs.get("uni_a")
123 1
        self.uni_z: UNI = kwargs.get("uni_z")
124 1
        self.name = kwargs.get("name")
125
126
        # optional attributes
127 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
128 1
        self.end_date = get_time(kwargs.get("end_date")) or None
129 1
        self.queue_id = kwargs.get("queue_id", -1)
130
131 1
        self.bandwidth = kwargs.get("bandwidth", 0)
132 1
        self.primary_links = Path(kwargs.get("primary_links", []))
133 1
        self.backup_links = Path(kwargs.get("backup_links", []))
134 1
        self.current_path = Path(kwargs.get("current_path", []))
135 1
        self.failover_path = Path(kwargs.get("failover_path", []))
136 1
        self.primary_path = Path(kwargs.get("primary_path", []))
137 1
        self.backup_path = Path(kwargs.get("backup_path", []))
138 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
139 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
140 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
141 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
142 1
        self.owner = kwargs.get("owner", None)
143 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
144
            "priority", None
145
        )
146 1
        self.service_level = kwargs.get("service_level", 0)
147 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
148 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
149 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
150 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
151
152 1
        self.current_links_cache = set()
153 1
        self.primary_links_cache = set()
154 1
        self.backup_links_cache = set()
155 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
156 1
        self.old_path = Path([])
157
158 1
        self.lock = Lock()
159
160 1
        self.archived = kwargs.get("archived", False)
161
162 1
        self.metadata = kwargs.get("metadata", {})
163
164 1
        self._mongo_controller = controllers.ELineController()
165
166 1
        if kwargs.get("active", False):
167 1
            self.activate()
168
        else:
169 1
            self.deactivate()
170
171 1
        if kwargs.get("enabled", False):
172 1
            self.enable()
173
        else:
174 1
            self.disable()
175
176
        # datetime of user request for a EVC (or datetime when object was
177
        # created)
178 1
        self.request_time = kwargs.get("request_time", now())
179
        # dict with the user original request (input)
180 1
        self._requested = kwargs
181
182
        # Special cases: No tag, any, untagged
183 1
        self.special_cases = {None, "4096/4096", 0}
184 1
        self.table_group = kwargs.get("table_group")
185
186 1
    def sync(self, keys: set = None):
187
        """Sync this EVC in the MongoDB."""
188 1
        self.updated_at = now()
189 1
        if keys:
190 1
            self._mongo_controller.update_evc(self.as_dict(keys))
191 1
            return
192 1
        self._mongo_controller.upsert_evc(self.as_dict())
193
194 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
195
        """Obtain both UNIs (uni_a, uni_z).
196
        If a UNI is changing, verify tags"""
197 1
        uni_a = kwargs.get("uni_a", None)
198 1
        uni_a_flag = False
199 1
        if uni_a and uni_a != self.uni_a:
200 1
            uni_a_flag = True
201 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
202
203 1
        uni_z = kwargs.get("uni_z", None)
204 1
        if uni_z and uni_z != self.uni_z:
205 1
            try:
206 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
207 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
208 1
            except KytosTagError as err:
209 1
                if uni_a_flag:
210 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
211 1
                raise err
212
        else:
213 1
            uni_z = self.uni_z
214
215 1
        if uni_a_flag:
216 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
217
        else:
218 1
            uni_a = self.uni_a
219 1
        return uni_a, uni_z
220
221 1
    def update(self, **kwargs):
222
        """Update evc attributes.
223
224
        This method will raises an error trying to change the following
225
        attributes: [creation_time, active, current_path, failover_path,
226
        _id, archived]
227
        [name, uni_a and uni_z]
228
229
        Returns:
230
            the values for enable and a redeploy attribute, if exists and None
231
            otherwise
232
        Raises:
233
            ValueError: message with error detail.
234
235
        """
236 1
        enable, redeploy = (None, None)
237 1
        if not self._tag_lists_equal(**kwargs):
238 1
            raise ValueError(
239
                "UNI_A and UNI_Z tag lists should be the same."
240
            )
241 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
242 1
        check_disabled_component(uni_a, uni_z)
243 1
        self._validate_has_primary_or_dynamic(
244
            primary_path=kwargs.get("primary_path"),
245
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
246
            uni_a=uni_a,
247
            uni_z=uni_z,
248
        )
249 1
        for attribute, value in kwargs.items():
250 1
            if attribute not in self.updatable_attributes:
251 1
                raise ValueError(f"{attribute} can't be updated.")
252 1
            if attribute in ("primary_path", "backup_path"):
253 1
                try:
254 1
                    value.is_valid(
255
                        uni_a.interface.switch, uni_z.interface.switch
256
                    )
257 1
                except InvalidPath as exception:
258 1
                    raise ValueError(  # pylint: disable=raise-missing-from
259
                        f"{attribute} is not a " f"valid path: {exception}"
260
                    )
261 1
        for attribute, value in kwargs.items():
262 1
            if attribute == "enabled":
263 1
                if value:
264 1
                    self.enable()
265
                else:
266 1
                    self.disable()
267 1
                enable = value
268
            else:
269 1
                setattr(self, attribute, value)
270 1
                if attribute in self.attributes_requiring_redeploy:
271 1
                    redeploy = True
272 1
        self.sync(set(kwargs.keys()))
273 1
        return enable, redeploy
274
275 1
    def set_flow_removed_at(self):
276
        """Update flow_removed_at attribute."""
277
        self.flow_removed_at = now()
278
279 1
    def has_recent_removed_flow(self, setting=settings):
280
        """Check if any flow has been removed from the evc"""
281
        if self.flow_removed_at is None:
282
            return False
283
        res_seconds = (now() - self.flow_removed_at).seconds
284
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
285
286 1
    def is_recent_updated(self, setting=settings):
287
        """Check if the evc has been updated recently"""
288
        res_seconds = (now() - self.updated_at).seconds
289
        return res_seconds < setting.TIME_RECENT_UPDATED
290
291 1
    def __repr__(self):
292
        """Repr method."""
293 1
        return f"EVC({self._id}, {self.name})"
294
295 1
    def _validate(self, **kwargs):
296
        """Do Basic validations.
297
298
        Verify required attributes: name, uni_a, uni_z
299
300
        Raises:
301
            ValueError: message with error detail.
302
303
        """
304 1
        for attribute in self.required_attributes:
305
306 1
            if attribute not in kwargs:
307 1
                raise ValueError(f"{attribute} is required.")
308
309 1
            if "uni" in attribute:
310 1
                uni = kwargs.get(attribute)
311 1
                if not isinstance(uni, UNI):
312
                    raise ValueError(f"{attribute} is an invalid UNI.")
313
314 1
    def _tag_lists_equal(self, **kwargs):
315
        """Verify that tag lists are the same."""
316 1
        uni_a = kwargs.get("uni_a") or self.uni_a
317 1
        uni_z = kwargs.get("uni_z") or self.uni_z
318 1
        uni_a_list = uni_z_list = False
319 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
320 1
            uni_a_list = True
321 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
322 1
            uni_z_list = True
323 1
        if uni_a_list and uni_z_list:
324 1
            return uni_a.user_tag.value == uni_z.user_tag.value
325 1
        return uni_a_list == uni_z_list
326
327 1
    def _validate_has_primary_or_dynamic(
328
        self,
329
        primary_path=None,
330
        dynamic_backup_path=None,
331
        uni_a=None,
332
        uni_z=None,
333
    ) -> None:
334
        """Validate that it must have a primary path or allow dynamic paths."""
335 1
        primary_path = (
336
            primary_path
337
            if primary_path is not None
338
            else self.primary_path
339
        )
340 1
        dynamic_backup_path = (
341
            dynamic_backup_path
342
            if dynamic_backup_path is not None
343
            else self.dynamic_backup_path
344
        )
345 1
        uni_a = uni_a if uni_a is not None else self.uni_a
346 1
        uni_z = uni_z if uni_z is not None else self.uni_z
347 1
        if (
348
            not primary_path
349
            and not dynamic_backup_path
350
            and uni_a and uni_z
351
            and uni_a.interface.switch != uni_z.interface.switch
352
        ):
353 1
            msg = "The EVC must have a primary path or allow dynamic paths."
354 1
            raise ValueError(msg)
355
356 1
    def __eq__(self, other):
357
        """Override the default implementation."""
358 1
        if not isinstance(other, EVC):
359
            return False
360
361 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
362 1
        for attribute in attrs_to_compare:
363 1
            if getattr(other, attribute) != getattr(self, attribute):
364 1
                return False
365 1
        return True
366
367 1
    def is_intra_switch(self):
368
        """Check if the UNIs are in the same switch."""
369 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
370
371 1
    def check_no_tag_duplicate(self, other_uni: UNI):
372
        """Check if a no tag UNI is duplicated."""
373 1
        if other_uni in (self.uni_a, self.uni_z):
374 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
375
                  f" duplicated with EVC {self.id}."
376 1
            raise DuplicatedNoTagUNI(msg)
377
378 1
    def as_dict(self, keys: set = None):
379
        """Return a dictionary representing an EVC object.
380
            keys: Only fields on this variable will be
381
                  returned in the dictionary"""
382 1
        evc_dict = {
383
            "id": self.id,
384
            "name": self.name,
385
            "uni_a": self.uni_a.as_dict(),
386
            "uni_z": self.uni_z.as_dict(),
387
        }
388
389 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
390
391 1
        evc_dict["start_date"] = self.start_date
392 1
        if isinstance(self.start_date, datetime):
393 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
394
395 1
        evc_dict["end_date"] = self.end_date
396 1
        if isinstance(self.end_date, datetime):
397 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
398
399 1
        evc_dict["queue_id"] = self.queue_id
400 1
        evc_dict["bandwidth"] = self.bandwidth
401 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
402 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
403 1
        evc_dict["current_path"] = self.current_path.as_dict()
404 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
405 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
406 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
407 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
408 1
        evc_dict["metadata"] = self.metadata
409
410 1
        evc_dict["request_time"] = self.request_time
411 1
        if isinstance(self.request_time, datetime):
412 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
413
414 1
        time = self.creation_time.strftime(time_fmt)
415 1
        evc_dict["creation_time"] = time
416
417 1
        evc_dict["owner"] = self.owner
418 1
        evc_dict["circuit_scheduler"] = [
419
            sc.as_dict() for sc in self.circuit_scheduler
420
        ]
421
422 1
        evc_dict["active"] = self.is_active()
423 1
        evc_dict["enabled"] = self.is_enabled()
424 1
        evc_dict["archived"] = self.archived
425 1
        evc_dict["sb_priority"] = self.sb_priority
426 1
        evc_dict["service_level"] = self.service_level
427 1
        evc_dict["primary_constraints"] = self.primary_constraints
428 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
429 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
430 1
        evc_dict["updated_at"] = self.updated_at
431
432 1
        if keys:
433 1
            selected = {}
434 1
            for key in keys:
435 1
                selected[key] = evc_dict[key]
436 1
            selected["id"] = evc_dict["id"]
437 1
            return selected
438 1
        return evc_dict
439
440 1
    @property
441 1
    def id(self):  # pylint: disable=invalid-name
442
        """Return this EVC's ID."""
443 1
        return self._id
444
445 1
    def archive(self):
446
        """Archive this EVC on deletion."""
447 1
        self.archived = True
448
449 1
    def _use_uni_vlan(
450
        self,
451
        uni: UNI,
452
        uni_dif: Union[None, UNI] = None
453
    ):
454
        """Use tags from UNI"""
455 1
        if uni.user_tag is None:
456 1
            return
457 1
        tag = uni.user_tag.value
458 1
        tag_type = uni.user_tag.tag_type
459 1
        if (uni_dif and isinstance(tag, list) and
460
                isinstance(uni_dif.user_tag.value, list)):
461 1
            tag = range_difference(tag, uni_dif.user_tag.value)
462 1
            if not tag:
463 1
                return
464 1
        uni.interface.use_tags(
465
            self._controller, tag, tag_type, use_lock=True, check_order=False
466
        )
467
468 1
    def make_uni_vlan_available(
469
        self,
470
        uni: UNI,
471
        uni_dif: Union[None, UNI] = None,
472
    ):
473
        """Make available tag from UNI"""
474 1
        if uni.user_tag is None:
475 1
            return
476 1
        tag = uni.user_tag.value
477 1
        tag_type = uni.user_tag.tag_type
478 1
        if (uni_dif and isinstance(tag, list) and
479
                isinstance(uni_dif.user_tag.value, list)):
480 1
            tag = range_difference(tag, uni_dif.user_tag.value)
481 1
            if not tag:
482
                return
483 1
        try:
484 1
            conflict = uni.interface.make_tags_available(
485
                self._controller, tag, tag_type, use_lock=True,
486
                check_order=False
487
            )
488 1
        except KytosTagError as err:
489 1
            log.error(f"Error in circuit {self._id}: {err}")
490 1
            return
491 1
        if conflict:
492 1
            intf = uni.interface.id
493 1
            log.warning(f"Tags {conflict} was already available in {intf}")
494
495 1
    def remove_uni_tags(self):
496
        """Remove both UNI usage of a tag"""
497 1
        self.make_uni_vlan_available(self.uni_a)
498 1
        self.make_uni_vlan_available(self.uni_z)
499
500
501
# pylint: disable=fixme, too-many-public-methods
502 1
class EVCDeploy(EVCBase):
503
    """Class to handle the deploy procedures."""
504
505 1
    def create(self):
506
        """Create a EVC."""
507
508 1
    def discover_new_paths(self):
509
        """Discover new paths to satisfy this circuit and deploy it."""
510
        return DynamicPathManager.get_best_paths(self,
511
                                                 **self.primary_constraints)
512
513 1
    def get_failover_path_candidates(self):
514
        """Get failover paths to satisfy this EVC."""
515
        # in the future we can return primary/backup paths as well
516
        # we just have to properly handle link_up and failover paths
517
        # if (
518
        #     self.is_using_primary_path() and
519
        #     self.backup_path.status is EntityStatus.UP
520
        # ):
521
        #     yield self.backup_path
522 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
523
524 1
    def change_path(self):
525
        """Change EVC path."""
526
527 1
    def reprovision(self):
528
        """Force the EVC (re-)provisioning."""
529
530 1
    def is_affected_by_link(self, link):
531
        """Return True if this EVC has the given link on its current path."""
532 1
        return link in self.current_path
533
534 1
    def link_affected_by_interface(self, interface):
535
        """Return True if this EVC has the given link on its current path."""
536
        return self.current_path.link_affected_by_interface(interface)
537
538 1
    def is_backup_path_affected_by_link(self, link):
539
        """Return True if the backup path of this EVC uses the given link."""
540 1
        return link in self.backup_path
541
542
    # pylint: disable=invalid-name
543 1
    def is_primary_path_affected_by_link(self, link):
544
        """Return True if the primary path of this EVC uses the given link."""
545 1
        return link in self.primary_path
546
547 1
    def is_failover_path_affected_by_link(self, link):
548
        """Return True if this EVC has the given link on its failover path."""
549 1
        return link in self.failover_path
550
551 1
    def is_eligible_for_failover_path(self):
552
        """Verify if this EVC is eligible for failover path (EP029)"""
553
        # In the future this function can be augmented to consider
554
        # primary/backup, primary/dynamic, and other path combinations
555 1
        return (
556
            self.dynamic_backup_path and
557
            not self.primary_path and not self.backup_path
558
        )
559
560 1
    def is_using_primary_path(self):
561
        """Verify if the current deployed path is self.primary_path."""
562 1
        return self.primary_path and (self.current_path == self.primary_path)
563
564 1
    def is_using_backup_path(self):
565
        """Verify if the current deployed path is self.backup_path."""
566 1
        return self.backup_path and (self.current_path == self.backup_path)
567
568 1
    def is_using_dynamic_path(self):
569
        """Verify if the current deployed path is a dynamic path."""
570 1
        if (
571
            self.current_path
572
            and not self.is_using_primary_path()
573
            and not self.is_using_backup_path()
574
            and self.current_path.status == EntityStatus.UP
575
        ):
576
            return True
577 1
        return False
578
579 1
    def deploy_to_backup_path(self):
580
        """Deploy the backup path into the datapaths of this circuit.
581
582
        If the backup_path attribute is valid and up, this method will try to
583
        deploy this backup_path.
584
585
        If everything fails and dynamic_backup_path is True, then tries to
586
        deploy a dynamic path.
587
        """
588
        # TODO: Remove flows from current (cookies)
589 1
        if self.is_using_backup_path():
590
            # TODO: Log to say that cannot move backup to backup
591
            return True
592
593 1
        success = False
594 1
        if self.backup_path.status is EntityStatus.UP:
595 1
            success = self.deploy_to_path(self.backup_path)
596
597 1
        if success:
598 1
            return True
599
600 1
        if self.dynamic_backup_path or self.is_intra_switch():
601 1
            return self.deploy_to_path()
602
603
        return False
604
605 1
    def deploy_to_primary_path(self):
606
        """Deploy the primary path into the datapaths of this circuit.
607
608
        If the primary_path attribute is valid and up, this method will try to
609
        deploy this primary_path.
610
        """
611
        # TODO: Remove flows from current (cookies)
612 1
        if self.is_using_primary_path():
613
            # TODO: Log to say that cannot move primary to primary
614
            return True
615
616 1
        if self.primary_path.status is EntityStatus.UP:
617 1
            return self.deploy_to_path(self.primary_path)
618
        return False
619
620 1
    def deploy(self):
621
        """Deploy EVC to best path.
622
623
        Best path can be the primary path, if available. If not, the backup
624
        path, and, if it is also not available, a dynamic path.
625
        """
626 1
        if self.archived:
627 1
            return False
628 1
        self.enable()
629 1
        success = self.deploy_to_primary_path()
630 1
        if not success:
631 1
            success = self.deploy_to_backup_path()
632
633 1
        if success:
634 1
            emit_event(self._controller, "deployed",
635
                       content=map_evc_event_content(self))
636 1
        return success
637
638 1
    @staticmethod
639 1
    def get_path_status(path):
640
        """Check for the current status of a path.
641
642
        If any link in this path is down, the path is considered down.
643
        """
644 1
        if not path:
645 1
            return EntityStatus.DISABLED
646
647 1
        for link in path:
648 1
            if link.status is not EntityStatus.UP:
649 1
                return link.status
650 1
        return EntityStatus.UP
651
652
    #    def discover_new_path(self):
653
    #        # TODO: discover a new path to satisfy this circuit and deploy
654
655 1
    def remove(self):
656
        """Remove EVC path and disable it."""
657 1
        self.remove_current_flows(sync=False)
658 1
        self.remove_failover_flows(sync=False)
659 1
        self.disable()
660 1
        self.sync()
661 1
        emit_event(self._controller, "undeployed",
662
                   content=map_evc_event_content(self))
663
664 1
    def remove_failover_flows(self, exclude_uni_switches=True,
665
                              force=True, sync=True) -> None:
666
        """Remove failover_flows.
667
668
        By default, it'll exclude UNI switches, if mef_eline has already
669
        called remove_current_flows before then this minimizes the number
670
        of FlowMods and IO.
671
        """
672 1
        if not self.failover_path:
673 1
            return
674 1
        switches, cookie, excluded = OrderedDict(), self.get_cookie(), set()
675 1
        links = set()
676 1
        if exclude_uni_switches:
677 1
            excluded.add(self.uni_a.interface.switch.id)
678 1
            excluded.add(self.uni_z.interface.switch.id)
679 1
        for link in self.failover_path:
680 1
            if link.endpoint_a.switch.id not in excluded:
681 1
                switches[link.endpoint_a.switch.id] = link.endpoint_a.switch
682 1
                links.add(link)
683 1
            if link.endpoint_b.switch.id not in excluded:
684 1
                switches[link.endpoint_b.switch.id] = link.endpoint_b.switch
685 1
                links.add(link)
686 1
        for switch in switches.values():
687 1
            try:
688 1
                self._send_flow_mods(
689
                    switch.id,
690
                    [
691
                        {
692
                            "cookie": cookie,
693
                            "cookie_mask": int(0xffffffffffffffff),
694
                        }
695
                    ],
696
                    "delete",
697
                    force=force,
698
                )
699
            except FlowModException as err:
700
                log.error(
701
                    f"Error removing flows from switch {switch.id} for"
702
                    f"EVC {self}: {err}"
703
                )
704 1
        try:
705 1
            self.failover_path.make_vlans_available(self._controller)
706
        except KytosTagError as err:
707
            log.error(f"Error when removing failover flows: {err}")
708 1
        self.failover_path = Path([])
709 1
        if sync:
710 1
            self.sync()
711
712 1
    def remove_current_flows(self, current_path=None, force=True,
713
                             sync=True):
714
        """Remove all flows from current path."""
715 1
        switches = set()
716
717 1
        switches.add(self.uni_a.interface.switch)
718 1
        switches.add(self.uni_z.interface.switch)
719 1
        if not current_path:
720 1
            current_path = self.current_path
721 1
        for link in current_path:
722 1
            switches.add(link.endpoint_a.switch)
723 1
            switches.add(link.endpoint_b.switch)
724
725 1
        match = {
726
            "cookie": self.get_cookie(),
727
            "cookie_mask": int(0xffffffffffffffff)
728
        }
729
730 1
        for switch in switches:
731 1
            try:
732 1
                self._send_flow_mods(switch.id, [match], 'delete', force=force)
733 1
            except FlowModException as err:
734 1
                log.error(
735
                    f"Error removing flows from switch {switch.id} for"
736
                    f"EVC {self}: {err}"
737
                )
738 1
        try:
739 1
            current_path.make_vlans_available(self._controller)
740
        except KytosTagError as err:
741
            log.error(f"Error when removing current path flows: {err}")
742 1
        self.current_path = Path([])
743 1
        self.deactivate()
744 1
        if sync:
745 1
            self.sync()
746
747 1
    def remove_path_flows(
748
        self, path=None, force=True
749
    ) -> dict[str, list[dict]]:
750
        """Remove all flows from path, and return the removed flows."""
751 1
        dpid_flows_match: dict[str, list[dict]] = {}
752
753 1
        if not path:
754 1
            return dpid_flows_match
755
756 1
        try:
757 1
            nni_flows = self._prepare_nni_flows(path)
758
        # pylint: disable=broad-except
759
        except Exception:
760
            err = traceback.format_exc().replace("\n", ", ")
761
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
762
            nni_flows = {}
763
764 1
        for dpid, flows in nni_flows.items():
765 1
            dpid_flows_match.setdefault(dpid, [])
766 1
            for flow in flows:
767 1
                dpid_flows_match[dpid].append({
768
                    "cookie": flow["cookie"],
769
                    "match": flow["match"],
770
                    "cookie_mask": int(0xffffffffffffffff)
771
                })
772
773 1
        try:
774 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
775
        # pylint: disable=broad-except
776
        except Exception:
777
            err = traceback.format_exc().replace("\n", ", ")
778
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
779
            uni_flows = {}
780
781 1
        for dpid, flows in uni_flows.items():
782 1
            dpid_flows_match.setdefault(dpid, [])
783 1
            for flow in flows:
784 1
                dpid_flows_match[dpid].append({
785
                    "cookie": flow["cookie"],
786
                    "match": flow["match"],
787
                    "cookie_mask": int(0xffffffffffffffff)
788
                })
789
790 1
        for dpid, flows in dpid_flows_match.items():
791 1
            try:
792 1
                self._send_flow_mods(dpid, flows, 'delete', force=force)
793 1
            except FlowModException as err:
794 1
                log.error(
795
                    "Error removing failover flows: "
796
                    f"dpid={dpid} evc={self} error={err}"
797
                )
798 1
        try:
799 1
            path.make_vlans_available(self._controller)
800
        except KytosTagError as err:
801
            log.error(f"Error when removing path flows: {err}")
802
803 1
        return dpid_flows_match
804
805 1
    @staticmethod
806 1
    def links_zipped(path=None):
807
        """Return an iterator which yields pairs of links in order."""
808 1
        if not path:
809
            return []
810 1
        return zip(path[:-1], path[1:])
811
812 1
    def should_deploy(self, path=None):
813
        """Verify if the circuit should be deployed."""
814 1
        if not path:
815 1
            log.debug("Path is empty.")
816 1
            return False
817
818 1
        if not self.is_enabled():
819 1
            log.debug(f"{self} is disabled.")
820 1
            return False
821
822 1
        if not self.is_active():
823 1
            log.debug(f"{self} will be deployed.")
824 1
            return True
825
826 1
        return False
827
828 1
    def deploy_to_path(self, path=None):  # pylint: disable=too-many-branches
829
        """Install the flows for this circuit.
830
831
        Procedures to deploy:
832
833
        0. Remove current flows installed
834
        1. Decide if will deploy "path" or discover a new path
835
        2. Choose vlan
836
        3. Install NNI flows
837
        4. Install UNI flows
838
        5. Activate
839
        6. Update current_path
840
        7. Update links caches(primary, current, backup)
841
842
        """
843 1
        self.remove_current_flows()
844 1
        use_path = path
845 1
        tag_errors = []
846 1
        if self.should_deploy(use_path):
847 1
            try:
848 1
                use_path.choose_vlans(self._controller)
849 1
            except KytosNoTagAvailableError as e:
850 1
                tag_errors.append(str(e))
851 1
                use_path = None
852
        else:
853 1
            for use_path in self.discover_new_paths():
854 1
                if use_path is None:
855
                    continue
856 1
                try:
857 1
                    use_path.choose_vlans(self._controller)
858 1
                    break
859 1
                except KytosNoTagAvailableError as e:
860 1
                    tag_errors.append(str(e))
861
            else:
862 1
                use_path = None
863
864 1
        try:
865 1
            if use_path:
866 1
                self._install_nni_flows(use_path)
867 1
                self._install_uni_flows(use_path)
868 1
            elif self.is_intra_switch():
869 1
                use_path = Path()
870 1
                self._install_direct_uni_flows()
871
            else:
872 1
                msg = f"{self} was not deployed. No available path was found."
873 1
                if tag_errors:
874 1
                    msg = self.add_tag_errors(msg, tag_errors)
875 1
                    log.error(msg)
876
                else:
877 1
                    log.warning(msg)
878 1
                return False
879 1
        except FlowModException as err:
880 1
            log.error(
881
                f"Error deploying EVC {self} when calling flow_manager: {err}"
882
            )
883 1
            self.remove_current_flows(use_path)
884 1
            return False
885 1
        self.activate()
886 1
        self.current_path = use_path
887 1
        self.sync()
888 1
        log.info(f"{self} was deployed.")
889 1
        return True
890
891 1
    def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
892
        """Try setup failover_path whenever possible."""
893
        if self.failover_path:
894
            return
895
        if (now() - self.affected_by_link_at).seconds >= wait:
896
            with self.lock:
897
                self.setup_failover_path()
898
899 1
    def setup_failover_path(self):
900
        """Install flows for the failover path of this EVC.
901
902
        Procedures to deploy:
903
904
        0. Remove flows currently installed for failover_path (if any)
905
        1. Discover a disjoint path from current_path
906
        2. Choose vlans
907
        3. Install NNI flows
908
        4. Install UNI egress flows
909
        5. Update failover_path
910
        """
911
        # Intra-switch EVCs have no failover_path
912 1
        if self.is_intra_switch():
913 1
            return False
914
915
        # For not only setup failover path for totally dynamic EVCs
916 1
        if not self.is_eligible_for_failover_path():
917 1
            return False
918
919 1
        out_new_flows: dict[str, list[dict]] = {}
920 1
        reason = ""
921 1
        tag_errors = []
922 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
923 1
        self.failover_path = Path([])
924
925 1
        for use_path in self.get_failover_path_candidates():
926 1
            if not use_path:
927 1
                continue
928 1
            try:
929 1
                use_path.choose_vlans(self._controller)
930 1
                break
931 1
            except KytosNoTagAvailableError as e:
932 1
                tag_errors.append(str(e))
933
        else:
934 1
            use_path = Path([])
935 1
            reason = "No available path was found"
936
937 1
        try:
938 1
            if use_path:
939 1
                _nni_flows = self._install_nni_flows(use_path)
940 1
                out_new_flows = merge_flow_dicts(out_new_flows, _nni_flows)
941 1
                _uni_flows = self._install_uni_flows(use_path, skip_in=True)
942 1
                out_new_flows = merge_flow_dicts(out_new_flows, _uni_flows)
943 1
        except FlowModException as err:
944 1
            reason = "Error deploying failover path"
945 1
            log.error(
946
                f"{reason} for {self}. FlowManager error: {err}"
947
            )
948 1
            _rmed_flows = self.remove_path_flows(use_path)
949 1
            out_new_flows = merge_flow_dicts(out_new_flows, _rmed_flows)
950 1
            use_path = Path([])
951
952 1
        self.failover_path = use_path
953 1
        self.sync()
954
955 1
        if out_new_flows or out_removed_flows:
956 1
            content = {
957
                self.id: map_evc_event_content(
958
                    self,
959
                    flows=out_new_flows,
960
                    removed_flows=out_removed_flows,
961
                    error_reason=reason,
962
                    current_path=self.current_path.as_dict(),
963
                )
964
            }
965 1
            emit_event(self._controller, "failover_deployed", content=content)
966
967 1
        if not use_path:
968 1
            msg = f"Failover path for {self} was not deployed: {reason}."
969 1
            if tag_errors:
970 1
                msg = self.add_tag_errors(msg, tag_errors)
971 1
                log.error(msg)
972
            else:
973 1
                log.warning(msg)
974 1
            return False
975 1
        log.info(f"Failover path for {self} was deployed.")
976 1
        return True
977
978 1
    @staticmethod
979 1
    def add_tag_errors(msg: str, tag_errors: list):
980
        """Add to msg the tag errors ecountered when chossing path."""
981 1
        path = ['path', 'paths']
982 1
        was = ['was', 'were']
983 1
        message = ['message', 'messages']
984
985
        # Choose either singular(0) or plural(1) words
986 1
        n = 1
987 1
        if len(tag_errors) == 1:
988 1
            n = 0
989
990 1
        msg += f" {len(tag_errors)} {path[n]} {was[n]} rejected"
991 1
        msg += f" with {message[n]}: {tag_errors}"
992 1
        return msg
993
994 1
    def get_failover_flows(self):
995
        """Return the flows needed to make the failover path active, i.e. the
996
        flows for ingress forwarding.
997
998
        Return:
999
            dict: A dict of flows indexed by the switch_id will be returned, or
1000
                an empty dict if no failover_path is available.
1001
        """
1002 1
        if not self.failover_path:
1003 1
            return {}
1004 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
1005
1006
    # pylint: disable=too-many-branches
1007 1
    def _prepare_direct_uni_flows(self):
1008
        """Prepare flows connecting two UNIs for intra-switch EVC."""
1009 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
1010 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
1011
1012 1
        flow_mod_az = self._prepare_flow_mod(
1013
            self.uni_a.interface, self.uni_z.interface,
1014
            self.queue_id, vlan_a
1015
        )
1016 1
        flow_mod_za = self._prepare_flow_mod(
1017
            self.uni_z.interface, self.uni_a.interface,
1018
            self.queue_id, vlan_z
1019
        )
1020
1021 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1022 1
            flow_mod_az["actions"].insert(
1023
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
1024
            )
1025 1
            if not vlan_a:
1026 1
                flow_mod_az["actions"].insert(
1027
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1028
                )
1029 1
            if vlan_a == 0:
1030 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1031 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1032 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1033
1034 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1035 1
            flow_mod_za["actions"].insert(
1036
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1037
                )
1038 1
            if not vlan_z:
1039 1
                flow_mod_za["actions"].insert(
1040
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1041
                )
1042 1
            if vlan_z == 0:
1043 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1044 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1045 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1046
1047 1
        flows = []
1048 1
        if isinstance(vlan_a, list):
1049 1
            for mask_a in vlan_a:
1050 1
                flow_aux = deepcopy(flow_mod_az)
1051 1
                flow_aux["match"]["dl_vlan"] = mask_a
1052 1
                flows.append(flow_aux)
1053
        else:
1054 1
            if vlan_a is not None:
1055 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1056 1
            flows.append(flow_mod_az)
1057
1058 1
        if isinstance(vlan_z, list):
1059 1
            for mask_z in vlan_z:
1060 1
                flow_aux = deepcopy(flow_mod_za)
1061 1
                flow_aux["match"]["dl_vlan"] = mask_z
1062 1
                flows.append(flow_aux)
1063
        else:
1064 1
            if vlan_z is not None:
1065 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1066 1
            flows.append(flow_mod_za)
1067 1
        return (
1068
            self.uni_a.interface.switch.id, flows
1069
        )
1070
1071 1
    def _install_direct_uni_flows(self):
1072
        """Install flows connecting two UNIs.
1073
1074
        This case happens when the circuit is between UNIs in the
1075
        same switch.
1076
        """
1077 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1078 1
        self._send_flow_mods(dpid, flows)
1079
1080 1
    def _prepare_nni_flows(self, path=None):
1081
        """Prepare NNI flows."""
1082 1
        nni_flows = OrderedDict()
1083 1
        previous = self.uni_a.interface.switch.dpid
1084 1
        for incoming, outcoming in self.links_zipped(path):
1085 1
            in_vlan = incoming.get_metadata("s_vlan").value
1086 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1087 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1088 1
            out_endpoint = self.get_endpoint_by_id(
1089
                outcoming, in_endpoint.switch.id, eq
1090
            )
1091
1092 1
            flows = []
1093
            # Flow for one direction
1094 1
            flows.append(
1095
                self._prepare_nni_flow(
1096
                    in_endpoint,
1097
                    out_endpoint,
1098
                    in_vlan,
1099
                    out_vlan,
1100
                    queue_id=self.queue_id,
1101
                )
1102
            )
1103
1104
            # Flow for the other direction
1105 1
            flows.append(
1106
                self._prepare_nni_flow(
1107
                    out_endpoint,
1108
                    in_endpoint,
1109
                    out_vlan,
1110
                    in_vlan,
1111
                    queue_id=self.queue_id,
1112
                )
1113
            )
1114 1
            previous = in_endpoint.switch.id
1115 1
            nni_flows[in_endpoint.switch.id] = flows
1116 1
        return nni_flows
1117
1118 1
    def _install_nni_flows(
1119
        self, path=None
1120
    ) -> dict[str, list[dict]]:
1121
        """Install NNI flows."""
1122 1
        nni_flows = self._prepare_nni_flows(path)
1123 1
        for dpid, flows in nni_flows.items():
1124 1
            self._send_flow_mods(dpid, flows)
1125 1
        return nni_flows
1126
1127 1
    @staticmethod
1128 1
    def _get_value_from_uni_tag(uni: UNI):
1129
        """Returns the value from tag. In case of any and untagged
1130
        it should return 4096/4096 and 0 respectively"""
1131 1
        special = {"any": "4096/4096", "untagged": 0}
1132 1
        if uni.user_tag:
1133 1
            value = uni.user_tag.value
1134 1
            if isinstance(value, list):
1135 1
                return uni.user_tag.mask_list
1136 1
            return special.get(value, value)
1137 1
        return None
1138
1139
    # pylint: disable=too-many-locals
1140 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1141
        """Prepare flows to install UNIs."""
1142 1
        uni_flows = {}
1143 1
        if not path:
1144 1
            log.info("install uni flows without path.")
1145 1
            return uni_flows
1146
1147
        # Determine VLANs
1148 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1149 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1150
1151 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1152 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1153
1154
        # Get endpoints from path
1155 1
        endpoint_a = self.get_endpoint_by_id(
1156
            path[0], self.uni_a.interface.switch.id, eq
1157
        )
1158 1
        endpoint_z = self.get_endpoint_by_id(
1159
            path[-1], self.uni_z.interface.switch.id, eq
1160
        )
1161
1162
        # Flows for the first UNI
1163 1
        flows_a = []
1164
1165
        # Flow for one direction, pushing the service tag
1166 1
        if not skip_in:
1167 1
            if isinstance(in_vlan_a, list):
1168 1
                for in_mask_a in in_vlan_a:
1169 1
                    push_flow = self._prepare_push_flow(
1170
                        self.uni_a.interface,
1171
                        endpoint_a,
1172
                        in_mask_a,
1173
                        out_vlan_a,
1174
                        in_vlan_z,
1175
                        queue_id=self.queue_id,
1176
                    )
1177 1
                    flows_a.append(push_flow)
1178
            else:
1179 1
                push_flow = self._prepare_push_flow(
1180
                    self.uni_a.interface,
1181
                    endpoint_a,
1182
                    in_vlan_a,
1183
                    out_vlan_a,
1184
                    in_vlan_z,
1185
                    queue_id=self.queue_id,
1186
                )
1187 1
                flows_a.append(push_flow)
1188
1189
        # Flow for the other direction, popping the service tag
1190 1
        if not skip_out:
1191 1
            pop_flow = self._prepare_pop_flow(
1192
                endpoint_a,
1193
                self.uni_a.interface,
1194
                out_vlan_a,
1195
                queue_id=self.queue_id,
1196
            )
1197 1
            flows_a.append(pop_flow)
1198
1199 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1200
1201
        # Flows for the second UNI
1202 1
        flows_z = []
1203
1204
        # Flow for one direction, pushing the service tag
1205 1
        if not skip_in:
1206 1
            if isinstance(in_vlan_z, list):
1207 1
                for in_mask_z in in_vlan_z:
1208 1
                    push_flow = self._prepare_push_flow(
1209
                        self.uni_z.interface,
1210
                        endpoint_z,
1211
                        in_mask_z,
1212
                        out_vlan_z,
1213
                        in_vlan_a,
1214
                        queue_id=self.queue_id,
1215
                    )
1216 1
                    flows_z.append(push_flow)
1217
            else:
1218 1
                push_flow = self._prepare_push_flow(
1219
                    self.uni_z.interface,
1220
                    endpoint_z,
1221
                    in_vlan_z,
1222
                    out_vlan_z,
1223
                    in_vlan_a,
1224
                    queue_id=self.queue_id,
1225
                )
1226 1
                flows_z.append(push_flow)
1227
1228
        # Flow for the other direction, popping the service tag
1229 1
        if not skip_out:
1230 1
            pop_flow = self._prepare_pop_flow(
1231
                endpoint_z,
1232
                self.uni_z.interface,
1233
                out_vlan_z,
1234
                queue_id=self.queue_id,
1235
            )
1236 1
            flows_z.append(pop_flow)
1237
1238 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1239
1240 1
        return uni_flows
1241
1242 1
    def _install_uni_flows(
1243
        self, path=None, skip_in=False, skip_out=False
1244
    ) -> dict[str, list[dict]]:
1245
        """Install UNI flows."""
1246 1
        uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)
1247
1248 1
        for (dpid, flows) in uni_flows.items():
1249 1
            self._send_flow_mods(dpid, flows)
1250
1251 1
        return uni_flows
1252
1253 1
    @staticmethod
1254 1
    def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
1255
        """Send a flow_mod list to a specific switch.
1256
1257
        Args:
1258
            dpid(str): The target of flows (i.e. Switch.id).
1259
            flow_mods(dict): Python dictionary with flow_mods.
1260
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1261
            force(bool): True to send via consistency check in case of errors
1262
1263
        """
1264
1265 1
        endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"
1266
1267 1
        data = {"flows": flow_mods, "force": force}
1268 1
        response = requests.post(endpoint, json=data)
1269 1
        if response.status_code >= 400:
1270 1
            raise FlowModException(str(response.text))
1271
1272 1
    def get_cookie(self):
1273
        """Return the cookie integer from evc id."""
1274 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1275
1276 1
    @staticmethod
1277 1
    def get_id_from_cookie(cookie):
1278
        """Return the evc id given a cookie value."""
1279 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1280 1
        return f"{evc_id:x}".zfill(14)
1281
1282 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1283
        """Set table_group and table_id"""
1284 1
        table_group = "epl" if vlan is None else "evpl"
1285 1
        flow_mod["table_group"] = table_group
1286 1
        flow_mod["table_id"] = self.table_group[table_group]
1287 1
        return flow_mod
1288
1289 1
    @staticmethod
1290 1
    def get_priority(vlan):
1291
        """Return priority value depending on vlan value"""
1292 1
        if isinstance(vlan, list):
1293 1
            return settings.EVPL_SB_PRIORITY
1294 1
        if vlan not in {None, "4096/4096", 0}:
1295 1
            return settings.EVPL_SB_PRIORITY
1296 1
        if vlan == 0:
1297 1
            return settings.UNTAGGED_SB_PRIORITY
1298 1
        if vlan == "4096/4096":
1299 1
            return settings.ANY_SB_PRIORITY
1300 1
        return settings.EPL_SB_PRIORITY
1301
1302 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1303
                          queue_id=None, vlan=True):
1304
        """Prepare a common flow mod."""
1305 1
        default_actions = [
1306
            {"action_type": "output", "port": out_interface.port_number}
1307
        ]
1308 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1309 1
        if queue_id is not None:
1310 1
            default_actions.append(
1311
                {"action_type": "set_queue", "queue_id": queue_id}
1312
            )
1313
1314 1
        flow_mod = {
1315
            "match": {"in_port": in_interface.port_number},
1316
            "cookie": self.get_cookie(),
1317
            "actions": default_actions,
1318
            "owner": "mef_eline",
1319
        }
1320
1321 1
        self.set_flow_table_group_id(flow_mod, vlan)
1322 1
        if self.sb_priority:
1323 1
            flow_mod["priority"] = self.sb_priority
1324
        else:
1325 1
            flow_mod["priority"] = self.get_priority(vlan)
1326 1
        return flow_mod
1327
1328 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1329
        """Create NNI flows."""
1330 1
        in_interface, out_interface, in_vlan, out_vlan = args
1331 1
        flow_mod = self._prepare_flow_mod(
1332
            in_interface, out_interface, queue_id
1333
        )
1334 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1335 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1336 1
        flow_mod["actions"].insert(0, new_action)
1337
1338 1
        return flow_mod
1339
1340 1
    def _prepare_push_flow(self, *args, queue_id=None):
1341
        """Prepare push flow.
1342
1343
        Arguments:
1344
            in_interface(str): Interface input.
1345
            out_interface(str): Interface output.
1346
            in_vlan(int,str,None): Vlan input.
1347
            out_vlan(str): Vlan output.
1348
            new_c_vlan(int,str,list,None): New client vlan.
1349
1350
        Return:
1351
            dict: An python dictionary representing a FlowMod
1352
1353
        """
1354
        # assign all arguments
1355 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1356 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1357 1
        flow_mod = self._prepare_flow_mod(
1358
            in_interface, out_interface, queue_id, vlan_pri
1359
        )
1360
        # the service tag must be always pushed
1361 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1362 1
        flow_mod["actions"].insert(0, new_action)
1363
1364 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1365 1
        flow_mod["actions"].insert(0, new_action)
1366
1367 1
        if in_vlan is not None:
1368
            # if in_vlan is set, it must be included in the match
1369 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1370
1371 1
        if (not isinstance(new_c_vlan, list) and in_vlan != new_c_vlan and
1372
                new_c_vlan not in self.special_cases):
1373
            # new_in_vlan is an integer but zero, action to set is required
1374 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1375 1
            flow_mod["actions"].insert(0, new_action)
1376
1377 1
        if in_vlan not in self.special_cases and new_c_vlan == 0:
1378
            # # new_in_vlan is an integer but zero and new_c_vlan does not
1379
            # a pop action is required
1380 1
            new_action = {"action_type": "pop_vlan"}
1381 1
            flow_mod["actions"].insert(0, new_action)
1382
1383 1
        elif in_vlan == "4096/4096" and new_c_vlan == 0:
1384
            # if in_vlan match with any tags and new_c_vlan does not
1385
            # a pop action is required
1386 1
            new_action = {"action_type": "pop_vlan"}
1387 1
            flow_mod["actions"].insert(0, new_action)
1388
1389 1
        elif (not in_vlan and
1390
                (not isinstance(new_c_vlan, list) and
1391
                 new_c_vlan not in self.special_cases)):
1392
            # new_in_vlan is an integer but zero and in_vlan is not set
1393
            # then it is set now
1394 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1395 1
            flow_mod["actions"].insert(0, new_action)
1396
1397 1
        return flow_mod
1398
1399 1
    def _prepare_pop_flow(
1400
        self, in_interface, out_interface, out_vlan, queue_id=None
1401
    ):
1402
        # pylint: disable=too-many-arguments
1403
        """Prepare pop flow."""
1404 1
        flow_mod = self._prepare_flow_mod(
1405
            in_interface, out_interface, queue_id
1406
        )
1407 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1408 1
        new_action = {"action_type": "pop_vlan"}
1409 1
        flow_mod["actions"].insert(0, new_action)
1410 1
        return flow_mod
1411
1412 1
    @staticmethod
1413 1
    def run_bulk_sdntraces(
1414
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1415
    ) -> dict:
1416
        """Run SDN traces on control plane starting from EVC UNIs."""
1417 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1418 1
        data = []
1419 1
        for interface, tag_value in uni_list:
1420 1
            data_uni = {
1421
                "trace": {
1422
                            "switch": {
1423
                                "dpid": interface.switch.dpid,
1424
                                "in_port": interface.port_number,
1425
                            }
1426
                        }
1427
                }
1428 1
            if tag_value:
1429 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1430 1
                if uni_dl_vlan:
1431 1
                    data_uni["trace"]["eth"] = {
1432
                                            "dl_type": 0x8100,
1433
                                            "dl_vlan": uni_dl_vlan,
1434
                                            }
1435 1
            data.append(data_uni)
1436 1
        try:
1437 1
            response = requests.put(endpoint, json=data, timeout=30)
1438 1
        except Timeout as exception:
1439 1
            log.error(f"Request has timed out: {exception}")
1440 1
            return {"result": []}
1441 1
        if response.status_code >= 400:
1442 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1443 1
            return {"result": []}
1444 1
        return response.json()
1445
1446
    # pylint: disable=too-many-return-statements, too-many-arguments
1447 1
    @staticmethod
1448 1
    def check_trace(
1449
        evc_id: str,
1450
        evc_name: str,
1451
        tag_a: Union[None, int, str],
1452
        tag_z: Union[None, int, str],
1453
        interface_a: Interface,
1454
        interface_z: Interface,
1455
        current_path: list,
1456
        trace_a: list,
1457
        trace_z: list
1458
    ) -> bool:
1459
        """Auxiliar function to check an individual trace"""
1460 1
        if (
1461
            len(trace_a) != len(current_path) + 1
1462
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1463
        ):
1464 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1465
                        f"Invalid trace from uni_a: {trace_a}")
1466 1
            return False
1467 1
        if (
1468
            len(trace_z) != len(current_path) + 1
1469
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1470
        ):
1471 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1472
                        f"Invalid trace from uni_z: {trace_z}")
1473 1
            return False
1474
1475 1
        if not current_path:
1476
            return True
1477
1478 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1479 1
        if (
1480
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1481
        ):
1482 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1483 1
        elif (
1484
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1485
        ):
1486 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1487
        else:
1488
            msg = (
1489
                f"first link {first_link} endpoint_a didn't match the first "
1490
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1491
            )
1492
            log.warning(msg)
1493
            return False
1494
1495 1
        for link, trace1, trace2 in zip(current_path,
1496
                                        trace_path_begin[1:],
1497
                                        trace_path_end[:0:-1]):
1498 1
            metadata_vlan = None
1499 1
            if link.metadata:
1500 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1501 1
            if compare_endpoint_trace(
1502
                                        link.endpoint_a,
1503
                                        metadata_vlan,
1504
                                        trace2
1505
                                    ) is False:
1506 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1507
                            f"Invalid trace from uni_a: {trace_a}")
1508 1
                return False
1509 1
            if compare_endpoint_trace(
1510
                                        link.endpoint_b,
1511
                                        metadata_vlan,
1512
                                        trace1
1513
                                    ) is False:
1514 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1515
                            f"Invalid trace from uni_z: {trace_z}")
1516 1
                return False
1517
1518 1
        return True
1519
1520 1
    @staticmethod
1521 1
    def check_range(circuit, traces: list) -> bool:
1522
        """Check traces when for UNI with TAGRange"""
1523 1
        check = True
1524 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1525 1
            trace_a = traces[i*2]
1526 1
            trace_z = traces[i*2+1]
1527 1
            check &= EVCDeploy.check_trace(
1528
                circuit.id, circuit.name,
1529
                mask, mask,
1530
                circuit.uni_a.interface,
1531
                circuit.uni_z.interface,
1532
                circuit.current_path,
1533
                trace_a, trace_z,
1534
            )
1535 1
        return check
1536
1537 1
    @staticmethod
1538 1
    def check_list_traces(list_circuits: list) -> dict:
1539
        """Check if current_path is deployed comparing with SDN traces."""
1540 1
        if not list_circuits:
1541 1
            return {}
1542 1
        uni_list = make_uni_list(list_circuits)
1543 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1544
1545 1
        if not traces:
1546 1
            return {}
1547
1548 1
        try:
1549 1
            circuits_checked = {}
1550 1
            i = 0
1551 1
            for circuit in list_circuits:
1552 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1553 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1554 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1555
                        circuit, traces[i:i+length*2]
1556
                    )
1557 1
                    i += length*2
1558
                else:
1559 1
                    trace_a = traces[i]
1560 1
                    trace_z = traces[i+1]
1561 1
                    tag_a = None
1562 1
                    if circuit.uni_a.user_tag:
1563 1
                        tag_a = circuit.uni_a.user_tag.value
1564 1
                    tag_z = None
1565 1
                    if circuit.uni_z.user_tag:
1566 1
                        tag_z = circuit.uni_z.user_tag.value
1567 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1568
                        circuit.id, circuit.name,
1569
                        tag_a, tag_z,
1570
                        circuit.uni_a.interface,
1571
                        circuit.uni_z.interface,
1572
                        circuit.current_path,
1573
                        trace_a, trace_z
1574
                    )
1575 1
                    i += 2
1576 1
        except IndexError as err:
1577 1
            log.error(
1578
                f"Bulk sdntraces returned fewer items than expected."
1579
                f"Error = {err}"
1580
            )
1581 1
            return {}
1582
1583 1
        return circuits_checked
1584
1585 1
    @staticmethod
1586 1
    def get_endpoint_by_id(
1587
        link: Link,
1588
        id_: str,
1589
        operator: Union[eq, ne]
1590
    ) -> Interface:
1591
        """Return endpoint from link
1592
        either equal(eq) or not equal(ne) to id"""
1593 1
        if operator(link.endpoint_a.switch.id, id_):
1594 1
            return link.endpoint_a
1595 1
        return link.endpoint_b
1596
1597
1598 1
class LinkProtection(EVCDeploy):
1599
    """Class to handle link protection."""
1600
1601 1
    def is_affected_by_link(self, link=None):
1602
        """Verify if the current path is affected by link down event."""
1603
        return self.current_path.is_affected_by_link(link)
1604
1605 1
    def is_using_primary_path(self):
1606
        """Verify if the current deployed path is self.primary_path."""
1607 1
        return self.current_path == self.primary_path
1608
1609 1
    def is_using_backup_path(self):
1610
        """Verify if the current deployed path is self.backup_path."""
1611 1
        return self.current_path == self.backup_path
1612
1613 1
    def is_using_dynamic_path(self):
1614
        """Verify if the current deployed path is dynamic."""
1615 1
        if (
1616
            self.current_path
1617
            and not self.is_using_primary_path()
1618
            and not self.is_using_backup_path()
1619
            and self.current_path.status is EntityStatus.UP
1620
        ):
1621
            return True
1622 1
        return False
1623
1624 1
    def deploy_to(self, path_name=None, path=None):
1625
        """Create a deploy to path."""
1626 1
        if self.current_path == path:
1627 1
            log.debug(f"{path_name} is equal to current_path.")
1628 1
            return True
1629
1630 1
        if path.status is EntityStatus.UP:
1631 1
            return self.deploy_to_path(path)
1632
1633 1
        return False
1634
1635 1
    def handle_link_up(self, link):
1636
        """Handle circuit when link up.
1637
1638
        Args:
1639
            link(Link): Link affected by link.up event.
1640
1641
        """
1642 1
        condition_pairs = [
1643
            (
1644
                lambda me: me.is_using_primary_path(),
1645
                lambda _: (True, 'nothing')
1646
            ),
1647
            (
1648
                lambda me: me.is_intra_switch(),
1649
                lambda _: (True, 'nothing')
1650
            ),
1651
            (
1652
                lambda me: me.primary_path.is_affected_by_link(link),
1653
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1654
            ),
1655
            # We tried to deploy(primary_path) without success.
1656
            # And in this case is up by some how. Nothing to do.
1657
            (
1658
                lambda me: me.is_using_backup_path(),
1659
                lambda _: (True, 'nothing')
1660
            ),
1661
            (
1662
                lambda me:  me.is_using_dynamic_path(),
1663
                lambda _: (True, 'nothing')
1664
            ),
1665
            # In this case, probably the circuit is not being used and
1666
            # we can move to backup
1667
            (
1668
                lambda me: me.backup_path.is_affected_by_link(link),
1669
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1670
            ),
1671
            # In this case, the circuit is not being used and we should
1672
            # try a dynamic path
1673
            (
1674
                lambda me: me.dynamic_backup_path and not me.is_active(),
1675
                lambda me: (me.deploy_to_path(), 'redeploy')
1676
            )
1677
        ]
1678 1
        for predicate, action in condition_pairs:
1679 1
            if not predicate(self):
1680 1
                continue
1681 1
            success, succcess_type = action(self)
1682 1
            if success:
1683 1
                if succcess_type == 'redeploy':
1684 1
                    emit_event(
1685
                        self._controller,
1686
                        "redeployed_link_up",
1687
                        content=map_evc_event_content(self)
1688
                    )
1689 1
                return True
1690 1
        return False
1691
1692 1
    def handle_link_down(self):
1693
        """Handle circuit when link down.
1694
1695
        Returns:
1696
            bool: True if the re-deploy was successly otherwise False.
1697
1698
        """
1699 1
        success = False
1700 1
        if self.is_using_primary_path():
1701 1
            success = self.deploy_to_backup_path()
1702 1
        elif self.is_using_backup_path():
1703 1
            success = self.deploy_to_primary_path()
1704
1705 1
        if not success and self.dynamic_backup_path:
1706 1
            success = self.deploy_to_path()
1707
1708 1
        if success:
1709 1
            log.debug(f"{self} deployed after link down.")
1710
        else:
1711 1
            self.remove_current_flows(sync=False)
1712 1
            self.deactivate()
1713 1
            self.sync()
1714 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1715
1716 1
        return success
1717
1718 1
    @staticmethod
1719 1
    def get_interface_from_switch(uni: UNI, switches: dict) -> Interface:
1720
        """Get interface from switch by uni"""
1721 1
        switch = switches[uni.interface.switch.dpid]
1722 1
        interface = switch.interfaces[uni.interface.port_number]
1723 1
        return interface
1724
1725 1
    def are_unis_active(self, switches: dict) -> bool:
1726
        """Determine whether this EVC should be active"""
1727 1
        interface_a = self.get_interface_from_switch(self.uni_a, switches)
1728 1
        interface_z = self.get_interface_from_switch(self.uni_z, switches)
1729 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1730 1
        return active
1731
1732 1
    @staticmethod
1733 1
    def is_uni_interface_active(
1734
        *interfaces: Interface
1735
    ) -> tuple[bool, dict]:
1736
        """Determine whether a UNI should be active"""
1737 1
        active = True
1738 1
        bad_interfaces = [
1739
            interface
1740
            for interface in interfaces
1741
            if interface.status != EntityStatus.UP
1742
        ]
1743 1
        if bad_interfaces:
1744 1
            active = False
1745 1
            interfaces = bad_interfaces
1746 1
        return active, {
1747
            interface.id: {
1748
                'status': interface.status.value,
1749
                'status_reason': interface.status_reason,
1750
            }
1751
            for interface in interfaces
1752
        }
1753
1754 1
    def handle_interface_link_up(self, interface: Interface):
1755
        """
1756
        Handler for interface link_up events
1757
        """
1758 1
        if self.is_active():
1759 1
            return
1760 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1761 1
        if interface not in interfaces:
1762
            return
1763 1
        down_interfaces = [
1764
            interface
1765
            for interface in interfaces
1766
            if interface.status != EntityStatus.UP
1767
        ]
1768 1
        if down_interfaces:
1769
            return
1770 1
        interface_dicts = {
1771
            interface.id: {
1772
                'status': interface.status.value,
1773
                'status_reason': interface.status_reason,
1774
            }
1775
            for interface in interfaces
1776
        }
1777 1
        self.activate()
1778 1
        log.info(
1779
            f"Activating EVC {self.id}. Interfaces: "
1780
            f"{interface_dicts}."
1781
        )
1782 1
        emit_event(self._controller, "uni_active_updated",
1783
                   content=map_evc_event_content(self))
1784 1
        self.sync()
1785
1786 1
    def handle_interface_link_down(self, interface):
1787
        """
1788
        Handler for interface link_down events
1789
        """
1790 1
        if not self.is_active():
1791 1
            return
1792 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1793 1
        if interface not in interfaces:
1794
            return
1795 1
        down_interfaces = [
1796
            interface
1797
            for interface in interfaces
1798
            if interface.status != EntityStatus.UP
1799
        ]
1800 1
        if not down_interfaces:
1801
            return
1802 1
        interface_dicts = {
1803
            interface.id: {
1804
                'status': interface.status.value,
1805
                'status_reason': interface.status_reason,
1806
            }
1807
            for interface in down_interfaces
1808
        }
1809 1
        self.deactivate()
1810 1
        log.info(
1811
            f"Deactivating EVC {self.id}. Interfaces: "
1812
            f"{interface_dicts}."
1813
        )
1814 1
        emit_event(self._controller, "uni_active_updated",
1815
                   content=map_evc_event_content(self))
1816 1
        self.sync()
1817
1818
1819 1
class EVC(LinkProtection):
1820
    """Class that represents a E-Line Virtual Connection."""
1821