ospd.network.ports_as_list()   F
last analyzed

Complexity

Conditions 31

Size

Total Lines 76
Code Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 31
eloc 51
nop 1
dl 0
loc 76
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd.network.ports_as_list() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Helper module for network related functions
19
"""
20
21
import binascii
22
import collections
23
import itertools
24
import logging
25
import re
26
import socket
27
import struct
28
29
from typing import List, Optional, Tuple
30
31
__LOGGER = logging.getLogger(__name__)
32
33
34
def target_to_ipv4(target: str) -> Optional[List]:
35
    """Attempt to return a single IPv4 host list from a target string."""
36
37
    try:
38
        socket.inet_pton(socket.AF_INET, target)
39
        return [target]
40
    except socket.error:
41
        return None
42
43
44
def target_to_ipv6(target: str) -> Optional[List]:
45
    """Attempt to return a single IPv6 host list from a target string."""
46
47
    try:
48
        socket.inet_pton(socket.AF_INET6, target)
49
        return [target]
50
    except socket.error:
51
        return None
52
53
54
def ipv4_range_to_list(start_packed, end_packed) -> Optional[List]:
55
    """Return a list of IPv4 entries from start_packed to end_packed."""
56
57
    new_list = list()
58
    start = struct.unpack('!L', start_packed)[0]
59
    end = struct.unpack('!L', end_packed)[0]
60
61
    for value in range(start, end + 1):
62
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
63
        new_list.append(new_ip)
64
65
    return new_list
66
67
68
def target_to_ipv4_short(target: str) -> Optional[List]:
69
    """Attempt to return a IPv4 short range list from a target string."""
70
71
    splitted = target.split('-')
72
    if len(splitted) != 2:
73
        return None
74
75
    try:
76
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
77
        end_value = int(splitted[1])
78
    except (socket.error, ValueError):
79
        return None
80
81
    # For subnet with mask lower than /24, ip addresses ending in .0 are
82
    # allowed.
83
    # The next code checks for a range starting with a A.B.C.0.
84
    # For the octet equal to 0, bytes() returns an empty binary b'',
85
    # which must be handle in a special way.
86
    _start_value = bytes(start_packed[3])
87
    if _start_value:
88
        start_value = int(binascii.hexlify(_start_value), 16)
89
    elif _start_value == b'':
90
        start_value = 0
91
    else:
92
        return None
93
94
    if end_value < 0 or end_value > 255 or end_value < start_value:
95
        return None
96
97
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
98
99
    return ipv4_range_to_list(start_packed, end_packed)
100
101
102
def target_to_ipv4_cidr(target: str) -> Optional[List]:
103
    """Attempt to return a IPv4 CIDR list from a target string."""
104
105
    splitted = target.split('/')
106
    if len(splitted) != 2:
107
        return None
108
109
    try:
110
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
111
        block = int(splitted[1])
112
    except (socket.error, ValueError):
113
        return None
114
115
    if block <= 0 or block > 30:
116
        return None
117
118
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
119
    start_value = (start_value << (32 - block)) + 1
120
121
    end_value = (start_value | (0xFFFFFFFF >> block)) - 1
122
123
    start_packed = struct.pack('!I', start_value)
124
    end_packed = struct.pack('!I', end_value)
125
126
    return ipv4_range_to_list(start_packed, end_packed)
127
128
129
def target_to_ipv6_cidr(target: str) -> Optional[List]:
130
    """Attempt to return a IPv6 CIDR list from a target string."""
131
132
    splitted = target.split('/')
133
    if len(splitted) != 2:
134
        return None
135
136
    try:
137
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
138
        block = int(splitted[1])
139
    except (socket.error, ValueError):
140
        return None
141
142
    if block <= 0 or block > 126:
143
        return None
144
145
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
146
    start_value = (start_value << (128 - block)) + 1
147
148
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
149
150
    high = start_value >> 64
151
    low = start_value & ((1 << 64) - 1)
152
153
    start_packed = struct.pack('!QQ', high, low)
154
155
    high = end_value >> 64
156
    low = end_value & ((1 << 64) - 1)
157
158
    end_packed = struct.pack('!QQ', high, low)
159
160
    return ipv6_range_to_list(start_packed, end_packed)
161
162
163
def target_to_ipv4_long(target: str) -> Optional[List]:
164
    """Attempt to return a IPv4 long-range list from a target string."""
165
166
    splitted = target.split('-')
167
    if len(splitted) != 2:
168
        return None
169
170
    try:
171
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
172
        end_packed = socket.inet_pton(socket.AF_INET, splitted[1])
173
    except socket.error:
174
        return None
175
176
    if end_packed < start_packed:
177
        return None
178
179
    return ipv4_range_to_list(start_packed, end_packed)
180
181
182
def ipv6_range_to_list(start_packed, end_packed) -> List:
183
    """Return a list of IPv6 entries from start_packed to end_packed."""
184
185
    new_list = list()
186
187
    start = int(binascii.hexlify(start_packed), 16)
188
    end = int(binascii.hexlify(end_packed), 16)
189
190
    for value in range(start, end + 1):
191
        high = value >> 64
192
        low = value & ((1 << 64) - 1)
193
        new_ip = socket.inet_ntop(
194
            socket.AF_INET6, struct.pack('!2Q', high, low)
195
        )
196
        new_list.append(new_ip)
197
198
    return new_list
199
200
201
def target_to_ipv6_short(target: str) -> Optional[List]:
202
    """Attempt to return a IPv6 short-range list from a target string."""
203
204
    splitted = target.split('-')
205
    if len(splitted) != 2:
206
        return None
207
208
    try:
209
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
210
        end_value = int(splitted[1], 16)
211
    except (socket.error, ValueError):
212
        return None
213
214
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
215
    if end_value < 0 or end_value > 0xFFFF or end_value < start_value:
216
        return None
217
218
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
219
220
    return ipv6_range_to_list(start_packed, end_packed)
221
222
223
def target_to_ipv6_long(target: str) -> Optional[List]:
224
    """Attempt to return a IPv6 long-range list from a target string."""
225
226
    splitted = target.split('-')
227
    if len(splitted) != 2:
228
        return None
229
230
    try:
231
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
232
        end_packed = socket.inet_pton(socket.AF_INET6, splitted[1])
233
    except socket.error:
234
        return None
235
236
    if end_packed < start_packed:
237
        return None
238
239
    return ipv6_range_to_list(start_packed, end_packed)
240
241
242
def target_to_hostname(target: str) -> Optional[List]:
243
    """Attempt to return a single hostname list from a target string."""
244
245
    if len(target) == 0 or len(target) > 255:
246
        return None
247
248
    if not re.match(r'^[\w.-]+$', target):
249
        return None
250
251
    return [target]
252
253
254
def target_to_list(target: str) -> Optional[List]:
255
    """Attempt to return a list of single hosts from a target string."""
256
257
    # Is it an IPv4 address ?
258
    new_list = target_to_ipv4(target)
259
    # Is it an IPv6 address ?
260
    if not new_list:
261
        new_list = target_to_ipv6(target)
262
    # Is it an IPv4 CIDR ?
263
    if not new_list:
264
        new_list = target_to_ipv4_cidr(target)
265
    # Is it an IPv6 CIDR ?
266
    if not new_list:
267
        new_list = target_to_ipv6_cidr(target)
268
    # Is it an IPv4 short-range ?
269
    if not new_list:
270
        new_list = target_to_ipv4_short(target)
271
    # Is it an IPv4 long-range ?
272
    if not new_list:
273
        new_list = target_to_ipv4_long(target)
274
    # Is it an IPv6 short-range ?
275
    if not new_list:
276
        new_list = target_to_ipv6_short(target)
277
    # Is it an IPv6 long-range ?
278
    if not new_list:
279
        new_list = target_to_ipv6_long(target)
280
    # Is it a hostname ?
281
    if not new_list:
282
        new_list = target_to_hostname(target)
283
284
    return new_list
285
286
287
def target_str_to_list(target_str: str) -> Optional[List]:
288
    """Parses a targets string into a list of individual targets.
289
    Return a list of hosts, None if supplied target_str is None or
290
    empty, or an empty list in case of malformed target.
291
    """
292
    new_list = list()
293
294
    if not target_str:
295
        return None
296
297
    target_str = target_str.strip(',')
298
299
    for target in target_str.split(','):
300
        target = target.strip()
301
        target_list = target_to_list(target)
302
303
        if target_list:
304
            new_list.extend(target_list)
305
        else:
306
            __LOGGER.info("%s: Invalid target value", target)
307
            return []
308
309
    return list(collections.OrderedDict.fromkeys(new_list))
310
311
312
def resolve_hostname(hostname: str) -> Optional[str]:
313
    """Returns IP of a hostname."""
314
315
    assert hostname
316
    try:
317
        return socket.gethostbyname(hostname)
318
    except socket.gaierror:
319
        return None
320
321
322
def is_valid_address(address: str) -> bool:
323
    if not address:
324
        return False
325
326
    try:
327
        socket.inet_pton(socket.AF_INET, address)
328
    except OSError:
329
        # invalid IPv4 address
330
        try:
331
            socket.inet_pton(socket.AF_INET6, address)
332
        except OSError:
333
            # invalid IPv6 address
334
            return False
335
336
    return True
337
338
339
def get_hostname_by_address(address: str) -> str:
340
    """Returns hostname of an address."""
341
342
    if not is_valid_address(address):
343
        return ''
344
345
    try:
346
        hostname = socket.getfqdn(address)
347
    except (socket.gaierror, socket.herror):
348
        return ''
349
350
    if hostname == address:
351
        return ''
352
353
    return hostname
354
355
356
def port_range_expand(portrange: str) -> Optional[List]:
357
    """
358
    Receive a port range and expands it in individual ports.
359
360
    @input Port range.
361
    e.g. "4-8"
362
363
    @return List of integers.
364
    e.g. [4, 5, 6, 7, 8]
365
    """
366
    if not portrange or '-' not in portrange:
367
        return None
368
369
    try:
370
        port_range_min = int(portrange[: portrange.index('-')])
371
        port_range_max = int(portrange[portrange.index('-') + 1 :]) + 1
372
    except (IndexError, ValueError) as e:
373
        __LOGGER.info("Invalid port range format %s", e)
374
        return None
375
376
    port_list = list()
377
378
    for single_port in range(
379
        port_range_min,
380
        port_range_max,
381
    ):
382
        port_list.append(single_port)
383
384
    return port_list
385
386
387
def port_str_arrange(ports: str) -> str:
388
    """Gives a str in the format (always tcp listed first).
389
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
390
    """
391
    b_tcp = ports.find("T")
392
    b_udp = ports.find("U")
393
394
    if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
395
        return ports[b_tcp:] + ports[b_udp:b_tcp]
396
397
    return ports
398
399
400
def ports_str_check_failed(port_str: str) -> bool:
401
    """
402
    Check if the port string is well formed.
403
    Return True if fail, False other case.
404
    """
405
    pattern = r'[^TU:0-9, \-\n]'
406
    if (
407
        re.search(pattern, port_str)
408
        or port_str.count('T') > 1
409
        or port_str.count('U') > 1
410
        or '-\n' in port_str
411
        or '\n-' in port_str
412
        or port_str[0] == '-'
413
        or port_str[len(port_str) - 1] == '-'
414
        or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
415
    ):
416
        __LOGGER.error("Invalid port range format")
417
        return True
418
419
    index = 0
420
    while index <= len(port_str) - 1:
421
        if port_str[index] == '-':
422
            try:
423
                int(port_str[index - 1])
424
                int(port_str[index + 1])
425
            except (TypeError, ValueError) as e:
426
                __LOGGER.error("Invalid port range format: %s", e)
427
                return True
428
        index += 1
429
430
    return False
431
432
433
def ports_as_list(port_str: str) -> Tuple[Optional[List], Optional[List]]:
434
    """
435
    Parses a ports string into two list of individual tcp and udp ports.
436
437
    @input string containing a port list
438
    e.g. T:1,2,3,5-8 U:22,80,600-1024
439
440
    @return two list of sorted integers, for tcp and udp ports respectively.
441
    """
442
    if not port_str:
443
        __LOGGER.info("Invalid port value")
444
        return [None, None]
445
446
    if ports_str_check_failed(port_str):
447
        __LOGGER.info("{0}: Port list malformed.")
448
        return [None, None]
449
450
    tcp_list = list()
451
    udp_list = list()
452
453
    ports = port_str.replace(' ', '')
454
    ports = ports.replace('\n', '')
455
456
    b_tcp = ports.find("T")
457
    b_udp = ports.find("U")
458
459
    if b_tcp != -1 and "T:" not in ports:
460
        return [None, None]
461
    if b_udp != -1 and "U:" not in ports:
462
        return [None, None]
463
464
    if len(ports) > 1 and ports[b_tcp - 1] == ',':
465
        ports = ports[: b_tcp - 1] + ports[b_tcp:]
466
    if len(ports) > 1 and ports[b_udp - 1] == ',':
467
        ports = ports[: b_udp - 1] + ports[b_udp:]
468
469
    ports = port_str_arrange(ports)
470
471
    tports = ''
472
    uports = ''
473
    # TCP ports listed first, then UDP ports
474
    if b_udp != -1 and b_tcp != -1:
475
        tports = ports[ports.index('T:') + 2 : ports.index('U:')]
476
        uports = ports[ports.index('U:') + 2 :]
477
    # Only UDP ports
478
    elif b_tcp == -1 and b_udp != -1:
479
        uports = ports[ports.index('U:') + 2 :]
480
    # Only TCP ports
481
    elif b_udp == -1 and b_tcp != -1:
482
        tports = ports[ports.index('T:') + 2 :]
483
    else:
484
        tports = ports
485
486
    if tports:
487
        for port in tports.split(','):
488
            port_range_expanded = port_range_expand(port)
489
            if '-' in port and port_range_expanded:
490
                tcp_list.extend(port_range_expanded)
491
            elif port != '' and '-' not in port:
492
                tcp_list.append(int(port))
493
494
        tcp_list.sort()
495
496
    if uports:
497
        for port in uports.split(','):
498
            port_range_expanded = port_range_expand(port)
499
            if '-' in port and port_range_expanded:
500
                udp_list.extend(port_range_expanded)
501
            elif port and '-' not in port:
502
                udp_list.append(int(port))
503
        udp_list.sort()
504
505
    if len(tcp_list) == 0 and len(udp_list) == 0:
506
        return [None, None]
507
508
    return (tcp_list, udp_list)
509
510
511
def get_tcp_port_list(port_str: str) -> Optional[List]:
512
    """Return a list with tcp ports from a given port list in string format"""
513
    return ports_as_list(port_str)[0]
514
515
516
def get_udp_port_list(port_str: str) -> Optional[List]:
517
    """Return a list with udp ports from a given port list in string format"""
518
    return ports_as_list(port_str)[1]
519
520
521
def port_list_compress(port_list: List) -> str:
522
    """Compress a port list and return a string."""
523
524
    if not port_list or len(port_list) == 0:
525
        __LOGGER.info("Invalid or empty port list.")
526
        return ''
527
528
    port_list = sorted(set(port_list))
529
    compressed_list = []
530
531
    for _key, group in itertools.groupby(
532
        enumerate(port_list), lambda t: t[1] - t[0]
533
    ):
534
        group = list(group)
535
536
        if group[0][1] == group[-1][1]:
537
            compressed_list.append(str(group[0][1]))
538
        else:
539
            compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
540
541
    return ','.join(compressed_list)
542
543
544
def valid_port_list(port_list: str) -> bool:
545
    """Validate a port list string.
546
    Parameters:
547
        port_list: string containing UDP and/or TCP
548
                   port list as ranges or single comma
549
                   separated ports "
550
    Return True if it is a valid port list, False otherwise.
551
    """
552
553
    # No port list provided
554
    if not port_list:
555
        return False
556
557
    # Remove white spaces
558
    port_list = port_list.replace(' ', '')
559
560
    # Special case is ignored.
561
    if port_list == 'U:,T:':
562
        return True
563
564
    # Invalid chars in the port list, like \0 or \n
565
    if ports_str_check_failed(port_list):
566
        return False
567
568
    tcp, udp = ports_as_list(port_list)
569
    # There is a port list but no tcp and no udp.
570
    if not tcp and not udp:
571
        return False
572
573
    if tcp:
574
        for port in tcp:
575
            if port < 1 or port > 65535:
576
                return False
577
    if udp:
578
        for port in udp:
579
            if port < 1 or port > 65535:
580
                return False
581
582
    return True
583