Passed
Pull Request — master (#473)
by Vinicius
03:59
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import requests
12 1
from glom import glom
13 1
from requests.exceptions import Timeout
14
15 1
from kytos.core import log
16 1
from kytos.core.common import EntityStatus, GenericEntity
17 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
18 1
from kytos.core.helpers import get_time, now
19 1
from kytos.core.interface import UNI, Interface, TAGRange
20 1
from kytos.core.link import Link
21 1
from kytos.core.tag_ranges import range_difference
22 1
from napps.kytos.mef_eline import controllers, settings
23 1
from napps.kytos.mef_eline.exceptions import (DuplicatedNoTagUNI,
24
                                              FlowModException, InvalidPath)
25 1
from napps.kytos.mef_eline.utils import (check_disabled_component,
26
                                         compare_endpoint_trace,
27
                                         compare_uni_out_trace, emit_event,
28
                                         make_uni_list, map_dl_vlan,
29
                                         map_evc_event_content,
30
                                         merge_flow_dicts)
31
32 1
from .path import DynamicPathManager, Path
33
34
35 1
class EVCBase(GenericEntity):
36
    """Class to represent a circuit."""
37
38 1
    read_only_attributes = [
39
        "creation_time",
40
        "active",
41
        "current_path",
42
        "failover_path",
43
        "_id",
44
        "archived",
45
    ]
46 1
    attributes_requiring_redeploy = [
47
        "primary_path",
48
        "backup_path",
49
        "dynamic_backup_path",
50
        "queue_id",
51
        "sb_priority",
52
        "primary_constraints",
53
        "secondary_constraints",
54
        "uni_a",
55
        "uni_z",
56
    ]
57 1
    required_attributes = ["name", "uni_a", "uni_z"]
58
59 1
    def __init__(self, controller, **kwargs):
60
        """Create an EVC instance with the provided parameters.
61
62
        Args:
63
            id(str): EVC identifier. Whether it's None an ID will be genereted.
64
                     Only the first 14 bytes passed will be used.
65
            name: represents an EVC name.(Required)
66
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
67
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
68
            start_date(datetime|str): Date when the EVC was registred.
69
                                      Default is now().
70
            end_date(datetime|str): Final date that the EVC will be fineshed.
71
                                    Default is None.
72
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
73
            primary_links(list): Primary links used by evc. Default is []
74
            backup_links(list): Backups links used by evc. Default is []
75
            current_path(list): Circuit being used at the moment if this is an
76
                                active circuit. Default is [].
77
            failover_path(list): Path being used to provide EVC protection via
78
                                failover during link failures. Default is [].
79
            primary_path(list): primary circuit offered to user IF one or more
80
                                links were provided. Default is [].
81
            backup_path(list): backup circuit offered to the user IF one or
82
                               more links were provided. Default is [].
83
            dynamic_backup_path(bool): Enable computer backup path dynamically.
84
                                       Dafault is False.
85
            creation_time(datetime|str): datetime when the circuit should be
86
                                         activated. default is now().
87
            enabled(Boolean): attribute to indicate the administrative state;
88
                              default is False.
89
            active(Boolean): attribute to indicate the operational state;
90
                             default is False.
91
            archived(Boolean): indicate the EVC has been deleted and is
92
                               archived; default is False.
93
            owner(str): The EVC owner. Default is None.
94
            sb_priority(int): Service level provided in the request.
95
                              Default is None.
96
            service_level(int): Service level provided. The higher the better.
97
                                Default is 0.
98
99
        Raises:
100
            ValueError: raised when object attributes are invalid.
101
102
        """
103 1
        self._controller = controller
104 1
        self._validate(**kwargs)
105 1
        super().__init__()
106
107
        # required attributes
108 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
109 1
        self.uni_a: UNI = kwargs.get("uni_a")
110 1
        self.uni_z: UNI = kwargs.get("uni_z")
111 1
        self.name = kwargs.get("name")
112
113
        # optional attributes
114 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
115 1
        self.end_date = get_time(kwargs.get("end_date")) or None
116 1
        self.queue_id = kwargs.get("queue_id", -1)
117
118 1
        self.bandwidth = kwargs.get("bandwidth", 0)
119 1
        self.primary_links = Path(kwargs.get("primary_links", []))
120 1
        self.backup_links = Path(kwargs.get("backup_links", []))
121 1
        self.current_path = Path(kwargs.get("current_path", []))
122 1
        self.failover_path = Path(kwargs.get("failover_path", []))
123 1
        self.primary_path = Path(kwargs.get("primary_path", []))
124 1
        self.backup_path = Path(kwargs.get("backup_path", []))
125 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
126 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
127 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
128 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
129 1
        self.owner = kwargs.get("owner", None)
130 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
131
            "priority", None
132
        )
133 1
        self.service_level = kwargs.get("service_level", 0)
134 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
135 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
136 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
137 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
138
139 1
        self.current_links_cache = set()
140 1
        self.primary_links_cache = set()
141 1
        self.backup_links_cache = set()
142 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
143 1
        self.old_path = Path([])
144
145 1
        self.lock = Lock()
146
147 1
        self.archived = kwargs.get("archived", False)
148
149 1
        self.metadata = kwargs.get("metadata", {})
150
151 1
        self._mongo_controller = controllers.ELineController()
152
153 1
        if kwargs.get("active", False):
154 1
            self.activate()
155
        else:
156 1
            self.deactivate()
157
158 1
        if kwargs.get("enabled", False):
159 1
            self.enable()
160
        else:
161 1
            self.disable()
162
163
        # datetime of user request for a EVC (or datetime when object was
164
        # created)
165 1
        self.request_time = kwargs.get("request_time", now())
166
        # dict with the user original request (input)
167 1
        self._requested = kwargs
168
169
        # Special cases: No tag, any, untagged
170 1
        self.special_cases = {None, "4096/4096", 0}
171 1
        self.table_group = kwargs.get("table_group")
172
173 1
    def sync(self, keys: set = None):
174
        """Sync this EVC in the MongoDB."""
175 1
        self.updated_at = now()
176 1
        if keys:
177 1
            self._mongo_controller.update_evc(self.as_dict(keys))
178 1
            return
179 1
        self._mongo_controller.upsert_evc(self.as_dict())
180
181 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
182
        """Obtain both UNIs (uni_a, uni_z).
183
        If a UNI is changing, verify tags"""
184 1
        uni_a = kwargs.get("uni_a", None)
185 1
        uni_a_flag = False
186 1
        if uni_a and uni_a != self.uni_a:
187 1
            uni_a_flag = True
188 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
189
190 1
        uni_z = kwargs.get("uni_z", None)
191 1
        if uni_z and uni_z != self.uni_z:
192 1
            try:
193 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
194 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
195 1
            except KytosTagError as err:
196 1
                if uni_a_flag:
197 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
198 1
                raise err
199
        else:
200 1
            uni_z = self.uni_z
201
202 1
        if uni_a_flag:
203 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
204
        else:
205 1
            uni_a = self.uni_a
206 1
        return uni_a, uni_z
207
208 1
    def update(self, **kwargs):
209
        """Update evc attributes.
210
211
        This method will raises an error trying to change the following
212
        attributes: [creation_time, active, current_path, failover_path,
213
        _id, archived]
214
        [name, uni_a and uni_z]
215
216
        Returns:
217
            the values for enable and a redeploy attribute, if exists and None
218
            otherwise
219
        Raises:
220
            ValueError: message with error detail.
221
222
        """
223 1
        enable, redeploy = (None, None)
224 1
        if not self._tag_lists_equal(**kwargs):
225 1
            raise ValueError(
226
                "UNI_A and UNI_Z tag lists should be the same."
227
            )
228 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
229 1
        check_disabled_component(uni_a, uni_z)
230 1
        self._validate_has_primary_or_dynamic(
231
            primary_path=kwargs.get("primary_path"),
232
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
233
            uni_a=uni_a,
234
            uni_z=uni_z,
235
        )
236 1
        for attribute, value in kwargs.items():
237 1
            if attribute in self.read_only_attributes:
238 1
                raise ValueError(f"{attribute} can't be updated.")
239 1
            if not hasattr(self, attribute):
240 1
                raise ValueError(f'The attribute "{attribute}" is invalid.')
241 1
            if attribute in ("primary_path", "backup_path"):
242 1
                try:
243 1
                    value.is_valid(
244
                        uni_a.interface.switch, uni_z.interface.switch
245
                    )
246 1
                except InvalidPath as exception:
247 1
                    raise ValueError(  # pylint: disable=raise-missing-from
248
                        f"{attribute} is not a " f"valid path: {exception}"
249
                    )
250 1
        for attribute, value in kwargs.items():
251 1
            if attribute in ("enable", "enabled"):
252 1
                if value:
253 1
                    self.enable()
254
                else:
255 1
                    self.disable()
256 1
                enable = value
257
            else:
258 1
                setattr(self, attribute, value)
259 1
                if attribute in self.attributes_requiring_redeploy:
260 1
                    redeploy = True
261 1
        self.sync(set(kwargs.keys()))
262 1
        return enable, redeploy
263
264 1
    def set_flow_removed_at(self):
265
        """Update flow_removed_at attribute."""
266
        self.flow_removed_at = now()
267
268 1
    def has_recent_removed_flow(self, setting=settings):
269
        """Check if any flow has been removed from the evc"""
270
        if self.flow_removed_at is None:
271
            return False
272
        res_seconds = (now() - self.flow_removed_at).seconds
273
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
274
275 1
    def is_recent_updated(self, setting=settings):
276
        """Check if the evc has been updated recently"""
277
        res_seconds = (now() - self.updated_at).seconds
278
        return res_seconds < setting.TIME_RECENT_UPDATED
279
280 1
    def __repr__(self):
281
        """Repr method."""
282 1
        return f"EVC({self._id}, {self.name})"
283
284 1
    def _validate(self, **kwargs):
285
        """Do Basic validations.
286
287
        Verify required attributes: name, uni_a, uni_z
288
289
        Raises:
290
            ValueError: message with error detail.
291
292
        """
293 1
        for attribute in self.required_attributes:
294
295 1
            if attribute not in kwargs:
296 1
                raise ValueError(f"{attribute} is required.")
297
298 1
            if "uni" in attribute:
299 1
                uni = kwargs.get(attribute)
300 1
                if not isinstance(uni, UNI):
301
                    raise ValueError(f"{attribute} is an invalid UNI.")
302
303 1
    def _tag_lists_equal(self, **kwargs):
304
        """Verify that tag lists are the same."""
305 1
        uni_a = kwargs.get("uni_a") or self.uni_a
306 1
        uni_z = kwargs.get("uni_z") or self.uni_z
307 1
        uni_a_list = uni_z_list = False
308 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
309 1
            uni_a_list = True
310 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
311 1
            uni_z_list = True
312 1
        if uni_a_list and uni_z_list:
313 1
            return uni_a.user_tag.value == uni_z.user_tag.value
314 1
        return uni_a_list == uni_z_list
315
316 1
    def _validate_has_primary_or_dynamic(
317
        self,
318
        primary_path=None,
319
        dynamic_backup_path=None,
320
        uni_a=None,
321
        uni_z=None,
322
    ) -> None:
323
        """Validate that it must have a primary path or allow dynamic paths."""
324 1
        primary_path = (
325
            primary_path
326
            if primary_path is not None
327
            else self.primary_path
328
        )
329 1
        dynamic_backup_path = (
330
            dynamic_backup_path
331
            if dynamic_backup_path is not None
332
            else self.dynamic_backup_path
333
        )
334 1
        uni_a = uni_a if uni_a is not None else self.uni_a
335 1
        uni_z = uni_z if uni_z is not None else self.uni_z
336 1
        if (
337
            not primary_path
338
            and not dynamic_backup_path
339
            and uni_a and uni_z
340
            and uni_a.interface.switch != uni_z.interface.switch
341
        ):
342 1
            msg = "The EVC must have a primary path or allow dynamic paths."
343 1
            raise ValueError(msg)
344
345 1
    def __eq__(self, other):
346
        """Override the default implementation."""
347 1
        if not isinstance(other, EVC):
348
            return False
349
350 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
351 1
        for attribute in attrs_to_compare:
352 1
            if getattr(other, attribute) != getattr(self, attribute):
353 1
                return False
354 1
        return True
355
356 1
    def is_intra_switch(self):
357
        """Check if the UNIs are in the same switch."""
358 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
359
360 1
    def check_no_tag_duplicate(self, other_uni: UNI):
361
        """Check if a no tag UNI is duplicated."""
362 1
        if other_uni in (self.uni_a, self.uni_z):
363 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
364
                  f" duplicated with EVC {self.id}."
365 1
            raise DuplicatedNoTagUNI(msg)
366
367 1
    def as_dict(self, keys: set = None):
368
        """Return a dictionary representing an EVC object.
369
            keys: Only fields on this variable will be
370
                  returned in the dictionary"""
371 1
        evc_dict = {
372
            "id": self.id,
373
            "name": self.name,
374
            "uni_a": self.uni_a.as_dict(),
375
            "uni_z": self.uni_z.as_dict(),
376
        }
377
378 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
379
380 1
        evc_dict["start_date"] = self.start_date
381 1
        if isinstance(self.start_date, datetime):
382 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
383
384 1
        evc_dict["end_date"] = self.end_date
385 1
        if isinstance(self.end_date, datetime):
386 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
387
388 1
        evc_dict["queue_id"] = self.queue_id
389 1
        evc_dict["bandwidth"] = self.bandwidth
390 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
391 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
392 1
        evc_dict["current_path"] = self.current_path.as_dict()
393 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
394 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
395 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
396 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
397 1
        evc_dict["metadata"] = self.metadata
398
399 1
        evc_dict["request_time"] = self.request_time
400 1
        if isinstance(self.request_time, datetime):
401 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
402
403 1
        time = self.creation_time.strftime(time_fmt)
404 1
        evc_dict["creation_time"] = time
405
406 1
        evc_dict["owner"] = self.owner
407 1
        evc_dict["circuit_scheduler"] = [
408
            sc.as_dict() for sc in self.circuit_scheduler
409
        ]
410
411 1
        evc_dict["active"] = self.is_active()
412 1
        evc_dict["enabled"] = self.is_enabled()
413 1
        evc_dict["archived"] = self.archived
414 1
        evc_dict["sb_priority"] = self.sb_priority
415 1
        evc_dict["service_level"] = self.service_level
416 1
        evc_dict["primary_constraints"] = self.primary_constraints
417 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
418 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
419 1
        evc_dict["updated_at"] = self.updated_at
420
421 1
        if keys:
422 1
            selected = {}
423 1
            for key in keys:
424 1
                if key == "enable":
425
                    selected["enabled"] = evc_dict["enabled"]
426
                    continue
427 1
                selected[key] = evc_dict[key]
428 1
            selected["id"] = evc_dict["id"]
429 1
            return selected
430 1
        return evc_dict
431
432 1
    @property
433 1
    def id(self):  # pylint: disable=invalid-name
434
        """Return this EVC's ID."""
435 1
        return self._id
436
437 1
    def archive(self):
438
        """Archive this EVC on deletion."""
439 1
        self.archived = True
440
441 1
    def _use_uni_vlan(
442
        self,
443
        uni: UNI,
444
        uni_dif: Union[None, UNI] = None
445
    ):
446
        """Use tags from UNI"""
447 1
        if uni.user_tag is None:
448 1
            return
449 1
        tag = uni.user_tag.value
450 1
        if not tag:
451
            return
452 1
        tag_type = uni.user_tag.tag_type
453 1
        if (uni_dif and isinstance(tag, list) and
454
                isinstance(uni_dif.user_tag.value, list)):
455 1
            tag = range_difference(tag, uni_dif.user_tag.value)
456 1
            if not tag:
457 1
                return
458 1
        uni.interface.use_tags(
459
            self._controller, tag, tag_type, use_lock=True, check_order=False
460
        )
461
462 1
    def make_uni_vlan_available(
463
        self,
464
        uni: UNI,
465
        uni_dif: Union[None, UNI] = None,
466
    ):
467
        """Make available tag from UNI"""
468 1
        if uni.user_tag is None:
469 1
            return
470 1
        tag = uni.user_tag.value
471 1
        if not tag:
472 1
            return
473 1
        tag_type = uni.user_tag.tag_type
474 1
        if (uni_dif and isinstance(tag, list) and
475
                isinstance(uni_dif.user_tag.value, list)):
476 1
            tag = range_difference(tag, uni_dif.user_tag.value)
477 1
            if not tag:
478
                return
479 1
        try:
480 1
            conflict = uni.interface.make_tags_available(
481
                self._controller, tag, tag_type, use_lock=True,
482
                check_order=False
483
            )
484 1
        except KytosTagError as err:
485 1
            log.error(f"Error in circuit {self._id}: {err}")
486 1
            return
487 1
        if conflict:
488 1
            intf = uni.interface.id
489 1
            log.warning(f"Tags {conflict} was already available in {intf}")
490
491 1
    def remove_uni_tags(self):
492
        """Remove both UNI usage of a tag"""
493 1
        self.make_uni_vlan_available(self.uni_a)
494 1
        self.make_uni_vlan_available(self.uni_z)
495
496
497
# pylint: disable=fixme, too-many-public-methods
498 1
class EVCDeploy(EVCBase):
499
    """Class to handle the deploy procedures."""
500
501 1
    def create(self):
502
        """Create a EVC."""
503
504 1
    def discover_new_paths(self):
505
        """Discover new paths to satisfy this circuit and deploy it."""
506
        return DynamicPathManager.get_best_paths(self,
507
                                                 **self.primary_constraints)
508
509 1
    def get_failover_path_candidates(self):
510
        """Get failover paths to satisfy this EVC."""
511
        # in the future we can return primary/backup paths as well
512
        # we just have to properly handle link_up and failover paths
513
        # if (
514
        #     self.is_using_primary_path() and
515
        #     self.backup_path.status is EntityStatus.UP
516
        # ):
517
        #     yield self.backup_path
518 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
519
520 1
    def change_path(self):
521
        """Change EVC path."""
522
523 1
    def reprovision(self):
524
        """Force the EVC (re-)provisioning."""
525
526 1
    def is_affected_by_link(self, link):
527
        """Return True if this EVC has the given link on its current path."""
528 1
        return link in self.current_path
529
530 1
    def link_affected_by_interface(self, interface):
531
        """Return True if this EVC has the given link on its current path."""
532
        return self.current_path.link_affected_by_interface(interface)
533
534 1
    def is_backup_path_affected_by_link(self, link):
535
        """Return True if the backup path of this EVC uses the given link."""
536 1
        return link in self.backup_path
537
538
    # pylint: disable=invalid-name
539 1
    def is_primary_path_affected_by_link(self, link):
540
        """Return True if the primary path of this EVC uses the given link."""
541 1
        return link in self.primary_path
542
543 1
    def is_failover_path_affected_by_link(self, link):
544
        """Return True if this EVC has the given link on its failover path."""
545 1
        return link in self.failover_path
546
547 1
    def is_eligible_for_failover_path(self):
548
        """Verify if this EVC is eligible for failover path (EP029)"""
549
        # In the future this function can be augmented to consider
550
        # primary/backup, primary/dynamic, and other path combinations
551 1
        return (
552
            self.dynamic_backup_path and
553
            not self.primary_path and not self.backup_path
554
        )
555
556 1
    def is_using_primary_path(self):
557
        """Verify if the current deployed path is self.primary_path."""
558 1
        return self.primary_path and (self.current_path == self.primary_path)
559
560 1
    def is_using_backup_path(self):
561
        """Verify if the current deployed path is self.backup_path."""
562 1
        return self.backup_path and (self.current_path == self.backup_path)
563
564 1
    def is_using_dynamic_path(self):
565
        """Verify if the current deployed path is a dynamic path."""
566 1
        if (
567
            self.current_path
568
            and not self.is_using_primary_path()
569
            and not self.is_using_backup_path()
570
            and self.current_path.status == EntityStatus.UP
571
        ):
572
            return True
573 1
        return False
574
575 1
    def deploy_to_backup_path(self):
576
        """Deploy the backup path into the datapaths of this circuit.
577
578
        If the backup_path attribute is valid and up, this method will try to
579
        deploy this backup_path.
580
581
        If everything fails and dynamic_backup_path is True, then tries to
582
        deploy a dynamic path.
583
        """
584
        # TODO: Remove flows from current (cookies)
585 1
        if self.is_using_backup_path():
586
            # TODO: Log to say that cannot move backup to backup
587
            return True
588
589 1
        success = False
590 1
        if self.backup_path.status is EntityStatus.UP:
591 1
            success = self.deploy_to_path(self.backup_path)
592
593 1
        if success:
594 1
            return True
595
596 1
        if self.dynamic_backup_path or self.is_intra_switch():
597 1
            return self.deploy_to_path()
598
599
        return False
600
601 1
    def deploy_to_primary_path(self):
602
        """Deploy the primary path into the datapaths of this circuit.
603
604
        If the primary_path attribute is valid and up, this method will try to
605
        deploy this primary_path.
606
        """
607
        # TODO: Remove flows from current (cookies)
608 1
        if self.is_using_primary_path():
609
            # TODO: Log to say that cannot move primary to primary
610
            return True
611
612 1
        if self.primary_path.status is EntityStatus.UP:
613 1
            return self.deploy_to_path(self.primary_path)
614
        return False
615
616 1
    def deploy(self):
617
        """Deploy EVC to best path.
618
619
        Best path can be the primary path, if available. If not, the backup
620
        path, and, if it is also not available, a dynamic path.
621
        """
622 1
        if self.archived:
623 1
            return False
624 1
        self.enable()
625 1
        success = self.deploy_to_primary_path()
626 1
        if not success:
627 1
            success = self.deploy_to_backup_path()
628
629 1
        if success:
630 1
            emit_event(self._controller, "deployed",
631
                       content=map_evc_event_content(self))
632 1
        return success
633
634 1
    @staticmethod
635 1
    def get_path_status(path):
636
        """Check for the current status of a path.
637
638
        If any link in this path is down, the path is considered down.
639
        """
640 1
        if not path:
641 1
            return EntityStatus.DISABLED
642
643 1
        for link in path:
644 1
            if link.status is not EntityStatus.UP:
645 1
                return link.status
646 1
        return EntityStatus.UP
647
648
    #    def discover_new_path(self):
649
    #        # TODO: discover a new path to satisfy this circuit and deploy
650
651 1
    def remove(self):
652
        """Remove EVC path and disable it."""
653 1
        self.remove_current_flows(sync=False)
654 1
        self.remove_failover_flows(sync=False)
655 1
        self.disable()
656 1
        self.sync()
657 1
        emit_event(self._controller, "undeployed",
658
                   content=map_evc_event_content(self))
659
660 1
    def remove_failover_flows(self, exclude_uni_switches=True,
661
                              force=True, sync=True) -> None:
662
        """Remove failover_flows.
663
664
        By default, it'll exclude UNI switches, if mef_eline has already
665
        called remove_current_flows before then this minimizes the number
666
        of FlowMods and IO.
667
        """
668 1
        if not self.failover_path:
669 1
            return
670 1
        switches, cookie, excluded = OrderedDict(), self.get_cookie(), set()
671 1
        links = set()
672 1
        if exclude_uni_switches:
673 1
            excluded.add(self.uni_a.interface.switch.id)
674 1
            excluded.add(self.uni_z.interface.switch.id)
675 1
        for link in self.failover_path:
676 1
            if link.endpoint_a.switch.id not in excluded:
677 1
                switches[link.endpoint_a.switch.id] = link.endpoint_a.switch
678 1
                links.add(link)
679 1
            if link.endpoint_b.switch.id not in excluded:
680 1
                switches[link.endpoint_b.switch.id] = link.endpoint_b.switch
681 1
                links.add(link)
682 1
        for switch in switches.values():
683 1
            try:
684 1
                self._send_flow_mods(
685
                    switch.id,
686
                    [
687
                        {
688
                            "cookie": cookie,
689
                            "cookie_mask": int(0xffffffffffffffff),
690
                        }
691
                    ],
692
                    "delete",
693
                    force=force,
694
                )
695
            except FlowModException as err:
696
                log.error(
697
                    f"Error removing flows from switch {switch.id} for"
698
                    f"EVC {self}: {err}"
699
                )
700 1
        try:
701 1
            self.failover_path.make_vlans_available(self._controller)
702
        except KytosTagError as err:
703
            log.error(f"Error when removing failover flows: {err}")
704 1
        self.failover_path = Path([])
705 1
        if sync:
706 1
            self.sync()
707
708 1
    def remove_current_flows(self, current_path=None, force=True,
709
                             sync=True):
710
        """Remove all flows from current path."""
711 1
        switches = set()
712
713 1
        switches.add(self.uni_a.interface.switch)
714 1
        switches.add(self.uni_z.interface.switch)
715 1
        if not current_path:
716 1
            current_path = self.current_path
717 1
        for link in current_path:
718 1
            switches.add(link.endpoint_a.switch)
719 1
            switches.add(link.endpoint_b.switch)
720
721 1
        match = {
722
            "cookie": self.get_cookie(),
723
            "cookie_mask": int(0xffffffffffffffff)
724
        }
725
726 1
        for switch in switches:
727 1
            try:
728 1
                self._send_flow_mods(switch.id, [match], 'delete', force=force)
729 1
            except FlowModException as err:
730 1
                log.error(
731
                    f"Error removing flows from switch {switch.id} for"
732
                    f"EVC {self}: {err}"
733
                )
734 1
        try:
735 1
            current_path.make_vlans_available(self._controller)
736
        except KytosTagError as err:
737
            log.error(f"Error when removing current path flows: {err}")
738 1
        self.current_path = Path([])
739 1
        self.deactivate()
740 1
        if sync:
741 1
            self.sync()
742
743 1
    def remove_path_flows(
744
        self, path=None, force=True
745
    ) -> dict[str, list[dict]]:
746
        """Remove all flows from path, and return the removed flows."""
747 1
        dpid_flows_match: dict[str, list[dict]] = {}
748
749 1
        if not path:
750 1
            return dpid_flows_match
751
752 1
        try:
753 1
            nni_flows = self._prepare_nni_flows(path)
754
        # pylint: disable=broad-except
755
        except Exception:
756
            err = traceback.format_exc().replace("\n", ", ")
757
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
758
            nni_flows = {}
759
760 1
        for dpid, flows in nni_flows.items():
761 1
            dpid_flows_match.setdefault(dpid, [])
762 1
            for flow in flows:
763 1
                dpid_flows_match[dpid].append({
764
                    "cookie": flow["cookie"],
765
                    "match": flow["match"],
766
                    "cookie_mask": int(0xffffffffffffffff)
767
                })
768
769 1
        try:
770 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
771
        # pylint: disable=broad-except
772
        except Exception:
773
            err = traceback.format_exc().replace("\n", ", ")
774
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
775
            uni_flows = {}
776
777 1
        for dpid, flows in uni_flows.items():
778 1
            dpid_flows_match.setdefault(dpid, [])
779 1
            for flow in flows:
780 1
                dpid_flows_match[dpid].append({
781
                    "cookie": flow["cookie"],
782
                    "match": flow["match"],
783
                    "cookie_mask": int(0xffffffffffffffff)
784
                })
785
786 1
        for dpid, flows in dpid_flows_match.items():
787 1
            try:
788 1
                self._send_flow_mods(dpid, flows, 'delete', force=force)
789 1
            except FlowModException as err:
790 1
                log.error(
791
                    "Error removing failover flows: "
792
                    f"dpid={dpid} evc={self} error={err}"
793
                )
794 1
        try:
795 1
            path.make_vlans_available(self._controller)
796
        except KytosTagError as err:
797
            log.error(f"Error when removing path flows: {err}")
798
799 1
        return dpid_flows_match
800
801 1
    @staticmethod
802 1
    def links_zipped(path=None):
803
        """Return an iterator which yields pairs of links in order."""
804 1
        if not path:
805
            return []
806 1
        return zip(path[:-1], path[1:])
807
808 1
    def should_deploy(self, path=None):
809
        """Verify if the circuit should be deployed."""
810 1
        if not path:
811 1
            log.debug("Path is empty.")
812 1
            return False
813
814 1
        if not self.is_enabled():
815 1
            log.debug(f"{self} is disabled.")
816 1
            return False
817
818 1
        if not self.is_active():
819 1
            log.debug(f"{self} will be deployed.")
820 1
            return True
821
822 1
        return False
823
824 1
    def deploy_to_path(self, path=None):  # pylint: disable=too-many-branches
825
        """Install the flows for this circuit.
826
827
        Procedures to deploy:
828
829
        0. Remove current flows installed
830
        1. Decide if will deploy "path" or discover a new path
831
        2. Choose vlan
832
        3. Install NNI flows
833
        4. Install UNI flows
834
        5. Activate
835
        6. Update current_path
836
        7. Update links caches(primary, current, backup)
837
838
        """
839 1
        self.remove_current_flows()
840 1
        use_path = path
841 1
        if self.should_deploy(use_path):
842 1
            try:
843 1
                use_path.choose_vlans(self._controller)
844 1
            except KytosNoTagAvailableError:
845 1
                use_path = None
846
        else:
847 1
            for use_path in self.discover_new_paths():
848 1
                if use_path is None:
849
                    continue
850 1
                try:
851 1
                    use_path.choose_vlans(self._controller)
852 1
                    break
853 1
                except KytosNoTagAvailableError:
854 1
                    pass
855
            else:
856 1
                use_path = None
857
858 1
        try:
859 1
            if use_path:
860 1
                self._install_nni_flows(use_path)
861 1
                self._install_uni_flows(use_path)
862 1
            elif self.is_intra_switch():
863 1
                use_path = Path()
864 1
                self._install_direct_uni_flows()
865
            else:
866 1
                log.warning(
867
                    f"{self} was not deployed. No available path was found."
868
                )
869 1
                return False
870 1
        except FlowModException as err:
871 1
            log.error(
872
                f"Error deploying EVC {self} when calling flow_manager: {err}"
873
            )
874 1
            self.remove_current_flows(use_path)
875 1
            return False
876 1
        self.activate()
877 1
        self.current_path = use_path
878 1
        self.sync()
879 1
        log.info(f"{self} was deployed.")
880 1
        return True
881
882 1
    def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
883
        """Try setup failover_path whenever possible."""
884
        if self.failover_path:
885
            return
886
        if (now() - self.affected_by_link_at).seconds >= wait:
887
            with self.lock:
888
                self.setup_failover_path()
889
890 1
    def setup_failover_path(self):
891
        """Install flows for the failover path of this EVC.
892
893
        Procedures to deploy:
894
895
        0. Remove flows currently installed for failover_path (if any)
896
        1. Discover a disjoint path from current_path
897
        2. Choose vlans
898
        3. Install NNI flows
899
        4. Install UNI egress flows
900
        5. Update failover_path
901
        """
902
        # Intra-switch EVCs have no failover_path
903 1
        if self.is_intra_switch():
904 1
            return False
905
906
        # For not only setup failover path for totally dynamic EVCs
907 1
        if not self.is_eligible_for_failover_path():
908 1
            return False
909
910 1
        out_new_flows: dict[str, list[dict]] = {}
911 1
        reason = ""
912 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
913 1
        self.failover_path = Path([])
914
915 1
        for use_path in self.get_failover_path_candidates():
916 1
            if not use_path:
917 1
                continue
918 1
            try:
919 1
                use_path.choose_vlans(self._controller)
920 1
                break
921 1
            except KytosNoTagAvailableError:
922 1
                pass
923
        else:
924 1
            use_path = Path([])
925 1
            reason = "No available path was found"
926
927 1
        try:
928 1
            if use_path:
929 1
                _nni_flows = self._install_nni_flows(use_path)
930 1
                out_new_flows = merge_flow_dicts(out_new_flows, _nni_flows)
931 1
                _uni_flows = self._install_uni_flows(use_path, skip_in=True)
932 1
                out_new_flows = merge_flow_dicts(out_new_flows, _uni_flows)
933 1
        except FlowModException as err:
934 1
            reason = "Error deploying failover path"
935 1
            log.error(
936
                f"{reason} for {self}. FlowManager error: {err}"
937
            )
938 1
            _rmed_flows = self.remove_path_flows(use_path)
939 1
            out_new_flows = merge_flow_dicts(out_new_flows, _rmed_flows)
940 1
            use_path = Path([])
941
942 1
        self.failover_path = use_path
943 1
        self.sync()
944
945 1
        if out_new_flows or out_removed_flows:
946 1
            content = {
947
                self.id: map_evc_event_content(
948
                    self,
949
                    flows=out_new_flows,
950
                    removed_flows=out_removed_flows,
951
                    error_reason=reason,
952
                    current_path=self.current_path.as_dict(),
953
                )
954
            }
955 1
            emit_event(self._controller, "failover_deployed", content=content)
956
957 1
        if not use_path:
958 1
            log.warning(
959
                f"Failover path for {self} was not deployed: {reason}"
960
            )
961 1
            return False
962 1
        log.info(f"Failover path for {self} was deployed.")
963 1
        return True
964
965 1
    def get_failover_flows(self):
966
        """Return the flows needed to make the failover path active, i.e. the
967
        flows for ingress forwarding.
968
969
        Return:
970
            dict: A dict of flows indexed by the switch_id will be returned, or
971
                an empty dict if no failover_path is available.
972
        """
973 1
        if not self.failover_path:
974 1
            return {}
975 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
976
977
    # pylint: disable=too-many-branches
978 1
    def _prepare_direct_uni_flows(self):
979
        """Prepare flows connecting two UNIs for intra-switch EVC."""
980 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
981 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
982
983 1
        flow_mod_az = self._prepare_flow_mod(
984
            self.uni_a.interface, self.uni_z.interface,
985
            self.queue_id, vlan_a
986
        )
987 1
        flow_mod_za = self._prepare_flow_mod(
988
            self.uni_z.interface, self.uni_a.interface,
989
            self.queue_id, vlan_z
990
        )
991
992 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
993 1
            flow_mod_az["actions"].insert(
994
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
995
            )
996 1
            if not vlan_a:
997 1
                flow_mod_az["actions"].insert(
998
                    0, {"action_type": "push_vlan", "tag_type": "c"}
999
                )
1000 1
            if vlan_a == 0:
1001 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1002 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
1003 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1004
1005 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1006 1
            flow_mod_za["actions"].insert(
1007
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1008
                )
1009 1
            if not vlan_z:
1010 1
                flow_mod_za["actions"].insert(
1011
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1012
                )
1013 1
            if vlan_z == 0:
1014 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1015 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1016 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1017
1018 1
        flows = []
1019 1
        if isinstance(vlan_a, list):
1020 1
            for mask_a in vlan_a:
1021 1
                flow_aux = deepcopy(flow_mod_az)
1022 1
                flow_aux["match"]["dl_vlan"] = mask_a
1023 1
                flows.append(flow_aux)
1024
        else:
1025 1
            if vlan_a is not None:
1026 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1027 1
            flows.append(flow_mod_az)
1028
1029 1
        if isinstance(vlan_z, list):
1030 1
            for mask_z in vlan_z:
1031 1
                flow_aux = deepcopy(flow_mod_za)
1032 1
                flow_aux["match"]["dl_vlan"] = mask_z
1033 1
                flows.append(flow_aux)
1034
        else:
1035 1
            if vlan_z is not None:
1036 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1037 1
            flows.append(flow_mod_za)
1038 1
        return (
1039
            self.uni_a.interface.switch.id, flows
1040
        )
1041
1042 1
    def _install_direct_uni_flows(self):
1043
        """Install flows connecting two UNIs.
1044
1045
        This case happens when the circuit is between UNIs in the
1046
        same switch.
1047
        """
1048 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1049 1
        self._send_flow_mods(dpid, flows)
1050
1051 1
    def _prepare_nni_flows(self, path=None):
1052
        """Prepare NNI flows."""
1053 1
        nni_flows = OrderedDict()
1054 1
        previous = self.uni_a.interface.switch.dpid
1055 1
        for incoming, outcoming in self.links_zipped(path):
1056 1
            in_vlan = incoming.get_metadata("s_vlan").value
1057 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1058 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1059 1
            out_endpoint = self.get_endpoint_by_id(
1060
                outcoming, in_endpoint.switch.id, eq
1061
            )
1062
1063 1
            flows = []
1064
            # Flow for one direction
1065 1
            flows.append(
1066
                self._prepare_nni_flow(
1067
                    in_endpoint,
1068
                    out_endpoint,
1069
                    in_vlan,
1070
                    out_vlan,
1071
                    queue_id=self.queue_id,
1072
                )
1073
            )
1074
1075
            # Flow for the other direction
1076 1
            flows.append(
1077
                self._prepare_nni_flow(
1078
                    out_endpoint,
1079
                    in_endpoint,
1080
                    out_vlan,
1081
                    in_vlan,
1082
                    queue_id=self.queue_id,
1083
                )
1084
            )
1085 1
            previous = in_endpoint.switch.id
1086 1
            nni_flows[in_endpoint.switch.id] = flows
1087 1
        return nni_flows
1088
1089 1
    def _install_nni_flows(
1090
        self, path=None
1091
    ) -> dict[str, list[dict]]:
1092
        """Install NNI flows."""
1093 1
        nni_flows = self._prepare_nni_flows(path)
1094 1
        for dpid, flows in nni_flows.items():
1095 1
            self._send_flow_mods(dpid, flows)
1096 1
        return nni_flows
1097
1098 1
    @staticmethod
1099 1
    def _get_value_from_uni_tag(uni: UNI):
1100
        """Returns the value from tag. In case of any and untagged
1101
        it should return 4096/4096 and 0 respectively"""
1102 1
        special = {"any": "4096/4096", "untagged": 0}
1103 1
        if uni.user_tag:
1104 1
            value = uni.user_tag.value
1105 1
            if isinstance(value, list):
1106 1
                return uni.user_tag.mask_list
1107 1
            return special.get(value, value)
1108 1
        return None
1109
1110
    # pylint: disable=too-many-locals
1111 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1112
        """Prepare flows to install UNIs."""
1113 1
        uni_flows = {}
1114 1
        if not path:
1115 1
            log.info("install uni flows without path.")
1116 1
            return uni_flows
1117
1118
        # Determine VLANs
1119 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1120 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1121
1122 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1123 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1124
1125
        # Get endpoints from path
1126 1
        endpoint_a = self.get_endpoint_by_id(
1127
            path[0], self.uni_a.interface.switch.id, eq
1128
        )
1129 1
        endpoint_z = self.get_endpoint_by_id(
1130
            path[-1], self.uni_z.interface.switch.id, eq
1131
        )
1132
1133
        # Flows for the first UNI
1134 1
        flows_a = []
1135
1136
        # Flow for one direction, pushing the service tag
1137 1
        if not skip_in:
1138 1
            if isinstance(in_vlan_a, list):
1139 1
                for in_mask_a in in_vlan_a:
1140 1
                    push_flow = self._prepare_push_flow(
1141
                        self.uni_a.interface,
1142
                        endpoint_a,
1143
                        in_mask_a,
1144
                        out_vlan_a,
1145
                        in_vlan_z,
1146
                        queue_id=self.queue_id,
1147
                    )
1148 1
                    flows_a.append(push_flow)
1149
            else:
1150 1
                push_flow = self._prepare_push_flow(
1151
                    self.uni_a.interface,
1152
                    endpoint_a,
1153
                    in_vlan_a,
1154
                    out_vlan_a,
1155
                    in_vlan_z,
1156
                    queue_id=self.queue_id,
1157
                )
1158 1
                flows_a.append(push_flow)
1159
1160
        # Flow for the other direction, popping the service tag
1161 1
        if not skip_out:
1162 1
            pop_flow = self._prepare_pop_flow(
1163
                endpoint_a,
1164
                self.uni_a.interface,
1165
                out_vlan_a,
1166
                queue_id=self.queue_id,
1167
            )
1168 1
            flows_a.append(pop_flow)
1169
1170 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1171
1172
        # Flows for the second UNI
1173 1
        flows_z = []
1174
1175
        # Flow for one direction, pushing the service tag
1176 1
        if not skip_in:
1177 1
            if isinstance(in_vlan_z, list):
1178 1
                for in_mask_z in in_vlan_z:
1179 1
                    push_flow = self._prepare_push_flow(
1180
                        self.uni_z.interface,
1181
                        endpoint_z,
1182
                        in_mask_z,
1183
                        out_vlan_z,
1184
                        in_vlan_a,
1185
                        queue_id=self.queue_id,
1186
                    )
1187 1
                    flows_z.append(push_flow)
1188
            else:
1189 1
                push_flow = self._prepare_push_flow(
1190
                    self.uni_z.interface,
1191
                    endpoint_z,
1192
                    in_vlan_z,
1193
                    out_vlan_z,
1194
                    in_vlan_a,
1195
                    queue_id=self.queue_id,
1196
                )
1197 1
                flows_z.append(push_flow)
1198
1199
        # Flow for the other direction, popping the service tag
1200 1
        if not skip_out:
1201 1
            pop_flow = self._prepare_pop_flow(
1202
                endpoint_z,
1203
                self.uni_z.interface,
1204
                out_vlan_z,
1205
                queue_id=self.queue_id,
1206
            )
1207 1
            flows_z.append(pop_flow)
1208
1209 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1210
1211 1
        return uni_flows
1212
1213 1
    def _install_uni_flows(
1214
        self, path=None, skip_in=False, skip_out=False
1215
    ) -> dict[str, list[dict]]:
1216
        """Install UNI flows."""
1217 1
        uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)
1218
1219 1
        for (dpid, flows) in uni_flows.items():
1220 1
            self._send_flow_mods(dpid, flows)
1221
1222 1
        return uni_flows
1223
1224 1
    @staticmethod
1225 1
    def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
1226
        """Send a flow_mod list to a specific switch.
1227
1228
        Args:
1229
            dpid(str): The target of flows (i.e. Switch.id).
1230
            flow_mods(dict): Python dictionary with flow_mods.
1231
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1232
            force(bool): True to send via consistency check in case of errors
1233
1234
        """
1235
1236 1
        endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"
1237
1238 1
        data = {"flows": flow_mods, "force": force}
1239 1
        response = requests.post(endpoint, json=data)
1240 1
        if response.status_code >= 400:
1241 1
            raise FlowModException(str(response.text))
1242
1243 1
    def get_cookie(self):
1244
        """Return the cookie integer from evc id."""
1245 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1246
1247 1
    @staticmethod
1248 1
    def get_id_from_cookie(cookie):
1249
        """Return the evc id given a cookie value."""
1250 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1251 1
        return f"{evc_id:x}".zfill(14)
1252
1253 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1254
        """Set table_group and table_id"""
1255 1
        table_group = "epl" if vlan is None else "evpl"
1256 1
        flow_mod["table_group"] = table_group
1257 1
        flow_mod["table_id"] = self.table_group[table_group]
1258 1
        return flow_mod
1259
1260 1
    @staticmethod
1261 1
    def get_priority(vlan):
1262
        """Return priority value depending on vlan value"""
1263 1
        if isinstance(vlan, list):
1264 1
            return settings.EVPL_SB_PRIORITY
1265 1
        if vlan not in {None, "4096/4096", 0}:
1266 1
            return settings.EVPL_SB_PRIORITY
1267 1
        if vlan == 0:
1268 1
            return settings.UNTAGGED_SB_PRIORITY
1269 1
        if vlan == "4096/4096":
1270 1
            return settings.ANY_SB_PRIORITY
1271 1
        return settings.EPL_SB_PRIORITY
1272
1273 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1274
                          queue_id=None, vlan=True):
1275
        """Prepare a common flow mod."""
1276 1
        default_actions = [
1277
            {"action_type": "output", "port": out_interface.port_number}
1278
        ]
1279 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1280 1
        if queue_id is not None:
1281 1
            default_actions.append(
1282
                {"action_type": "set_queue", "queue_id": queue_id}
1283
            )
1284
1285 1
        flow_mod = {
1286
            "match": {"in_port": in_interface.port_number},
1287
            "cookie": self.get_cookie(),
1288
            "actions": default_actions,
1289
            "owner": "mef_eline",
1290
        }
1291
1292 1
        self.set_flow_table_group_id(flow_mod, vlan)
1293 1
        if self.sb_priority:
1294 1
            flow_mod["priority"] = self.sb_priority
1295
        else:
1296 1
            flow_mod["priority"] = self.get_priority(vlan)
1297 1
        return flow_mod
1298
1299 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1300
        """Create NNI flows."""
1301 1
        in_interface, out_interface, in_vlan, out_vlan = args
1302 1
        flow_mod = self._prepare_flow_mod(
1303
            in_interface, out_interface, queue_id
1304
        )
1305 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1306 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1307 1
        flow_mod["actions"].insert(0, new_action)
1308
1309 1
        return flow_mod
1310
1311 1
    def _prepare_push_flow(self, *args, queue_id=None):
1312
        """Prepare push flow.
1313
1314
        Arguments:
1315
            in_interface(str): Interface input.
1316
            out_interface(str): Interface output.
1317
            in_vlan(int,str,None): Vlan input.
1318
            out_vlan(str): Vlan output.
1319
            new_c_vlan(int,str,list,None): New client vlan.
1320
1321
        Return:
1322
            dict: An python dictionary representing a FlowMod
1323
1324
        """
1325
        # assign all arguments
1326 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1327 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1328 1
        flow_mod = self._prepare_flow_mod(
1329
            in_interface, out_interface, queue_id, vlan_pri
1330
        )
1331
        # the service tag must be always pushed
1332 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1333 1
        flow_mod["actions"].insert(0, new_action)
1334
1335 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1336 1
        flow_mod["actions"].insert(0, new_action)
1337
1338 1
        if in_vlan is not None:
1339
            # if in_vlan is set, it must be included in the match
1340 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1341
1342 1
        if (not isinstance(new_c_vlan, list) and in_vlan != new_c_vlan and
1343
                new_c_vlan not in self.special_cases):
1344
            # new_in_vlan is an integer but zero, action to set is required
1345 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1346 1
            flow_mod["actions"].insert(0, new_action)
1347
1348 1
        if in_vlan not in self.special_cases and new_c_vlan == 0:
1349
            # # new_in_vlan is an integer but zero and new_c_vlan does not
1350
            # a pop action is required
1351 1
            new_action = {"action_type": "pop_vlan"}
1352 1
            flow_mod["actions"].insert(0, new_action)
1353
1354 1
        elif in_vlan == "4096/4096" and new_c_vlan == 0:
1355
            # if in_vlan match with any tags and new_c_vlan does not
1356
            # a pop action is required
1357 1
            new_action = {"action_type": "pop_vlan"}
1358 1
            flow_mod["actions"].insert(0, new_action)
1359
1360 1
        elif (not in_vlan and
1361
                (not isinstance(new_c_vlan, list) and
1362
                 new_c_vlan not in self.special_cases)):
1363
            # new_in_vlan is an integer but zero and in_vlan is not set
1364
            # then it is set now
1365 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1366 1
            flow_mod["actions"].insert(0, new_action)
1367
1368 1
        return flow_mod
1369
1370 1
    def _prepare_pop_flow(
1371
        self, in_interface, out_interface, out_vlan, queue_id=None
1372
    ):
1373
        # pylint: disable=too-many-arguments
1374
        """Prepare pop flow."""
1375 1
        flow_mod = self._prepare_flow_mod(
1376
            in_interface, out_interface, queue_id
1377
        )
1378 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1379 1
        new_action = {"action_type": "pop_vlan"}
1380 1
        flow_mod["actions"].insert(0, new_action)
1381 1
        return flow_mod
1382
1383 1
    @staticmethod
1384 1
    def run_bulk_sdntraces(
1385
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1386
    ) -> dict:
1387
        """Run SDN traces on control plane starting from EVC UNIs."""
1388 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1389 1
        data = []
1390 1
        for interface, tag_value in uni_list:
1391 1
            data_uni = {
1392
                "trace": {
1393
                            "switch": {
1394
                                "dpid": interface.switch.dpid,
1395
                                "in_port": interface.port_number,
1396
                            }
1397
                        }
1398
                }
1399 1
            if tag_value:
1400 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1401 1
                if uni_dl_vlan:
1402 1
                    data_uni["trace"]["eth"] = {
1403
                                            "dl_type": 0x8100,
1404
                                            "dl_vlan": uni_dl_vlan,
1405
                                            }
1406 1
            data.append(data_uni)
1407 1
        try:
1408 1
            response = requests.put(endpoint, json=data, timeout=30)
1409 1
        except Timeout as exception:
1410 1
            log.error(f"Request has timed out: {exception}")
1411 1
            return {"result": []}
1412 1
        if response.status_code >= 400:
1413 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1414 1
            return {"result": []}
1415 1
        return response.json()
1416
1417
    # pylint: disable=too-many-return-statements, too-many-arguments
1418 1
    @staticmethod
1419 1
    def check_trace(
1420
        evc_id: str,
1421
        evc_name: str,
1422
        tag_a: Union[None, int, str],
1423
        tag_z: Union[None, int, str],
1424
        interface_a: Interface,
1425
        interface_z: Interface,
1426
        current_path: list,
1427
        trace_a: list,
1428
        trace_z: list
1429
    ) -> bool:
1430
        """Auxiliar function to check an individual trace"""
1431 1
        if (
1432
            len(trace_a) != len(current_path) + 1
1433
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1434
        ):
1435 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1436
                        f"Invalid trace from uni_a: {trace_a}")
1437 1
            return False
1438 1
        if (
1439
            len(trace_z) != len(current_path) + 1
1440
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1441
        ):
1442 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1443
                        f"Invalid trace from uni_z: {trace_z}")
1444 1
            return False
1445
1446 1
        if not current_path:
1447
            return True
1448
1449 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1450 1
        if (
1451
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1452
        ):
1453 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1454 1
        elif (
1455
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1456
        ):
1457 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1458
        else:
1459
            msg = (
1460
                f"first link {first_link} endpoint_a didn't match the first "
1461
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1462
            )
1463
            log.warning(msg)
1464
            return False
1465
1466 1
        for link, trace1, trace2 in zip(current_path,
1467
                                        trace_path_begin[1:],
1468
                                        trace_path_end[:0:-1]):
1469 1
            metadata_vlan = None
1470 1
            if link.metadata:
1471 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1472 1
            if compare_endpoint_trace(
1473
                                        link.endpoint_a,
1474
                                        metadata_vlan,
1475
                                        trace2
1476
                                    ) is False:
1477 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1478
                            f"Invalid trace from uni_a: {trace_a}")
1479 1
                return False
1480 1
            if compare_endpoint_trace(
1481
                                        link.endpoint_b,
1482
                                        metadata_vlan,
1483
                                        trace1
1484
                                    ) is False:
1485 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1486
                            f"Invalid trace from uni_z: {trace_z}")
1487 1
                return False
1488
1489 1
        return True
1490
1491 1
    @staticmethod
1492 1
    def check_range(circuit, traces: list) -> bool:
1493
        """Check traces when for UNI with TAGRange"""
1494 1
        check = True
1495 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1496 1
            trace_a = traces[i*2]
1497 1
            trace_z = traces[i*2+1]
1498 1
            check &= EVCDeploy.check_trace(
1499
                circuit.id, circuit.name,
1500
                mask, mask,
1501
                circuit.uni_a.interface,
1502
                circuit.uni_z.interface,
1503
                circuit.current_path,
1504
                trace_a, trace_z,
1505
            )
1506 1
        return check
1507
1508 1
    @staticmethod
1509 1
    def check_list_traces(list_circuits: list) -> dict:
1510
        """Check if current_path is deployed comparing with SDN traces."""
1511 1
        if not list_circuits:
1512 1
            return {}
1513 1
        uni_list = make_uni_list(list_circuits)
1514 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1515
1516 1
        if not traces:
1517 1
            return {}
1518
1519 1
        try:
1520 1
            circuits_checked = {}
1521 1
            i = 0
1522 1
            for circuit in list_circuits:
1523 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1524 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1525 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1526
                        circuit, traces[i:i+length*2]
1527
                    )
1528 1
                    i += length*2
1529
                else:
1530 1
                    trace_a = traces[i]
1531 1
                    trace_z = traces[i+1]
1532 1
                    tag_a = None
1533 1
                    if circuit.uni_a.user_tag:
1534 1
                        tag_a = circuit.uni_a.user_tag.value
1535 1
                    tag_z = None
1536 1
                    if circuit.uni_z.user_tag:
1537 1
                        tag_z = circuit.uni_z.user_tag.value
1538 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1539
                        circuit.id, circuit.name,
1540
                        tag_a, tag_z,
1541
                        circuit.uni_a.interface,
1542
                        circuit.uni_z.interface,
1543
                        circuit.current_path,
1544
                        trace_a, trace_z
1545
                    )
1546 1
                    i += 2
1547 1
        except IndexError as err:
1548 1
            log.error(
1549
                f"Bulk sdntraces returned fewer items than expected."
1550
                f"Error = {err}"
1551
            )
1552 1
            return {}
1553
1554 1
        return circuits_checked
1555
1556 1
    @staticmethod
1557 1
    def get_endpoint_by_id(
1558
        link: Link,
1559
        id_: str,
1560
        operator: Union[eq, ne]
1561
    ) -> Interface:
1562
        """Return endpoint from link
1563
        either equal(eq) or not equal(ne) to id"""
1564 1
        if operator(link.endpoint_a.switch.id, id_):
1565 1
            return link.endpoint_a
1566 1
        return link.endpoint_b
1567
1568
1569 1
class LinkProtection(EVCDeploy):
1570
    """Class to handle link protection."""
1571
1572 1
    def is_affected_by_link(self, link=None):
1573
        """Verify if the current path is affected by link down event."""
1574
        return self.current_path.is_affected_by_link(link)
1575
1576 1
    def is_using_primary_path(self):
1577
        """Verify if the current deployed path is self.primary_path."""
1578 1
        return self.current_path == self.primary_path
1579
1580 1
    def is_using_backup_path(self):
1581
        """Verify if the current deployed path is self.backup_path."""
1582 1
        return self.current_path == self.backup_path
1583
1584 1
    def is_using_dynamic_path(self):
1585
        """Verify if the current deployed path is dynamic."""
1586 1
        if (
1587
            self.current_path
1588
            and not self.is_using_primary_path()
1589
            and not self.is_using_backup_path()
1590
            and self.current_path.status is EntityStatus.UP
1591
        ):
1592
            return True
1593 1
        return False
1594
1595 1
    def deploy_to(self, path_name=None, path=None):
1596
        """Create a deploy to path."""
1597 1
        if self.current_path == path:
1598 1
            log.debug(f"{path_name} is equal to current_path.")
1599 1
            return True
1600
1601 1
        if path.status is EntityStatus.UP:
1602 1
            return self.deploy_to_path(path)
1603
1604 1
        return False
1605
1606 1
    def handle_link_up(self, link):
1607
        """Handle circuit when link up.
1608
1609
        Args:
1610
            link(Link): Link affected by link.up event.
1611
1612
        """
1613 1
        condition_pairs = [
1614
            (
1615
                lambda me: me.is_using_primary_path(),
1616
                lambda _: (True, 'nothing')
1617
            ),
1618
            (
1619
                lambda me: me.is_intra_switch(),
1620
                lambda _: (True, 'nothing')
1621
            ),
1622
            (
1623
                lambda me: me.primary_path.is_affected_by_link(link),
1624
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1625
            ),
1626
            # We tried to deploy(primary_path) without success.
1627
            # And in this case is up by some how. Nothing to do.
1628
            (
1629
                lambda me: me.is_using_backup_path(),
1630
                lambda _: (True, 'nothing')
1631
            ),
1632
            (
1633
                lambda me:  me.is_using_dynamic_path(),
1634
                lambda _: (True, 'nothing')
1635
            ),
1636
            # In this case, probably the circuit is not being used and
1637
            # we can move to backup
1638
            (
1639
                lambda me: me.backup_path.is_affected_by_link(link),
1640
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1641
            ),
1642
            # In this case, the circuit is not being used and we should
1643
            # try a dynamic path
1644
            (
1645
                lambda me: me.dynamic_backup_path and not me.is_active(),
1646
                lambda me: (me.deploy_to_path(), 'redeploy')
1647
            )
1648
        ]
1649 1
        for predicate, action in condition_pairs:
1650 1
            if not predicate(self):
1651 1
                continue
1652 1
            success, succcess_type = action(self)
1653 1
            if success:
1654 1
                if succcess_type == 'redeploy':
1655 1
                    emit_event(
1656
                        self._controller,
1657
                        "redeployed_link_up",
1658
                        content=map_evc_event_content(self)
1659
                    )
1660 1
                return True
1661 1
        return False
1662
1663 1
    def handle_link_down(self):
1664
        """Handle circuit when link down.
1665
1666
        Returns:
1667
            bool: True if the re-deploy was successly otherwise False.
1668
1669
        """
1670 1
        success = False
1671 1
        if self.is_using_primary_path():
1672 1
            success = self.deploy_to_backup_path()
1673 1
        elif self.is_using_backup_path():
1674 1
            success = self.deploy_to_primary_path()
1675
1676 1
        if not success and self.dynamic_backup_path:
1677 1
            success = self.deploy_to_path()
1678
1679 1
        if success:
1680 1
            log.debug(f"{self} deployed after link down.")
1681
        else:
1682 1
            self.remove_current_flows(sync=False)
1683 1
            self.deactivate()
1684 1
            self.sync()
1685 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1686
1687 1
        return success
1688
1689 1
    @staticmethod
1690 1
    def get_interface_from_switch(uni: UNI, switches: dict) -> Interface:
1691
        """Get interface from switch by uni"""
1692 1
        switch = switches[uni.interface.switch.dpid]
1693 1
        interface = switch.interfaces[uni.interface.port_number]
1694 1
        return interface
1695
1696 1
    def are_unis_active(self, switches: dict) -> bool:
1697
        """Determine whether this EVC should be active"""
1698 1
        interface_a = self.get_interface_from_switch(self.uni_a, switches)
1699 1
        interface_z = self.get_interface_from_switch(self.uni_z, switches)
1700 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1701 1
        return active
1702
1703 1
    @staticmethod
1704 1
    def is_uni_interface_active(
1705
        *interfaces: Interface
1706
    ) -> tuple[bool, dict]:
1707
        """Determine whether a UNI should be active"""
1708 1
        active = True
1709 1
        bad_interfaces = [
1710
            interface
1711
            for interface in interfaces
1712
            if interface.status != EntityStatus.UP
1713
        ]
1714 1
        if bad_interfaces:
1715 1
            active = False
1716 1
            interfaces = bad_interfaces
1717 1
        return active, {
1718
            interface.id: {
1719
                'status': interface.status.value,
1720
                'status_reason': interface.status_reason,
1721
            }
1722
            for interface in interfaces
1723
        }
1724
1725 1
    def handle_interface_link_up(self, interface: Interface):
1726
        """
1727
        Handler for interface link_up events
1728
        """
1729 1
        if self.is_active():
1730 1
            return
1731 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1732 1
        if interface not in interfaces:
1733
            return
1734 1
        down_interfaces = [
1735
            interface
1736
            for interface in interfaces
1737
            if interface.status != EntityStatus.UP
1738
        ]
1739 1
        if down_interfaces:
1740
            return
1741 1
        interface_dicts = {
1742
            interface.id: {
1743
                'status': interface.status.value,
1744
                'status_reason': interface.status_reason,
1745
            }
1746
            for interface in interfaces
1747
        }
1748 1
        self.activate()
1749 1
        log.info(
1750
            f"Activating EVC {self.id}. Interfaces: "
1751
            f"{interface_dicts}."
1752
        )
1753 1
        emit_event(self._controller, "uni_active_updated",
1754
                   content=map_evc_event_content(self))
1755 1
        self.sync()
1756
1757 1
    def handle_interface_link_down(self, interface):
1758
        """
1759
        Handler for interface link_down events
1760
        """
1761 1
        if not self.is_active():
1762 1
            return
1763 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1764 1
        if interface not in interfaces:
1765
            return
1766 1
        down_interfaces = [
1767
            interface
1768
            for interface in interfaces
1769
            if interface.status != EntityStatus.UP
1770
        ]
1771 1
        if not down_interfaces:
1772
            return
1773 1
        interface_dicts = {
1774
            interface.id: {
1775
                'status': interface.status.value,
1776
                'status_reason': interface.status_reason,
1777
            }
1778
            for interface in down_interfaces
1779
        }
1780 1
        self.deactivate()
1781 1
        log.info(
1782
            f"Deactivating EVC {self.id}. Interfaces: "
1783
            f"{interface_dicts}."
1784
        )
1785 1
        emit_event(self._controller, "uni_active_updated",
1786
                   content=map_evc_event_content(self))
1787 1
        self.sync()
1788
1789
1790 1
class EVC(LinkProtection):
1791
    """Class that represents a E-Line Virtual Connection."""
1792