Completed
Push — master ( 1fcaa8...21a832 )
by
unknown
16s queued 12s
created

ospd.network.ports_str_check_failed()   D

Complexity

Conditions 12

Size

Total Lines 31
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 24
nop 1
dl 0
loc 31
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like ospd.network.ports_str_check_failed() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Helper module for network related functions
19
"""
20
21
import binascii
22
import collections
23
import itertools
24
import logging
25
import re
26
import socket
27
import struct
28
29
from typing import List, Optional, Tuple
30
31
__LOGGER = logging.getLogger(__name__)
32
33
34
def target_to_ipv4(target: str) -> Optional[List]:
35
    """Attempt to return a single IPv4 host list from a target string."""
36
37
    try:
38
        socket.inet_pton(socket.AF_INET, target)
39
        return [target]
40
    except socket.error:
41
        return None
42
43
44
def target_to_ipv6(target: str) -> Optional[List]:
45
    """Attempt to return a single IPv6 host list from a target string."""
46
47
    try:
48
        socket.inet_pton(socket.AF_INET6, target)
49
        return [target]
50
    except socket.error:
51
        return None
52
53
54
def ipv4_range_to_list(start_packed, end_packed) -> Optional[List]:
55
    """Return a list of IPv4 entries from start_packed to end_packed."""
56
57
    new_list = list()
58
    start = struct.unpack('!L', start_packed)[0]
59
    end = struct.unpack('!L', end_packed)[0]
60
61
    for value in range(start, end + 1):
62
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
63
        new_list.append(new_ip)
64
65
    return new_list
66
67
68
def target_to_ipv4_short(target: str) -> Optional[List]:
69
    """Attempt to return a IPv4 short range list from a target string."""
70
71
    splitted = target.split('-')
72
    if len(splitted) != 2:
73
        return None
74
75
    try:
76
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
77
        end_value = int(splitted[1])
78
    except (socket.error, ValueError):
79
        return None
80
81
    # For subnet with mask lower than /24, ip addresses ending in .0 are
82
    # allowed.
83
    # The next code checks for a range starting with a A.B.C.0.
84
    # For the octet equal to 0, bytes() returns an empty binary b'',
85
    # which must be handle in a special way.
86
    _start_value = bytes(start_packed[3])
87
    if _start_value:
88
        start_value = int(binascii.hexlify(_start_value), 16)
89
    elif _start_value == b'':
90
        start_value = 0
91
    else:
92
        return None
93
94
    if end_value < 0 or end_value > 255 or end_value < start_value:
95
        return None
96
97
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
98
99
    return ipv4_range_to_list(start_packed, end_packed)
100
101
102
def target_to_ipv4_cidr(target: str) -> Optional[List]:
103
    """Attempt to return a IPv4 CIDR list from a target string."""
104
105
    splitted = target.split('/')
106
    if len(splitted) != 2:
107
        return None
108
109
    try:
110
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
111
        block = int(splitted[1])
112
    except (socket.error, ValueError):
113
        return None
114
115
    if block <= 0 or block > 30:
116
        return None
117
118
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
119
    start_value = (start_value << (32 - block)) + 1
120
121
    end_value = (start_value | (0xFFFFFFFF >> block)) - 1
122
123
    start_packed = struct.pack('!I', start_value)
124
    end_packed = struct.pack('!I', end_value)
125
126
    return ipv4_range_to_list(start_packed, end_packed)
127
128
129
def target_to_ipv6_cidr(target: str) -> Optional[List]:
130
    """Attempt to return a IPv6 CIDR list from a target string."""
131
132
    splitted = target.split('/')
133
    if len(splitted) != 2:
134
        return None
135
136
    try:
137
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
138
        block = int(splitted[1])
139
    except (socket.error, ValueError):
140
        return None
141
142
    if block <= 0 or block > 126:
143
        return None
144
145
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
146
    start_value = (start_value << (128 - block)) + 1
147
148
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
149
150
    high = start_value >> 64
151
    low = start_value & ((1 << 64) - 1)
152
153
    start_packed = struct.pack('!QQ', high, low)
154
155
    high = end_value >> 64
156
    low = end_value & ((1 << 64) - 1)
157
158
    end_packed = struct.pack('!QQ', high, low)
159
160
    return ipv6_range_to_list(start_packed, end_packed)
161
162
163
def target_to_ipv4_long(target: str) -> Optional[List]:
164
    """Attempt to return a IPv4 long-range list from a target string."""
165
166
    splitted = target.split('-')
167
    if len(splitted) != 2:
168
        return None
169
170
    try:
171
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
172
        end_packed = socket.inet_pton(socket.AF_INET, splitted[1])
173
    except socket.error:
174
        return None
175
176
    if end_packed < start_packed:
177
        return None
178
179
    return ipv4_range_to_list(start_packed, end_packed)
180
181
182
def ipv6_range_to_list(start_packed, end_packed) -> List:
183
    """Return a list of IPv6 entries from start_packed to end_packed."""
184
185
    new_list = list()
186
187
    start = int(binascii.hexlify(start_packed), 16)
188
    end = int(binascii.hexlify(end_packed), 16)
189
190
    for value in range(start, end + 1):
191
        high = value >> 64
192
        low = value & ((1 << 64) - 1)
193
        new_ip = socket.inet_ntop(
194
            socket.AF_INET6, struct.pack('!2Q', high, low)
195
        )
196
        new_list.append(new_ip)
197
198
    return new_list
199
200
201
def target_to_ipv6_short(target: str) -> Optional[List]:
202
    """Attempt to return a IPv6 short-range list from a target string."""
203
204
    splitted = target.split('-')
205
    if len(splitted) != 2:
206
        return None
207
208
    try:
209
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
210
        end_value = int(splitted[1], 16)
211
    except (socket.error, ValueError):
212
        return None
213
214
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
215
    if end_value < 0 or end_value > 0xFFFF or end_value < start_value:
216
        return None
217
218
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
219
220
    return ipv6_range_to_list(start_packed, end_packed)
221
222
223
def target_to_ipv6_long(target: str) -> Optional[List]:
224
    """Attempt to return a IPv6 long-range list from a target string."""
225
226
    splitted = target.split('-')
227
    if len(splitted) != 2:
228
        return None
229
230
    try:
231
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
232
        end_packed = socket.inet_pton(socket.AF_INET6, splitted[1])
233
    except socket.error:
234
        return None
235
236
    if end_packed < start_packed:
237
        return None
238
239
    return ipv6_range_to_list(start_packed, end_packed)
240
241
242
def target_to_hostname(target: str) -> Optional[List]:
243
    """Attempt to return a single hostname list from a target string."""
244
245
    if len(target) == 0 or len(target) > 255:
246
        return None
247
248
    if not re.match(r'^[\w.-]+$', target):
249
        return None
250
251
    return [target]
252
253
254
def target_to_list(target: str) -> Optional[List]:
255
    """Attempt to return a list of single hosts from a target string."""
256
257
    # Is it an IPv4 address ?
258
    new_list = target_to_ipv4(target)
259
    # Is it an IPv6 address ?
260
    if not new_list:
261
        new_list = target_to_ipv6(target)
262
    # Is it an IPv4 CIDR ?
263
    if not new_list:
264
        new_list = target_to_ipv4_cidr(target)
265
    # Is it an IPv6 CIDR ?
266
    if not new_list:
267
        new_list = target_to_ipv6_cidr(target)
268
    # Is it an IPv4 short-range ?
269
    if not new_list:
270
        new_list = target_to_ipv4_short(target)
271
    # Is it an IPv4 long-range ?
272
    if not new_list:
273
        new_list = target_to_ipv4_long(target)
274
    # Is it an IPv6 short-range ?
275
    if not new_list:
276
        new_list = target_to_ipv6_short(target)
277
    # Is it an IPv6 long-range ?
278
    if not new_list:
279
        new_list = target_to_ipv6_long(target)
280
    # Is it a hostname ?
281
    if not new_list:
282
        new_list = target_to_hostname(target)
283
284
    return new_list
285
286
287
def target_str_to_list(target_str: str) -> Optional[List]:
288
    """Parses a targets string into a list of individual targets.
289
    Return a list of hosts, None if supplied target_str is None or
290
    empty, or an empty list in case of malformed target.
291
    """
292
    new_list = list()
293
294
    if not target_str:
295
        return None
296
297
    target_str = target_str.strip(',')
298
299
    for target in target_str.split(','):
300
        target = target.strip()
301
        target_list = target_to_list(target)
302
303
        if target_list:
304
            new_list.extend(target_list)
305
        else:
306
            __LOGGER.info("%s: Invalid target value", target)
307
            return []
308
309
    return list(collections.OrderedDict.fromkeys(new_list))
310
311
312
def resolve_hostname(hostname: str) -> Optional[str]:
313
    """Returns IP of a hostname."""
314
315
    assert hostname
316
    try:
317
        return socket.gethostbyname(hostname)
318
    except socket.gaierror:
319
        return None
320
321
322
def is_valid_address(address: str) -> bool:
323
    if not address:
324
        return False
325
326
    try:
327
        socket.inet_pton(socket.AF_INET, address)
328
    except OSError:
329
        # invalid IPv4 address
330
        try:
331
            socket.inet_pton(socket.AF_INET6, address)
332
        except OSError:
333
            # invalid IPv6 address
334
            return False
335
336
    return True
337
338
339
def get_hostname_by_address(address: str) -> str:
340
    """Returns hostname of an address."""
341
342
    if not is_valid_address(address):
343
        return ''
344
345
    try:
346
        hostname = socket.getfqdn(address)
347
    except (socket.gaierror, socket.herror):
348
        return ''
349
350
    if hostname == address:
351
        return ''
352
353
    return hostname
354
355
356
def port_range_expand(portrange: str) -> Optional[List]:
357
    """
358
    Receive a port range and expands it in individual ports.
359
360
    @input Port range.
361
    e.g. "4-8"
362
363
    @return List of integers.
364
    e.g. [4, 5, 6, 7, 8]
365
    """
366
    if not portrange or '-' not in portrange:
367
        __LOGGER.info("Invalid port range format")
368
        return None
369
370
    try:
371
        port_range_min = int(portrange[: portrange.index('-')])
372
        port_range_max = int(portrange[portrange.index('-') + 1 :]) + 1
373
    except (IndexError, ValueError) as e:
374
        __LOGGER.info("Invalid port range format %s", e)
375
        return None
376
377
    port_list = list()
378
379
    for single_port in range(
380
        port_range_min,
381
        port_range_max,
382
    ):
383
        port_list.append(single_port)
384
385
    return port_list
386
387
388
def port_str_arrange(ports: str) -> str:
389
    """Gives a str in the format (always tcp listed first).
390
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
391
    """
392
    b_tcp = ports.find("T")
393
    b_udp = ports.find("U")
394
395
    if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
396
        return ports[b_tcp:] + ports[b_udp:b_tcp]
397
398
    return ports
399
400
401
def ports_str_check_failed(port_str: str) -> bool:
402
    """
403
    Check if the port string is well formed.
404
    Return True if fail, False other case.
405
    """
406
    pattern = r'[^TU:0-9, \-\n]'
407
    if (
408
        re.search(pattern, port_str)
409
        or port_str.count('T') > 1
410
        or port_str.count('U') > 1
411
        or '-\n' in port_str
412
        or '\n-' in port_str
413
        or port_str[0] == '-'
414
        or port_str[len(port_str) - 1] == '-'
415
        or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
416
    ):
417
        __LOGGER.error("Invalid port range format")
418
        return True
419
420
    index = 0
421
    while index <= len(port_str) - 1:
422
        if port_str[index] == '-':
423
            try:
424
                int(port_str[index - 1])
425
                int(port_str[index + 1])
426
            except (TypeError, ValueError) as e:
427
                __LOGGER.error("Invalid port range format: %s", e)
428
                return True
429
        index += 1
430
431
    return False
432
433
434
def ports_as_list(port_str: str) -> Tuple[Optional[List], Optional[List]]:
435
    """
436
    Parses a ports string into two list of individual tcp and udp ports.
437
438
    @input string containing a port list
439
    e.g. T:1,2,3,5-8 U:22,80,600-1024
440
441
    @return two list of sorted integers, for tcp and udp ports respectively.
442
    """
443
    if not port_str:
444
        __LOGGER.info("Invalid port value")
445
        return [None, None]
446
447
    if ports_str_check_failed(port_str):
448
        __LOGGER.info("{0}: Port list malformed.")
449
        return [None, None]
450
451
    tcp_list = list()
452
    udp_list = list()
453
454
    ports = port_str.replace(' ', '')
455
    ports = ports.replace('\n', '')
456
457
    b_tcp = ports.find("T")
458
    b_udp = ports.find("U")
459
460
    if b_tcp != -1 and "T:" not in ports:
461
        return [None, None]
462
    if b_udp != -1 and "U:" not in ports:
463
        return [None, None]
464
465
    if len(ports) > 1 and ports[b_tcp - 1] == ',':
466
        ports = ports[: b_tcp - 1] + ports[b_tcp:]
467
    if len(ports) > 1 and ports[b_udp - 1] == ',':
468
        ports = ports[: b_udp - 1] + ports[b_udp:]
469
470
    ports = port_str_arrange(ports)
471
472
    tports = ''
473
    uports = ''
474
    # TCP ports listed first, then UDP ports
475
    if b_udp != -1 and b_tcp != -1:
476
        tports = ports[ports.index('T:') + 2 : ports.index('U:')]
477
        uports = ports[ports.index('U:') + 2 :]
478
    # Only UDP ports
479
    elif b_tcp == -1 and b_udp != -1:
480
        uports = ports[ports.index('U:') + 2 :]
481
    # Only TCP ports
482
    elif b_udp == -1 and b_tcp != -1:
483
        tports = ports[ports.index('T:') + 2 :]
484
    else:
485
        tports = ports
486
487
    if tports:
488
        for port in tports.split(','):
489
            port_range_expanded = port_range_expand(port)
490
            if '-' in port and port_range_expanded:
491
                tcp_list.extend(port_range_expanded)
492
            elif port != '' and '-' not in port:
493
                tcp_list.append(int(port))
494
495
        tcp_list.sort()
496
497
    if uports:
498
        for port in uports.split(','):
499
            port_range_expanded = port_range_expand(port)
500
            if '-' in port and port_range_expanded:
501
                udp_list.extend(port_range_expanded)
502
            elif port and '-' not in port:
503
                udp_list.append(int(port))
504
        udp_list.sort()
505
506
    if len(tcp_list) == 0 and len(udp_list) == 0:
507
        return [None, None]
508
509
    return (tcp_list, udp_list)
510
511
512
def get_tcp_port_list(port_str: str) -> Optional[List]:
513
    """Return a list with tcp ports from a given port list in string format"""
514
    return ports_as_list(port_str)[0]
515
516
517
def get_udp_port_list(port_str: str) -> Optional[List]:
518
    """Return a list with udp ports from a given port list in string format"""
519
    return ports_as_list(port_str)[1]
520
521
522
def port_list_compress(port_list: List) -> str:
523
    """Compress a port list and return a string."""
524
525
    if not port_list or len(port_list) == 0:
526
        __LOGGER.info("Invalid or empty port list.")
527
        return ''
528
529
    port_list = sorted(set(port_list))
530
    compressed_list = []
531
532
    for _key, group in itertools.groupby(
533
        enumerate(port_list), lambda t: t[1] - t[0]
534
    ):
535
        group = list(group)
536
537
        if group[0][1] == group[-1][1]:
538
            compressed_list.append(str(group[0][1]))
539
        else:
540
            compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
541
542
    return ','.join(compressed_list)
543
544
545
def valid_port_list(port_list: str) -> bool:
546
    """Validate a port list string.
547
    Parameters:
548
        port_list: string containing UDP and/or TCP
549
                   port list as ranges or single comma
550
                   separated ports "
551
    Return True if it is a valid port list, False otherwise.
552
    """
553
554
    # No port list provided
555
    if not port_list:
556
        return False
557
558
    # Remove white spaces
559
    port_list = port_list.replace(' ', '')
560
561
    # Special case is ignored.
562
    if port_list == 'U:,T:':
563
        return True
564
565
    # Invalid chars in the port list, like \0 or \n
566
    if ports_str_check_failed(port_list):
567
        return False
568
569
    tcp, udp = ports_as_list(port_list)
570
    # There is a port list but no tcp and no udp.
571
    if not tcp and not udp:
572
        return False
573
574
    if tcp:
575
        for port in tcp:
576
            if port < 1 or port > 65535:
577
                return False
578
    if udp:
579
        for port in udp:
580
            if port < 1 or port > 65535:
581
                return False
582
583
    return True
584