Completed
Push — master ( 144b8b...7404f6 )
by
unknown
15s
created

ospd.misc.port_range_expand()   A

Complexity

Conditions 4

Size

Total Lines 18
Code Lines 9

Duplication

Lines 18
Ratio 100 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
nop 1
dl 18
loc 18
rs 9.95
c 0
b 0
f 0
1
# $Id$
2
# Description:
3
# Miscellaneous classes and functions related to OSPD.
4
#
5
# Authors:
6
# Hani Benhabiles <[email protected]>
7
#
8
# Copyright:
9
# Copyright (C) 2014 Greenbone Networks GmbH
10
#
11
# This program is free software; you can redistribute it and/or
12
# modify it under the terms of the GNU General Public License
13
# as published by the Free Software Foundation; either version 2
14
# of the License, or (at your option) any later version.
15
#
16
# This program is distributed in the hope that it will be useful,
17
# but WITHOUT ANY WARRANTY; without even the implied warranty of
18
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19
# GNU General Public License for more details.
20
#
21
# You should have received a copy of the GNU General Public License
22
# along with this program; if not, write to the Free Software
23
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
24
#
25
""" Miscellaneous functions and utilities related to OSPD. """
26
27
# Needed to say that when we import ospd, we mean the package and not the
28
# module in that directory.
29
from __future__ import absolute_import
30
from __future__ import print_function
31
32
import argparse
33
import binascii
34
import collections
35
import logging
36
import logging.handlers
37
import os
38
import re
39
import socket
40
import struct
41
import sys
42
import time
43
import ssl
44
import uuid
45
import multiprocessing
46
import itertools
47
48
LOGGER = logging.getLogger(__name__)
49
50
# Default file locations as used by a OpenVAS default installation
51
KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
52
CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
53
CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
54
55
PORT = 1234
56
ADDRESS = "0.0.0.0"
57
58
59 View Code Duplication
class ScanCollection(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
60
61
    """ Scans collection, managing scans and results read and write, exposing
62
    only needed information.
63
64
    Each scan has meta-information such as scan ID, current progress (from 0 to
65
    100), start time, end time, scan target and options and a list of results.
66
67
    There are 4 types of results: Alarms, Logs, Errors and Host Details.
68
69
    Todo:
70
    - Better checking for Scan ID existence and handling otherwise.
71
    - More data validation.
72
    - Mutex access per table/scan_info.
73
74
    """
75
76
    def __init__(self):
77
        """ Initialize the Scan Collection. """
78
79
        self.data_manager = None
80
        self.scans_table = dict()
81
82
    def add_result(self, scan_id, result_type, host='', name='', value='',
0 ignored issues
show
best-practice introduced by
Too many arguments (10/5)
Loading history...
83
                   port='', test_id='', severity='', qod=''):
84
        """ Add a result to a scan in the table. """
85
86
        assert scan_id
87
        assert len(name) or len(value)
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
88
        result = dict()
89
        result['type'] = result_type
90
        result['name'] = name
91
        result['severity'] = severity
92
        result['test_id'] = test_id
93
        result['value'] = value
94
        result['host'] = host
95
        result['port'] = port
96
        result['qod'] = qod
97
        results = self.scans_table[scan_id]['results']
98
        results.append(result)
99
        # Set scan_info's results to propagate results to parent process.
100
        self.scans_table[scan_id]['results'] = results
101
102
    def set_progress(self, scan_id, progress):
103
        """ Sets scan_id scan's progress. """
104
105
        if progress > 0 and progress <= 100:
106
            self.scans_table[scan_id]['progress'] = progress
107
        if progress == 100:
108
            self.scans_table[scan_id]['end_time'] = int(time.time())
109
110
    def set_target_progress(self, scan_id, target, progress):
111
        """ Sets scan_id scan's progress. """
112
        if progress > 0 and progress <= 100:
113
            target_process = dict()
114
            target_process = self.scans_table[scan_id]['target_progress']
115
            target_process[target] = progress
116
            # Set scan_info's target_progress to propagate progresses
117
            # to parent process.
118
            self.scans_table[scan_id]['target_progress'] = target_process
119
120
    def results_iterator(self, scan_id, pop_res):
121
        """ Returns an iterator over scan_id scan's results. If pop_res is True,
122
        it removed the fetched results from the list.
123
        """
124
        if pop_res:
125
            result_aux = self.scans_table[scan_id]['results']
126
            self.scans_table[scan_id]['results'] = list()
127
            return iter(result_aux)
128
129
        return iter(self.scans_table[scan_id]['results'])
130
131
    def ids_iterator(self):
132
        """ Returns an iterator over the collection's scan IDS. """
133
134
        return iter(self.scans_table.keys())
135
136
    def create_scan(self, scan_id='', targets='', target_str=None,
0 ignored issues
show
Bug Best Practice introduced by
The default value dict() (builtins.dict) might cause unintended side-effects.

Objects as default values are only created once in Python and not on each invocation of the function. If the default object is modified, this modification is carried over to the next invocation of the method.

# Bad:
# If array_param is modified inside the function, the next invocation will
# receive the modified object.
def some_function(array_param=[]):
    # ...

# Better: Create an array on each invocation
def some_function(array_param=None):
    array_param = array_param or []
    # ...
Loading history...
best-practice introduced by
Too many arguments (6/5)
Loading history...
137
                    options=dict(), vts=''):
138
        """ Creates a new scan with provided scan information. """
139
140
        if self.data_manager is None:
141
            self.data_manager = multiprocessing.Manager()
142
        scan_info = self.data_manager.dict()
143
        scan_info['results'] = list()
144
        scan_info['progress'] = 0
145
        scan_info['target_progress'] = dict([[elem[0], 0] for elem in targets])
146
        scan_info['targets'] = targets
147
        scan_info['legacy_target'] = target_str
148
        scan_info['vts'] = vts
149
        scan_info['options'] = options
150
        scan_info['start_time'] = int(time.time())
151
        scan_info['end_time'] = "0"
152
        if scan_id is None or scan_id == '':
153
            scan_id = str(uuid.uuid4())
154
        scan_info['scan_id'] = scan_id
155
        self.scans_table[scan_id] = scan_info
156
        return scan_id
157
158
    def get_options(self, scan_id):
159
        """ Get scan_id scan's options list. """
160
161
        return self.scans_table[scan_id]['options']
162
163
    def set_option(self, scan_id, name, value):
164
        """ Set a scan_id scan's name option to value. """
165
166
        self.scans_table[scan_id]['options'][name] = value
167
168
    def get_progress(self, scan_id):
169
        """ Get a scan's current progress value. """
170
171
        return self.scans_table[scan_id]['progress']
172
173
    def get_target_progress(self, scan_id):
174
        """ Get a scan's current progress value. """
175
176
        return self.scans_table[scan_id]['target_progress']
177
178
    def get_start_time(self, scan_id):
179
        """ Get a scan's start time. """
180
181
        return self.scans_table[scan_id]['start_time']
182
183
    def get_end_time(self, scan_id):
184
        """ Get a scan's end time. """
185
186
        return self.scans_table[scan_id]['end_time']
187
188
    def get_target(self, scan_id):
189
        """ Get a scan's target list. """
190
        if self.scans_table[scan_id]['legacy_target']:
191
            return self.scans_table[scan_id]['legacy_target']
192
193
        target_list=[]
0 ignored issues
show
Coding Style introduced by
Exactly one space required around assignment
Loading history...
194
        for item in self.scans_table[scan_id]['targets']:
195
            target_list.append(item[0])
196
        separ = ','
197
        return separ.join(target_list)
198
199
    def get_ports(self, scan_id, target):
200
        """ Get a scan's ports list. If a target is specified
201
        it will return the corresponding port for it. If not,
202
        it returns the port item of the first nested list in
203
        the target's list.
204
        """
205
        if target:
206
            for item in self.scans_table[scan_id]['targets']:
207
                if target == item[0]:
208
                    return item[1]
209
210
        return self.scans_table[scan_id]['targets'][0][1]
211
212
    def get_credentials(self, scan_id, target):
213
        """ Get a scan's credential list. It return dictionary with
214
        the corresponding credential for a given target.
215
        """
216
        if target:
217
            for item in self.scans_table[scan_id]['targets']:
218
                if target == item[0]:
219
                    return item[2]
220
221
    def get_vts(self, scan_id):
222
        """ Get a scan's vts list. """
223
224
        return self.scans_table[scan_id]['vts']
225
226
    def id_exists(self, scan_id):
227
        """ Check whether a scan exists in the table. """
228
229
        return self.scans_table.get(scan_id) is not None
230
231
    def delete_scan(self, scan_id):
232
        """ Delete a scan if fully finished. """
233
234
        if self.get_progress(scan_id) < 100:
235
            return False
236
        self.scans_table.pop(scan_id)
237
        if len(self.scans_table) == 0:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
238
            del self.data_manager
239
            self.data_manager = None
240
        return True
241
242
243 View Code Duplication
class ResultType(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
Unused Code introduced by
The variable __class__ seems to be unused.
Loading history...
244
245
    """ Various scan results types values. """
246
247
    ALARM = 0
248
    LOG = 1
249
    ERROR = 2
250
    HOST_DETAIL = 3
251
252
    @classmethod
253
    def get_str(cls, result_type):
254
        """ Return string name of a result type. """
255
        if result_type == cls.ALARM:
256
            return "Alarm"
257
        elif result_type == cls.LOG:
258
            return "Log Message"
259
        elif result_type == cls.ERROR:
260
            return "Error Message"
261
        elif result_type == cls.HOST_DETAIL:
262
            return "Host Detail"
263
        else:
264
            assert False, "Erroneous result type {0}.".format(result_type)
265
266
    @classmethod
267
    def get_type(cls, result_name):
268
        """ Return string name of a result type. """
269
        if result_name == "Alarm":
270
            return cls.ALARM
271
        elif result_name == "Log Message":
272
            return cls.LOG
273
        elif result_name == "Error Message":
274
            return cls.ERROR
275
        elif result_name == "Host Detail":
276
            return cls.HOST_DETAIL
277
        else:
278
            assert False, "Erroneous result name {0}.".format(result_name)
279
280
__inet_pton = None
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_pton does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
281 View Code Duplication
def inet_pton(address_family, ip_string):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
282
    """ A platform independent version of inet_pton """
283
    global __inet_pton
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_pton does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
284
    if __inet_pton is None:
285
        if hasattr(socket, 'inet_pton'):
286
            __inet_pton = socket.inet_pton
287
        else:
288
            from ospd import win_socket
289
            __inet_pton = win_socket.inet_pton
290
291
    return __inet_pton(address_family, ip_string)
292
293
__inet_ntop = None
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_ntop does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
294 View Code Duplication
def inet_ntop(address_family, packed_ip):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
295
    """ A platform independent version of inet_ntop """
296
    global __inet_ntop
0 ignored issues
show
Coding Style Naming introduced by
The name __inet_ntop does not conform to the constant naming conventions ((([A-Z_][A-Z0-9_]*)|(__.*__))$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
Coding Style introduced by
Usage of the global statement should be avoided.

Usage of global can make code hard to read and test, its usage is generally not recommended unless you are dealing with legacy code.

Loading history...
297
    if __inet_ntop is None:
298
        if hasattr(socket, 'inet_ntop'):
299
            __inet_ntop = socket.inet_ntop
300
        else:
301
            from ospd import win_socket
302
            __inet_ntop = win_socket.inet_ntop
303
304
    return __inet_ntop(address_family, packed_ip)
305
306
307
def target_to_ipv4(target):
308
    """ Attempt to return a single IPv4 host list from a target string. """
309
310
    try:
311
        inet_pton(socket.AF_INET, target)
312
        return [target]
313
    except socket.error:
314
        return None
315
316
317
def target_to_ipv6(target):
318
    """ Attempt to return a single IPv6 host list from a target string. """
319
320
    try:
321
        inet_pton(socket.AF_INET6, target)
322
        return [target]
323
    except socket.error:
324
        return None
325
326
327 View Code Duplication
def ipv4_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
328
    """ Return a list of IPv4 entries from start_packed to end_packed. """
329
330
    new_list = list()
331
    start = struct.unpack('!L', start_packed)[0]
332
    end = struct.unpack('!L', end_packed)[0]
333
    for value in range(start, end + 1):
334
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
335
        new_list.append(new_ip)
336
    return new_list
337
338
339 View Code Duplication
def target_to_ipv4_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
340
    """ Attempt to return a IPv4 short range list from a target string. """
341
342
    splitted = target.split('-')
343
    if len(splitted) != 2:
344
        return None
345
    try:
346
        start_packed = inet_pton(socket.AF_INET, splitted[0])
347
        end_value = int(splitted[1])
348
    except (socket.error, ValueError):
349
        return None
350
    start_value = int(binascii.hexlify(start_packed[3]), 16)
351
    if end_value < 0 or end_value > 255 or end_value < start_value:
352
        return None
353
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
354
    return ipv4_range_to_list(start_packed, end_packed)
355
356
357 View Code Duplication
def target_to_ipv4_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
358
    """ Attempt to return a IPv4 CIDR list from a target string. """
359
360
    splitted = target.split('/')
361
    if len(splitted) != 2:
362
        return None
363
    try:
364
        start_packed = inet_pton(socket.AF_INET, splitted[0])
365
        block = int(splitted[1])
366
    except (socket.error, ValueError):
367
        return None
368
    if block <= 0 or block > 30:
369
        return None
370
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
371
    start_value = (start_value << (32 - block)) + 1
372
    end_value = (start_value | (0xffffffff >> block)) - 1
373
    start_packed = struct.pack('!I', start_value)
374
    end_packed = struct.pack('!I', end_value)
375
    return ipv4_range_to_list(start_packed, end_packed)
376
377
378 View Code Duplication
def target_to_ipv6_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
379
    """ Attempt to return a IPv6 CIDR list from a target string. """
380
381
    splitted = target.split('/')
382
    if len(splitted) != 2:
383
        return None
384
    try:
385
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
386
        block = int(splitted[1])
387
    except (socket.error, ValueError):
388
        return None
389
    if block <= 0 or block > 126:
390
        return None
391
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
392
    start_value = (start_value << (128 - block)) + 1
393
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
394
    high = start_value >> 64
395
    low = start_value & ((1 << 64) - 1)
396
    start_packed = struct.pack('!QQ', high, low)
397
    high = end_value >> 64
398
    low = end_value & ((1 << 64) - 1)
399
    end_packed = struct.pack('!QQ', high, low)
400
    return ipv6_range_to_list(start_packed, end_packed)
401
402
403 View Code Duplication
def target_to_ipv4_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
404
    """ Attempt to return a IPv4 long-range list from a target string. """
405
406
    splitted = target.split('-')
407
    if len(splitted) != 2:
408
        return None
409
    try:
410
        start_packed = inet_pton(socket.AF_INET, splitted[0])
411
        end_packed = inet_pton(socket.AF_INET, splitted[1])
412
    except socket.error:
413
        return None
414
    if end_packed < start_packed:
415
        return None
416
    return ipv4_range_to_list(start_packed, end_packed)
417
418
419 View Code Duplication
def ipv6_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
420
    """ Return a list of IPv6 entries from start_packed to end_packed. """
421
422
    new_list = list()
423
    start = int(binascii.hexlify(start_packed), 16)
424
    end = int(binascii.hexlify(end_packed), 16)
425
    for value in range(start, end + 1):
426
        high = value >> 64
427
        low = value & ((1 << 64) - 1)
428
        new_ip = inet_ntop(socket.AF_INET6,
429
                           struct.pack('!2Q', high, low))
430
        new_list.append(new_ip)
431
    return new_list
432
433
434 View Code Duplication
def target_to_ipv6_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
435
    """ Attempt to return a IPv6 short-range list from a target string. """
436
437
    splitted = target.split('-')
438
    if len(splitted) != 2:
439
        return None
440
    try:
441
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
442
        end_value = int(splitted[1], 16)
443
    except (socket.error, ValueError):
444
        return None
445
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
446
    if end_value < 0 or end_value > 0xffff or end_value < start_value:
447
        return None
448
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
449
    return ipv6_range_to_list(start_packed, end_packed)
450
451
452 View Code Duplication
def target_to_ipv6_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
453
    """ Attempt to return a IPv6 long-range list from a target string. """
454
455
    splitted = target.split('-')
456
    if len(splitted) != 2:
457
        return None
458
    try:
459
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
460
        end_packed = inet_pton(socket.AF_INET6, splitted[1])
461
    except socket.error:
462
        return None
463
    if end_packed < start_packed:
464
        return None
465
    return ipv6_range_to_list(start_packed, end_packed)
466
467
468
def target_to_hostname(target):
469
    """ Attempt to return a single hostname list from a target string. """
470
471
    if len(target) == 0 or len(target) > 255:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
472
        return None
473
    if not re.match(r'^[\w.-]+$', target):
474
        return None
475
    return [target]
476
477
478 View Code Duplication
def target_to_list(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
479
    """ Attempt to return a list of single hosts from a target string. """
480
481
    # Is it an IPv4 address ?
482
    new_list = target_to_ipv4(target)
483
    # Is it an IPv6 address ?
484
    if not new_list:
485
        new_list = target_to_ipv6(target)
486
    # Is it an IPv4 CIDR ?
487
    if not new_list:
488
        new_list = target_to_ipv4_cidr(target)
489
    # Is it an IPv6 CIDR ?
490
    if not new_list:
491
        new_list = target_to_ipv6_cidr(target)
492
    # Is it an IPv4 short-range ?
493
    if not new_list:
494
        new_list = target_to_ipv4_short(target)
495
    # Is it an IPv4 long-range ?
496
    if not new_list:
497
        new_list = target_to_ipv4_long(target)
498
    # Is it an IPv6 short-range ?
499
    if not new_list:
500
        new_list = target_to_ipv6_short(target)
501
    # Is it an IPv6 long-range ?
502
    if not new_list:
503
        new_list = target_to_ipv6_long(target)
504
    # Is it a hostname ?
505
    if not new_list:
506
        new_list = target_to_hostname(target)
507
    return new_list
508
509
510 View Code Duplication
def target_str_to_list(target_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
511
    """ Parses a targets string into a list of individual targets. """
512
    new_list = list()
513
    for target in target_str.split(','):
514
        target = target.strip()
515
        target_list = target_to_list(target)
516
        if target_list:
517
            new_list.extend(target_list)
518
        else:
519
            LOGGER.info("{0}: Invalid target value".format(target))
0 ignored issues
show
introduced by
Use formatting in logging functions and pass the parameters as arguments
Loading history...
520
            return None
521
    return list(collections.OrderedDict.fromkeys(new_list))
522
523
524
def resolve_hostname(hostname):
525
    """ Returns IP of a hostname. """
526
527
    assert hostname
528
    try:
529
        return socket.gethostbyname(hostname)
530
    except socket.gaierror:
531
        return None
532
533
534 View Code Duplication
def port_range_expand(portrange):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
535
    """
536
    Receive a port range and expands it in individual ports.
537
538
    @input Port range.
539
    e.g. "4-8"
540
541
    @return List of integers.
542
    e.g. [4, 5, 6, 7, 8]
543
    """
544
    if not portrange or '-' not in portrange:
545
        LOGGER.info("Invalid port range format")
546
        return None
547
    port_list = list()
548
    for single_port in range(int(portrange[:portrange.index('-')]),
549
                             int(portrange[portrange.index('-') + 1:]) + 1):
550
        port_list.append(single_port)
551
    return port_list
552
553 View Code Duplication
def port_str_arrange(ports):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
554
    """ Gives a str in the format (always tcp listed first).
555
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
556
    """
557
    b_tcp = ports.find("T")
558
    b_udp = ports.find("U")
559
    if (b_udp != -1 and b_tcp != -1)  and b_udp < b_tcp:
560
        return ports[b_tcp:] + ports[b_udp:b_tcp]
561
562
    return ports
563
564
565 View Code Duplication
def ports_str_check_failed(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
566
    """
567
    Check if the port string is well formed.
568
    Return True if fail, False other case.
569
    """
570
571
    pattern = r'[^TU:0-9, \-]'
572
    if (re.search(pattern, port_str) or
573
        port_str.count('T') > 1 or
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation before block (add 4 spaces).
Loading history...
574
        port_str.count('U') > 1 or
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation before block (add 4 spaces).
Loading history...
575
        port_str.count(':') < (port_str.count('T') + port_str.count('U'))):
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation before block (add 4 spaces).
Loading history...
576
        return True
577
    return False
578
579 View Code Duplication
def ports_as_list(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
580
    """
581
    Parses a ports string into two list of idividual tcp and udp ports.
582
583
    @input string containing a port list
584
    e.g. T:1,2,3,5-8 U:22,80,600-1024
585
586
    @return two list of sorted integers, for tcp and udp ports respectively.
587
    """
588
    if not port_str:
589
        LOGGER.info("Invalid port value")
590
        return [None, None]
591
592
    if ports_str_check_failed(port_str):
593
        LOGGER.info("{0}: Port list malformed.")
594
        return [None, None]
595
596
    tcp_list = list()
597
    udp_list = list()
598
    ports = port_str.replace(' ', '')
599
    b_tcp = ports.find("T")
600
    b_udp = ports.find("U")
601
602
    if ports[b_tcp - 1] == ',':
603
        ports = ports[:b_tcp - 1] + ports[b_tcp:]
604
    if ports[b_udp - 1] == ',':
605
        ports = ports[:b_udp - 1] + ports[b_udp:]
606
    ports = port_str_arrange(ports)
607
608
    tports = ''
609
    uports = ''
610
    # TCP ports listed first, then UDP ports
611
    if b_udp != -1 and b_tcp != -1:
612
        tports = ports[ports.index('T:') + 2:ports.index('U:')]
613
        uports = ports[ports.index('U:') + 2:]
614
    # Only UDP ports
615
    elif b_tcp == -1 and b_udp != -1:
616
        uports = ports[ports.index('U:') + 2:]
617
    # Only TCP ports
618
    elif b_udp == -1 and b_tcp != -1:
619
        tports = ports[ports.index('T:') + 2:]
620
    else:
621
        tports = ports
622
623
    if tports:
624
        for port in tports.split(','):
625
            if '-' in port:
626
                tcp_list.extend(port_range_expand(port))
627
            else:
628
                tcp_list.append(int(port))
629
        tcp_list.sort()
630
    if uports:
631
        for port in uports.split(','):
632
            if '-' in port:
633
                udp_list.extend(port_range_expand(port))
634
            else:
635
                udp_list.append(int(port))
636
        udp_list.sort()
637
638
    return (tcp_list, udp_list)
639
640
def get_tcp_port_list(port_str):
641
    """ Return a list with tcp ports from a given port list in string format """
642
    return ports_as_list(port_str)[0]
643
644
645
def get_udp_port_list(port_str):
646
    """ Return a list with udp ports from a given port list in string format """
647
    return ports_as_list(port_str)[1]
648
649
650 View Code Duplication
def port_list_compress(port_list):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
651
    """ Compress a port list and return a string. """
652
653
    if not port_list or len(port_list) == 0:
0 ignored issues
show
Unused Code introduced by
Do not use len(SEQUENCE) as condition value
Loading history...
654
        LOGGER.info("Invalid or empty port list.")
655
        return ''
656
657
    port_list = sorted(set(port_list))
658
    compressed_list = []
659
    for key, group in itertools.groupby(enumerate(port_list),
0 ignored issues
show
Unused Code introduced by
The variable key seems to be unused.
Loading history...
660
                                        lambda t: t[1] - t[0]):
661
        group = list(group)
662
        if group[0][1] ==  group[-1][1]:
0 ignored issues
show
Coding Style introduced by
Exactly one space required after comparison
Loading history...
663
            compressed_list.append(str(group[0][1]))
664
        else:
665
            compressed_list.append(str(group[0][1]) +
666
                                          '-' +
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (remove 7 spaces).
Loading history...
667
                                          str(group[-1][1] ))
0 ignored issues
show
Coding Style introduced by
Wrong continued indentation (remove 7 spaces).
Loading history...
Coding Style introduced by
No space allowed before bracket
Loading history...
668
669
    return ','.join(compressed_list)
670
671
def valid_uuid(value):
672
    """ Check if value is a valid UUID. """
673
674
    try:
675
        uuid.UUID(value, version=4)
676
        return True
677
    except (TypeError, ValueError, AttributeError):
678
        return False
679
680
681 View Code Duplication
def create_args_parser(description):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
682
    """ Create a command-line arguments parser for OSPD. """
683
684
    parser = argparse.ArgumentParser(description=description)
685
686
    def network_port(string):
687
        """ Check if provided string is a valid network port. """
688
689
        value = int(string)
690
        if not 0 < value <= 65535:
691
            raise argparse.ArgumentTypeError(
692
                'port must be in ]0,65535] interval')
693
        return value
694
695
    def cacert_file(cacert):
696
        """ Check if provided file is a valid CA Certificate """
697
        try:
698
            context = ssl.create_default_context(cafile=cacert)
699
        except AttributeError:
700
            # Python version < 2.7.9
701
            return cacert
702
        except IOError:
703
            raise argparse.ArgumentTypeError('CA Certificate not found')
704
        try:
705
            not_after = context.get_ca_certs()[0]['notAfter']
706
            not_after = ssl.cert_time_to_seconds(not_after)
707
            not_before = context.get_ca_certs()[0]['notBefore']
708
            not_before = ssl.cert_time_to_seconds(not_before)
709
        except (KeyError, IndexError):
710
            raise argparse.ArgumentTypeError('CA Certificate is erroneous')
711
        if not_after < int(time.time()):
712
            raise argparse.ArgumentTypeError('CA Certificate expired')
713
        if not_before > int(time.time()):
714
            raise argparse.ArgumentTypeError('CA Certificate not active yet')
715
        return cacert
716
717
    def log_level(string):
718
        """ Check if provided string is a valid log level. """
719
720
        value = getattr(logging, string.upper(), None)
721
        if not isinstance(value, int):
722
            raise argparse.ArgumentTypeError(
723
                'log level must be one of {debug,info,warning,error,critical}')
724
        return value
725
726
    def filename(string):
727
        """ Check if provided string is a valid file path. """
728
729
        if not os.path.isfile(string):
730
            raise argparse.ArgumentTypeError(
731
                '%s is not a valid file path' % string)
732
        return string
733
734
    parser.add_argument('-p', '--port', default=PORT, type=network_port,
735
                        help='TCP Port to listen on. Default: {0}'.format(PORT))
736
    parser.add_argument('-b', '--bind-address', default=ADDRESS,
737
                        help='Address to listen on. Default: {0}'
738
                        .format(ADDRESS))
739
    parser.add_argument('-u', '--unix-socket',
740
                        help='Unix file socket to listen on.')
741
    parser.add_argument('-k', '--key-file', type=filename,
742
                        help='Server key file. Default: {0}'.format(KEY_FILE))
743
    parser.add_argument('-c', '--cert-file', type=filename,
744
                        help='Server cert file. Default: {0}'.format(CERT_FILE))
745
    parser.add_argument('--ca-file', type=cacert_file,
746
                        help='CA cert file. Default: {0}'.format(CA_FILE))
747
    parser.add_argument('-L', '--log-level', default='warning', type=log_level,
748
                        help='Wished level of logging. Default: WARNING')
749
    parser.add_argument('--syslog', action='store_true',
750
                        help='Use syslog for logging.')
751
    parser.add_argument('--background', action='store_true',
752
                        help='Run in background. Implies --syslog.')
753
    parser.add_argument('--version', action='store_true',
754
                        help='Print version then exit.')
755
    return parser
756
757
758
def go_to_background():
759
    """ Daemonize the running process. """
760
    try:
761
        if os.fork():
762
            sys.exit()
763
    except OSError as errmsg:
764
        LOGGER.error('Fork failed: {0}'.format(errmsg))
0 ignored issues
show
introduced by
Use formatting in logging functions and pass the parameters as arguments
Loading history...
765
        sys.exit('Fork failed')
766
767
768 View Code Duplication
def get_common_args(parser, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
769
    """ Return list of OSPD common command-line arguments from parser, after
770
    validating provided values or setting default ones.
771
772
    """
773
774
    options = parser.parse_args(args)
775
    # TCP Port to listen on.
776
    port = options.port
777
778
    # Network address to bind listener to
779
    address = options.bind_address
780
781
    # Unix file socket to listen on
782
    unix_socket = options.unix_socket
783
784
    # Debug level.
785
    log_level = options.log_level
786
787
    # Server key path.
788
    keyfile = options.key_file or KEY_FILE
789
790
    # Server cert path.
791
    certfile = options.cert_file or CERT_FILE
792
793
    # CA cert path.
794
    cafile = options.ca_file or CA_FILE
795
796
    common_args = dict()
797
    common_args['port'] = port
798
    common_args['address'] = address
799
    common_args['unix_socket'] = unix_socket
800
    common_args['keyfile'] = keyfile
801
    common_args['certfile'] = certfile
802
    common_args['cafile'] = cafile
803
    common_args['log_level'] = log_level
804
    common_args['syslog'] = options.syslog or options.background
805
    common_args['background'] = options.background
806
    common_args['version'] = options.version
807
808
    return common_args
809
810
811 View Code Duplication
def print_version(wrapper):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
812
    """ Prints the server version and license information."""
813
814
    scanner_name = wrapper.get_scanner_name()
815
    server_version = wrapper.get_server_version()
816
    print("OSP Server for {0} version {1}".format(scanner_name, server_version))
817
    protocol_version = wrapper.get_protocol_version()
818
    print("OSP Version: {0}".format(protocol_version))
819
    daemon_name = wrapper.get_daemon_name()
820
    daemon_version = wrapper.get_daemon_version()
821
    print("Using: {0} {1}".format(daemon_name, daemon_version))
822
    print("Copyright (C) 2014, 2015 Greenbone Networks GmbH\n"
823
          "License GPLv2+: GNU GPL version 2 or later\n"
824
          "This is free software: you are free to change"
825
          " and redistribute it.\n"
826
          "There is NO WARRANTY, to the extent permitted by law.")
827
828
829 View Code Duplication
def main(name, klass):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
830
    """ OSPD Main function. """
831
832
    # Common args parser.
833
    parser = create_args_parser(name)
834
835
    # Common args
836
    cargs = get_common_args(parser)
837
    logging.getLogger().setLevel(cargs['log_level'])
838
    wrapper = klass(certfile=cargs['certfile'], keyfile=cargs['keyfile'],
839
                    cafile=cargs['cafile'])
840
841
    if cargs['version']:
842
        print_version(wrapper)
843
        sys.exit()
844
    if cargs['syslog']:
845
        syslog = logging.handlers.SysLogHandler('/dev/log')
846
        syslog.setFormatter(
847
            logging.Formatter('%(name)s: %(levelname)s: %(message)s'))
848
        logging.getLogger().addHandler(syslog)
849
        # Duplicate syslog's file descriptor to stout/stderr.
850
        syslog_fd = syslog.socket.fileno()
851
        os.dup2(syslog_fd, 1)
852
        os.dup2(syslog_fd, 2)
853
    else:
854
        console = logging.StreamHandler()
855
        console.setFormatter(
856
            logging.Formatter(
857
                '%(asctime)s %(name)s: %(levelname)s: %(message)s'))
858
        logging.getLogger().addHandler(console)
859
860
    if cargs['background']:
861
        go_to_background()
862
863
    if not wrapper.check():
864
        return 1
865
    return wrapper.run(cargs['address'], cargs['port'], cargs['unix_socket'])
866