exabgp.protocol.ip.IP.top()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 3
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
ip/__init__.py
4
5
Created by Thomas Mangin on 2010-01-15.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
import socket
11
12
from exabgp.protocol.family import AFI
13
from exabgp.protocol.family import SAFI
14
15
from exabgp.protocol.ip.netmask import NetMask
16
17
# XXX: The IP,Range and CIDR class API are totally broken, fix it.
18
# XXX: many of the NLRI classes constructor also need correct @classmethods
19
20
21
# =========================================================================== IP
22
#
23
24
25
class IPSelf(object):
26
    def __init__(self, afi):
27
        self.afi = afi
28
29
    def __repr__(self):
30
        return 'self'
31
32
    def top(self, negotiated, afi=AFI.undefined):
33
        return negotiated.nexthopself(afi).top()
34
35
    def ton(self, negotiated, afi=AFI.undefined):
36
        return negotiated.nexthopself(afi).ton()
37
38
    def pack(self, negotiated):
39
        return negotiated.nexthopself(self.afi).ton()
40
41
    def index(self):
42
        return 'self-' + AFI.names[self.afi]
43
44
45
class IP(object):
46
    afi = None  # here for the API, changed in init which does not change this
47
    _known = dict()
48
49
    _UNICAST = SAFI.unicast
50
    _MULTICAST = SAFI.multicast
51
52
    _multicast_range = set(range(224, 240))  # 239
53
54
    # deprecate the string API in favor of top()
55
56
    def __init__(self):
57
        raise RuntimeError("You should use IP.create() to use IP")
58
59
    def init(self, string, packed=None):
60
        # XXX: the str should not be needed
61
        self._string = string
62
        self._packed = IP.pton(string) if packed is None else packed
63
        self.afi = IP.toafi(string)
64
        return self
65
66
    def __iter__(self):
67
        for letter in self._string:
68
            yield letter
69
70
    @staticmethod
71
    def pton(ip):
72
        return socket.inet_pton(IP.toaf(ip), ip)
73
74
    @staticmethod
75
    def ntop(data):
76
        return socket.inet_ntop(socket.AF_INET if len(data) == 4 else socket.AF_INET6, data)
77
78
    def top(self, negotiated=None, afi=AFI.undefined):
79
        return self._string
80
81
    @staticmethod
82
    def toaf(ip):
83
        # the orders matters as ::FFFF:<ipv4> is an IPv6 address
84
        if ':' in ip:
85
            return socket.AF_INET6
86
        if '.' in ip:
87
            return socket.AF_INET
88
        raise ValueError('unrecognised ip address %s' % ip)
89
90
    @staticmethod
91
    def toafi(ip):
92
        # the orders matters as ::FFFF:<ipv4> is an IPv6 address
93
        if ':' in ip:
94
            return AFI.ipv6
95
        if '.' in ip:
96
            return AFI.ipv4
97
        raise ValueError('unrecognised ip address %s' % ip)
98
99
    @staticmethod
100
    def tosafi(ip):
101
        if ':' in ip:
102
            # XXX: FIXME: I assume that ::FFFF:<ip> must be treated unicast
103
            # if int(ip.split(':')[-1].split('.')[0]) in IP._multicast_range:
104
            return SAFI.unicast
105
        elif '.' in ip:
106
            if int(ip.split('.')[0]) in IP._multicast_range:
107
                return SAFI.multicast
108
            return SAFI.unicast
109
        raise ValueError('unrecognised ip address %s' % ip)
110
111
    def ipv4(self):
112
        return True if len(self._packed) == 4 else False
113
114
    def ipv6(self):
115
        return False if len(self._packed) == 4 else True
116
117
    def address(self):
118
        value = 0
119
        for char in self._packed:
120
            value <<= 8
121
            value += char
122
        return value
123
124
    @staticmethod
125
    def length(afi):
126
        return 4 if afi == AFI.ipv4 else 16
127
128
    def index(self):
129
        return self._packed
130
131
    def pack(self):
132
        return self._packed
133
134
    def ton(self, negotiated=None, afi=AFI.undefined):
135
        return self._packed
136
137
    def __repr__(self):
138
        return self._string
139
140
    def __eq__(self, other):
141
        if not isinstance(other, IP):
142
            return False
143
        return self._packed == other._packed
144
145
    def __ne__(self, other):
146
        return not self.__eq__(other)
147
148
    def __lt__(self, other):
149
        return self._packed < other._packed
150
151
    def __le__(self, other):
152
        return self._packed <= other._packed
153
154
    def __gt__(self, other):
155
        return self._packed > other._packed
156
157
    def __ge__(self, other):
158
        return self._packed >= other._packed
159
160
    def __hash__(self):
161
        return hash((self.__class__.__name__, self._packed))
162
163
    @classmethod
164
    def klass(cls, ip):
165
        # the orders matters as ::FFFF:<ipv4> is an IPv6 address
166
        if ':' in ip:
167
            afi = IPv6.afi
168
        elif '.' in ip:
169
            afi = IPv4.afi
170
        else:
171
            raise ValueError('can not decode this ip address : %s' % ip)
172
        if afi in cls._known:
173
            return cls._known[afi]
174
175
    @classmethod
176
    def create(cls, string, packed=None, klass=None):
177
        if klass:
178
            return klass(string, packed)
179
        return cls.klass(string)(string, packed)
180
181
    @classmethod
182
    def register(cls):
183
        cls._known[cls.afi] = cls
184
185
    @classmethod
186
    def unpack(cls, data, klass=None):
187
        return cls.create(IP.ntop(data), data, klass)
188
189
190
# ======================================================================== Range
191
#
192
193
194
class IPRange(IP):
195
    def __init__(self, ip, mask):
196
        IP.init(self, ip)
197
        self.mask = NetMask.create(mask, IP.toafi(ip))
198
199
    @classmethod
200
    def create(klass, ip, mask):
201
        return klass(ip, mask)
202
203
    def __repr__(self):
204
        if (self.ipv4() and self.mask == 32) or (self.ipv6() and self.mask == 128):
205
            return super(IPRange, self).__repr__()
206
        else:
207
            return '%s/%d' % (self.top(), int(self.mask))
208
209
210
# ==================================================================== NoNextHop
211
#
212
213
214
class _NoNextHop(object):
215
    packed = ''
216
217
    def pack(self, data, negotiated=None):
218
        return ''
219
220
    def index(self):
221
        return ''
222
223
    def ton(self, negotiated=None, afi=AFI.undefined):
224
        return ''
225
226
    def __str__(self):
227
        return 'no-nexthop'
228
229
    def __deepcopy__(self, _):
230
        return self
231
232
    def __copy__(self, _):
233
        return self
234
235
236
NoNextHop = _NoNextHop()
237
238
239
# ========================================================================= IPv4
240
#
241
242
243
class IPv4(IP):
244
    # lower case to match the class Address API
245
    afi = AFI.ipv4
246
247
    def __init__(self, string, packed=None):
248
        self.init(string, packed if packed else IP.pton(string))
249
250
    def __len__(self):
251
        return 4
252
253
    def unicast(self):
254
        return not self.multicast()
255
256
    def multicast(self):
257
        return self._packed[0] in set(range(224, 240))  # 239 is last
258
259
    def ipv4(self):
260
        return True
261
262
    def ipv6(self):
263
        return False
264
265
    @staticmethod
266
    def pton(ip):
267
        return socket.inet_pton(socket.AF_INET, ip)
268
269
    @staticmethod
270
    def ntop(data):
271
        return socket.inet_ntop(socket.AF_INET, data)
272
273
    # klass is a trick for subclasses of IP/IPv4 such as NextHop / OriginatorID
274
    @classmethod
275
    def unpack(cls, data, klass=None):
276
        ip = socket.inet_ntop(socket.AF_INET, data)
277
        if klass:
278
            return klass(ip, data)
279
        return cls(ip, data)
280
281
282
IPv4.register()
283
284
285
# ========================================================================= IPv6
286
#
287
288
289
class IPv6(IP):
290
    # lower case to match the class Address API
291
    afi = AFI.ipv6
292
293
    def __init__(self, string, packed=None):
294
        self.init(string, packed if packed else socket.inet_pton(socket.AF_INET6, string))
295
296
    def __len__(self):
297
        return 16
298
299
    def ipv4(self):
300
        return False
301
302
    def ipv6(self):
303
        return True
304
305
    def unicast(self):
306
        return True
307
308
    def multicast(self):
309
        return False
310
311
    @staticmethod
312
    def pton(ip):
313
        return socket.inet_pton(socket.AF_INET6, ip)
314
315
    @staticmethod
316
    def ntop(data):
317
        return socket.inet_ntop(socket.AF_INET6, data)
318
319
    @classmethod
320
    def unpack(cls, data, klass=None):
321
        ip6 = socket.inet_ntop(socket.AF_INET6, data)
322
        if klass:
323
            return klass(ip6)
324
        return cls(ip6)
325
326
327
IPv6.register()
328