Completed
Push — master ( b4b69f...c80324 )
by
unknown
12s queued 11s
created

ospd.misc.target_to_ipv4_cidr()   A

Complexity

Conditions 5

Size

Total Lines 19
Code Lines 17

Duplication

Lines 19
Ratio 100 %

Importance

Changes 0
Metric Value
cc 5
eloc 17
nop 1
dl 19
loc 19
rs 9.0833
c 0
b 0
f 0
1
# $Id$
2
# Description:
3
# Miscellaneous classes and functions related to OSPD.
4
#
5
# Authors:
6
# Hani Benhabiles <[email protected]>
7
#
8
# Copyright:
9
# Copyright (C) 2014 Greenbone Networks GmbH
10
#
11
# This program is free software; you can redistribute it and/or
12
# modify it under the terms of the GNU General Public License
13
# as published by the Free Software Foundation; either version 2
14
# of the License, or (at your option) any later version.
15
#
16
# This program is distributed in the hope that it will be useful,
17
# but WITHOUT ANY WARRANTY; without even the implied warranty of
18
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19
# GNU General Public License for more details.
20
#
21
# You should have received a copy of the GNU General Public License
22
# along with this program; if not, write to the Free Software
23
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
24
#
25
""" Miscellaneous functions and utilities related to OSPD. """
26
27
# Needed to say that when we import ospd, we mean the package and not the
28
# module in that directory.
29
from __future__ import absolute_import
30
from __future__ import print_function
31
32
import argparse
33
import binascii
34
import collections
35
import logging
36
import logging.handlers
37
import os
38
import re
39
import socket
40
import struct
41
import sys
42
import time
43
import ssl
44
import uuid
45
import multiprocessing
46
import itertools
47
48
LOGGER = logging.getLogger(__name__)
49
50
# Default file locations as used by a OpenVAS default installation
51
KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
52
CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
53
CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
54
55
PORT = 1234
56
ADDRESS = "0.0.0.0"
57
58
59 View Code Duplication
class ScanCollection(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
60
61
    """ Scans collection, managing scans and results read and write, exposing
62
    only needed information.
63
64
    Each scan has meta-information such as scan ID, current progress (from 0 to
65
    100), start time, end time, scan target and options and a list of results.
66
67
    There are 4 types of results: Alarms, Logs, Errors and Host Details.
68
69
    Todo:
70
    - Better checking for Scan ID existence and handling otherwise.
71
    - More data validation.
72
    - Mutex access per table/scan_info.
73
74
    """
75
76
    def __init__(self):
77
        """ Initialize the Scan Collection. """
78
79
        self.data_manager = None
80
        self.scans_table = dict()
81
82
    def add_result(self, scan_id, result_type, host='', name='', value='',
0 ignored issues
show
best-practice introduced by
Too many arguments (10/5)
Loading history...
83
                   port='', test_id='', severity='', qod=''):
84
        """ Add a result to a scan in the table. """
85
86
        assert scan_id
87
        assert len(name) or len(value)
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
88
        result = dict()
89
        result['type'] = result_type
90
        result['name'] = name
91
        result['severity'] = severity
92
        result['test_id'] = test_id
93
        result['value'] = value
94
        result['host'] = host
95
        result['port'] = port
96
        result['qod'] = qod
97
        results = self.scans_table[scan_id]['results']
98
        results.append(result)
99
        # Set scan_info's results to propagate results to parent process.
100
        self.scans_table[scan_id]['results'] = results
101
102
    def set_progress(self, scan_id, progress):
103
        """ Sets scan_id scan's progress. """
104
105
        if progress > 0 and progress <= 100:
106
            self.scans_table[scan_id]['progress'] = progress
107
        if progress == 100:
108
            self.scans_table[scan_id]['end_time'] = int(time.time())
109
110
    def results_iterator(self, scan_id):
111
        """ Returns an iterator over scan_id scan's results. """
112
113
        return iter(self.scans_table[scan_id]['results'])
114
115
    def ids_iterator(self):
116
        """ Returns an iterator over the collection's scan IDS. """
117
118
        return iter(self.scans_table.keys())
119
120
    def create_scan(self, scan_id='', targets='', target_str=None,
0 ignored issues
show
Bug Best Practice introduced by
The default value dict() (builtins.dict) might cause unintended side-effects.

Objects as default values are only created once in Python and not on each invocation of the function. If the default object is modified, this modification is carried over to the next invocation of the method.

# Bad:
# If array_param is modified inside the function, the next invocation will
# receive the modified object.
def some_function(array_param=[]):
    # ...

# Better: Create an array on each invocation
def some_function(array_param=None):
    array_param = array_param or []
    # ...
Loading history...
best-practice introduced by
Too many arguments (6/5)
Loading history...
121
                    options=dict(), vts=''):
122
        """ Creates a new scan with provided scan information. """
123
124
        if self.data_manager is None:
125
            self.data_manager = multiprocessing.Manager()
126
        scan_info = self.data_manager.dict()
127
        scan_info['results'] = list()
128
        scan_info['progress'] = 0
129
        scan_info['targets'] = targets
130
        scan_info['legacy_target'] = target_str
131
        scan_info['vts'] = vts
132
        scan_info['options'] = options
133
        scan_info['start_time'] = int(time.time())
134
        scan_info['end_time'] = "0"
135
        if scan_id is None or scan_id == '':
136
            scan_id = str(uuid.uuid4())
137
        scan_info['scan_id'] = scan_id
138
        self.scans_table[scan_id] = scan_info
139
        return scan_id
140
141
    def get_options(self, scan_id):
142
        """ Get scan_id scan's options list. """
143
144
        return self.scans_table[scan_id]['options']
145
146
    def set_option(self, scan_id, name, value):
147
        """ Set a scan_id scan's name option to value. """
148
149
        self.scans_table[scan_id]['options'][name] = value
150
151
    def get_progress(self, scan_id):
152
        """ Get a scan's current progress value. """
153
154
        return self.scans_table[scan_id]['progress']
155
156
    def get_start_time(self, scan_id):
157
        """ Get a scan's start time. """
158
159
        return self.scans_table[scan_id]['start_time']
160
161
    def get_end_time(self, scan_id):
162
        """ Get a scan's end time. """
163
164
        return self.scans_table[scan_id]['end_time']
165
166
    def get_target(self, scan_id):
167
        """ Get a scan's target list. """
168
        if self.scans_table[scan_id]['legacy_target']:
169
            return self.scans_table[scan_id]['legacy_target']
170
171
        target_list=[]
0 ignored issues
show
Coding Style introduced by
Exactly one space required around assignment
Loading history...
172
        for item in self.scans_table[scan_id]['targets']:
173
            target_list.append(item[0])
174
        separ = ','
175
        return separ.join(target_list)
176
177
    def get_ports(self, scan_id, target):
178
        """ Get a scan's ports list. If a target is specified
179
        it will return the corresponding port for it. If not,
180
        it returns the port item of the first nested list in
181
        the target's list.
182
        """
183
        if target:
184
            for item in self.scans_table[scan_id]['targets']:
185
                if target == item[0]:
186
                    return item[1]
187
188
        return self.scans_table[scan_id]['targets'][0][1]
189
190
    def get_credentials(self, scan_id, target):
191
        """ Get a scan's credential list. It return dictionary with
192
        the corresponding credential for a given target.
193
        """
194
        if target:
195
            for item in self.scans_table[scan_id]['targets']:
196
                if target == item[0]:
197
                    return item[2]
198
199
    def get_vts(self, scan_id):
200
        """ Get a scan's vts list. """
201
202
        return self.scans_table[scan_id]['vts']
203
204
    def id_exists(self, scan_id):
205
        """ Check whether a scan exists in the table. """
206
207
        return self.scans_table.get(scan_id) is not None
208
209
    def delete_scan(self, scan_id):
210
        """ Delete a scan if fully finished. """
211
212
        if self.get_progress(scan_id) < 100:
213
            return False
214
        self.scans_table.pop(scan_id)
215
        if len(self.scans_table) == 0:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
216
            del self.data_manager
217
            self.data_manager = None
218
        return True
219
220
221 View Code Duplication
class ResultType(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
222
223
    """ Various scan results types values. """
224
225
    ALARM = 0
226
    LOG = 1
227
    ERROR = 2
228
    HOST_DETAIL = 3
229
230
    @classmethod
231
    def get_str(cls, result_type):
232
        """ Return string name of a result type. """
233
        if result_type == cls.ALARM:
234
            return "Alarm"
235
        elif result_type == cls.LOG:
236
            return "Log Message"
237
        elif result_type == cls.ERROR:
238
            return "Error Message"
239
        elif result_type == cls.HOST_DETAIL:
240
            return "Host Detail"
241
        else:
242
            assert False, "Erroneous result type {0}.".format(result_type)
243
244
    @classmethod
245
    def get_type(cls, result_name):
246
        """ Return string name of a result type. """
247
        if result_name == "Alarm":
248
            return cls.ALARM
249
        elif result_name == "Log Message":
250
            return cls.LOG
251
        elif result_name == "Error Message":
252
            return cls.ERROR
253
        elif result_name == "Host Detail":
254
            return cls.HOST_DETAIL
255
        else:
256
            assert False, "Erroneous result name {0}.".format(result_name)
257
258
__inet_pton = None
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_pton does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
259 View Code Duplication
def inet_pton(address_family, ip_string):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
    """ A platform independent version of inet_pton """
261
    global __inet_pton
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_pton does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
262
    if __inet_pton is None:
263
        if hasattr(socket, 'inet_pton'):
264
            __inet_pton = socket.inet_pton
265
        else:
266
            from ospd import win_socket
267
            __inet_pton = win_socket.inet_pton
268
269
    return __inet_pton(address_family, ip_string)
270
271
__inet_ntop = None
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_ntop does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
272 View Code Duplication
def inet_ntop(address_family, packed_ip):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
273
    """ A platform independent version of inet_ntop """
274
    global __inet_ntop
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_ntop does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
275
    if __inet_ntop is None:
276
        if hasattr(socket, 'inet_ntop'):
277
            __inet_ntop = socket.inet_ntop
278
        else:
279
            from ospd import win_socket
280
            __inet_ntop = win_socket.inet_ntop
281
282
    return __inet_ntop(address_family, packed_ip)
283
284
285
def target_to_ipv4(target):
286
    """ Attempt to return a single IPv4 host list from a target string. """
287
288
    try:
289
        inet_pton(socket.AF_INET, target)
290
        return [target]
291
    except socket.error:
292
        return None
293
294
295
def target_to_ipv6(target):
296
    """ Attempt to return a single IPv6 host list from a target string. """
297
298
    try:
299
        inet_pton(socket.AF_INET6, target)
300
        return [target]
301
    except socket.error:
302
        return None
303
304
305 View Code Duplication
def ipv4_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
306
    """ Return a list of IPv4 entries from start_packed to end_packed. """
307
308
    new_list = list()
309
    start = struct.unpack('!L', start_packed)[0]
310
    end = struct.unpack('!L', end_packed)[0]
311
    for value in range(start, end + 1):
312
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
313
        new_list.append(new_ip)
314
    return new_list
315
316
317 View Code Duplication
def target_to_ipv4_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
318
    """ Attempt to return a IPv4 short range list from a target string. """
319
320
    splitted = target.split('-')
321
    if len(splitted) != 2:
322
        return None
323
    try:
324
        start_packed = inet_pton(socket.AF_INET, splitted[0])
325
        end_value = int(splitted[1])
326
    except (socket.error, ValueError):
327
        return None
328
    start_value = int(binascii.hexlify(start_packed[3]), 16)
329
    if end_value < 0 or end_value > 255 or end_value < start_value:
330
        return None
331
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
332
    return ipv4_range_to_list(start_packed, end_packed)
333
334
335 View Code Duplication
def target_to_ipv4_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
336
    """ Attempt to return a IPv4 CIDR list from a target string. """
337
338
    splitted = target.split('/')
339
    if len(splitted) != 2:
340
        return None
341
    try:
342
        start_packed = inet_pton(socket.AF_INET, splitted[0])
343
        block = int(splitted[1])
344
    except (socket.error, ValueError):
345
        return None
346
    if block <= 0 or block > 30:
347
        return None
348
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
349
    start_value = (start_value << (32 - block)) + 1
350
    end_value = (start_value | (0xffffffff >> block)) - 1
351
    start_packed = struct.pack('!I', start_value)
352
    end_packed = struct.pack('!I', end_value)
353
    return ipv4_range_to_list(start_packed, end_packed)
354
355
356 View Code Duplication
def target_to_ipv6_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
357
    """ Attempt to return a IPv6 CIDR list from a target string. """
358
359
    splitted = target.split('/')
360
    if len(splitted) != 2:
361
        return None
362
    try:
363
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
364
        block = int(splitted[1])
365
    except (socket.error, ValueError):
366
        return None
367
    if block <= 0 or block > 126:
368
        return None
369
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
370
    start_value = (start_value << (128 - block)) + 1
371
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
372
    high = start_value >> 64
373
    low = start_value & ((1 << 64) - 1)
374
    start_packed = struct.pack('!QQ', high, low)
375
    high = end_value >> 64
376
    low = end_value & ((1 << 64) - 1)
377
    end_packed = struct.pack('!QQ', high, low)
378
    return ipv6_range_to_list(start_packed, end_packed)
379
380
381 View Code Duplication
def target_to_ipv4_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
382
    """ Attempt to return a IPv4 long-range list from a target string. """
383
384
    splitted = target.split('-')
385
    if len(splitted) != 2:
386
        return None
387
    try:
388
        start_packed = inet_pton(socket.AF_INET, splitted[0])
389
        end_packed = inet_pton(socket.AF_INET, splitted[1])
390
    except socket.error:
391
        return None
392
    if end_packed < start_packed:
393
        return None
394
    return ipv4_range_to_list(start_packed, end_packed)
395
396
397 View Code Duplication
def ipv6_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
398
    """ Return a list of IPv6 entries from start_packed to end_packed. """
399
400
    new_list = list()
401
    start = int(binascii.hexlify(start_packed), 16)
402
    end = int(binascii.hexlify(end_packed), 16)
403
    for value in range(start, end + 1):
404
        high = value >> 64
405
        low = value & ((1 << 64) - 1)
406
        new_ip = inet_ntop(socket.AF_INET6,
407
                           struct.pack('!2Q', high, low))
408
        new_list.append(new_ip)
409
    return new_list
410
411
412 View Code Duplication
def target_to_ipv6_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
413
    """ Attempt to return a IPv6 short-range list from a target string. """
414
415
    splitted = target.split('-')
416
    if len(splitted) != 2:
417
        return None
418
    try:
419
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
420
        end_value = int(splitted[1], 16)
421
    except (socket.error, ValueError):
422
        return None
423
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
424
    if end_value < 0 or end_value > 0xffff or end_value < start_value:
425
        return None
426
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
427
    return ipv6_range_to_list(start_packed, end_packed)
428
429
430 View Code Duplication
def target_to_ipv6_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
431
    """ Attempt to return a IPv6 long-range list from a target string. """
432
433
    splitted = target.split('-')
434
    if len(splitted) != 2:
435
        return None
436
    try:
437
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
438
        end_packed = inet_pton(socket.AF_INET6, splitted[1])
439
    except socket.error:
440
        return None
441
    if end_packed < start_packed:
442
        return None
443
    return ipv6_range_to_list(start_packed, end_packed)
444
445
446
def target_to_hostname(target):
447
    """ Attempt to return a single hostname list from a target string. """
448
449
    if len(target) == 0 or len(target) > 255:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
450
        return None
451
    if not re.match(r'^[\w.-]+$', target):
452
        return None
453
    return [target]
454
455
456 View Code Duplication
def target_to_list(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
457
    """ Attempt to return a list of single hosts from a target string. """
458
459
    # Is it an IPv4 address ?
460
    new_list = target_to_ipv4(target)
461
    # Is it an IPv6 address ?
462
    if not new_list:
463
        new_list = target_to_ipv6(target)
464
    # Is it an IPv4 CIDR ?
465
    if not new_list:
466
        new_list = target_to_ipv4_cidr(target)
467
    # Is it an IPv6 CIDR ?
468
    if not new_list:
469
        new_list = target_to_ipv6_cidr(target)
470
    # Is it an IPv4 short-range ?
471
    if not new_list:
472
        new_list = target_to_ipv4_short(target)
473
    # Is it an IPv4 long-range ?
474
    if not new_list:
475
        new_list = target_to_ipv4_long(target)
476
    # Is it an IPv6 short-range ?
477
    if not new_list:
478
        new_list = target_to_ipv6_short(target)
479
    # Is it an IPv6 long-range ?
480
    if not new_list:
481
        new_list = target_to_ipv6_long(target)
482
    # Is it a hostname ?
483
    if not new_list:
484
        new_list = target_to_hostname(target)
485
    return new_list
486
487
488 View Code Duplication
def target_str_to_list(target_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
489
    """ Parses a targets string into a list of individual targets. """
490
    new_list = list()
491
    for target in target_str.split(','):
492
        target = target.strip()
493
        target_list = target_to_list(target)
494
        if target_list:
495
            new_list.extend(target_list)
496
        else:
497
            LOGGER.info("{0}: Invalid target value".format(target))
0 ignored issues
show
introduced by
Use formatting in logging functions and pass the parameters as arguments
Loading history...
498
            return None
499
    return list(collections.OrderedDict.fromkeys(new_list))
500
501
502
def resolve_hostname(hostname):
503
    """ Returns IP of a hostname. """
504
505
    assert hostname
506
    try:
507
        return socket.gethostbyname(hostname)
508
    except socket.gaierror:
509
        return None
510
511
512 View Code Duplication
def port_range_expand(portrange):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
513
    """
514
    Receive a port range and expands it in individual ports.
515
516
    @input Port range.
517
    e.g. "4-8"
518
519
    @return List of integers.
520
    e.g. [4, 5, 6, 7, 8]
521
    """
522
    if not portrange or '-' not in portrange:
523
        LOGGER.info("Invalid port range format")
524
        return None
525
    port_list = list()
526
    for single_port in range(int(portrange[:portrange.index('-')]),
527
                             int(portrange[portrange.index('-') + 1:]) + 1):
528
        port_list.append(single_port)
529
    return port_list
530
531 View Code Duplication
def port_str_arrange(ports):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
532
    """ Gives a str in the format (always tcp listed first).
533
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
534
    """
535
    b_tcp = ports.find("T")
536
    b_udp = ports.find("U")
537
    if (b_udp != -1 and b_tcp != -1)  and b_udp < b_tcp:
538
        return ports[b_tcp:] + ports[b_udp:b_tcp]
539
540
    return ports
541
542
543 View Code Duplication
def ports_str_check_failed(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
544
    """
545
    Check if the port string is well formed.
546
    Return True if fail, False other case.
547
    """
548
549
    pattern = r'[^TU:0-9, \-]'
550
    if (re.search(pattern, port_str) or
551
        port_str.count('T') > 1 or
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation before block (add 4 spaces).
Loading history...
552
        port_str.count('U') > 1 or
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation before block (add 4 spaces).
Loading history...
553
        port_str.count(':') < (port_str.count('T') + port_str.count('U'))):
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation before block (add 4 spaces).
Loading history...
554
        return True
555
    return False
556
557 View Code Duplication
def ports_as_list(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
558
    """
559
    Parses a ports string into two list of idividual tcp and udp ports.
560
561
    @input string containing a port list
562
    e.g. T:1,2,3,5-8 U:22,80,600-1024
563
564
    @return two list of sorted integers, for tcp and udp ports respectively.
565
    """
566
    if not port_str:
567
        LOGGER.info("Invalid port value")
568
        return [None, None]
569
570
    if ports_str_check_failed(port_str):
571
        LOGGER.info("{0}: Port list malformed.")
572
        return [None, None]
573
574
    tcp_list = list()
575
    udp_list = list()
576
    ports = port_str.replace(' ', '')
577
    b_tcp = ports.find("T")
578
    b_udp = ports.find("U")
579
580
    if ports[b_tcp - 1] == ',':
581
        ports = ports[:b_tcp - 1] + ports[b_tcp:]
582
    if ports[b_udp - 1] == ',':
583
        ports = ports[:b_udp - 1] + ports[b_udp:]
584
    ports = port_str_arrange(ports)
585
586
    tports = ''
587
    uports = ''
588
    # TCP ports listed first, then UDP ports
589
    if b_udp != -1 and b_tcp != -1:
590
        tports = ports[ports.index('T:') + 2:ports.index('U:')]
591
        uports = ports[ports.index('U:') + 2:]
592
    # Only UDP ports
593
    elif b_tcp == -1 and b_udp != -1:
594
        uports = ports[ports.index('U:') + 2:]
595
    # Only TCP ports
596
    elif b_udp == -1 and b_tcp != -1:
597
        tports = ports[ports.index('T:') + 2:]
598
    else:
599
        tports = ports
600
601
    if tports:
602
        for port in tports.split(','):
603
            if '-' in port:
604
                tcp_list.extend(port_range_expand(port))
605
            else:
606
                tcp_list.append(int(port))
607
        tcp_list.sort()
608
    if uports:
609
        for port in uports.split(','):
610
            if '-' in port:
611
                udp_list.extend(port_range_expand(port))
612
            else:
613
                udp_list.append(int(port))
614
        udp_list.sort()
615
616
    return (tcp_list, udp_list)
617
618
def get_tcp_port_list(port_str):
619
    """ Return a list with tcp ports from a given port list in string format """
620
    return ports_as_list(port_str)[0]
621
622
623
def get_udp_port_list(port_str):
624
    """ Return a list with udp ports from a given port list in string format """
625
    return ports_as_list(port_str)[1]
626
627
628 View Code Duplication
def port_list_compress(port_list):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
629
    """ Compress a port list and return a string. """
630
631
    if not port_list or len(port_list) == 0:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
632
        LOGGER.info("Invalid or empty port list.")
633
        return ''
634
635
    port_list = sorted(set(port_list))
636
    compressed_list = []
637
    for key, group in itertools.groupby(enumerate(port_list),
0 ignored issues
show
Unused Code introduced by
The variable key seems to be unused.
Loading history...
638
                                        lambda t: t[1] - t[0]):
639
        group = list(group)
640
        if group[0][1] ==  group[-1][1]:
0 ignored issues
show
Coding Style introduced by
Exactly one space required after comparison
Loading history...
641
            compressed_list.append(str(group[0][1]))
642
        else:
643
            compressed_list.append(str(group[0][1]) +
644
                                          '-' +
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (remove 7 spaces).
Loading history...
645
                                          str(group[-1][1] ))
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (remove 7 spaces).
Loading history...
Coding Style introduced by
No space allowed before bracket
Loading history...
646
647
    return ','.join(compressed_list)
648
649
def valid_uuid(value):
650
    """ Check if value is a valid UUID. """
651
652
    try:
653
        uuid.UUID(value, version=4)
654
        return True
655
    except (TypeError, ValueError, AttributeError):
656
        return False
657
658
659 View Code Duplication
def create_args_parser(description):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
660
    """ Create a command-line arguments parser for OSPD. """
661
662
    parser = argparse.ArgumentParser(description=description)
663
664
    def network_port(string):
665
        """ Check if provided string is a valid network port. """
666
667
        value = int(string)
668
        if not 0 < value <= 65535:
669
            raise argparse.ArgumentTypeError(
670
                'port must be in ]0,65535] interval')
671
        return value
672
673
    def cacert_file(cacert):
674
        """ Check if provided file is a valid CA Certificate """
675
        try:
676
            context = ssl.create_default_context(cafile=cacert)
677
        except AttributeError:
678
            # Python version < 2.7.9
679
            return cacert
680
        except IOError:
681
            raise argparse.ArgumentTypeError('CA Certificate not found')
682
        try:
683
            not_after = context.get_ca_certs()[0]['notAfter']
684
            not_after = ssl.cert_time_to_seconds(not_after)
685
            not_before = context.get_ca_certs()[0]['notBefore']
686
            not_before = ssl.cert_time_to_seconds(not_before)
687
        except (KeyError, IndexError):
688
            raise argparse.ArgumentTypeError('CA Certificate is erroneous')
689
        if not_after < int(time.time()):
690
            raise argparse.ArgumentTypeError('CA Certificate expired')
691
        if not_before > int(time.time()):
692
            raise argparse.ArgumentTypeError('CA Certificate not active yet')
693
        return cacert
694
695
    def log_level(string):
696
        """ Check if provided string is a valid log level. """
697
698
        value = getattr(logging, string.upper(), None)
699
        if not isinstance(value, int):
700
            raise argparse.ArgumentTypeError(
701
                'log level must be one of {debug,info,warning,error,critical}')
702
        return value
703
704
    def filename(string):
705
        """ Check if provided string is a valid file path. """
706
707
        if not os.path.isfile(string):
708
            raise argparse.ArgumentTypeError(
709
                '%s is not a valid file path' % string)
710
        return string
711
712
    parser.add_argument('-p', '--port', default=PORT, type=network_port,
713
                        help='TCP Port to listen on. Default: {0}'.format(PORT))
714
    parser.add_argument('-b', '--bind-address', default=ADDRESS,
715
                        help='Address to listen on. Default: {0}'
716
                        .format(ADDRESS))
717
    parser.add_argument('-u', '--unix-socket',
718
                        help='Unix file socket to listen on.')
719
    parser.add_argument('-k', '--key-file', type=filename,
720
                        help='Server key file. Default: {0}'.format(KEY_FILE))
721
    parser.add_argument('-c', '--cert-file', type=filename,
722
                        help='Server cert file. Default: {0}'.format(CERT_FILE))
723
    parser.add_argument('--ca-file', type=cacert_file,
724
                        help='CA cert file. Default: {0}'.format(CA_FILE))
725
    parser.add_argument('-L', '--log-level', default='warning', type=log_level,
726
                        help='Wished level of logging. Default: WARNING')
727
    parser.add_argument('--syslog', action='store_true',
728
                        help='Use syslog for logging.')
729
    parser.add_argument('--background', action='store_true',
730
                        help='Run in background. Implies --syslog.')
731
    parser.add_argument('--version', action='store_true',
732
                        help='Print version then exit.')
733
    return parser
734
735
736
def go_to_background():
737
    """ Daemonize the running process. """
738
    try:
739
        if os.fork():
740
            sys.exit()
741
    except OSError as errmsg:
742
        LOGGER.error('Fork failed: {0}'.format(errmsg))
0 ignored issues
show
introduced by
Use formatting in logging functions and pass the parameters as arguments
Loading history...
743
        sys.exit('Fork failed')
744
745
746 View Code Duplication
def get_common_args(parser, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
747
    """ Return list of OSPD common command-line arguments from parser, after
748
    validating provided values or setting default ones.
749
750
    """
751
752
    options = parser.parse_args(args)
753
    # TCP Port to listen on.
754
    port = options.port
755
756
    # Network address to bind listener to
757
    address = options.bind_address
758
759
    # Unix file socket to listen on
760
    unix_socket = options.unix_socket
761
762
    # Debug level.
763
    log_level = options.log_level
764
765
    # Server key path.
766
    keyfile = options.key_file or KEY_FILE
767
768
    # Server cert path.
769
    certfile = options.cert_file or CERT_FILE
770
771
    # CA cert path.
772
    cafile = options.ca_file or CA_FILE
773
774
    common_args = dict()
775
    common_args['port'] = port
776
    common_args['address'] = address
777
    common_args['unix_socket'] = unix_socket
778
    common_args['keyfile'] = keyfile
779
    common_args['certfile'] = certfile
780
    common_args['cafile'] = cafile
781
    common_args['log_level'] = log_level
782
    common_args['syslog'] = options.syslog or options.background
783
    common_args['background'] = options.background
784
    common_args['version'] = options.version
785
786
    return common_args
787
788
789 View Code Duplication
def print_version(wrapper):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
790
    """ Prints the server version and license information."""
791
792
    scanner_name = wrapper.get_scanner_name()
793
    server_version = wrapper.get_server_version()
794
    print("OSP Server for {0} version {1}".format(scanner_name, server_version))
795
    protocol_version = wrapper.get_protocol_version()
796
    print("OSP Version: {0}".format(protocol_version))
797
    daemon_name = wrapper.get_daemon_name()
798
    daemon_version = wrapper.get_daemon_version()
799
    print("Using: {0} {1}".format(daemon_name, daemon_version))
800
    print("Copyright (C) 2014, 2015 Greenbone Networks GmbH\n"
801
          "License GPLv2+: GNU GPL version 2 or later\n"
802
          "This is free software: you are free to change"
803
          " and redistribute it.\n"
804
          "There is NO WARRANTY, to the extent permitted by law.")
805
806
807 View Code Duplication
def main(name, klass):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
808
    """ OSPD Main function. """
809
810
    # Common args parser.
811
    parser = create_args_parser(name)
812
813
    # Common args
814
    cargs = get_common_args(parser)
815
    logging.getLogger().setLevel(cargs['log_level'])
816
    wrapper = klass(certfile=cargs['certfile'], keyfile=cargs['keyfile'],
817
                    cafile=cargs['cafile'])
818
819
    if cargs['version']:
820
        print_version(wrapper)
821
        sys.exit()
822
    if cargs['syslog']:
823
        syslog = logging.handlers.SysLogHandler('/dev/log')
824
        syslog.setFormatter(
825
            logging.Formatter('%(name)s: %(levelname)s: %(message)s'))
826
        logging.getLogger().addHandler(syslog)
827
        # Duplicate syslog's file descriptor to stout/stderr.
828
        syslog_fd = syslog.socket.fileno()
829
        os.dup2(syslog_fd, 1)
830
        os.dup2(syslog_fd, 2)
831
    else:
832
        console = logging.StreamHandler()
833
        console.setFormatter(
834
            logging.Formatter(
835
                '%(asctime)s %(name)s: %(levelname)s: %(message)s'))
836
        logging.getLogger().addHandler(console)
837
838
    if cargs['background']:
839
        go_to_background()
840
841
    if not wrapper.check():
842
        return 1
843
    return wrapper.run(cargs['address'], cargs['port'], cargs['unix_socket'])
844