Passed
Pull Request — master (#393)
by Juan José
01:50
created

ospd.network.valid_port_list()   F

Complexity

Conditions 14

Size

Total Lines 39
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 20
nop 1
dl 0
loc 39
rs 3.6
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like ospd.network.valid_port_list() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Helper module for network related functions
19
"""
20
21
import binascii
22
import collections
23
import itertools
24
import logging
25
import re
26
import socket
27
import struct
28
29
from typing import List, Optional, Tuple
30
31
__LOGGER = logging.getLogger(__name__)
32
33
34
def target_to_ipv4(target: str) -> Optional[List]:
35
    """Attempt to return a single IPv4 host list from a target string."""
36
37
    try:
38
        socket.inet_pton(socket.AF_INET, target)
39
        return [target]
40
    except socket.error:
41
        return None
42
43
44
def target_to_ipv6(target: str) -> Optional[List]:
45
    """Attempt to return a single IPv6 host list from a target string."""
46
47
    try:
48
        socket.inet_pton(socket.AF_INET6, target)
49
        return [target]
50
    except socket.error:
51
        return None
52
53
54
def ipv4_range_to_list(start_packed, end_packed) -> Optional[List]:
55
    """Return a list of IPv4 entries from start_packed to end_packed."""
56
57
    new_list = list()
58
    start = struct.unpack('!L', start_packed)[0]
59
    end = struct.unpack('!L', end_packed)[0]
60
61
    for value in range(start, end + 1):
62
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
63
        new_list.append(new_ip)
64
65
    return new_list
66
67
68
def target_to_ipv4_short(target: str) -> Optional[List]:
69
    """Attempt to return a IPv4 short range list from a target string."""
70
71
    splitted = target.split('-')
72
    if len(splitted) != 2:
73
        return None
74
75
    try:
76
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
77
        end_value = int(splitted[1])
78
    except (socket.error, ValueError):
79
        return None
80
81
    # For subnet with mask lower than /24, ip addresses ending in .0 are
82
    # allowed.
83
    # The next code checks for a range starting with a A.B.C.0.
84
    # For the octet equal to 0, bytes() returns an empty binary b'',
85
    # which must be handle in a special way.
86
    _start_value = bytes(start_packed[3])
87
    if _start_value:
88
        start_value = int(binascii.hexlify(_start_value), 16)
89
    elif _start_value == b'':
90
        start_value = 0
91
    else:
92
        return None
93
94
    if end_value < 0 or end_value > 255 or end_value < start_value:
95
        return None
96
97
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
98
99
    return ipv4_range_to_list(start_packed, end_packed)
100
101
102
def target_to_ipv4_cidr(target: str) -> Optional[List]:
103
    """Attempt to return a IPv4 CIDR list from a target string."""
104
105
    splitted = target.split('/')
106
    if len(splitted) != 2:
107
        return None
108
109
    try:
110
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
111
        block = int(splitted[1])
112
    except (socket.error, ValueError):
113
        return None
114
115
    if block <= 0 or block > 30:
116
        return None
117
118
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
119
    start_value = (start_value << (32 - block)) + 1
120
121
    end_value = (start_value | (0xFFFFFFFF >> block)) - 1
122
123
    start_packed = struct.pack('!I', start_value)
124
    end_packed = struct.pack('!I', end_value)
125
126
    return ipv4_range_to_list(start_packed, end_packed)
127
128
129
def target_to_ipv6_cidr(target: str) -> Optional[List]:
130
    """Attempt to return a IPv6 CIDR list from a target string."""
131
132
    splitted = target.split('/')
133
    if len(splitted) != 2:
134
        return None
135
136
    try:
137
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
138
        block = int(splitted[1])
139
    except (socket.error, ValueError):
140
        return None
141
142
    if block <= 0 or block > 126:
143
        return None
144
145
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
146
    start_value = (start_value << (128 - block)) + 1
147
148
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
149
150
    high = start_value >> 64
151
    low = start_value & ((1 << 64) - 1)
152
153
    start_packed = struct.pack('!QQ', high, low)
154
155
    high = end_value >> 64
156
    low = end_value & ((1 << 64) - 1)
157
158
    end_packed = struct.pack('!QQ', high, low)
159
160
    return ipv6_range_to_list(start_packed, end_packed)
161
162
163
def target_to_ipv4_long(target: str) -> Optional[List]:
164
    """Attempt to return a IPv4 long-range list from a target string."""
165
166
    splitted = target.split('-')
167
    if len(splitted) != 2:
168
        return None
169
170
    try:
171
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
172
        end_packed = socket.inet_pton(socket.AF_INET, splitted[1])
173
    except socket.error:
174
        return None
175
176
    if end_packed < start_packed:
177
        return None
178
179
    return ipv4_range_to_list(start_packed, end_packed)
180
181
182
def ipv6_range_to_list(start_packed, end_packed) -> List:
183
    """Return a list of IPv6 entries from start_packed to end_packed."""
184
185
    new_list = list()
186
187
    start = int(binascii.hexlify(start_packed), 16)
188
    end = int(binascii.hexlify(end_packed), 16)
189
190
    for value in range(start, end + 1):
191
        high = value >> 64
192
        low = value & ((1 << 64) - 1)
193
        new_ip = socket.inet_ntop(
194
            socket.AF_INET6, struct.pack('!2Q', high, low)
195
        )
196
        new_list.append(new_ip)
197
198
    return new_list
199
200
201
def target_to_ipv6_short(target: str) -> Optional[List]:
202
    """Attempt to return a IPv6 short-range list from a target string."""
203
204
    splitted = target.split('-')
205
    if len(splitted) != 2:
206
        return None
207
208
    try:
209
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
210
        end_value = int(splitted[1], 16)
211
    except (socket.error, ValueError):
212
        return None
213
214
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
215
    if end_value < 0 or end_value > 0xFFFF or end_value < start_value:
216
        return None
217
218
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
219
220
    return ipv6_range_to_list(start_packed, end_packed)
221
222
223
def target_to_ipv6_long(target: str) -> Optional[List]:
224
    """Attempt to return a IPv6 long-range list from a target string."""
225
226
    splitted = target.split('-')
227
    if len(splitted) != 2:
228
        return None
229
230
    try:
231
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
232
        end_packed = socket.inet_pton(socket.AF_INET6, splitted[1])
233
    except socket.error:
234
        return None
235
236
    if end_packed < start_packed:
237
        return None
238
239
    return ipv6_range_to_list(start_packed, end_packed)
240
241
242
def target_to_hostname(target: str) -> Optional[List]:
243
    """Attempt to return a single hostname list from a target string."""
244
245
    if len(target) == 0 or len(target) > 255:
246
        return None
247
248
    if not re.match(r'^[\w.-]+$', target):
249
        return None
250
251
    return [target]
252
253
254
def target_to_list(target: str) -> Optional[List]:
255
    """Attempt to return a list of single hosts from a target string."""
256
257
    # Is it an IPv4 address ?
258
    new_list = target_to_ipv4(target)
259
    # Is it an IPv6 address ?
260
    if not new_list:
261
        new_list = target_to_ipv6(target)
262
    # Is it an IPv4 CIDR ?
263
    if not new_list:
264
        new_list = target_to_ipv4_cidr(target)
265
    # Is it an IPv6 CIDR ?
266
    if not new_list:
267
        new_list = target_to_ipv6_cidr(target)
268
    # Is it an IPv4 short-range ?
269
    if not new_list:
270
        new_list = target_to_ipv4_short(target)
271
    # Is it an IPv4 long-range ?
272
    if not new_list:
273
        new_list = target_to_ipv4_long(target)
274
    # Is it an IPv6 short-range ?
275
    if not new_list:
276
        new_list = target_to_ipv6_short(target)
277
    # Is it an IPv6 long-range ?
278
    if not new_list:
279
        new_list = target_to_ipv6_long(target)
280
    # Is it a hostname ?
281
    if not new_list:
282
        new_list = target_to_hostname(target)
283
284
    return new_list
285
286
287
def target_str_to_list(target_str: str) -> Optional[List]:
288
    """Parses a targets string into a list of individual targets.
289
    Return a list of hosts, None if supplied target_str is None or
290
    empty, or an empty list in case of malformed target.
291
    """
292
    new_list = list()
293
294
    if not target_str:
295
        return None
296
297
    target_str = target_str.strip(',')
298
299
    for target in target_str.split(','):
300
        target = target.strip()
301
        target_list = target_to_list(target)
302
303
        if target_list:
304
            new_list.extend(target_list)
305
        else:
306
            __LOGGER.info("%s: Invalid target value", target)
307
            return []
308
309
    return list(collections.OrderedDict.fromkeys(new_list))
310
311
312
def resolve_hostname(hostname: str) -> Optional[str]:
313
    """Returns IP of a hostname."""
314
315
    assert hostname
316
    try:
317
        return socket.gethostbyname(hostname)
318
    except socket.gaierror:
319
        return None
320
321
322
def is_valid_address(address: str) -> bool:
323
    if not address:
324
        return False
325
326
    try:
327
        socket.inet_pton(socket.AF_INET, address)
328
    except OSError:
329
        # invalid IPv4 address
330
        try:
331
            socket.inet_pton(socket.AF_INET6, address)
332
        except OSError:
333
            # invalid IPv6 address
334
            return False
335
336
    return True
337
338
339
def get_hostname_by_address(address: str) -> str:
340
    """Returns hostname of an address."""
341
342
    if not is_valid_address(address):
343
        return ''
344
345
    try:
346
        hostname = socket.getfqdn(address)
347
    except (socket.gaierror, socket.herror):
348
        return ''
349
350
    if hostname == address:
351
        return ''
352
353
    return hostname
354
355
356
def port_range_expand(portrange: str) -> Optional[List]:
357
    """
358
    Receive a port range and expands it in individual ports.
359
360
    @input Port range.
361
    e.g. "4-8"
362
363
    @return List of integers.
364
    e.g. [4, 5, 6, 7, 8]
365
    """
366
    if not portrange or '-' not in portrange:
367
        __LOGGER.info("Invalid port range format")
368
        return None
369
370
    try:
371
        port_range_min = int(portrange[: portrange.index('-')])
372
        port_range_max = int(portrange[portrange.index('-') + 1 :]) + 1
373
    except:
374
        __LOGGER.info("Invalid port range format")
375
        return None
376
377
    port_list = list()
378
379
    for single_port in range(
380
            port_range_min,
381
            port_range_max,
382
    ):
383
        port_list.append(single_port)
384
385
    return port_list
386
387
388
def port_str_arrange(ports: str) -> str:
389
    """Gives a str in the format (always tcp listed first).
390
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
391
    """
392
    b_tcp = ports.find("T")
393
    b_udp = ports.find("U")
394
395
    if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
396
        return ports[b_tcp:] + ports[b_udp:b_tcp]
397
398
    return ports
399
400
401
def ports_str_check_failed(port_str: str) -> bool:
402
    """
403
    Check if the port string is well formed.
404
    Return True if fail, False other case.
405
    """
406
    ret = False
407
    pattern = r'[^TU:0-9, \-\n]'
408
    if (
409
        re.search(pattern, port_str)
410
        or port_str.count('T') > 1
411
        or port_str.count('U') > 1
412
        or '-\n' in port_str
413
        or '\n-' in port_str
414
        or port_str[0] == '-'
415
        or port_str[len(port_str) - 1] == '-'
416
        or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
417
    ):
418
        __LOGGER.error("Invalid port range format")
419
        return True
420
421
    index = 0
422
    while index <= len(port_str) - 1:
423
        if port_str[index] == '-':
424
            try:
425
                int(port_str[index - 1])
426
                int(port_str[index + 1])
427
            except (TypeError, ValueError) as e:
428
                __LOGGER.error("Invalid port range format: %s", e)
429
                return True
430
        index+=1
431
432
    return False
433
434
435
def ports_as_list(port_str: str) -> Tuple[Optional[List], Optional[List]]:
436
    """
437
    Parses a ports string into two list of individual tcp and udp ports.
438
439
    @input string containing a port list
440
    e.g. T:1,2,3,5-8 U:22,80,600-1024
441
442
    @return two list of sorted integers, for tcp and udp ports respectively.
443
    """
444
    if not port_str:
445
        __LOGGER.info("Invalid port value")
446
        return [None, None]
447
448
    if ports_str_check_failed(port_str):
449
        __LOGGER.info("{0}: Port list malformed.")
450
        return [None, None]
451
452
    tcp_list = list()
453
    udp_list = list()
454
455
    ports = port_str.replace(' ', '')
456
    ports = ports.replace('\n', '')
457
    
458
    b_tcp = ports.find("T")
459
    b_udp = ports.find("U")
460
461
    if b_tcp != -1 and "T:" not in ports:
462
        return [None, None]
463
    if b_udp != -1 and "U:" not in ports:
464
        return [None, None]
465
466
    if len(ports) > 1 and ports[b_tcp - 1] == ',':
467
        ports = ports[: b_tcp - 1] + ports[b_tcp:]
468
    if len(ports) > 1 and ports[b_udp - 1] == ',':
469
        ports = ports[: b_udp - 1] + ports[b_udp:]
470
    
471
    ports = port_str_arrange(ports)
472
    
473
    tports = ''
474
    uports = ''
475
    # TCP ports listed first, then UDP ports
476
    if b_udp != -1 and b_tcp != -1:
477
        tports = ports[ports.index('T:') + 2 : ports.index('U:')]
478
        uports = ports[ports.index('U:') + 2 :]
479
    # Only UDP ports
480
    elif b_tcp == -1 and b_udp != -1:
481
        uports = ports[ports.index('U:') + 2 :]
482
    # Only TCP ports
483
    elif b_udp == -1 and b_tcp != -1:
484
        tports = ports[ports.index('T:') + 2 :]
485
    else:
486
        tports = ports
487
488
    if tports:
489
        for port in tports.split(','):
490
            port_range_expanded = port_range_expand(port)
491
            if '-' in port and port_range_expanded:
492
                tcp_list.extend(port_range_expanded)
493
            elif port != '' and '-' not in port:
494
                tcp_list.append(int(port))
495
            
496
        tcp_list.sort()
497
498
    if uports:
499
        for port in uports.split(','):
500
            port_range_expanded = port_range_expand(port)
501
            if '-' in port and port_range_expanded:
502
                udp_list.extend(port_range_expanded)
503
            elif port and '-' not in port:
504
                udp_list.append(int(port))
505
        udp_list.sort()
506
507
    if (len(tcp_list) == 0 and len(udp_list) == 0):
508
        return [None, None]
509
        
510
    return (tcp_list, udp_list)
511
512
513
def get_tcp_port_list(port_str: str) -> Optional[List]:
514
    """Return a list with tcp ports from a given port list in string format"""
515
    return ports_as_list(port_str)[0]
516
517
518
def get_udp_port_list(port_str: str) -> Optional[List]:
519
    """Return a list with udp ports from a given port list in string format"""
520
    return ports_as_list(port_str)[1]
521
522
523
def port_list_compress(port_list: List) -> str:
524
    """Compress a port list and return a string."""
525
526
    if not port_list or len(port_list) == 0:
527
        __LOGGER.info("Invalid or empty port list.")
528
        return ''
529
530
    port_list = sorted(set(port_list))
531
    compressed_list = []
532
533
    for _key, group in itertools.groupby(
534
        enumerate(port_list), lambda t: t[1] - t[0]
535
    ):
536
        group = list(group)
537
538
        if group[0][1] == group[-1][1]:
539
            compressed_list.append(str(group[0][1]))
540
        else:
541
            compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
542
543
    return ','.join(compressed_list)
544
545
def valid_port_list(port_list: str) -> bool:
546
    """ Validate a port list string.
547
    Parameters:
548
        port_list: string containing UDP and/or TCP 
549
                   port list as ranges or single comma
550
                   separated ports "
551
    Return True if it is a valid port list, False otherwise.
552
    """
553
554
    # No port list provided
555
    if not port_list:
556
        return False
557
558
    #Remove white spaces
559
    port_list = port_list.replace(' ', '')
560
561
    # Special case is ignored.
562
    if port_list == 'U:,T:':
563
        return True
564
    
565
    # Invalid chars in the port list, like \0 or \n
566
    if ports_str_check_failed(port_list):
567
        return False    
568
    
569
    tcp, udp = ports_as_list(port_list)
570
    # There is a port list but no tcp and no udp.
571
    if not tcp and not udp:
572
        return False
573
    
574
    if tcp:
575
        for port in tcp:
576
            if port < 1 or port > 65535:
577
                return False
578
    if udp:
579
        for port in udp:
580
            if port < 1 or port > 65535:
581
                return False
582
    
583
    return True
584