NodeDescriptor.unpack()   F
last analyzed

Complexity

Conditions 20

Size

Total Lines 50
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 20
eloc 34
nop 3
dl 0
loc 50
rs 0
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like exabgp.bgp.message.update.nlri.bgpls.tlvs.node.NodeDescriptor.unpack() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# encoding: utf-8
2
"""
3
node.py
4
5
Created by Evelio Vila on 2016-11-26. [email protected]
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
from struct import unpack
11
12
from exabgp.protocol.ip import IP
13
from exabgp.protocol.iso import ISO
14
from exabgp.bgp.message.notification import Notify
15
16
#           +--------------------+-------------------+----------+
17
#           | Sub-TLV Code Point | Description       |   Length |
18
#           +--------------------+-------------------+----------+
19
#           |        512         | Autonomous System |        4 |
20
#           |        513         | BGP-LS Identifier |        4 |
21
#           |        514         | OSPF Area-ID      |        4 |
22
#           |        515         | IGP Router-ID     | Variable |
23
#           +--------------------+-------------------+----------+
24
#            https://tools.ietf.org/html/rfc7752#section-3.2.1.4
25
# ================================================================== NODE-DESC-SUB-TLVs
26
27
NODE_TLVS = {
28
    512: 'autonomous-system',
29
    513: 'bgp-ls-id',
30
    514: 'ospf-area-id',
31
    515: 'igp-rid',
32
}
33
34
35
# TODO
36
# 3.2.1.5.  Multi-Topology ID
37
38
39
class NodeDescriptor(object):
40
    def __init__(self, node_id, dtype, psn=None, dr_id=None, packed=None):
41
        self.node_id = node_id
42
        self.dtype = dtype
43
        self.psn = psn
44
        self.dr_id = dr_id
45
        self._packed = packed
46
47
    @classmethod
48
    def unpack(cls, data, igp):
49
        dtype, dlength = unpack('!HH', data[0:4])
50
        if dtype not in NODE_TLVS.keys():
51
            raise Exception("Unknown Node Descriptor Sub-TLV")
52
        # OSPF Area-ID
53
        if dtype == 514:
54
            return (
55
                cls(node_id=IP.unpack(data[4 : 4 + dlength]), dtype=dtype, packed=data[: 4 + dlength]),
56
                data[4 + dlength :],
57
            )
58
        # IGP Router-ID: The TLV size in combination with the protocol
59
        # identifier enables the decoder to determine the type
60
        # of the node: sec 3.2.1.4.
61
        elif dtype == 515:
62
            # OSPFv{2,3} non-pseudonode
63
            if (igp == 3 or igp == 6) and dlength == 4:
64
                r_id = IP.unpack(data[4 : 4 + 4])
65
                return cls(node_id=r_id, dtype=dtype, packed=data[: 4 + dlength]), data[4 + 4 :]
66
            # OSPFv{2,3} LAN pseudonode
67
            if (igp == 3 or igp == 6) and dlength == 8:
68
                r_id = IP.unpack(data[4 : 4 + 4])
69
                dr_id = IP.unpack(data[8 : 4 + 8])
70
                return cls(node_id=r_id, dtype=dtype, psn=None, dr_id=dr_id, packed=data[: 4 + dlength]), data[4 + 8 :]
71
            # IS-IS non-pseudonode
72
            if (igp == 1 or igp == 2) and dlength == 6:
73
                return (
74
                    cls(node_id=ISO.unpack_sysid(data[4 : 4 + 6]), dtype=dtype, packed=data[: 4 + dlength]),
75
                    data[4 + 6 :],
76
                )
77
            # IS-IS LAN pseudonode = ISO Node-ID + PSN
78
            # Unpack ISO address
79
            if (igp == 1 or igp == 2) and dlength == 7:
80
                iso_node = ISO.unpack_sysid(data[4 : 4 + 6])
81
                psn = unpack('!B', data[4 + 6 : 4 + 7])[0]
82
                return cls(node_id=iso_node, dtype=dtype, psn=psn, packed=data[: 4 + dlength]), data[4 + 7 :]
83
        elif dtype == 512 and dlength == 4:
84
            # ASN
85
            return (
86
                cls(node_id=unpack('!L', data[4 : 4 + dlength])[0], dtype=dtype, packed=data[: 4 + dlength]),
87
                data[4 + 4 :],
88
            )
89
        elif dtype == 513 and dlength == 4:
90
            # BGP-LS
91
            return (
92
                cls(node_id=unpack('!L', data[4 : 4 + dlength])[0], dtype=dtype, packed=data[: 4 + dlength]),
93
                data[4 + 4 :],
94
            )
95
        else:
96
            raise Notify(3, 5, 'could not decode Local Node descriptor')
97
98
    def json(self, compact=None):
99
        ospf = None
100
        designated = None
101
        psn = None
102
        router_id = None
103
        asn = None
104
        bgpls_id = None
105
        if self.dtype == 514:
106
            ospf = '"ospf-area-id": "%s"' % self.node_id
107
        if self.dr_id is not None:
108
            designated = '"designated-router-id": "%s"' % self.dr_id
109
        if self.psn is not None:
110
            psn = '"psn": "%s"' % self.psn
111
        if self.dtype == 515:
112
            router_id = '"router-id": "%s"' % self.node_id
113
        if self.dtype == 512:
114
            asn = '"autonomous-system": %d' % self.node_id
115
        if self.dtype == 513:
116
            bgpls_id = '"bgp-ls-identifier": "%d"' % self.node_id
117
        content = ', '.join(d for d in [ospf, designated, psn, router_id, asn, bgpls_id] if d)
118
        return content
119
120
    def __eq__(self, other):
121
        return isinstance(other, NodeDescriptor) and self.node_id == other.node_id
122
123
    def __neq__(self, other):
124
        return self.node_id != other.node_id
125
126
    def __lt__(self, other):
127
        raise RuntimeError('Not implemented')
128
129
    def __le__(self, other):
130
        raise RuntimeError('Not implemented')
131
132
    def __gt__(self, other):
133
        raise RuntimeError('Not implemented')
134
135
    def __ge__(self, other):
136
        raise RuntimeError('Not implemented')
137
138
    def __str__(self):
139
        return self.json()
140
141
    def __repr__(self):
142
        return self.__str__()
143
144
    def __len__(self):
145
        return len(self._packed)
146
147
    def __hash__(self):
148
        return hash(str(self))
149
150
    def pack(self):
151
        return self._packed
152