Completed
Push — master ( ae553d...3df76a )
by Thomas
25:18 queued 11:15
created

LinkState.registered()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
Copyright (c) 2016 Evelio Vila <[email protected]>
4
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
5
License: 3-clause BSD. (See the COPYRIGHT file)
6
"""
7
8
import json
9
import binascii
10
import itertools
11
from struct import unpack
12
13
from exabgp.bgp.message.notification import Notify
14
from exabgp.bgp.message.update.attribute.attribute import Attribute
15
16
17
@Attribute.register()
18
class LinkState(Attribute):
19
    ID = Attribute.CODE.BGP_LS
20
    FLAG = Attribute.Flag.OPTIONAL
21
    TLV = -1
22
23
    # Registered subclasses we know how to decode
24
    registered_lsids = dict()
25
26
    # what this implementation knows as LS attributes
27
    node_lsids = []
28
    link_lsids = []
29
    prefix_lsids = []
30
31
    def __init__(self, ls_attrs):
32
        self.ls_attrs = ls_attrs
33
34
    @classmethod
35
    def register(cls, lsid=None, flag=None):
36
        def register_lsid(klass):
37
            scode = klass.TLV if lsid is None else lsid
38
            if scode in cls.registered_lsids:
39
                raise RuntimeError('only one class can be registered per BGP link state attribute type')
40
            cls.registered_lsids[scode] = klass
41
            return klass
42
43
        return register_lsid
44
45
    @classmethod
46
    def registered(cls, lsid, flag=None):
47
        return lsid in cls.registered_lsids
48
49
    @classmethod
50
    def unpack(cls, data, negotiated):
51
        ls_attrs = []
52
        while data:
53
            scode, length = unpack('!HH', data[:4])
54
            if scode in cls.registered_lsids:
55
                klass = cls.registered_lsids[scode].unpack(data[4 : length + 4], length)
56
            else:
57
                klass = GenericLSID(scode, data[4 : length + 4])
58
            klass.TLV = scode
59
            ls_attrs.append(klass)
60
            data = data[length + 4 :]
61
        for klass in ls_attrs:
62
            if hasattr(klass, 'terids'):
63
                klass.reset()
64
65
        return cls(ls_attrs=ls_attrs)
66
67
    def json(self, compact=None):
68
        content = ', '.join(d.json() for d in self.ls_attrs)
69
        return '{ %s }' % (content)
70
71
    def __str__(self):
72
        return ', '.join(str(d) for d in self.ls_attrs)
73
74
75 View Code Duplication
class GenericLSID(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
76
    TLV = 99999
77
78
    def __init__(self, code, rep):
79
        self.rep = rep
80
        self.code = code
81
82
    def __repr__(self):
83
        return "Attribute with code [ %s ] not implemented" % (self.code)
84
85
    @classmethod
86
    def unpack(cls, scode, data):
87
        length = len(data)
88
        info = binascii.b2a_uu(data[:length])
89
        return cls(code=scode, rep=info)
90
91
    def json(self, compact=None):
92
        return '"attribute-not-implemented": "%s"' % (self.code)
93
94
95
class LsGenericFlags(object):
96
    JSON = 'json-name-unset'
97
    REPR = 'repr name unset'
98
    LEN = None
99
100
    def __init__(self, flags):
101
        self.flags = flags
102
103
    @classmethod
104
    def unpack_flags(cls, data):
105
        pad = cls.FLAGS.count('RSV')
106
        repeat = len(cls.FLAGS) - pad
107
        hex_rep = int(binascii.b2a_hex(data), 16)
108
        bits = f'{hex_rep:08b}'
109
        valid_flags = [
110
            ''.join(item) + '0' * pad
111
            for item in itertools.product('01', repeat=repeat)
112
        ]
113
        valid_flags.append('0000')
114
        if bits in valid_flags:
115
            flags = dict(zip(cls.FLAGS, [0, ] * len(cls.FLAGS)))
116
            flags.update(dict((k, int(v)) for k, v in zip(cls.FLAGS, bits)))
117
        else:
118
            raise Notify(3, 5, "Invalid SR flags mask")
119
        return flags
120
121
122
    def json(self, compact=None):
123
        return '"{}": {}'.format(self.JSON, json.dumps(self.flags))
124
125
    def __repr__(self):
126
        return "%s: %s" % (self.REPR, self.flags)
127
128
    @classmethod
129
    def unpack(cls, data, length):
130
        if cls.LEN is not None and length != cls.LEN:
131
            raise Notify(3, 5, f'wrong size for {cls.REPR}')
132
133
        # We only support IS-IS for now.
134
        return cls(cls.unpack_flags(data[0:1]))
135