Passed
Push — master ( 2e4517...9aeaeb )
by Vinicius
04:27 queued 15s
created

build.models.evc.EVCDeploy.check_trace()   D

Complexity

Conditions 12

Size

Total Lines 72
Code Lines 55

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 25
CRAP Score 12.3775

Importance

Changes 0
Metric Value
cc 12
eloc 55
nop 9
dl 0
loc 72
ccs 25
cts 29
cp 0.8621
crap 12.3775
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.models.evc.EVCDeploy.check_trace() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
"""Classes used in the main application."""  # pylint: disable=too-many-lines
2 1
import traceback
3 1
from collections import OrderedDict
4 1
from copy import deepcopy
5 1
from datetime import datetime
6 1
from operator import eq, ne
7 1
from threading import Lock
8 1
from typing import Union
9 1
from uuid import uuid4
10
11 1
import requests
12 1
from glom import glom
13 1
from requests.exceptions import Timeout
14
15 1
from kytos.core import log
16 1
from kytos.core.common import EntityStatus, GenericEntity
17 1
from kytos.core.exceptions import KytosNoTagAvailableError, KytosTagError
18 1
from kytos.core.helpers import get_time, now
19 1
from kytos.core.interface import UNI, Interface, TAGRange
20 1
from kytos.core.link import Link
21 1
from kytos.core.tag_ranges import range_difference
22 1
from napps.kytos.mef_eline import controllers, settings
23 1
from napps.kytos.mef_eline.exceptions import (DuplicatedNoTagUNI,
24
                                              FlowModException, InvalidPath)
25 1
from napps.kytos.mef_eline.utils import (check_disabled_component,
26
                                         compare_endpoint_trace,
27
                                         compare_uni_out_trace, emit_event,
28
                                         make_uni_list, map_dl_vlan,
29
                                         map_evc_event_content,
30
                                         merge_flow_dicts)
31
32 1
from .path import DynamicPathManager, Path
33
34
35 1
class EVCBase(GenericEntity):
36
    """Class to represent a circuit."""
37
38 1
    read_only_attributes = [
39
        "creation_time",
40
        "active",
41
        "current_path",
42
        "failover_path",
43
        "_id",
44
        "archived",
45
    ]
46 1
    attributes_requiring_redeploy = [
47
        "primary_path",
48
        "backup_path",
49
        "dynamic_backup_path",
50
        "queue_id",
51
        "sb_priority",
52
        "primary_constraints",
53
        "secondary_constraints",
54
        "uni_a",
55
        "uni_z",
56
    ]
57 1
    required_attributes = ["name", "uni_a", "uni_z"]
58
59 1
    def __init__(self, controller, **kwargs):
60
        """Create an EVC instance with the provided parameters.
61
62
        Args:
63
            id(str): EVC identifier. Whether it's None an ID will be genereted.
64
                     Only the first 14 bytes passed will be used.
65
            name: represents an EVC name.(Required)
66
            uni_a (UNI): Endpoint A for User Network Interface.(Required)
67
            uni_z (UNI): Endpoint Z for User Network Interface.(Required)
68
            start_date(datetime|str): Date when the EVC was registred.
69
                                      Default is now().
70
            end_date(datetime|str): Final date that the EVC will be fineshed.
71
                                    Default is None.
72
            bandwidth(int): Bandwidth used by EVC instance. Default is 0.
73
            primary_links(list): Primary links used by evc. Default is []
74
            backup_links(list): Backups links used by evc. Default is []
75
            current_path(list): Circuit being used at the moment if this is an
76
                                active circuit. Default is [].
77
            failover_path(list): Path being used to provide EVC protection via
78
                                failover during link failures. Default is [].
79
            primary_path(list): primary circuit offered to user IF one or more
80
                                links were provided. Default is [].
81
            backup_path(list): backup circuit offered to the user IF one or
82
                               more links were provided. Default is [].
83
            dynamic_backup_path(bool): Enable computer backup path dynamically.
84
                                       Dafault is False.
85
            creation_time(datetime|str): datetime when the circuit should be
86
                                         activated. default is now().
87
            enabled(Boolean): attribute to indicate the administrative state;
88
                              default is False.
89
            active(Boolean): attribute to indicate the operational state;
90
                             default is False.
91
            archived(Boolean): indicate the EVC has been deleted and is
92
                               archived; default is False.
93
            owner(str): The EVC owner. Default is None.
94
            sb_priority(int): Service level provided in the request.
95
                              Default is None.
96
            service_level(int): Service level provided. The higher the better.
97
                                Default is 0.
98
99
        Raises:
100
            ValueError: raised when object attributes are invalid.
101
102
        """
103 1
        self._controller = controller
104 1
        self._validate(**kwargs)
105 1
        super().__init__()
106
107
        # required attributes
108 1
        self._id = kwargs.get("id", uuid4().hex)[:14]
109 1
        self.uni_a: UNI = kwargs.get("uni_a")
110 1
        self.uni_z: UNI = kwargs.get("uni_z")
111 1
        self.name = kwargs.get("name")
112
113
        # optional attributes
114 1
        self.start_date = get_time(kwargs.get("start_date")) or now()
115 1
        self.end_date = get_time(kwargs.get("end_date")) or None
116 1
        self.queue_id = kwargs.get("queue_id", -1)
117
118 1
        self.bandwidth = kwargs.get("bandwidth", 0)
119 1
        self.primary_links = Path(kwargs.get("primary_links", []))
120 1
        self.backup_links = Path(kwargs.get("backup_links", []))
121 1
        self.current_path = Path(kwargs.get("current_path", []))
122 1
        self.failover_path = Path(kwargs.get("failover_path", []))
123 1
        self.primary_path = Path(kwargs.get("primary_path", []))
124 1
        self.backup_path = Path(kwargs.get("backup_path", []))
125 1
        self.dynamic_backup_path = kwargs.get("dynamic_backup_path", False)
126 1
        self.primary_constraints = kwargs.get("primary_constraints", {})
127 1
        self.secondary_constraints = kwargs.get("secondary_constraints", {})
128 1
        self.creation_time = get_time(kwargs.get("creation_time")) or now()
129 1
        self.owner = kwargs.get("owner", None)
130 1
        self.sb_priority = kwargs.get("sb_priority", None) or kwargs.get(
131
            "priority", None
132
        )
133 1
        self.service_level = kwargs.get("service_level", 0)
134 1
        self.circuit_scheduler = kwargs.get("circuit_scheduler", [])
135 1
        self.flow_removed_at = get_time(kwargs.get("flow_removed_at")) or None
136 1
        self.updated_at = get_time(kwargs.get("updated_at")) or now()
137 1
        self.execution_rounds = kwargs.get("execution_rounds", 0)
138
139 1
        self.current_links_cache = set()
140 1
        self.primary_links_cache = set()
141 1
        self.backup_links_cache = set()
142 1
        self.affected_by_link_at = get_time("0001-01-01T00:00:00")
143 1
        self.old_path = Path([])
144
145 1
        self.lock = Lock()
146
147 1
        self.archived = kwargs.get("archived", False)
148
149 1
        self.metadata = kwargs.get("metadata", {})
150
151 1
        self._mongo_controller = controllers.ELineController()
152
153 1
        if kwargs.get("active", False):
154 1
            self.activate()
155
        else:
156 1
            self.deactivate()
157
158 1
        if kwargs.get("enabled", False):
159 1
            self.enable()
160
        else:
161 1
            self.disable()
162
163
        # datetime of user request for a EVC (or datetime when object was
164
        # created)
165 1
        self.request_time = kwargs.get("request_time", now())
166
        # dict with the user original request (input)
167 1
        self._requested = kwargs
168
169
        # Special cases: No tag, any, untagged
170 1
        self.special_cases = {None, "4096/4096", 0}
171 1
        self.table_group = kwargs.get("table_group")
172
173 1
    def sync(self, keys: set = None):
174
        """Sync this EVC in the MongoDB."""
175 1
        self.updated_at = now()
176 1
        if keys:
177 1
            self._mongo_controller.update_evc(self.as_dict(keys))
178 1
            return
179 1
        self._mongo_controller.upsert_evc(self.as_dict())
180
181 1
    def _get_unis_use_tags(self, **kwargs) -> tuple[UNI, UNI]:
182
        """Obtain both UNIs (uni_a, uni_z).
183
        If a UNI is changing, verify tags"""
184 1
        uni_a = kwargs.get("uni_a", None)
185 1
        uni_a_flag = False
186 1
        if uni_a and uni_a != self.uni_a:
187 1
            uni_a_flag = True
188 1
            self._use_uni_vlan(uni_a, uni_dif=self.uni_a)
189
190 1
        uni_z = kwargs.get("uni_z", None)
191 1
        if uni_z and uni_z != self.uni_z:
192 1
            try:
193 1
                self._use_uni_vlan(uni_z, uni_dif=self.uni_z)
194 1
                self.make_uni_vlan_available(self.uni_z, uni_dif=uni_z)
195 1
            except KytosTagError as err:
196 1
                if uni_a_flag:
197 1
                    self.make_uni_vlan_available(uni_a, uni_dif=self.uni_a)
198 1
                raise err
199
        else:
200 1
            uni_z = self.uni_z
201
202 1
        if uni_a_flag:
203 1
            self.make_uni_vlan_available(self.uni_a, uni_dif=uni_a)
204
        else:
205 1
            uni_a = self.uni_a
206 1
        return uni_a, uni_z
207
208 1
    def update(self, **kwargs):
209
        """Update evc attributes.
210
211
        This method will raises an error trying to change the following
212
        attributes: [creation_time, active, current_path, failover_path,
213
        _id, archived]
214
        [name, uni_a and uni_z]
215
216
        Returns:
217
            the values for enable and a redeploy attribute, if exists and None
218
            otherwise
219
        Raises:
220
            ValueError: message with error detail.
221
222
        """
223 1
        enable, redeploy = (None, None)
224 1
        if not self._tag_lists_equal(**kwargs):
225 1
            raise ValueError(
226
                "UNI_A and UNI_Z tag lists should be the same."
227
            )
228 1
        uni_a, uni_z = self._get_unis_use_tags(**kwargs)
229 1
        check_disabled_component(uni_a, uni_z)
230 1
        self._validate_has_primary_or_dynamic(
231
            primary_path=kwargs.get("primary_path"),
232
            dynamic_backup_path=kwargs.get("dynamic_backup_path"),
233
            uni_a=uni_a,
234
            uni_z=uni_z,
235
        )
236 1
        for attribute, value in kwargs.items():
237 1
            if attribute in self.read_only_attributes:
238 1
                raise ValueError(f"{attribute} can't be updated.")
239 1
            if not hasattr(self, attribute):
240 1
                raise ValueError(f'The attribute "{attribute}" is invalid.')
241 1
            if attribute in ("primary_path", "backup_path"):
242 1
                try:
243 1
                    value.is_valid(
244
                        uni_a.interface.switch, uni_z.interface.switch
245
                    )
246 1
                except InvalidPath as exception:
247 1
                    raise ValueError(  # pylint: disable=raise-missing-from
248
                        f"{attribute} is not a " f"valid path: {exception}"
249
                    )
250 1
        for attribute, value in kwargs.items():
251 1
            if attribute in ("enable", "enabled"):
252 1
                if value:
253 1
                    self.enable()
254
                else:
255 1
                    self.disable()
256 1
                enable = value
257
            else:
258 1
                setattr(self, attribute, value)
259 1
                if attribute in self.attributes_requiring_redeploy:
260 1
                    redeploy = True
261 1
        self.sync(set(kwargs.keys()))
262 1
        return enable, redeploy
263
264 1
    def set_flow_removed_at(self):
265
        """Update flow_removed_at attribute."""
266
        self.flow_removed_at = now()
267
268 1
    def has_recent_removed_flow(self, setting=settings):
269
        """Check if any flow has been removed from the evc"""
270
        if self.flow_removed_at is None:
271
            return False
272
        res_seconds = (now() - self.flow_removed_at).seconds
273
        return res_seconds < setting.TIME_RECENT_DELETED_FLOWS
274
275 1
    def is_recent_updated(self, setting=settings):
276
        """Check if the evc has been updated recently"""
277
        res_seconds = (now() - self.updated_at).seconds
278
        return res_seconds < setting.TIME_RECENT_UPDATED
279
280 1
    def __repr__(self):
281
        """Repr method."""
282 1
        return f"EVC({self._id}, {self.name})"
283
284 1
    def _validate(self, **kwargs):
285
        """Do Basic validations.
286
287
        Verify required attributes: name, uni_a, uni_z
288
289
        Raises:
290
            ValueError: message with error detail.
291
292
        """
293 1
        for attribute in self.required_attributes:
294
295 1
            if attribute not in kwargs:
296 1
                raise ValueError(f"{attribute} is required.")
297
298 1
            if "uni" in attribute:
299 1
                uni = kwargs.get(attribute)
300 1
                if not isinstance(uni, UNI):
301
                    raise ValueError(f"{attribute} is an invalid UNI.")
302
303 1
    def _tag_lists_equal(self, **kwargs):
304
        """Verify that tag lists are the same."""
305 1
        uni_a = kwargs.get("uni_a") or self.uni_a
306 1
        uni_z = kwargs.get("uni_z") or self.uni_z
307 1
        uni_a_list = uni_z_list = False
308 1
        if (uni_a.user_tag and isinstance(uni_a.user_tag, TAGRange)):
309 1
            uni_a_list = True
310 1
        if (uni_z.user_tag and isinstance(uni_z.user_tag, TAGRange)):
311 1
            uni_z_list = True
312 1
        if uni_a_list and uni_z_list:
313 1
            return uni_a.user_tag.value == uni_z.user_tag.value
314 1
        return uni_a_list == uni_z_list
315
316 1
    def _validate_has_primary_or_dynamic(
317
        self,
318
        primary_path=None,
319
        dynamic_backup_path=None,
320
        uni_a=None,
321
        uni_z=None,
322
    ) -> None:
323
        """Validate that it must have a primary path or allow dynamic paths."""
324 1
        primary_path = (
325
            primary_path
326
            if primary_path is not None
327
            else self.primary_path
328
        )
329 1
        dynamic_backup_path = (
330
            dynamic_backup_path
331
            if dynamic_backup_path is not None
332
            else self.dynamic_backup_path
333
        )
334 1
        uni_a = uni_a if uni_a is not None else self.uni_a
335 1
        uni_z = uni_z if uni_z is not None else self.uni_z
336 1
        if (
337
            not primary_path
338
            and not dynamic_backup_path
339
            and uni_a and uni_z
340
            and uni_a.interface.switch != uni_z.interface.switch
341
        ):
342 1
            msg = "The EVC must have a primary path or allow dynamic paths."
343 1
            raise ValueError(msg)
344
345 1
    def __eq__(self, other):
346
        """Override the default implementation."""
347 1
        if not isinstance(other, EVC):
348
            return False
349
350 1
        attrs_to_compare = ["name", "uni_a", "uni_z", "owner", "bandwidth"]
351 1
        for attribute in attrs_to_compare:
352 1
            if getattr(other, attribute) != getattr(self, attribute):
353 1
                return False
354 1
        return True
355
356 1
    def is_intra_switch(self):
357
        """Check if the UNIs are in the same switch."""
358 1
        return self.uni_a.interface.switch == self.uni_z.interface.switch
359
360 1
    def check_no_tag_duplicate(self, other_uni: UNI):
361
        """Check if a no tag UNI is duplicated."""
362 1
        if other_uni in (self.uni_a, self.uni_z):
363 1
            msg = f"UNI with interface {other_uni.interface.id} is"\
364
                  f" duplicated with EVC {self.id}."
365 1
            raise DuplicatedNoTagUNI(msg)
366
367 1
    def as_dict(self, keys: set = None):
368
        """Return a dictionary representing an EVC object.
369
            keys: Only fields on this variable will be
370
                  returned in the dictionary"""
371 1
        evc_dict = {
372
            "id": self.id,
373
            "name": self.name,
374
            "uni_a": self.uni_a.as_dict(),
375
            "uni_z": self.uni_z.as_dict(),
376
        }
377
378 1
        time_fmt = "%Y-%m-%dT%H:%M:%S"
379
380 1
        evc_dict["start_date"] = self.start_date
381 1
        if isinstance(self.start_date, datetime):
382 1
            evc_dict["start_date"] = self.start_date.strftime(time_fmt)
383
384 1
        evc_dict["end_date"] = self.end_date
385 1
        if isinstance(self.end_date, datetime):
386 1
            evc_dict["end_date"] = self.end_date.strftime(time_fmt)
387
388 1
        evc_dict["queue_id"] = self.queue_id
389 1
        evc_dict["bandwidth"] = self.bandwidth
390 1
        evc_dict["primary_links"] = self.primary_links.as_dict()
391 1
        evc_dict["backup_links"] = self.backup_links.as_dict()
392 1
        evc_dict["current_path"] = self.current_path.as_dict()
393 1
        evc_dict["failover_path"] = self.failover_path.as_dict()
394 1
        evc_dict["primary_path"] = self.primary_path.as_dict()
395 1
        evc_dict["backup_path"] = self.backup_path.as_dict()
396 1
        evc_dict["dynamic_backup_path"] = self.dynamic_backup_path
397 1
        evc_dict["metadata"] = self.metadata
398
399 1
        evc_dict["request_time"] = self.request_time
400 1
        if isinstance(self.request_time, datetime):
401 1
            evc_dict["request_time"] = self.request_time.strftime(time_fmt)
402
403 1
        time = self.creation_time.strftime(time_fmt)
404 1
        evc_dict["creation_time"] = time
405
406 1
        evc_dict["owner"] = self.owner
407 1
        evc_dict["circuit_scheduler"] = [
408
            sc.as_dict() for sc in self.circuit_scheduler
409
        ]
410
411 1
        evc_dict["active"] = self.is_active()
412 1
        evc_dict["enabled"] = self.is_enabled()
413 1
        evc_dict["archived"] = self.archived
414 1
        evc_dict["sb_priority"] = self.sb_priority
415 1
        evc_dict["service_level"] = self.service_level
416 1
        evc_dict["primary_constraints"] = self.primary_constraints
417 1
        evc_dict["secondary_constraints"] = self.secondary_constraints
418 1
        evc_dict["flow_removed_at"] = self.flow_removed_at
419 1
        evc_dict["updated_at"] = self.updated_at
420
421 1
        if keys:
422 1
            selected = {}
423 1
            for key in keys:
424 1
                if key == "enable":
425
                    selected["enabled"] = evc_dict["enabled"]
426
                    continue
427 1
                selected[key] = evc_dict[key]
428 1
            selected["id"] = evc_dict["id"]
429 1
            return selected
430 1
        return evc_dict
431
432 1
    @property
433 1
    def id(self):  # pylint: disable=invalid-name
434
        """Return this EVC's ID."""
435 1
        return self._id
436
437 1
    def archive(self):
438
        """Archive this EVC on deletion."""
439 1
        self.archived = True
440
441 1
    def _use_uni_vlan(
442
        self,
443
        uni: UNI,
444
        uni_dif: Union[None, UNI] = None
445
    ):
446
        """Use tags from UNI"""
447 1
        if uni.user_tag is None:
448 1
            return
449 1
        tag = uni.user_tag.value
450 1
        tag_type = uni.user_tag.tag_type
451 1
        if (uni_dif and isinstance(tag, list) and
452
                isinstance(uni_dif.user_tag.value, list)):
453 1
            tag = range_difference(tag, uni_dif.user_tag.value)
454 1
            if not tag:
455 1
                return
456 1
        uni.interface.use_tags(
457
            self._controller, tag, tag_type, use_lock=True, check_order=False
458
        )
459
460 1
    def make_uni_vlan_available(
461
        self,
462
        uni: UNI,
463
        uni_dif: Union[None, UNI] = None,
464
    ):
465
        """Make available tag from UNI"""
466 1
        if uni.user_tag is None:
467 1
            return
468 1
        tag = uni.user_tag.value
469 1
        tag_type = uni.user_tag.tag_type
470 1
        if (uni_dif and isinstance(tag, list) and
471
                isinstance(uni_dif.user_tag.value, list)):
472 1
            tag = range_difference(tag, uni_dif.user_tag.value)
473 1
            if not tag:
474
                return
475 1
        try:
476 1
            conflict = uni.interface.make_tags_available(
477
                self._controller, tag, tag_type, use_lock=True,
478
                check_order=False
479
            )
480 1
        except KytosTagError as err:
481 1
            log.error(f"Error in circuit {self._id}: {err}")
482 1
            return
483 1
        if conflict:
484 1
            intf = uni.interface.id
485 1
            log.warning(f"Tags {conflict} was already available in {intf}")
486
487 1
    def remove_uni_tags(self):
488
        """Remove both UNI usage of a tag"""
489 1
        self.make_uni_vlan_available(self.uni_a)
490 1
        self.make_uni_vlan_available(self.uni_z)
491
492
493
# pylint: disable=fixme, too-many-public-methods
494 1
class EVCDeploy(EVCBase):
495
    """Class to handle the deploy procedures."""
496
497 1
    def create(self):
498
        """Create a EVC."""
499
500 1
    def discover_new_paths(self):
501
        """Discover new paths to satisfy this circuit and deploy it."""
502
        return DynamicPathManager.get_best_paths(self,
503
                                                 **self.primary_constraints)
504
505 1
    def get_failover_path_candidates(self):
506
        """Get failover paths to satisfy this EVC."""
507
        # in the future we can return primary/backup paths as well
508
        # we just have to properly handle link_up and failover paths
509
        # if (
510
        #     self.is_using_primary_path() and
511
        #     self.backup_path.status is EntityStatus.UP
512
        # ):
513
        #     yield self.backup_path
514 1
        return DynamicPathManager.get_disjoint_paths(self, self.current_path)
515
516 1
    def change_path(self):
517
        """Change EVC path."""
518
519 1
    def reprovision(self):
520
        """Force the EVC (re-)provisioning."""
521
522 1
    def is_affected_by_link(self, link):
523
        """Return True if this EVC has the given link on its current path."""
524 1
        return link in self.current_path
525
526 1
    def link_affected_by_interface(self, interface):
527
        """Return True if this EVC has the given link on its current path."""
528
        return self.current_path.link_affected_by_interface(interface)
529
530 1
    def is_backup_path_affected_by_link(self, link):
531
        """Return True if the backup path of this EVC uses the given link."""
532 1
        return link in self.backup_path
533
534
    # pylint: disable=invalid-name
535 1
    def is_primary_path_affected_by_link(self, link):
536
        """Return True if the primary path of this EVC uses the given link."""
537 1
        return link in self.primary_path
538
539 1
    def is_failover_path_affected_by_link(self, link):
540
        """Return True if this EVC has the given link on its failover path."""
541 1
        return link in self.failover_path
542
543 1
    def is_eligible_for_failover_path(self):
544
        """Verify if this EVC is eligible for failover path (EP029)"""
545
        # In the future this function can be augmented to consider
546
        # primary/backup, primary/dynamic, and other path combinations
547 1
        return (
548
            self.dynamic_backup_path and
549
            not self.primary_path and not self.backup_path
550
        )
551
552 1
    def is_using_primary_path(self):
553
        """Verify if the current deployed path is self.primary_path."""
554 1
        return self.primary_path and (self.current_path == self.primary_path)
555
556 1
    def is_using_backup_path(self):
557
        """Verify if the current deployed path is self.backup_path."""
558 1
        return self.backup_path and (self.current_path == self.backup_path)
559
560 1
    def is_using_dynamic_path(self):
561
        """Verify if the current deployed path is a dynamic path."""
562 1
        if (
563
            self.current_path
564
            and not self.is_using_primary_path()
565
            and not self.is_using_backup_path()
566
            and self.current_path.status == EntityStatus.UP
567
        ):
568
            return True
569 1
        return False
570
571 1
    def deploy_to_backup_path(self):
572
        """Deploy the backup path into the datapaths of this circuit.
573
574
        If the backup_path attribute is valid and up, this method will try to
575
        deploy this backup_path.
576
577
        If everything fails and dynamic_backup_path is True, then tries to
578
        deploy a dynamic path.
579
        """
580
        # TODO: Remove flows from current (cookies)
581 1
        if self.is_using_backup_path():
582
            # TODO: Log to say that cannot move backup to backup
583
            return True
584
585 1
        success = False
586 1
        if self.backup_path.status is EntityStatus.UP:
587 1
            success = self.deploy_to_path(self.backup_path)
588
589 1
        if success:
590 1
            return True
591
592 1
        if self.dynamic_backup_path or self.is_intra_switch():
593 1
            return self.deploy_to_path()
594
595
        return False
596
597 1
    def deploy_to_primary_path(self):
598
        """Deploy the primary path into the datapaths of this circuit.
599
600
        If the primary_path attribute is valid and up, this method will try to
601
        deploy this primary_path.
602
        """
603
        # TODO: Remove flows from current (cookies)
604 1
        if self.is_using_primary_path():
605
            # TODO: Log to say that cannot move primary to primary
606
            return True
607
608 1
        if self.primary_path.status is EntityStatus.UP:
609 1
            return self.deploy_to_path(self.primary_path)
610
        return False
611
612 1
    def deploy(self):
613
        """Deploy EVC to best path.
614
615
        Best path can be the primary path, if available. If not, the backup
616
        path, and, if it is also not available, a dynamic path.
617
        """
618 1
        if self.archived:
619 1
            return False
620 1
        self.enable()
621 1
        success = self.deploy_to_primary_path()
622 1
        if not success:
623 1
            success = self.deploy_to_backup_path()
624
625 1
        if success:
626 1
            emit_event(self._controller, "deployed",
627
                       content=map_evc_event_content(self))
628 1
        return success
629
630 1
    @staticmethod
631 1
    def get_path_status(path):
632
        """Check for the current status of a path.
633
634
        If any link in this path is down, the path is considered down.
635
        """
636 1
        if not path:
637 1
            return EntityStatus.DISABLED
638
639 1
        for link in path:
640 1
            if link.status is not EntityStatus.UP:
641 1
                return link.status
642 1
        return EntityStatus.UP
643
644
    #    def discover_new_path(self):
645
    #        # TODO: discover a new path to satisfy this circuit and deploy
646
647 1
    def remove(self):
648
        """Remove EVC path and disable it."""
649 1
        self.remove_current_flows(sync=False)
650 1
        self.remove_failover_flows(sync=False)
651 1
        self.disable()
652 1
        self.sync()
653 1
        emit_event(self._controller, "undeployed",
654
                   content=map_evc_event_content(self))
655
656 1
    def remove_failover_flows(self, exclude_uni_switches=True,
657
                              force=True, sync=True) -> None:
658
        """Remove failover_flows.
659
660
        By default, it'll exclude UNI switches, if mef_eline has already
661
        called remove_current_flows before then this minimizes the number
662
        of FlowMods and IO.
663
        """
664 1
        if not self.failover_path:
665 1
            return
666 1
        switches, cookie, excluded = OrderedDict(), self.get_cookie(), set()
667 1
        links = set()
668 1
        if exclude_uni_switches:
669 1
            excluded.add(self.uni_a.interface.switch.id)
670 1
            excluded.add(self.uni_z.interface.switch.id)
671 1
        for link in self.failover_path:
672 1
            if link.endpoint_a.switch.id not in excluded:
673 1
                switches[link.endpoint_a.switch.id] = link.endpoint_a.switch
674 1
                links.add(link)
675 1
            if link.endpoint_b.switch.id not in excluded:
676 1
                switches[link.endpoint_b.switch.id] = link.endpoint_b.switch
677 1
                links.add(link)
678 1
        for switch in switches.values():
679 1
            try:
680 1
                self._send_flow_mods(
681
                    switch.id,
682
                    [
683
                        {
684
                            "cookie": cookie,
685
                            "cookie_mask": int(0xffffffffffffffff),
686
                        }
687
                    ],
688
                    "delete",
689
                    force=force,
690
                )
691
            except FlowModException as err:
692
                log.error(
693
                    f"Error removing flows from switch {switch.id} for"
694
                    f"EVC {self}: {err}"
695
                )
696 1
        try:
697 1
            self.failover_path.make_vlans_available(self._controller)
698
        except KytosTagError as err:
699
            log.error(f"Error when removing failover flows: {err}")
700 1
        self.failover_path = Path([])
701 1
        if sync:
702 1
            self.sync()
703
704 1
    def remove_current_flows(self, current_path=None, force=True,
705
                             sync=True):
706
        """Remove all flows from current path."""
707 1
        switches = set()
708
709 1
        switches.add(self.uni_a.interface.switch)
710 1
        switches.add(self.uni_z.interface.switch)
711 1
        if not current_path:
712 1
            current_path = self.current_path
713 1
        for link in current_path:
714 1
            switches.add(link.endpoint_a.switch)
715 1
            switches.add(link.endpoint_b.switch)
716
717 1
        match = {
718
            "cookie": self.get_cookie(),
719
            "cookie_mask": int(0xffffffffffffffff)
720
        }
721
722 1
        for switch in switches:
723 1
            try:
724 1
                self._send_flow_mods(switch.id, [match], 'delete', force=force)
725 1
            except FlowModException as err:
726 1
                log.error(
727
                    f"Error removing flows from switch {switch.id} for"
728
                    f"EVC {self}: {err}"
729
                )
730 1
        try:
731 1
            current_path.make_vlans_available(self._controller)
732
        except KytosTagError as err:
733
            log.error(f"Error when removing current path flows: {err}")
734 1
        self.current_path = Path([])
735 1
        self.deactivate()
736 1
        if sync:
737 1
            self.sync()
738
739 1
    def remove_path_flows(
740
        self, path=None, force=True
741
    ) -> dict[str, list[dict]]:
742
        """Remove all flows from path, and return the removed flows."""
743 1
        dpid_flows_match: dict[str, list[dict]] = {}
744
745 1
        if not path:
746 1
            return dpid_flows_match
747
748 1
        try:
749 1
            nni_flows = self._prepare_nni_flows(path)
750
        # pylint: disable=broad-except
751
        except Exception:
752
            err = traceback.format_exc().replace("\n", ", ")
753
            log.error(f"Fail to remove NNI failover flows for {self}: {err}")
754
            nni_flows = {}
755
756 1
        for dpid, flows in nni_flows.items():
757 1
            dpid_flows_match.setdefault(dpid, [])
758 1
            for flow in flows:
759 1
                dpid_flows_match[dpid].append({
760
                    "cookie": flow["cookie"],
761
                    "match": flow["match"],
762
                    "cookie_mask": int(0xffffffffffffffff)
763
                })
764
765 1
        try:
766 1
            uni_flows = self._prepare_uni_flows(path, skip_in=True)
767
        # pylint: disable=broad-except
768
        except Exception:
769
            err = traceback.format_exc().replace("\n", ", ")
770
            log.error(f"Fail to remove UNI failover flows for {self}: {err}")
771
            uni_flows = {}
772
773 1
        for dpid, flows in uni_flows.items():
774 1
            dpid_flows_match.setdefault(dpid, [])
775 1
            for flow in flows:
776 1
                dpid_flows_match[dpid].append({
777
                    "cookie": flow["cookie"],
778
                    "match": flow["match"],
779
                    "cookie_mask": int(0xffffffffffffffff)
780
                })
781
782 1
        for dpid, flows in dpid_flows_match.items():
783 1
            try:
784 1
                self._send_flow_mods(dpid, flows, 'delete', force=force)
785 1
            except FlowModException as err:
786 1
                log.error(
787
                    "Error removing failover flows: "
788
                    f"dpid={dpid} evc={self} error={err}"
789
                )
790 1
        try:
791 1
            path.make_vlans_available(self._controller)
792
        except KytosTagError as err:
793
            log.error(f"Error when removing path flows: {err}")
794
795 1
        return dpid_flows_match
796
797 1
    @staticmethod
798 1
    def links_zipped(path=None):
799
        """Return an iterator which yields pairs of links in order."""
800 1
        if not path:
801
            return []
802 1
        return zip(path[:-1], path[1:])
803
804 1
    def should_deploy(self, path=None):
805
        """Verify if the circuit should be deployed."""
806 1
        if not path:
807 1
            log.debug("Path is empty.")
808 1
            return False
809
810 1
        if not self.is_enabled():
811 1
            log.debug(f"{self} is disabled.")
812 1
            return False
813
814 1
        if not self.is_active():
815 1
            log.debug(f"{self} will be deployed.")
816 1
            return True
817
818 1
        return False
819
820 1
    def deploy_to_path(self, path=None):  # pylint: disable=too-many-branches
821
        """Install the flows for this circuit.
822
823
        Procedures to deploy:
824
825
        0. Remove current flows installed
826
        1. Decide if will deploy "path" or discover a new path
827
        2. Choose vlan
828
        3. Install NNI flows
829
        4. Install UNI flows
830
        5. Activate
831
        6. Update current_path
832
        7. Update links caches(primary, current, backup)
833
834
        """
835 1
        self.remove_current_flows()
836 1
        use_path = path
837 1
        if self.should_deploy(use_path):
838 1
            try:
839 1
                use_path.choose_vlans(self._controller)
840 1
            except KytosNoTagAvailableError:
841 1
                use_path = None
842
        else:
843 1
            for use_path in self.discover_new_paths():
844 1
                if use_path is None:
845
                    continue
846 1
                try:
847 1
                    use_path.choose_vlans(self._controller)
848 1
                    break
849 1
                except KytosNoTagAvailableError:
850 1
                    pass
851
            else:
852 1
                use_path = None
853
854 1
        try:
855 1
            if use_path:
856 1
                self._install_nni_flows(use_path)
857 1
                self._install_uni_flows(use_path)
858 1
            elif self.is_intra_switch():
859 1
                use_path = Path()
860 1
                self._install_direct_uni_flows()
861
            else:
862 1
                log.warning(
863
                    f"{self} was not deployed. No available path was found."
864
                )
865 1
                return False
866 1
        except FlowModException as err:
867 1
            log.error(
868
                f"Error deploying EVC {self} when calling flow_manager: {err}"
869
            )
870 1
            self.remove_current_flows(use_path)
871 1
            return False
872 1
        self.activate()
873 1
        self.current_path = use_path
874 1
        self.sync()
875 1
        log.info(f"{self} was deployed.")
876 1
        return True
877
878 1
    def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
879
        """Try setup failover_path whenever possible."""
880
        if self.failover_path:
881
            return
882
        if (now() - self.affected_by_link_at).seconds >= wait:
883
            with self.lock:
884
                self.setup_failover_path()
885
886 1
    def setup_failover_path(self):
887
        """Install flows for the failover path of this EVC.
888
889
        Procedures to deploy:
890
891
        0. Remove flows currently installed for failover_path (if any)
892
        1. Discover a disjoint path from current_path
893
        2. Choose vlans
894
        3. Install NNI flows
895
        4. Install UNI egress flows
896
        5. Update failover_path
897
        """
898
        # Intra-switch EVCs have no failover_path
899 1
        if self.is_intra_switch():
900 1
            return False
901
902
        # For not only setup failover path for totally dynamic EVCs
903 1
        if not self.is_eligible_for_failover_path():
904 1
            return False
905
906 1
        out_new_flows: dict[str, list[dict]] = {}
907 1
        reason = ""
908 1
        out_removed_flows = self.remove_path_flows(self.failover_path)
909 1
        self.failover_path = Path([])
910
911 1
        for use_path in self.get_failover_path_candidates():
912 1
            if not use_path:
913 1
                continue
914 1
            try:
915 1
                use_path.choose_vlans(self._controller)
916 1
                break
917 1
            except KytosNoTagAvailableError:
918 1
                pass
919
        else:
920 1
            use_path = Path([])
921 1
            reason = "No available path was found"
922
923 1
        try:
924 1
            if use_path:
925 1
                _nni_flows = self._install_nni_flows(use_path)
926 1
                out_new_flows = merge_flow_dicts(out_new_flows, _nni_flows)
927 1
                _uni_flows = self._install_uni_flows(use_path, skip_in=True)
928 1
                out_new_flows = merge_flow_dicts(out_new_flows, _uni_flows)
929 1
        except FlowModException as err:
930 1
            reason = "Error deploying failover path"
931 1
            log.error(
932
                f"{reason} for {self}. FlowManager error: {err}"
933
            )
934 1
            _rmed_flows = self.remove_path_flows(use_path)
935 1
            out_new_flows = merge_flow_dicts(out_new_flows, _rmed_flows)
936 1
            use_path = Path([])
937
938 1
        self.failover_path = use_path
939 1
        self.sync()
940
941 1
        if out_new_flows or out_removed_flows:
942 1
            content = {
943
                self.id: map_evc_event_content(
944
                    self,
945
                    flows=out_new_flows,
946
                    removed_flows=out_removed_flows,
947
                    error_reason=reason,
948
                    current_path=self.current_path.as_dict(),
949
                )
950
            }
951 1
            emit_event(self._controller, "failover_deployed", content=content)
952
953 1
        if not use_path:
954 1
            log.warning(
955
                f"Failover path for {self} was not deployed: {reason}"
956
            )
957 1
            return False
958 1
        log.info(f"Failover path for {self} was deployed.")
959 1
        return True
960
961 1
    def get_failover_flows(self):
962
        """Return the flows needed to make the failover path active, i.e. the
963
        flows for ingress forwarding.
964
965
        Return:
966
            dict: A dict of flows indexed by the switch_id will be returned, or
967
                an empty dict if no failover_path is available.
968
        """
969 1
        if not self.failover_path:
970 1
            return {}
971 1
        return self._prepare_uni_flows(self.failover_path, skip_out=True)
972
973
    # pylint: disable=too-many-branches
974 1
    def _prepare_direct_uni_flows(self):
975
        """Prepare flows connecting two UNIs for intra-switch EVC."""
976 1
        vlan_a = self._get_value_from_uni_tag(self.uni_a)
977 1
        vlan_z = self._get_value_from_uni_tag(self.uni_z)
978
979 1
        flow_mod_az = self._prepare_flow_mod(
980
            self.uni_a.interface, self.uni_z.interface,
981
            self.queue_id, vlan_a
982
        )
983 1
        flow_mod_za = self._prepare_flow_mod(
984
            self.uni_z.interface, self.uni_a.interface,
985
            self.queue_id, vlan_z
986
        )
987
988 1 View Code Duplication
        if not isinstance(vlan_z, list) and vlan_z not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
989 1
            flow_mod_az["actions"].insert(
990
                0, {"action_type": "set_vlan", "vlan_id": vlan_z}
991
            )
992 1
            if not vlan_a:
993 1
                flow_mod_az["actions"].insert(
994
                    0, {"action_type": "push_vlan", "tag_type": "c"}
995
                )
996 1
            if vlan_a == 0:
997 1
                flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
998 1
        elif vlan_a == 0 and vlan_z == "4096/4096":
999 1
            flow_mod_za["actions"].insert(0, {"action_type": "pop_vlan"})
1000
1001 1 View Code Duplication
        if not isinstance(vlan_a, list) and vlan_a not in self.special_cases:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1002 1
            flow_mod_za["actions"].insert(
1003
                    0, {"action_type": "set_vlan", "vlan_id": vlan_a}
1004
                )
1005 1
            if not vlan_z:
1006 1
                flow_mod_za["actions"].insert(
1007
                    0, {"action_type": "push_vlan", "tag_type": "c"}
1008
                )
1009 1
            if vlan_z == 0:
1010 1
                flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1011 1
        elif vlan_a == "4096/4096" and vlan_z == 0:
1012 1
            flow_mod_az["actions"].insert(0, {"action_type": "pop_vlan"})
1013
1014 1
        flows = []
1015 1
        if isinstance(vlan_a, list):
1016 1
            for mask_a in vlan_a:
1017 1
                flow_aux = deepcopy(flow_mod_az)
1018 1
                flow_aux["match"]["dl_vlan"] = mask_a
1019 1
                flows.append(flow_aux)
1020
        else:
1021 1
            if vlan_a is not None:
1022 1
                flow_mod_az["match"]["dl_vlan"] = vlan_a
1023 1
            flows.append(flow_mod_az)
1024
1025 1
        if isinstance(vlan_z, list):
1026 1
            for mask_z in vlan_z:
1027 1
                flow_aux = deepcopy(flow_mod_za)
1028 1
                flow_aux["match"]["dl_vlan"] = mask_z
1029 1
                flows.append(flow_aux)
1030
        else:
1031 1
            if vlan_z is not None:
1032 1
                flow_mod_za["match"]["dl_vlan"] = vlan_z
1033 1
            flows.append(flow_mod_za)
1034 1
        return (
1035
            self.uni_a.interface.switch.id, flows
1036
        )
1037
1038 1
    def _install_direct_uni_flows(self):
1039
        """Install flows connecting two UNIs.
1040
1041
        This case happens when the circuit is between UNIs in the
1042
        same switch.
1043
        """
1044 1
        (dpid, flows) = self._prepare_direct_uni_flows()
1045 1
        self._send_flow_mods(dpid, flows)
1046
1047 1
    def _prepare_nni_flows(self, path=None):
1048
        """Prepare NNI flows."""
1049 1
        nni_flows = OrderedDict()
1050 1
        previous = self.uni_a.interface.switch.dpid
1051 1
        for incoming, outcoming in self.links_zipped(path):
1052 1
            in_vlan = incoming.get_metadata("s_vlan").value
1053 1
            out_vlan = outcoming.get_metadata("s_vlan").value
1054 1
            in_endpoint = self.get_endpoint_by_id(incoming, previous, ne)
1055 1
            out_endpoint = self.get_endpoint_by_id(
1056
                outcoming, in_endpoint.switch.id, eq
1057
            )
1058
1059 1
            flows = []
1060
            # Flow for one direction
1061 1
            flows.append(
1062
                self._prepare_nni_flow(
1063
                    in_endpoint,
1064
                    out_endpoint,
1065
                    in_vlan,
1066
                    out_vlan,
1067
                    queue_id=self.queue_id,
1068
                )
1069
            )
1070
1071
            # Flow for the other direction
1072 1
            flows.append(
1073
                self._prepare_nni_flow(
1074
                    out_endpoint,
1075
                    in_endpoint,
1076
                    out_vlan,
1077
                    in_vlan,
1078
                    queue_id=self.queue_id,
1079
                )
1080
            )
1081 1
            previous = in_endpoint.switch.id
1082 1
            nni_flows[in_endpoint.switch.id] = flows
1083 1
        return nni_flows
1084
1085 1
    def _install_nni_flows(
1086
        self, path=None
1087
    ) -> dict[str, list[dict]]:
1088
        """Install NNI flows."""
1089 1
        nni_flows = self._prepare_nni_flows(path)
1090 1
        for dpid, flows in nni_flows.items():
1091 1
            self._send_flow_mods(dpid, flows)
1092 1
        return nni_flows
1093
1094 1
    @staticmethod
1095 1
    def _get_value_from_uni_tag(uni: UNI):
1096
        """Returns the value from tag. In case of any and untagged
1097
        it should return 4096/4096 and 0 respectively"""
1098 1
        special = {"any": "4096/4096", "untagged": 0}
1099 1
        if uni.user_tag:
1100 1
            value = uni.user_tag.value
1101 1
            if isinstance(value, list):
1102 1
                return uni.user_tag.mask_list
1103 1
            return special.get(value, value)
1104 1
        return None
1105
1106
    # pylint: disable=too-many-locals
1107 1
    def _prepare_uni_flows(self, path=None, skip_in=False, skip_out=False):
1108
        """Prepare flows to install UNIs."""
1109 1
        uni_flows = {}
1110 1
        if not path:
1111 1
            log.info("install uni flows without path.")
1112 1
            return uni_flows
1113
1114
        # Determine VLANs
1115 1
        in_vlan_a = self._get_value_from_uni_tag(self.uni_a)
1116 1
        out_vlan_a = path[0].get_metadata("s_vlan").value
1117
1118 1
        in_vlan_z = self._get_value_from_uni_tag(self.uni_z)
1119 1
        out_vlan_z = path[-1].get_metadata("s_vlan").value
1120
1121
        # Get endpoints from path
1122 1
        endpoint_a = self.get_endpoint_by_id(
1123
            path[0], self.uni_a.interface.switch.id, eq
1124
        )
1125 1
        endpoint_z = self.get_endpoint_by_id(
1126
            path[-1], self.uni_z.interface.switch.id, eq
1127
        )
1128
1129
        # Flows for the first UNI
1130 1
        flows_a = []
1131
1132
        # Flow for one direction, pushing the service tag
1133 1
        if not skip_in:
1134 1
            if isinstance(in_vlan_a, list):
1135 1
                for in_mask_a in in_vlan_a:
1136 1
                    push_flow = self._prepare_push_flow(
1137
                        self.uni_a.interface,
1138
                        endpoint_a,
1139
                        in_mask_a,
1140
                        out_vlan_a,
1141
                        in_vlan_z,
1142
                        queue_id=self.queue_id,
1143
                    )
1144 1
                    flows_a.append(push_flow)
1145
            else:
1146 1
                push_flow = self._prepare_push_flow(
1147
                    self.uni_a.interface,
1148
                    endpoint_a,
1149
                    in_vlan_a,
1150
                    out_vlan_a,
1151
                    in_vlan_z,
1152
                    queue_id=self.queue_id,
1153
                )
1154 1
                flows_a.append(push_flow)
1155
1156
        # Flow for the other direction, popping the service tag
1157 1
        if not skip_out:
1158 1
            pop_flow = self._prepare_pop_flow(
1159
                endpoint_a,
1160
                self.uni_a.interface,
1161
                out_vlan_a,
1162
                queue_id=self.queue_id,
1163
            )
1164 1
            flows_a.append(pop_flow)
1165
1166 1
        uni_flows[self.uni_a.interface.switch.id] = flows_a
1167
1168
        # Flows for the second UNI
1169 1
        flows_z = []
1170
1171
        # Flow for one direction, pushing the service tag
1172 1
        if not skip_in:
1173 1
            if isinstance(in_vlan_z, list):
1174 1
                for in_mask_z in in_vlan_z:
1175 1
                    push_flow = self._prepare_push_flow(
1176
                        self.uni_z.interface,
1177
                        endpoint_z,
1178
                        in_mask_z,
1179
                        out_vlan_z,
1180
                        in_vlan_a,
1181
                        queue_id=self.queue_id,
1182
                    )
1183 1
                    flows_z.append(push_flow)
1184
            else:
1185 1
                push_flow = self._prepare_push_flow(
1186
                    self.uni_z.interface,
1187
                    endpoint_z,
1188
                    in_vlan_z,
1189
                    out_vlan_z,
1190
                    in_vlan_a,
1191
                    queue_id=self.queue_id,
1192
                )
1193 1
                flows_z.append(push_flow)
1194
1195
        # Flow for the other direction, popping the service tag
1196 1
        if not skip_out:
1197 1
            pop_flow = self._prepare_pop_flow(
1198
                endpoint_z,
1199
                self.uni_z.interface,
1200
                out_vlan_z,
1201
                queue_id=self.queue_id,
1202
            )
1203 1
            flows_z.append(pop_flow)
1204
1205 1
        uni_flows[self.uni_z.interface.switch.id] = flows_z
1206
1207 1
        return uni_flows
1208
1209 1
    def _install_uni_flows(
1210
        self, path=None, skip_in=False, skip_out=False
1211
    ) -> dict[str, list[dict]]:
1212
        """Install UNI flows."""
1213 1
        uni_flows = self._prepare_uni_flows(path, skip_in, skip_out)
1214
1215 1
        for (dpid, flows) in uni_flows.items():
1216 1
            self._send_flow_mods(dpid, flows)
1217
1218 1
        return uni_flows
1219
1220 1
    @staticmethod
1221 1
    def _send_flow_mods(dpid, flow_mods, command='flows', force=False):
1222
        """Send a flow_mod list to a specific switch.
1223
1224
        Args:
1225
            dpid(str): The target of flows (i.e. Switch.id).
1226
            flow_mods(dict): Python dictionary with flow_mods.
1227
            command(str): By default is 'flows'. To remove a flow is 'remove'.
1228
            force(bool): True to send via consistency check in case of errors
1229
1230
        """
1231
1232 1
        endpoint = f"{settings.MANAGER_URL}/{command}/{dpid}"
1233
1234 1
        data = {"flows": flow_mods, "force": force}
1235 1
        response = requests.post(endpoint, json=data)
1236 1
        if response.status_code >= 400:
1237 1
            raise FlowModException(str(response.text))
1238
1239 1
    def get_cookie(self):
1240
        """Return the cookie integer from evc id."""
1241 1
        return int(self.id, 16) + (settings.COOKIE_PREFIX << 56)
1242
1243 1
    @staticmethod
1244 1
    def get_id_from_cookie(cookie):
1245
        """Return the evc id given a cookie value."""
1246 1
        evc_id = cookie - (settings.COOKIE_PREFIX << 56)
1247 1
        return f"{evc_id:x}".zfill(14)
1248
1249 1
    def set_flow_table_group_id(self, flow_mod: dict, vlan) -> dict:
1250
        """Set table_group and table_id"""
1251 1
        table_group = "epl" if vlan is None else "evpl"
1252 1
        flow_mod["table_group"] = table_group
1253 1
        flow_mod["table_id"] = self.table_group[table_group]
1254 1
        return flow_mod
1255
1256 1
    @staticmethod
1257 1
    def get_priority(vlan):
1258
        """Return priority value depending on vlan value"""
1259 1
        if isinstance(vlan, list):
1260 1
            return settings.EVPL_SB_PRIORITY
1261 1
        if vlan not in {None, "4096/4096", 0}:
1262 1
            return settings.EVPL_SB_PRIORITY
1263 1
        if vlan == 0:
1264 1
            return settings.UNTAGGED_SB_PRIORITY
1265 1
        if vlan == "4096/4096":
1266 1
            return settings.ANY_SB_PRIORITY
1267 1
        return settings.EPL_SB_PRIORITY
1268
1269 1
    def _prepare_flow_mod(self, in_interface, out_interface,
1270
                          queue_id=None, vlan=True):
1271
        """Prepare a common flow mod."""
1272 1
        default_actions = [
1273
            {"action_type": "output", "port": out_interface.port_number}
1274
        ]
1275 1
        queue_id = settings.QUEUE_ID if queue_id == -1 else queue_id
1276 1
        if queue_id is not None:
1277 1
            default_actions.append(
1278
                {"action_type": "set_queue", "queue_id": queue_id}
1279
            )
1280
1281 1
        flow_mod = {
1282
            "match": {"in_port": in_interface.port_number},
1283
            "cookie": self.get_cookie(),
1284
            "actions": default_actions,
1285
            "owner": "mef_eline",
1286
        }
1287
1288 1
        self.set_flow_table_group_id(flow_mod, vlan)
1289 1
        if self.sb_priority:
1290 1
            flow_mod["priority"] = self.sb_priority
1291
        else:
1292 1
            flow_mod["priority"] = self.get_priority(vlan)
1293 1
        return flow_mod
1294
1295 1
    def _prepare_nni_flow(self, *args, queue_id=None):
1296
        """Create NNI flows."""
1297 1
        in_interface, out_interface, in_vlan, out_vlan = args
1298 1
        flow_mod = self._prepare_flow_mod(
1299
            in_interface, out_interface, queue_id
1300
        )
1301 1
        flow_mod["match"]["dl_vlan"] = in_vlan
1302 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1303 1
        flow_mod["actions"].insert(0, new_action)
1304
1305 1
        return flow_mod
1306
1307 1
    def _prepare_push_flow(self, *args, queue_id=None):
1308
        """Prepare push flow.
1309
1310
        Arguments:
1311
            in_interface(str): Interface input.
1312
            out_interface(str): Interface output.
1313
            in_vlan(int,str,None): Vlan input.
1314
            out_vlan(str): Vlan output.
1315
            new_c_vlan(int,str,list,None): New client vlan.
1316
1317
        Return:
1318
            dict: An python dictionary representing a FlowMod
1319
1320
        """
1321
        # assign all arguments
1322 1
        in_interface, out_interface, in_vlan, out_vlan, new_c_vlan = args
1323 1
        vlan_pri = in_vlan if not isinstance(new_c_vlan, list) else new_c_vlan
1324 1
        flow_mod = self._prepare_flow_mod(
1325
            in_interface, out_interface, queue_id, vlan_pri
1326
        )
1327
        # the service tag must be always pushed
1328 1
        new_action = {"action_type": "set_vlan", "vlan_id": out_vlan}
1329 1
        flow_mod["actions"].insert(0, new_action)
1330
1331 1
        new_action = {"action_type": "push_vlan", "tag_type": "s"}
1332 1
        flow_mod["actions"].insert(0, new_action)
1333
1334 1
        if in_vlan is not None:
1335
            # if in_vlan is set, it must be included in the match
1336 1
            flow_mod["match"]["dl_vlan"] = in_vlan
1337
1338 1
        if (not isinstance(new_c_vlan, list) and in_vlan != new_c_vlan and
1339
                new_c_vlan not in self.special_cases):
1340
            # new_in_vlan is an integer but zero, action to set is required
1341 1
            new_action = {"action_type": "set_vlan", "vlan_id": new_c_vlan}
1342 1
            flow_mod["actions"].insert(0, new_action)
1343
1344 1
        if in_vlan not in self.special_cases and new_c_vlan == 0:
1345
            # # new_in_vlan is an integer but zero and new_c_vlan does not
1346
            # a pop action is required
1347 1
            new_action = {"action_type": "pop_vlan"}
1348 1
            flow_mod["actions"].insert(0, new_action)
1349
1350 1
        elif in_vlan == "4096/4096" and new_c_vlan == 0:
1351
            # if in_vlan match with any tags and new_c_vlan does not
1352
            # a pop action is required
1353 1
            new_action = {"action_type": "pop_vlan"}
1354 1
            flow_mod["actions"].insert(0, new_action)
1355
1356 1
        elif (not in_vlan and
1357
                (not isinstance(new_c_vlan, list) and
1358
                 new_c_vlan not in self.special_cases)):
1359
            # new_in_vlan is an integer but zero and in_vlan is not set
1360
            # then it is set now
1361 1
            new_action = {"action_type": "push_vlan", "tag_type": "c"}
1362 1
            flow_mod["actions"].insert(0, new_action)
1363
1364 1
        return flow_mod
1365
1366 1
    def _prepare_pop_flow(
1367
        self, in_interface, out_interface, out_vlan, queue_id=None
1368
    ):
1369
        # pylint: disable=too-many-arguments
1370
        """Prepare pop flow."""
1371 1
        flow_mod = self._prepare_flow_mod(
1372
            in_interface, out_interface, queue_id
1373
        )
1374 1
        flow_mod["match"]["dl_vlan"] = out_vlan
1375 1
        new_action = {"action_type": "pop_vlan"}
1376 1
        flow_mod["actions"].insert(0, new_action)
1377 1
        return flow_mod
1378
1379 1
    @staticmethod
1380 1
    def run_bulk_sdntraces(
1381
        uni_list: list[tuple[Interface, Union[str, int, None]]]
1382
    ) -> dict:
1383
        """Run SDN traces on control plane starting from EVC UNIs."""
1384 1
        endpoint = f"{settings.SDN_TRACE_CP_URL}/traces"
1385 1
        data = []
1386 1
        for interface, tag_value in uni_list:
1387 1
            data_uni = {
1388
                "trace": {
1389
                            "switch": {
1390
                                "dpid": interface.switch.dpid,
1391
                                "in_port": interface.port_number,
1392
                            }
1393
                        }
1394
                }
1395 1
            if tag_value:
1396 1
                uni_dl_vlan = map_dl_vlan(tag_value)
1397 1
                if uni_dl_vlan:
1398 1
                    data_uni["trace"]["eth"] = {
1399
                                            "dl_type": 0x8100,
1400
                                            "dl_vlan": uni_dl_vlan,
1401
                                            }
1402 1
            data.append(data_uni)
1403 1
        try:
1404 1
            response = requests.put(endpoint, json=data, timeout=30)
1405 1
        except Timeout as exception:
1406 1
            log.error(f"Request has timed out: {exception}")
1407 1
            return {"result": []}
1408 1
        if response.status_code >= 400:
1409 1
            log.error(f"Failed to run sdntrace-cp: {response.text}")
1410 1
            return {"result": []}
1411 1
        return response.json()
1412
1413
    # pylint: disable=too-many-return-statements, too-many-arguments
1414 1
    @staticmethod
1415 1
    def check_trace(
1416
        evc_id: str,
1417
        evc_name: str,
1418
        tag_a: Union[None, int, str],
1419
        tag_z: Union[None, int, str],
1420
        interface_a: Interface,
1421
        interface_z: Interface,
1422
        current_path: list,
1423
        trace_a: list,
1424
        trace_z: list
1425
    ) -> bool:
1426
        """Auxiliar function to check an individual trace"""
1427 1
        if (
1428
            len(trace_a) != len(current_path) + 1
1429
            or not compare_uni_out_trace(tag_z, interface_z, trace_a[-1])
1430
        ):
1431 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1432
                        f"Invalid trace from uni_a: {trace_a}")
1433 1
            return False
1434 1
        if (
1435
            len(trace_z) != len(current_path) + 1
1436
            or not compare_uni_out_trace(tag_a, interface_a, trace_z[-1])
1437
        ):
1438 1
            log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1439
                        f"Invalid trace from uni_z: {trace_z}")
1440 1
            return False
1441
1442 1
        if not current_path:
1443
            return True
1444
1445 1
        first_link, trace_path_begin, trace_path_end = current_path[0], [], []
1446 1
        if (
1447
            first_link.endpoint_a.switch.id == trace_a[0]["dpid"]
1448
        ):
1449 1
            trace_path_begin, trace_path_end = trace_a, trace_z
1450 1
        elif (
1451
            first_link.endpoint_a.switch.id == trace_z[0]["dpid"]
1452
        ):
1453 1
            trace_path_begin, trace_path_end = trace_z, trace_a
1454
        else:
1455
            msg = (
1456
                f"first link {first_link} endpoint_a didn't match the first "
1457
                f"step of trace_a {trace_a} or trace_z {trace_z}"
1458
            )
1459
            log.warning(msg)
1460
            return False
1461
1462 1
        for link, trace1, trace2 in zip(current_path,
1463
                                        trace_path_begin[1:],
1464
                                        trace_path_end[:0:-1]):
1465 1
            metadata_vlan = None
1466 1
            if link.metadata:
1467 1
                metadata_vlan = glom(link.metadata, 's_vlan.value')
1468 1
            if compare_endpoint_trace(
1469
                                        link.endpoint_a,
1470
                                        metadata_vlan,
1471
                                        trace2
1472
                                    ) is False:
1473 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1474
                            f"Invalid trace from uni_a: {trace_a}")
1475 1
                return False
1476 1
            if compare_endpoint_trace(
1477
                                        link.endpoint_b,
1478
                                        metadata_vlan,
1479
                                        trace1
1480
                                    ) is False:
1481 1
                log.warning(f"From EVC({evc_id}) named '{evc_name}'. "
1482
                            f"Invalid trace from uni_z: {trace_z}")
1483 1
                return False
1484
1485 1
        return True
1486
1487 1
    @staticmethod
1488 1
    def check_range(circuit, traces: list) -> bool:
1489
        """Check traces when for UNI with TAGRange"""
1490 1
        check = True
1491 1
        for i, mask in enumerate(circuit.uni_a.user_tag.mask_list):
1492 1
            trace_a = traces[i*2]
1493 1
            trace_z = traces[i*2+1]
1494 1
            check &= EVCDeploy.check_trace(
1495
                circuit.id, circuit.name,
1496
                mask, mask,
1497
                circuit.uni_a.interface,
1498
                circuit.uni_z.interface,
1499
                circuit.current_path,
1500
                trace_a, trace_z,
1501
            )
1502 1
        return check
1503
1504 1
    @staticmethod
1505 1
    def check_list_traces(list_circuits: list) -> dict:
1506
        """Check if current_path is deployed comparing with SDN traces."""
1507 1
        if not list_circuits:
1508 1
            return {}
1509 1
        uni_list = make_uni_list(list_circuits)
1510 1
        traces = EVCDeploy.run_bulk_sdntraces(uni_list)["result"]
1511
1512 1
        if not traces:
1513 1
            return {}
1514
1515 1
        try:
1516 1
            circuits_checked = {}
1517 1
            i = 0
1518 1
            for circuit in list_circuits:
1519 1
                if isinstance(circuit.uni_a.user_tag, TAGRange):
1520 1
                    length = len(circuit.uni_a.user_tag.mask_list)
1521 1
                    circuits_checked[circuit.id] = EVCDeploy.check_range(
1522
                        circuit, traces[i:i+length*2]
1523
                    )
1524 1
                    i += length*2
1525
                else:
1526 1
                    trace_a = traces[i]
1527 1
                    trace_z = traces[i+1]
1528 1
                    tag_a = None
1529 1
                    if circuit.uni_a.user_tag:
1530 1
                        tag_a = circuit.uni_a.user_tag.value
1531 1
                    tag_z = None
1532 1
                    if circuit.uni_z.user_tag:
1533 1
                        tag_z = circuit.uni_z.user_tag.value
1534 1
                    circuits_checked[circuit.id] = EVCDeploy.check_trace(
1535
                        circuit.id, circuit.name,
1536
                        tag_a, tag_z,
1537
                        circuit.uni_a.interface,
1538
                        circuit.uni_z.interface,
1539
                        circuit.current_path,
1540
                        trace_a, trace_z
1541
                    )
1542 1
                    i += 2
1543 1
        except IndexError as err:
1544 1
            log.error(
1545
                f"Bulk sdntraces returned fewer items than expected."
1546
                f"Error = {err}"
1547
            )
1548 1
            return {}
1549
1550 1
        return circuits_checked
1551
1552 1
    @staticmethod
1553 1
    def get_endpoint_by_id(
1554
        link: Link,
1555
        id_: str,
1556
        operator: Union[eq, ne]
1557
    ) -> Interface:
1558
        """Return endpoint from link
1559
        either equal(eq) or not equal(ne) to id"""
1560 1
        if operator(link.endpoint_a.switch.id, id_):
1561 1
            return link.endpoint_a
1562 1
        return link.endpoint_b
1563
1564
1565 1
class LinkProtection(EVCDeploy):
1566
    """Class to handle link protection."""
1567
1568 1
    def is_affected_by_link(self, link=None):
1569
        """Verify if the current path is affected by link down event."""
1570
        return self.current_path.is_affected_by_link(link)
1571
1572 1
    def is_using_primary_path(self):
1573
        """Verify if the current deployed path is self.primary_path."""
1574 1
        return self.current_path == self.primary_path
1575
1576 1
    def is_using_backup_path(self):
1577
        """Verify if the current deployed path is self.backup_path."""
1578 1
        return self.current_path == self.backup_path
1579
1580 1
    def is_using_dynamic_path(self):
1581
        """Verify if the current deployed path is dynamic."""
1582 1
        if (
1583
            self.current_path
1584
            and not self.is_using_primary_path()
1585
            and not self.is_using_backup_path()
1586
            and self.current_path.status is EntityStatus.UP
1587
        ):
1588
            return True
1589 1
        return False
1590
1591 1
    def deploy_to(self, path_name=None, path=None):
1592
        """Create a deploy to path."""
1593 1
        if self.current_path == path:
1594 1
            log.debug(f"{path_name} is equal to current_path.")
1595 1
            return True
1596
1597 1
        if path.status is EntityStatus.UP:
1598 1
            return self.deploy_to_path(path)
1599
1600 1
        return False
1601
1602 1
    def handle_link_up(self, link):
1603
        """Handle circuit when link up.
1604
1605
        Args:
1606
            link(Link): Link affected by link.up event.
1607
1608
        """
1609 1
        condition_pairs = [
1610
            (
1611
                lambda me: me.is_using_primary_path(),
1612
                lambda _: (True, 'nothing')
1613
            ),
1614
            (
1615
                lambda me: me.is_intra_switch(),
1616
                lambda _: (True, 'nothing')
1617
            ),
1618
            (
1619
                lambda me: me.primary_path.is_affected_by_link(link),
1620
                lambda me: (me.deploy_to_primary_path(), 'redeploy')
1621
            ),
1622
            # We tried to deploy(primary_path) without success.
1623
            # And in this case is up by some how. Nothing to do.
1624
            (
1625
                lambda me: me.is_using_backup_path(),
1626
                lambda _: (True, 'nothing')
1627
            ),
1628
            (
1629
                lambda me:  me.is_using_dynamic_path(),
1630
                lambda _: (True, 'nothing')
1631
            ),
1632
            # In this case, probably the circuit is not being used and
1633
            # we can move to backup
1634
            (
1635
                lambda me: me.backup_path.is_affected_by_link(link),
1636
                lambda me: (me.deploy_to_backup_path(), 'redeploy')
1637
            ),
1638
            # In this case, the circuit is not being used and we should
1639
            # try a dynamic path
1640
            (
1641
                lambda me: me.dynamic_backup_path and not me.is_active(),
1642
                lambda me: (me.deploy_to_path(), 'redeploy')
1643
            )
1644
        ]
1645 1
        for predicate, action in condition_pairs:
1646 1
            if not predicate(self):
1647 1
                continue
1648 1
            success, succcess_type = action(self)
1649 1
            if success:
1650 1
                if succcess_type == 'redeploy':
1651 1
                    emit_event(
1652
                        self._controller,
1653
                        "redeployed_link_up",
1654
                        content=map_evc_event_content(self)
1655
                    )
1656 1
                return True
1657 1
        return False
1658
1659 1
    def handle_link_down(self):
1660
        """Handle circuit when link down.
1661
1662
        Returns:
1663
            bool: True if the re-deploy was successly otherwise False.
1664
1665
        """
1666 1
        success = False
1667 1
        if self.is_using_primary_path():
1668 1
            success = self.deploy_to_backup_path()
1669 1
        elif self.is_using_backup_path():
1670 1
            success = self.deploy_to_primary_path()
1671
1672 1
        if not success and self.dynamic_backup_path:
1673 1
            success = self.deploy_to_path()
1674
1675 1
        if success:
1676 1
            log.debug(f"{self} deployed after link down.")
1677
        else:
1678 1
            self.remove_current_flows(sync=False)
1679 1
            self.deactivate()
1680 1
            self.sync()
1681 1
            log.debug(f"Failed to re-deploy {self} after link down.")
1682
1683 1
        return success
1684
1685 1
    @staticmethod
1686 1
    def get_interface_from_switch(uni: UNI, switches: dict) -> Interface:
1687
        """Get interface from switch by uni"""
1688 1
        switch = switches[uni.interface.switch.dpid]
1689 1
        interface = switch.interfaces[uni.interface.port_number]
1690 1
        return interface
1691
1692 1
    def are_unis_active(self, switches: dict) -> bool:
1693
        """Determine whether this EVC should be active"""
1694 1
        interface_a = self.get_interface_from_switch(self.uni_a, switches)
1695 1
        interface_z = self.get_interface_from_switch(self.uni_z, switches)
1696 1
        active, _ = self.is_uni_interface_active(interface_a, interface_z)
1697 1
        return active
1698
1699 1
    @staticmethod
1700 1
    def is_uni_interface_active(
1701
        *interfaces: Interface
1702
    ) -> tuple[bool, dict]:
1703
        """Determine whether a UNI should be active"""
1704 1
        active = True
1705 1
        bad_interfaces = [
1706
            interface
1707
            for interface in interfaces
1708
            if interface.status != EntityStatus.UP
1709
        ]
1710 1
        if bad_interfaces:
1711 1
            active = False
1712 1
            interfaces = bad_interfaces
1713 1
        return active, {
1714
            interface.id: {
1715
                'status': interface.status.value,
1716
                'status_reason': interface.status_reason,
1717
            }
1718
            for interface in interfaces
1719
        }
1720
1721 1
    def handle_interface_link_up(self, interface: Interface):
1722
        """
1723
        Handler for interface link_up events
1724
        """
1725 1
        if self.is_active():
1726 1
            return
1727 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1728 1
        if interface not in interfaces:
1729
            return
1730 1
        down_interfaces = [
1731
            interface
1732
            for interface in interfaces
1733
            if interface.status != EntityStatus.UP
1734
        ]
1735 1
        if down_interfaces:
1736
            return
1737 1
        interface_dicts = {
1738
            interface.id: {
1739
                'status': interface.status.value,
1740
                'status_reason': interface.status_reason,
1741
            }
1742
            for interface in interfaces
1743
        }
1744 1
        self.activate()
1745 1
        log.info(
1746
            f"Activating EVC {self.id}. Interfaces: "
1747
            f"{interface_dicts}."
1748
        )
1749 1
        emit_event(self._controller, "uni_active_updated",
1750
                   content=map_evc_event_content(self))
1751 1
        self.sync()
1752
1753 1
    def handle_interface_link_down(self, interface):
1754
        """
1755
        Handler for interface link_down events
1756
        """
1757 1
        if not self.is_active():
1758 1
            return
1759 1
        interfaces = (self.uni_a.interface, self.uni_z.interface)
1760 1
        if interface not in interfaces:
1761
            return
1762 1
        down_interfaces = [
1763
            interface
1764
            for interface in interfaces
1765
            if interface.status != EntityStatus.UP
1766
        ]
1767 1
        if not down_interfaces:
1768
            return
1769 1
        interface_dicts = {
1770
            interface.id: {
1771
                'status': interface.status.value,
1772
                'status_reason': interface.status_reason,
1773
            }
1774
            for interface in down_interfaces
1775
        }
1776 1
        self.deactivate()
1777 1
        log.info(
1778
            f"Deactivating EVC {self.id}. Interfaces: "
1779
            f"{interface_dicts}."
1780
        )
1781 1
        emit_event(self._controller, "uni_active_updated",
1782
                   content=map_evc_event_content(self))
1783 1
        self.sync()
1784
1785
1786 1
class EVC(LinkProtection):
1787
    """Class that represents a E-Line Virtual Connection."""
1788