exabgp.data.check.integer()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# encoding: utf-8
2
"""
3
check.py
4
5
Created by Thomas Mangin on 2013-03-18.
6
Copyright (c) 2009-2017 Exa Networks. All rights reserved.
7
License: 3-clause BSD. (See the COPYRIGHT file)
8
"""
9
10
11
class TYPE(object):
12
    NULL = 0x01
13
    BOOLEAN = 0x02
14
    INTEGER = 0x04
15
    STRING = 0x08
16
    ARRAY = 0x10
17
    HASH = 0x20
18
19
20
class PRESENCE(object):
21
    OPTIONAL = 0x01
22
    MANDATORY = 0x02
23
24
25
# TYPE CHECK
26
27
28
def null(data):
29
    return type(data) == type(None)  # noqa
30
31
32
def boolean(data):
33
    return type(data) == type(True)  # noqa
34
35
36
def integer(data):
37
    return type(data) == type(0)  # noqa
38
39
40
def string(data):
41
    return type(data) == type(u'') or type(data) == type('')  # noqa
42
43
44
def array(data):
45
    return type(data) == type([])  # noqa
46
47
48
def hashtable(data):
49
    return type(data) == type({})  # noqa
50
51
52
# XXX: Not very good to redefine the keyword object, but this class uses no OO ...
53
54
CHECK_TYPE = {
55
    TYPE.NULL: null,
56
    TYPE.BOOLEAN: boolean,
57
    TYPE.INTEGER: integer,
58
    TYPE.STRING: string,
59
    TYPE.ARRAY: array,
60
    TYPE.HASH: hashtable,
61
}
62
63
64
def kind(kind, data):
65
    for t in CHECK_TYPE:
66
        if kind & t:
67
            if CHECK_TYPE[t](data):
68
                return True
69
    return False
70
71
72
# DATA CHECK
73
74
75
def nop(data):
76
    return True
77
78
79
def uint8(data):
80
    return 0 <= data < pow(2, 8)
81
82
83
def uint16(data):
84
    return 0 <= data < pow(2, 16)
85
86
87
def uint32(data):
88
    return 0 <= data < pow(2, 32)
89
90
91
def uint96(data):
92
    return 0 <= data < pow(2, 96)
93
94
95
def float(data):
96
    return 0 <= data < 3.4 * pow(10, 38)  # approximation of max from wikipedia
97
98
99
def ip(data):
100
    return ipv4(data) or ipv6(data)
101
102
103
def ipv4(data):  # XXX: improve
104
    return string(data) and data.count('.') == 3
105
106
107
def ipv6(data):  # XXX: improve
108
    return string(data) and ':' in data
109
110
111
def range4(data):
112
    return 0 < data <= 32
113
114
115
def range6(data):
116
    return 0 < data <= 128
117
118
119
def ipv4_range(data):
120
    if not data.count('/') == 1:
121
        return False
122
    ip, r = data.split('/')
123
    if not ipv4(ip):
124
        return False
125
    if not r.isdigit():
126
        return False
127
    if not range4(int(r)):
128
        return False
129
    return True
130
131
132
def port(data):
133
    return 0 <= data < pow(2, 16)
134
135
136
def asn16(data):
137
    return 1 <= data < pow(2, 16)
138
139
140
def asn32(data):
141
    return 1 <= data < pow(2, 32)
142
143
144
asn = asn32
145
146
147
def md5(data):
148
    return len(data) <= 18
149
150
151
def localpreference(data):
152
    return uint32(data)
153
154
155
def med(data):
156
    return uint32(data)
157
158
159
def aigp(data):
160
    return uint32(data)
161
162
163
def originator(data):
164
    return ipv4(data)
165
166
167
def distinguisher(data):
168
    parts = data.split(':')
169
    if len(parts) != 2:
170
        return False
171
    _, __ = parts
172
    return (_.isdigit() and asn16(int(_)) and ipv4(__)) or (ipv4(_) and __.isdigit() and asn16(int(__)))
173
174
175
def pathinformation(data):
176
    if integer(data):
177
        return uint32(data)
178
    if string(data):
179
        return ipv4(data)
180
    return False
181
182
183
def watchdog(data):
184
    return ' ' not in data  # TODO: improve
185
186
187
def split(data):
188
    return range6(data)
189
190
191
# LIST DATA CHECK
192
# Those function need to perform type checks before using the data
193
194
195
def aspath(data):
196
    return integer(data) and data < pow(2, 32)
197
198
199
def assequence(data):
200
    return integer(data) and data < pow(2, 32)
201
202
203
def community(data):
204
    if integer(data):
205
        return uint32(data)
206
    if string(data) and data.lower() in (
207
        'no-export',
208
        'no-advertise',
209
        'no-export-subconfed',
210
        'nopeer',
211
        'no-peer',
212
        'blackhole',
213
    ):
214
        return True
215
    return (
216
        array(data) and len(data) == 2 and integer(data[0]) and integer(data[1]) and asn16(data[0]) and uint16(data[1])
217
    )
218
219
220
def largecommunity(data):
221
    if integer(data):
222
        return uint96(data)
223
    return (
224
        array(data)
225
        and len(data) == 3
226
        and integer(data[0])
227
        and integer(data[1])
228
        and integer(data[2])
229
        and asn32(data[0])
230
        and uint32(data[1])
231
        and uint32(data[2])
232
    )
233
234
235
def extendedcommunity(data):  # TODO: improve, incomplete see https://tools.ietf.org/rfc/rfc4360.txt
236
    if integer(data):
237
        return True
238
    if string(data) and data.count(':') == 2:
239
        _, __, ___ = data.split(':')
240
        if _.lower() not in ('origin', 'target'):
241
            return False
242
        return (__.isdigit() and asn16(__) and ipv4(___)) or (ipv4(__) and ___.isdigit() and asn16(___))
243
    return False
244
245
246
def label(data):
247
    return integer(data) and 0 <= data < pow(2, 20)  # XXX: SHOULD be taken from Label class
248
249
250
def clusterlist(data):
251
    return integer(data) and uint8(data)
252
253
254
def aggregator(data):
255
    if not array(data):
256
        return False
257
    if len(data) == 0:
258
        return True
259
    if len(data) == 2:
260
        return integer(data[0]) and string(data[1]) and asn(data[0]) and ipv4(data[1])
261
    return False
262
263
264
def dscp(data):
265
    return integer(data) and uint8(data)
266
267
268
# FLOW DATA CHECK
269
#
270
271
272
def flow_ipv4_range(data):
273
    if array(data):
274
        for r in data:
275
            if not ipv4_range(r):
276
                return False
277
    if string(data):
278
        return ipv4_range(data)
279
    return False
280
281
282
def _flow_numeric(data, check):
283
    if not array(data):
284
        return False
285
    for et in data:
286
        if not (
287
            array(et) and len(et) == 2 and et[0] in ('>', '<', '=', '>=', '<=') and integer(et[1]) and check(et[1])
288
        ):
289
            return False
290
    return True
291
292
293
def flow_port(data):
294
    return _flow_numeric(data, port)
295
296
297
def _length(data):
298
    return uint16(data)
299
300
301
def flow_length(data):
302
    return _flow_numeric(data, _length)
303
304
305
def redirect(data):  # TODO: check that we are not too restrictive with our asn() calls
306
    parts = data.split(':')
307
    if len(parts) != 2:
308
        return False
309
    _, __ = parts
310
    if not __.isdigit() and asn16(int(__)):
311
        return False
312
    return ipv4(_) or (_.isdigit() and asn16(int(_)))
313