Passed
Pull Request — master (#111)
by
unknown
01:50
created

ospd.network.ports_as_list()   F

Complexity

Conditions 17

Size

Total Lines 65
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 40
nop 1
dl 0
loc 65
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd.network.ports_as_list() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2019 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Helper module for network related functions
20
"""
21
22
import binascii
23
import collections
24
import itertools
25
import logging
26
import re
27
import socket
28
import struct
29
30
from typing import List, Optional, Tuple
31
32
__LOGGER = logging.getLogger(__name__)
33
34
35
def target_to_ipv4(target: str) -> Optional[List]:
36
    """ Attempt to return a single IPv4 host list from a target string. """
37
38
    try:
39
        socket.inet_pton(socket.AF_INET, target)
40
        return [target]
41
    except socket.error:
42
        return None
43
44
45
def target_to_ipv6(target: str) -> Optional[List]:
46
    """ Attempt to return a single IPv6 host list from a target string. """
47
48
    try:
49
        socket.inet_pton(socket.AF_INET6, target)
50
        return [target]
51
    except socket.error:
52
        return None
53
54
55
def ipv4_range_to_list(start_packed, end_packed) -> Optional[List]:
56
    """ Return a list of IPv4 entries from start_packed to end_packed. """
57
58
    new_list = list()
59
    start = struct.unpack('!L', start_packed)[0]
60
    end = struct.unpack('!L', end_packed)[0]
61
62
    for value in range(start, end + 1):
63
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
64
        new_list.append(new_ip)
65
66
    return new_list
67
68
69 View Code Duplication
def target_to_ipv4_short(target: str) -> Optional[List]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
70
    """ Attempt to return a IPv4 short range list from a target string. """
71
72
    splitted = target.split('-')
73
    if len(splitted) != 2:
74
        return None
75
76
    try:
77
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
78
        end_value = int(splitted[1])
79
    except (socket.error, ValueError):
80
        return None
81
82
    start_value = int(binascii.hexlify(bytes(start_packed[3])), 16)
83
    if end_value < 0 or end_value > 255 or end_value < start_value:
84
        return None
85
86
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
87
88
    return ipv4_range_to_list(start_packed, end_packed)
89
90
91
def target_to_ipv4_cidr(target: str) -> Optional[List]:
92
    """ Attempt to return a IPv4 CIDR list from a target string. """
93
94
    splitted = target.split('/')
95
    if len(splitted) != 2:
96
        return None
97
98
    try:
99
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
100
        block = int(splitted[1])
101
    except (socket.error, ValueError):
102
        return None
103
104
    if block <= 0 or block > 30:
105
        return None
106
107
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
108
    start_value = (start_value << (32 - block)) + 1
109
110
    end_value = (start_value | (0xFFFFFFFF >> block)) - 1
111
112
    start_packed = struct.pack('!I', start_value)
113
    end_packed = struct.pack('!I', end_value)
114
115
    return ipv4_range_to_list(start_packed, end_packed)
116
117
118
def target_to_ipv6_cidr(target: str) -> Optional[List]:
119
    """ Attempt to return a IPv6 CIDR list from a target string. """
120
121
    splitted = target.split('/')
122
    if len(splitted) != 2:
123
        return None
124
125
    try:
126
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
127
        block = int(splitted[1])
128
    except (socket.error, ValueError):
129
        return None
130
131
    if block <= 0 or block > 126:
132
        return None
133
134
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
135
    start_value = (start_value << (128 - block)) + 1
136
137
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
138
139
    high = start_value >> 64
140
    low = start_value & ((1 << 64) - 1)
141
142
    start_packed = struct.pack('!QQ', high, low)
143
144
    high = end_value >> 64
145
    low = end_value & ((1 << 64) - 1)
146
147
    end_packed = struct.pack('!QQ', high, low)
148
149
    return ipv6_range_to_list(start_packed, end_packed)
150
151
152
def target_to_ipv4_long(target: str) -> Optional[List]:
153
    """ Attempt to return a IPv4 long-range list from a target string. """
154
155
    splitted = target.split('-')
156
    if len(splitted) != 2:
157
        return None
158
159
    try:
160
        start_packed = socket.inet_pton(socket.AF_INET, splitted[0])
161
        end_packed = socket.inet_pton(socket.AF_INET, splitted[1])
162
    except socket.error:
163
        return None
164
165
    if end_packed < start_packed:
166
        return None
167
168
    return ipv4_range_to_list(start_packed, end_packed)
169
170
171
def ipv6_range_to_list(start_packed, end_packed) -> List:
172
    """ Return a list of IPv6 entries from start_packed to end_packed. """
173
174
    new_list = list()
175
176
    start = int(binascii.hexlify(start_packed), 16)
177
    end = int(binascii.hexlify(end_packed), 16)
178
179
    for value in range(start, end + 1):
180
        high = value >> 64
181
        low = value & ((1 << 64) - 1)
182
        new_ip = socket.inet_ntop(
183
            socket.AF_INET6, struct.pack('!2Q', high, low)
184
        )
185
        new_list.append(new_ip)
186
187
    return new_list
188
189
190 View Code Duplication
def target_to_ipv6_short(target: str) -> Optional[List]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
191
    """ Attempt to return a IPv6 short-range list from a target string. """
192
193
    splitted = target.split('-')
194
    if len(splitted) != 2:
195
        return None
196
197
    try:
198
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
199
        end_value = int(splitted[1], 16)
200
    except (socket.error, ValueError):
201
        return None
202
203
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
204
    if end_value < 0 or end_value > 0xFFFF or end_value < start_value:
205
        return None
206
207
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
208
209
    return ipv6_range_to_list(start_packed, end_packed)
210
211
212
def target_to_ipv6_long(target: str) -> Optional[List]:
213
    """ Attempt to return a IPv6 long-range list from a target string. """
214
215
    splitted = target.split('-')
216
    if len(splitted) != 2:
217
        return None
218
219
    try:
220
        start_packed = socket.inet_pton(socket.AF_INET6, splitted[0])
221
        end_packed = socket.inet_pton(socket.AF_INET6, splitted[1])
222
    except socket.error:
223
        return None
224
225
    if end_packed < start_packed:
226
        return None
227
228
    return ipv6_range_to_list(start_packed, end_packed)
229
230
231
def target_to_hostname(target: str) -> Optional[List]:
232
    """ Attempt to return a single hostname list from a target string. """
233
234
    if len(target) == 0 or len(target) > 255:
235
        return None
236
237
    if not re.match(r'^[\w.-]+$', target):
238
        return None
239
240
    return [target]
241
242
243
def target_to_list(target: str) -> Optional[List]:
244
    """ Attempt to return a list of single hosts from a target string. """
245
246
    # Is it an IPv4 address ?
247
    new_list = target_to_ipv4(target)
248
    # Is it an IPv6 address ?
249
    if not new_list:
250
        new_list = target_to_ipv6(target)
251
    # Is it an IPv4 CIDR ?
252
    if not new_list:
253
        new_list = target_to_ipv4_cidr(target)
254
    # Is it an IPv6 CIDR ?
255
    if not new_list:
256
        new_list = target_to_ipv6_cidr(target)
257
    # Is it an IPv4 short-range ?
258
    if not new_list:
259
        new_list = target_to_ipv4_short(target)
260
    # Is it an IPv4 long-range ?
261
    if not new_list:
262
        new_list = target_to_ipv4_long(target)
263
    # Is it an IPv6 short-range ?
264
    if not new_list:
265
        new_list = target_to_ipv6_short(target)
266
    # Is it an IPv6 long-range ?
267
    if not new_list:
268
        new_list = target_to_ipv6_long(target)
269
    # Is it a hostname ?
270
    if not new_list:
271
        new_list = target_to_hostname(target)
272
273
    return new_list
274
275
276
def target_str_to_list(target_str: str) -> Optional[List]:
277
    """ Parses a targets string into a list of individual targets. """
278
    new_list = list()
279
280
    for target in target_str.split(','):
281
282
        target = target.strip()
283
        target_list = target_to_list(target)
284
285
        if target_list:
286
            new_list.extend(target_list)
287
        else:
288
            __LOGGER.info("%s: Invalid target value", target)
289
            return None
290
291
    return list(collections.OrderedDict.fromkeys(new_list))
292
293
294
def resolve_hostname(hostname: str) -> Optional[str]:
295
    """ Returns IP of a hostname. """
296
297
    assert hostname
298
    try:
299
        return socket.gethostbyname(hostname)
300
    except socket.gaierror:
301
        return None
302
303
304
def is_valid_address(address: str) -> bool:
305
    if not address:
306
        return False
307
308
    try:
309
        socket.inet_pton(socket.AF_INET, address)
310
    except OSError:
311
        # invalid IPv4 address
312
        try:
313
            socket.inet_pton(socket.AF_INET6, address)
314
        except OSError:
315
            # invalid IPv6 address
316
            return False
317
318
    return True
319
320
321
def get_hostname_by_address(address: str) -> str:
322
    """ Returns hostname of an address. """
323
324
    if not is_valid_address(address):
325
        return ''
326
327
    try:
328
        hostname = socket.getfqdn(address)
329
    except (socket.gaierror, socket.herror):
330
        return ''
331
332
    if hostname == address:
333
        return ''
334
335
    return hostname
336
337
338
def port_range_expand(portrange: str) -> Optional[List]:
339
    """
340
    Receive a port range and expands it in individual ports.
341
342
    @input Port range.
343
    e.g. "4-8"
344
345
    @return List of integers.
346
    e.g. [4, 5, 6, 7, 8]
347
    """
348
    if not portrange or '-' not in portrange:
349
        __LOGGER.info("Invalid port range format")
350
        return None
351
352
    port_list = list()
353
354
    for single_port in range(
355
        int(portrange[: portrange.index('-')]),
356
        int(portrange[portrange.index('-') + 1 :]) + 1,
357
    ):
358
        port_list.append(single_port)
359
360
    return port_list
361
362
363
def port_str_arrange(ports: str) -> str:
364
    """ Gives a str in the format (always tcp listed first).
365
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
366
    """
367
    b_tcp = ports.find("T")
368
    b_udp = ports.find("U")
369
370
    if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
371
        return ports[b_tcp:] + ports[b_udp:b_tcp]
372
373
    return ports
374
375
376
def ports_str_check_failed(port_str: str) -> bool:
377
    """
378
    Check if the port string is well formed.
379
    Return True if fail, False other case.
380
    """
381
382
    pattern = r'[^TU:0-9, \-]'
383
    if (
384
        re.search(pattern, port_str)
385
        or port_str.count('T') > 1
386
        or port_str.count('U') > 1
387
        or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
388
    ):
389
        return True
390
391
    return False
392
393
394
def ports_as_list(port_str: str) -> Tuple[Optional[List], Optional[List]]:
395
    """
396
    Parses a ports string into two list of individual tcp and udp ports.
397
398
    @input string containing a port list
399
    e.g. T:1,2,3,5-8 U:22,80,600-1024
400
401
    @return two list of sorted integers, for tcp and udp ports respectively.
402
    """
403
    if not port_str:
404
        __LOGGER.info("Invalid port value")
405
        return [None, None]
406
407
    if ports_str_check_failed(port_str):
408
        __LOGGER.info("{0}: Port list malformed.")
409
        return [None, None]
410
411
    tcp_list = list()
412
    udp_list = list()
413
414
    ports = port_str.replace(' ', '')
415
416
    b_tcp = ports.find("T")
417
    b_udp = ports.find("U")
418
419
    if ports[b_tcp - 1] == ',':
420
        ports = ports[: b_tcp - 1] + ports[b_tcp:]
421
422
    if ports[b_udp - 1] == ',':
423
        ports = ports[: b_udp - 1] + ports[b_udp:]
424
425
    ports = port_str_arrange(ports)
426
427
    tports = ''
428
    uports = ''
429
    # TCP ports listed first, then UDP ports
430
    if b_udp != -1 and b_tcp != -1:
431
        tports = ports[ports.index('T:') + 2 : ports.index('U:')]
432
        uports = ports[ports.index('U:') + 2 :]
433
    # Only UDP ports
434
    elif b_tcp == -1 and b_udp != -1:
435
        uports = ports[ports.index('U:') + 2 :]
436
    # Only TCP ports
437
    elif b_udp == -1 and b_tcp != -1:
438
        tports = ports[ports.index('T:') + 2 :]
439
    else:
440
        tports = ports
441
442
    if tports:
443
        for port in tports.split(','):
444
            if '-' in port:
445
                tcp_list.extend(port_range_expand(port))
446
            else:
447
                tcp_list.append(int(port))
448
        tcp_list.sort()
449
450
    if uports:
451
        for port in uports.split(','):
452
            if '-' in port:
453
                udp_list.extend(port_range_expand(port))
454
            else:
455
                udp_list.append(int(port))
456
        udp_list.sort()
457
458
    return (tcp_list, udp_list)
459
460
461
def get_tcp_port_list(port_str: str) -> Optional[List]:
462
    """ Return a list with tcp ports from a given port list in string format """
463
    return ports_as_list(port_str)[0]
464
465
466
def get_udp_port_list(port_str: str) -> Optional[List]:
467
    """ Return a list with udp ports from a given port list in string format """
468
    return ports_as_list(port_str)[1]
469
470
471
def port_list_compress(port_list: str) -> str:
472
    """ Compress a port list and return a string. """
473
474
    if not port_list or len(port_list) == 0:
475
        __LOGGER.info("Invalid or empty port list.")
476
        return ''
477
478
    port_list = sorted(set(port_list))
479
    compressed_list = []
480
481
    for _key, group in itertools.groupby(
482
        enumerate(port_list), lambda t: t[1] - t[0]
483
    ):
484
        group = list(group)
485
486
        if group[0][1] == group[-1][1]:
487
            compressed_list.append(str(group[0][1]))
488
        else:
489
            compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
490
491
    return ','.join(compressed_list)
492