Completed
Push — master ( cef773...93c828 )
by Juan José
13s queued 11s
created

ospd.misc.inet_pton()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 8

Duplication

Lines 11
Ratio 100 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 11
loc 11
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Miscellaneous classes and functions related to OSPD.
20
"""
21
22
23
# Needed to say that when we import ospd, we mean the package and not the
24
# module in that directory.
25
from __future__ import absolute_import
26
from __future__ import print_function
27
28
import argparse
29
import binascii
30
import collections
31
import logging
32
import logging.handlers
33
import os
34
import re
35
import socket
36
import struct
37
import sys
38
import time
39
import ssl
40
import uuid
41
import multiprocessing
42
import itertools
43
from enum import Enum
44
45
LOGGER = logging.getLogger(__name__)
46
47
# Default file locations as used by a OpenVAS default installation
48
KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
49
CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
50
CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
51
52
PORT = 1234
53
ADDRESS = "0.0.0.0"
54
55
class ScanStatus(Enum):
56
    """Scan status. """
57
    INIT = 0
58
    RUNNING = 1
59
    STOPPED = 2
60
    FINISHED = 3
61
62
63
64 View Code Duplication
class ScanCollection(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
65
66
    """ Scans collection, managing scans and results read and write, exposing
67
    only needed information.
68
69
    Each scan has meta-information such as scan ID, current progress (from 0 to
70
    100), start time, end time, scan target and options and a list of results.
71
72
    There are 4 types of results: Alarms, Logs, Errors and Host Details.
73
74
    Todo:
75
    - Better checking for Scan ID existence and handling otherwise.
76
    - More data validation.
77
    - Mutex access per table/scan_info.
78
79
    """
80
81
    def __init__(self):
82
        """ Initialize the Scan Collection. """
83
84
        self.data_manager = None
85
        self.scans_table = dict()
86
87
    def add_result(self, scan_id, result_type, host='', name='', value='',
88
                   port='', test_id='', severity='', qod=''):
89
        """ Add a result to a scan in the table. """
90
91
        assert scan_id
92
        assert len(name) or len(value)
93
        result = dict()
94
        result['type'] = result_type
95
        result['name'] = name
96
        result['severity'] = severity
97
        result['test_id'] = test_id
98
        result['value'] = value
99
        result['host'] = host
100
        result['port'] = port
101
        result['qod'] = qod
102
        results = self.scans_table[scan_id]['results']
103
        results.append(result)
104
        # Set scan_info's results to propagate results to parent process.
105
        self.scans_table[scan_id]['results'] = results
106
107
    def set_progress(self, scan_id, progress):
108
        """ Sets scan_id scan's progress. """
109
110
        if progress > 0 and progress <= 100:
111
            self.scans_table[scan_id]['progress'] = progress
112
        if progress == 100:
113
            self.scans_table[scan_id]['end_time'] = int(time.time())
114
115
    def set_target_progress(self, scan_id, target, host, progress):
116
        """ Sets scan_id scan's progress. """
117
        if progress > 0 and progress <= 100:
118
            targets =  self.scans_table[scan_id]['target_progress']
119
            targets[target][host] = progress
120
            # Set scan_info's target_progress to propagate progresses
121
            # to parent process.
122
            self.scans_table[scan_id]['target_progress'] = targets
123
124
    def set_host_finished(self, scan_id, target, host):
125
        """ Add the host in a list of finished hosts """
126
        finished_hosts = self.scans_table[scan_id]['finished_hosts']
127
        finished_hosts[target].append(host)
128
        self.scans_table[scan_id]['finished_hosts'] = finished_hosts
129
130
    def get_hosts_unfinished(self, scan_id):
131
        """ Get a list of unfinished hosts."""
132
133
        unfinished_hosts = list()
134
        for target in self.scans_table[scan_id]['finished_hosts']:
135
            unfinished_hosts.extend(target_str_to_list(target))
136
        for target in self.scans_table[scan_id]['finished_hosts']:
137
           for host in self.scans_table[scan_id]['finished_hosts'][target]:
138
               unfinished_hosts.remove(host)
139
140
        return unfinished_hosts
141
142
    def get_hosts_finished(self, scan_id):
143
        """ Get a list of finished hosts."""
144
145
        finished_hosts = list()
146
        for target in self.scans_table[scan_id]['finished_hosts']:
147
            finished_hosts.extend(
148
                self.scans_table[scan_id]['finished_hosts'].get(target))
149
150
        return finished_hosts
151
152
    def results_iterator(self, scan_id, pop_res):
153
        """ Returns an iterator over scan_id scan's results. If pop_res is True,
154
        it removed the fetched results from the list.
155
        """
156
        if pop_res:
157
            result_aux = self.scans_table[scan_id]['results']
158
            self.scans_table[scan_id]['results'] = list()
159
            return iter(result_aux)
160
161
        return iter(self.scans_table[scan_id]['results'])
162
163
    def ids_iterator(self):
164
        """ Returns an iterator over the collection's scan IDS. """
165
166
        return iter(self.scans_table.keys())
167
168
    def remove_single_result(self, scan_id, result):
169
        """Removes a single result from the result list in scan_table.
170
171
        Parameters:
172
            scan_id (uuid): Scan ID to identify the scan process to be resumed.
173
            result (dict): The result to be removed from the results list.
174
        """
175
        results = self.scans_table[scan_id]['results']
176
        results.remove(result)
177
        self.scans_table[scan_id]['results'] = results
178
179
    def del_results_for_stopped_hosts(self, scan_id):
180
        """ Remove results from the result table for those host
181
        """
182
        unfinished_hosts = self.get_hosts_unfinished(scan_id)
183
        for result in self.results_iterator(scan_id, False):
184
            if result['host'] in unfinished_hosts:
185
                self.remove_single_result(scan_id, result)
186
187
    def resume_scan(self, scan_id, options):
188
        """ Reset the scan status in the scan_table to INIT.
189
        Also, overwrite the options, because a resume task cmd
190
        can add some new option. E.g. exclude hosts list.
191
        Parameters:
192
            scan_id (uuid): Scan ID to identify the scan process to be resumed.
193
            options (dict): Options for the scan to be resumed. This options
194
                            are not added to the already existent ones.
195
                            The old ones are removed
196
197
        Return:
198
            Scan ID which identifies the current scan.
199
        """
200
        self.scans_table[scan_id]['status'] = ScanStatus.INIT
201
        if options:
202
            self.scans_table[scan_id]['options'] = options
203
204
        self.del_results_for_stopped_hosts(scan_id)
205
206
        return scan_id
207
208
    def create_scan(self, scan_id='', targets='', options=None, vts=''):
209
        """ Creates a new scan with provided scan information. """
210
211
        if self.data_manager is None:
212
            self.data_manager = multiprocessing.Manager()
213
214
        # Check if it is possible to resume task. To avoid to resume, the
215
        # scan must be deleted from the scans_table.
216
        if scan_id and self.id_exists(scan_id) and (
217
                self.get_status(scan_id) == ScanStatus.STOPPED):
218
            return self.resume_scan(scan_id, options)
219
220
        if not options:
221
            options = dict()
222
        scan_info = self.data_manager.dict()
223
        scan_info['results'] = list()
224
        scan_info['finished_hosts'] = dict(
225
            [[target, []] for target, _, _ in targets])
226
        scan_info['progress'] = 0
227
        scan_info['target_progress'] = dict(
228
            [[target, {}] for target, _, _ in targets])
229
        scan_info['targets'] = targets
230
        scan_info['vts'] = vts
231
        scan_info['options'] = options
232
        scan_info['start_time'] = int(time.time())
233
        scan_info['end_time'] = "0"
234
        scan_info['status'] = ScanStatus.INIT
235
        if scan_id is None or scan_id == '':
236
            scan_id = str(uuid.uuid4())
237
        scan_info['scan_id'] = scan_id
238
        self.scans_table[scan_id] = scan_info
239
        return scan_id
240
241
    def set_status(self, scan_id, status):
242
        """ Sets scan_id scan's status. """
243
        self.scans_table[scan_id]['status'] = status
244
245
    def get_status(self, scan_id):
246
        """ Get scan_id scans's status."""
247
248
        return self.scans_table[scan_id]['status']
249
250
    def get_options(self, scan_id):
251
        """ Get scan_id scan's options list. """
252
253
        return self.scans_table[scan_id]['options']
254
255
    def set_option(self, scan_id, name, value):
256
        """ Set a scan_id scan's name option to value. """
257
258
        self.scans_table[scan_id]['options'][name] = value
259
260
    def get_progress(self, scan_id):
261
        """ Get a scan's current progress value. """
262
263
        return self.scans_table[scan_id]['progress']
264
265
    def get_target_progress(self, scan_id, target):
266
        """ Get a target's current progress value.
267
        The value is calculated with the progress of each single host
268
        in the target."""
269
270
        total_hosts = len(target_str_to_list(target))
271
        host_progresses = self.scans_table[scan_id]['target_progress'].get(target)
272
        try:
273
            t_prog = sum(host_progresses.values()) / total_hosts
274
        except ZeroDivisionError:
275
            LOGGER.error("Zero division error in ", get_target_progress.__name__)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_target_progress does not seem to be defined.
Loading history...
276
            raise
277
        return t_prog
278
279
    def get_start_time(self, scan_id):
280
        """ Get a scan's start time. """
281
282
        return self.scans_table[scan_id]['start_time']
283
284
    def get_end_time(self, scan_id):
285
        """ Get a scan's end time. """
286
287
        return self.scans_table[scan_id]['end_time']
288
289
    def get_target_list(self, scan_id):
290
        """ Get a scan's target list. """
291
292
        target_list = []
293
        for target, _, _ in self.scans_table[scan_id]['targets']:
294
            target_list.append(target)
295
        return target_list
296
297
    def get_ports(self, scan_id, target):
298
        """ Get a scan's ports list. If a target is specified
299
        it will return the corresponding port for it. If not,
300
        it returns the port item of the first nested list in
301
        the target's list.
302
        """
303
        if target:
304
            for item in self.scans_table[scan_id]['targets']:
305
                if target == item[0]:
306
                    return item[1]
307
308
        return self.scans_table[scan_id]['targets'][0][1]
309
310
    def get_credentials(self, scan_id, target):
311
        """ Get a scan's credential list. It return dictionary with
312
        the corresponding credential for a given target.
313
        """
314
        if target:
315
            for item in self.scans_table[scan_id]['targets']:
316
                if target == item[0]:
317
                    return item[2]
318
319
    def get_vts(self, scan_id):
320
        """ Get a scan's vts list. """
321
322
        return self.scans_table[scan_id]['vts']
323
324
    def id_exists(self, scan_id):
325
        """ Check whether a scan exists in the table. """
326
327
        return self.scans_table.get(scan_id) is not None
328
329
    def delete_scan(self, scan_id):
330
        """ Delete a scan if fully finished. """
331
332
        if self.get_status(scan_id) == ScanStatus.RUNNING:
333
            return False
334
        self.scans_table.pop(scan_id)
335
        if len(self.scans_table) == 0:
336
            del self.data_manager
337
            self.data_manager = None
338
        return True
339
340
341 View Code Duplication
class ResultType(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
342
343
    """ Various scan results types values. """
344
345
    ALARM = 0
346
    LOG = 1
347
    ERROR = 2
348
    HOST_DETAIL = 3
349
350
    @classmethod
351
    def get_str(cls, result_type):
352
        """ Return string name of a result type. """
353
        if result_type == cls.ALARM:
354
            return "Alarm"
355
        elif result_type == cls.LOG:
356
            return "Log Message"
357
        elif result_type == cls.ERROR:
358
            return "Error Message"
359
        elif result_type == cls.HOST_DETAIL:
360
            return "Host Detail"
361
        else:
362
            assert False, "Erroneous result type {0}.".format(result_type)
363
364
    @classmethod
365
    def get_type(cls, result_name):
366
        """ Return string name of a result type. """
367
        if result_name == "Alarm":
368
            return cls.ALARM
369
        elif result_name == "Log Message":
370
            return cls.LOG
371
        elif result_name == "Error Message":
372
            return cls.ERROR
373
        elif result_name == "Host Detail":
374
            return cls.HOST_DETAIL
375
        else:
376
            assert False, "Erroneous result name {0}.".format(result_name)
377
378
379
__inet_pton = None
380
381
382 View Code Duplication
def inet_pton(address_family, ip_string):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
383
    """ A platform independent version of inet_pton """
384
    global __inet_pton
385
    if __inet_pton is None:
386
        if hasattr(socket, 'inet_pton'):
387
            __inet_pton = socket.inet_pton
388
        else:
389
            from ospd import win_socket
390
            __inet_pton = win_socket.inet_pton
391
392
    return __inet_pton(address_family, ip_string)
393
394
395
__inet_ntop = None
396
397
398 View Code Duplication
def inet_ntop(address_family, packed_ip):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
399
    """ A platform independent version of inet_ntop """
400
    global __inet_ntop
401
    if __inet_ntop is None:
402
        if hasattr(socket, 'inet_ntop'):
403
            __inet_ntop = socket.inet_ntop
404
        else:
405
            from ospd import win_socket
406
            __inet_ntop = win_socket.inet_ntop
407
408
    return __inet_ntop(address_family, packed_ip)
409
410
411
def target_to_ipv4(target):
412
    """ Attempt to return a single IPv4 host list from a target string. """
413
414
    try:
415
        inet_pton(socket.AF_INET, target)
416
        return [target]
417
    except socket.error:
418
        return None
419
420
421
def target_to_ipv6(target):
422
    """ Attempt to return a single IPv6 host list from a target string. """
423
424
    try:
425
        inet_pton(socket.AF_INET6, target)
426
        return [target]
427
    except socket.error:
428
        return None
429
430
431 View Code Duplication
def ipv4_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
432
    """ Return a list of IPv4 entries from start_packed to end_packed. """
433
434
    new_list = list()
435
    start = struct.unpack('!L', start_packed)[0]
436
    end = struct.unpack('!L', end_packed)[0]
437
    for value in range(start, end + 1):
438
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
439
        new_list.append(new_ip)
440
    return new_list
441
442
443 View Code Duplication
def target_to_ipv4_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
444
    """ Attempt to return a IPv4 short range list from a target string. """
445
446
    splitted = target.split('-')
447
    if len(splitted) != 2:
448
        return None
449
    try:
450
        start_packed = inet_pton(socket.AF_INET, splitted[0])
451
        end_value = int(splitted[1])
452
    except (socket.error, ValueError):
453
        return None
454
    start_value = int(binascii.hexlify(bytes(start_packed[3])), 16)
455
    if end_value < 0 or end_value > 255 or end_value < start_value:
456
        return None
457
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
458
    return ipv4_range_to_list(start_packed, end_packed)
459
460
461 View Code Duplication
def target_to_ipv4_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
462
    """ Attempt to return a IPv4 CIDR list from a target string. """
463
464
    splitted = target.split('/')
465
    if len(splitted) != 2:
466
        return None
467
    try:
468
        start_packed = inet_pton(socket.AF_INET, splitted[0])
469
        block = int(splitted[1])
470
    except (socket.error, ValueError):
471
        return None
472
    if block <= 0 or block > 30:
473
        return None
474
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
475
    start_value = (start_value << (32 - block)) + 1
476
    end_value = (start_value | (0xffffffff >> block)) - 1
477
    start_packed = struct.pack('!I', start_value)
478
    end_packed = struct.pack('!I', end_value)
479
    return ipv4_range_to_list(start_packed, end_packed)
480
481
482 View Code Duplication
def target_to_ipv6_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
483
    """ Attempt to return a IPv6 CIDR list from a target string. """
484
485
    splitted = target.split('/')
486
    if len(splitted) != 2:
487
        return None
488
    try:
489
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
490
        block = int(splitted[1])
491
    except (socket.error, ValueError):
492
        return None
493
    if block <= 0 or block > 126:
494
        return None
495
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
496
    start_value = (start_value << (128 - block)) + 1
497
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
498
    high = start_value >> 64
499
    low = start_value & ((1 << 64) - 1)
500
    start_packed = struct.pack('!QQ', high, low)
501
    high = end_value >> 64
502
    low = end_value & ((1 << 64) - 1)
503
    end_packed = struct.pack('!QQ', high, low)
504
    return ipv6_range_to_list(start_packed, end_packed)
505
506
507 View Code Duplication
def target_to_ipv4_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508
    """ Attempt to return a IPv4 long-range list from a target string. """
509
510
    splitted = target.split('-')
511
    if len(splitted) != 2:
512
        return None
513
    try:
514
        start_packed = inet_pton(socket.AF_INET, splitted[0])
515
        end_packed = inet_pton(socket.AF_INET, splitted[1])
516
    except socket.error:
517
        return None
518
    if end_packed < start_packed:
519
        return None
520
    return ipv4_range_to_list(start_packed, end_packed)
521
522
523 View Code Duplication
def ipv6_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
524
    """ Return a list of IPv6 entries from start_packed to end_packed. """
525
526
    new_list = list()
527
    start = int(binascii.hexlify(start_packed), 16)
528
    end = int(binascii.hexlify(end_packed), 16)
529
    for value in range(start, end + 1):
530
        high = value >> 64
531
        low = value & ((1 << 64) - 1)
532
        new_ip = inet_ntop(socket.AF_INET6,
533
                           struct.pack('!2Q', high, low))
534
        new_list.append(new_ip)
535
    return new_list
536
537
538 View Code Duplication
def target_to_ipv6_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
539
    """ Attempt to return a IPv6 short-range list from a target string. """
540
541
    splitted = target.split('-')
542
    if len(splitted) != 2:
543
        return None
544
    try:
545
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
546
        end_value = int(splitted[1], 16)
547
    except (socket.error, ValueError):
548
        return None
549
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
550
    if end_value < 0 or end_value > 0xffff or end_value < start_value:
551
        return None
552
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
553
    return ipv6_range_to_list(start_packed, end_packed)
554
555
556 View Code Duplication
def target_to_ipv6_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
557
    """ Attempt to return a IPv6 long-range list from a target string. """
558
559
    splitted = target.split('-')
560
    if len(splitted) != 2:
561
        return None
562
    try:
563
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
564
        end_packed = inet_pton(socket.AF_INET6, splitted[1])
565
    except socket.error:
566
        return None
567
    if end_packed < start_packed:
568
        return None
569
    return ipv6_range_to_list(start_packed, end_packed)
570
571
572
def target_to_hostname(target):
573
    """ Attempt to return a single hostname list from a target string. """
574
575
    if len(target) == 0 or len(target) > 255:
576
        return None
577
    if not re.match(r'^[\w.-]+$', target):
578
        return None
579
    return [target]
580
581
582 View Code Duplication
def target_to_list(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
583
    """ Attempt to return a list of single hosts from a target string. """
584
585
    # Is it an IPv4 address ?
586
    new_list = target_to_ipv4(target)
587
    # Is it an IPv6 address ?
588
    if not new_list:
589
        new_list = target_to_ipv6(target)
590
    # Is it an IPv4 CIDR ?
591
    if not new_list:
592
        new_list = target_to_ipv4_cidr(target)
593
    # Is it an IPv6 CIDR ?
594
    if not new_list:
595
        new_list = target_to_ipv6_cidr(target)
596
    # Is it an IPv4 short-range ?
597
    if not new_list:
598
        new_list = target_to_ipv4_short(target)
599
    # Is it an IPv4 long-range ?
600
    if not new_list:
601
        new_list = target_to_ipv4_long(target)
602
    # Is it an IPv6 short-range ?
603
    if not new_list:
604
        new_list = target_to_ipv6_short(target)
605
    # Is it an IPv6 long-range ?
606
    if not new_list:
607
        new_list = target_to_ipv6_long(target)
608
    # Is it a hostname ?
609
    if not new_list:
610
        new_list = target_to_hostname(target)
611
    return new_list
612
613
614 View Code Duplication
def target_str_to_list(target_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
615
    """ Parses a targets string into a list of individual targets. """
616
    new_list = list()
617
    for target in target_str.split(','):
618
        target = target.strip()
619
        target_list = target_to_list(target)
620
        if target_list:
621
            new_list.extend(target_list)
622
        else:
623
            LOGGER.info("{0}: Invalid target value".format(target))
624
            return None
625
    return list(collections.OrderedDict.fromkeys(new_list))
626
627
628
def resolve_hostname(hostname):
629
    """ Returns IP of a hostname. """
630
631
    assert hostname
632
    try:
633
        return socket.gethostbyname(hostname)
634
    except socket.gaierror:
635
        return None
636
637
638 View Code Duplication
def port_range_expand(portrange):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
639
    """
640
    Receive a port range and expands it in individual ports.
641
642
    @input Port range.
643
    e.g. "4-8"
644
645
    @return List of integers.
646
    e.g. [4, 5, 6, 7, 8]
647
    """
648
    if not portrange or '-' not in portrange:
649
        LOGGER.info("Invalid port range format")
650
        return None
651
    port_list = list()
652
    for single_port in range(int(portrange[:portrange.index('-')]),
653
                             int(portrange[portrange.index('-') + 1:]) + 1):
654
        port_list.append(single_port)
655
    return port_list
656
657
658 View Code Duplication
def port_str_arrange(ports):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
659
    """ Gives a str in the format (always tcp listed first).
660
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
661
    """
662
    b_tcp = ports.find("T")
663
    b_udp = ports.find("U")
664
    if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
665
        return ports[b_tcp:] + ports[b_udp:b_tcp]
666
667
    return ports
668
669
670 View Code Duplication
def ports_str_check_failed(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
671
    """
672
    Check if the port string is well formed.
673
    Return True if fail, False other case.
674
    """
675
676
    pattern = r'[^TU:0-9, \-]'
677
    if (
678
        re.search(pattern, port_str)
679
        or port_str.count('T') > 1
680
        or port_str.count('U') > 1
681
        or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
682
    ):
683
        return True
684
    return False
685
686
687 View Code Duplication
def ports_as_list(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
688
    """
689
    Parses a ports string into two list of individual tcp and udp ports.
690
691
    @input string containing a port list
692
    e.g. T:1,2,3,5-8 U:22,80,600-1024
693
694
    @return two list of sorted integers, for tcp and udp ports respectively.
695
    """
696
    if not port_str:
697
        LOGGER.info("Invalid port value")
698
        return [None, None]
699
700
    if ports_str_check_failed(port_str):
701
        LOGGER.info("{0}: Port list malformed.")
702
        return [None, None]
703
704
    tcp_list = list()
705
    udp_list = list()
706
    ports = port_str.replace(' ', '')
707
    b_tcp = ports.find("T")
708
    b_udp = ports.find("U")
709
710
    if ports[b_tcp - 1] == ',':
711
        ports = ports[:b_tcp - 1] + ports[b_tcp:]
712
    if ports[b_udp - 1] == ',':
713
        ports = ports[:b_udp - 1] + ports[b_udp:]
714
    ports = port_str_arrange(ports)
715
716
    tports = ''
717
    uports = ''
718
    # TCP ports listed first, then UDP ports
719
    if b_udp != -1 and b_tcp != -1:
720
        tports = ports[ports.index('T:') + 2:ports.index('U:')]
721
        uports = ports[ports.index('U:') + 2:]
722
    # Only UDP ports
723
    elif b_tcp == -1 and b_udp != -1:
724
        uports = ports[ports.index('U:') + 2:]
725
    # Only TCP ports
726
    elif b_udp == -1 and b_tcp != -1:
727
        tports = ports[ports.index('T:') + 2:]
728
    else:
729
        tports = ports
730
731
    if tports:
732
        for port in tports.split(','):
733
            if '-' in port:
734
                tcp_list.extend(port_range_expand(port))
735
            else:
736
                tcp_list.append(int(port))
737
        tcp_list.sort()
738
    if uports:
739
        for port in uports.split(','):
740
            if '-' in port:
741
                udp_list.extend(port_range_expand(port))
742
            else:
743
                udp_list.append(int(port))
744
        udp_list.sort()
745
746
    return (tcp_list, udp_list)
747
748
749
def get_tcp_port_list(port_str):
750
    """ Return a list with tcp ports from a given port list in string format """
751
    return ports_as_list(port_str)[0]
752
753
754
def get_udp_port_list(port_str):
755
    """ Return a list with udp ports from a given port list in string format """
756
    return ports_as_list(port_str)[1]
757
758
759 View Code Duplication
def port_list_compress(port_list):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
760
    """ Compress a port list and return a string. """
761
762
    if not port_list or len(port_list) == 0:
763
        LOGGER.info("Invalid or empty port list.")
764
        return ''
765
766
    port_list = sorted(set(port_list))
767
    compressed_list = []
768
    for key, group in itertools.groupby(enumerate(port_list),
769
                                        lambda t: t[1] - t[0]):
770
        group = list(group)
771
        if group[0][1] == group[-1][1]:
772
            compressed_list.append(str(group[0][1]))
773
        else:
774
            compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
775
776
    return ','.join(compressed_list)
777
778
779
def valid_uuid(value):
780
    """ Check if value is a valid UUID. """
781
782
    try:
783
        uuid.UUID(value, version=4)
784
        return True
785
    except (TypeError, ValueError, AttributeError):
786
        return False
787
788
789 View Code Duplication
def create_args_parser(description):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
790
    """ Create a command-line arguments parser for OSPD. """
791
792
    parser = argparse.ArgumentParser(description=description)
793
794
    def network_port(string):
795
        """ Check if provided string is a valid network port. """
796
797
        value = int(string)
798
        if not 0 < value <= 65535:
799
            raise argparse.ArgumentTypeError(
800
                'port must be in ]0,65535] interval')
801
        return value
802
803
    def cacert_file(cacert):
804
        """ Check if provided file is a valid CA Certificate """
805
        try:
806
            context = ssl.create_default_context(cafile=cacert)
807
        except AttributeError:
808
            # Python version < 2.7.9
809
            return cacert
810
        except IOError:
811
            raise argparse.ArgumentTypeError('CA Certificate not found')
812
        try:
813
            not_after = context.get_ca_certs()[0]['notAfter']
814
            not_after = ssl.cert_time_to_seconds(not_after)
815
            not_before = context.get_ca_certs()[0]['notBefore']
816
            not_before = ssl.cert_time_to_seconds(not_before)
817
        except (KeyError, IndexError):
818
            raise argparse.ArgumentTypeError('CA Certificate is erroneous')
819
        if not_after < int(time.time()):
820
            raise argparse.ArgumentTypeError('CA Certificate expired')
821
        if not_before > int(time.time()):
822
            raise argparse.ArgumentTypeError('CA Certificate not active yet')
823
        return cacert
824
825
    def log_level(string):
826
        """ Check if provided string is a valid log level. """
827
828
        value = getattr(logging, string.upper(), None)
829
        if not isinstance(value, int):
830
            raise argparse.ArgumentTypeError(
831
                'log level must be one of {debug,info,warning,error,critical}')
832
        return value
833
834
    def filename(string):
835
        """ Check if provided string is a valid file path. """
836
837
        if not os.path.isfile(string):
838
            raise argparse.ArgumentTypeError(
839
                '%s is not a valid file path' % string)
840
        return string
841
842
    parser.add_argument('-p', '--port', default=PORT, type=network_port,
843
                        help='TCP Port to listen on. Default: {0}'.format(PORT))
844
    parser.add_argument('-b', '--bind-address', default=ADDRESS,
845
                        help='Address to listen on. Default: {0}'
846
                        .format(ADDRESS))
847
    parser.add_argument('-u', '--unix-socket',
848
                        help='Unix file socket to listen on.')
849
    parser.add_argument('-k', '--key-file', type=filename,
850
                        help='Server key file. Default: {0}'.format(KEY_FILE))
851
    parser.add_argument('-c', '--cert-file', type=filename,
852
                        help='Server cert file. Default: {0}'.format(CERT_FILE))
853
    parser.add_argument('--ca-file', type=cacert_file,
854
                        help='CA cert file. Default: {0}'.format(CA_FILE))
855
    parser.add_argument('-L', '--log-level', default='warning', type=log_level,
856
                        help='Wished level of logging. Default: WARNING')
857
    parser.add_argument('--foreground', action='store_true',
858
                        help='Run in foreground and logs all messages to console.')
859
    parser.add_argument('-l', '--log-file', type=filename,
860
                        help='Path to the logging file.')
861
    parser.add_argument('--version', action='store_true',
862
                        help='Print version then exit.')
863
    return parser
864
865
866
def go_to_background():
867
    """ Daemonize the running process. """
868
    try:
869
        if os.fork():
870
            sys.exit()
871
    except OSError as errmsg:
872
        LOGGER.error('Fork failed: {0}'.format(errmsg))
873
        sys.exit('Fork failed')
874
875
876 View Code Duplication
def get_common_args(parser, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
877
    """ Return list of OSPD common command-line arguments from parser, after
878
    validating provided values or setting default ones.
879
880
    """
881
882
    options = parser.parse_args(args)
883
    # TCP Port to listen on.
884
    port = options.port
885
886
    # Network address to bind listener to
887
    address = options.bind_address
888
889
    # Unix file socket to listen on
890
    unix_socket = options.unix_socket
891
892
    # Debug level.
893
    log_level = options.log_level
894
895
    # Server key path.
896
    keyfile = options.key_file or KEY_FILE
897
898
    # Server cert path.
899
    certfile = options.cert_file or CERT_FILE
900
901
    # CA cert path.
902
    cafile = options.ca_file or CA_FILE
903
904
    common_args = dict()
905
    common_args['port'] = port
906
    common_args['address'] = address
907
    common_args['unix_socket'] = unix_socket
908
    common_args['keyfile'] = keyfile
909
    common_args['certfile'] = certfile
910
    common_args['cafile'] = cafile
911
    common_args['log_level'] = log_level
912
    common_args['foreground'] = options.foreground
913
    common_args['log_file'] = options.log_file
914
    common_args['version'] = options.version
915
916
    return common_args
917
918
919 View Code Duplication
def print_version(wrapper):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
920
    """ Prints the server version and license information."""
921
922
    scanner_name = wrapper.get_scanner_name()
923
    server_version = wrapper.get_server_version()
924
    print("OSP Server for {0} version {1}".format(scanner_name, server_version))
925
    protocol_version = wrapper.get_protocol_version()
926
    print("OSP Version: {0}".format(protocol_version))
927
    daemon_name = wrapper.get_daemon_name()
928
    daemon_version = wrapper.get_daemon_version()
929
    print("Using: {0} {1}".format(daemon_name, daemon_version))
930
    print("Copyright (C) 2014, 2015 Greenbone Networks GmbH\n"
931
          "License GPLv2+: GNU GPL version 2 or later\n"
932
          "This is free software: you are free to change"
933
          " and redistribute it.\n"
934
          "There is NO WARRANTY, to the extent permitted by law.")
935
936
937 View Code Duplication
def main(name, klass):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
938
    """ OSPD Main function. """
939
940
    # Common args parser.
941
    parser = create_args_parser(name)
942
943
    # Common args
944
    cargs = get_common_args(parser)
945
    logging.getLogger().setLevel(cargs['log_level'])
946
    wrapper = klass(certfile=cargs['certfile'], keyfile=cargs['keyfile'],
947
                    cafile=cargs['cafile'])
948
949
    if cargs['version']:
950
        print_version(wrapper)
951
        sys.exit()
952
953
    if cargs['foreground']:
954
        console = logging.StreamHandler()
955
        console.setFormatter(
956
            logging.Formatter(
957
                '%(asctime)s %(name)s: %(levelname)s: %(message)s'))
958
        logging.getLogger().addHandler(console)
959
    elif cargs['log_file']:
960
        logfile = logging.handlers.WatchedFileHandler(cargs['log_file'])
961
        logfile.setFormatter(
962
            logging.Formatter(
963
                '%(asctime)s %(name)s: %(levelname)s: %(message)s'))
964
        logging.getLogger().addHandler(logfile)
965
        go_to_background()
966
    else:
967
        syslog = logging.handlers.SysLogHandler('/dev/log')
968
        syslog.setFormatter(
969
            logging.Formatter('%(name)s: %(levelname)s: %(message)s'))
970
        logging.getLogger().addHandler(syslog)
971
        # Duplicate syslog's file descriptor to stout/stderr.
972
        syslog_fd = syslog.socket.fileno()
973
        os.dup2(syslog_fd, 1)
974
        os.dup2(syslog_fd, 2)
975
        go_to_background()
976
977
    if not wrapper.check():
978
        return 1
979
    return wrapper.run(cargs['address'], cargs['port'], cargs['unix_socket'])
980