Completed
Push — master ( d1b721...16ca13 )
by Juan José
22s queued 12s
created

ospd.misc.ScanCollection.get_exclude_hosts()   A

Complexity

Conditions 4

Size

Total Lines 7
Code Lines 5

Duplication

Lines 7
Ratio 100 %

Importance

Changes 0
Metric Value
cc 4
eloc 5
nop 3
dl 7
loc 7
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Miscellaneous classes and functions related to OSPD.
20
"""
21
22
23
# Needed to say that when we import ospd, we mean the package and not the
24
# module in that directory.
25
from __future__ import absolute_import
26
from __future__ import print_function
27
28
import argparse
29
import binascii
30
import collections
31
import logging
32
import logging.handlers
33
import os
34
import re
35
import socket
36
import struct
37
import sys
38
import time
39
import ssl
40
import uuid
41
import multiprocessing
42
import itertools
43
from enum import Enum
44
45
LOGGER = logging.getLogger(__name__)
46
47
# Default file locations as used by a OpenVAS default installation
48
KEY_FILE = "/usr/var/lib/gvm/private/CA/serverkey.pem"
49
CERT_FILE = "/usr/var/lib/gvm/CA/servercert.pem"
50
CA_FILE = "/usr/var/lib/gvm/CA/cacert.pem"
51
52
PORT = 1234
53
ADDRESS = "0.0.0.0"
54
55
class ScanStatus(Enum):
56
    """Scan status. """
57
    INIT = 0
58
    RUNNING = 1
59
    STOPPED = 2
60
    FINISHED = 3
61
62
63
64 View Code Duplication
class ScanCollection(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
65
66
    """ Scans collection, managing scans and results read and write, exposing
67
    only needed information.
68
69
    Each scan has meta-information such as scan ID, current progress (from 0 to
70
    100), start time, end time, scan target and options and a list of results.
71
72
    There are 4 types of results: Alarms, Logs, Errors and Host Details.
73
74
    Todo:
75
    - Better checking for Scan ID existence and handling otherwise.
76
    - More data validation.
77
    - Mutex access per table/scan_info.
78
79
    """
80
81
    def __init__(self):
82
        """ Initialize the Scan Collection. """
83
84
        self.data_manager = None
85
        self.scans_table = dict()
86
87
    def add_result(self, scan_id, result_type, host='', name='', value='',
88
                   port='', test_id='', severity='', qod=''):
89
        """ Add a result to a scan in the table. """
90
91
        assert scan_id
92
        assert len(name) or len(value)
93
        result = dict()
94
        result['type'] = result_type
95
        result['name'] = name
96
        result['severity'] = severity
97
        result['test_id'] = test_id
98
        result['value'] = value
99
        result['host'] = host
100
        result['port'] = port
101
        result['qod'] = qod
102
        results = self.scans_table[scan_id]['results']
103
        results.append(result)
104
        # Set scan_info's results to propagate results to parent process.
105
        self.scans_table[scan_id]['results'] = results
106
107
    def set_progress(self, scan_id, progress):
108
        """ Sets scan_id scan's progress. """
109
110
        if progress > 0 and progress <= 100:
111
            self.scans_table[scan_id]['progress'] = progress
112
        if progress == 100:
113
            self.scans_table[scan_id]['end_time'] = int(time.time())
114
115
    def set_target_progress(self, scan_id, target, host, progress):
116
        """ Sets scan_id scan's progress. """
117
        if progress > 0 and progress <= 100:
118
            targets =  self.scans_table[scan_id]['target_progress']
119
            targets[target][host] = progress
120
            # Set scan_info's target_progress to propagate progresses
121
            # to parent process.
122
            self.scans_table[scan_id]['target_progress'] = targets
123
124
    def set_host_finished(self, scan_id, target, host):
125
        """ Add the host in a list of finished hosts """
126
        finished_hosts = self.scans_table[scan_id]['finished_hosts']
127
        finished_hosts[target].append(host)
128
        self.scans_table[scan_id]['finished_hosts'] = finished_hosts
129
130
    def get_hosts_unfinished(self, scan_id):
131
        """ Get a list of unfinished hosts."""
132
133
        unfinished_hosts = list()
134
        for target in self.scans_table[scan_id]['finished_hosts']:
135
            unfinished_hosts.extend(target_str_to_list(target))
136
        for target in self.scans_table[scan_id]['finished_hosts']:
137
           for host in self.scans_table[scan_id]['finished_hosts'][target]:
138
               unfinished_hosts.remove(host)
139
140
        return unfinished_hosts
141
142
    def get_hosts_finished(self, scan_id):
143
        """ Get a list of finished hosts."""
144
145
        finished_hosts = list()
146
        for target in self.scans_table[scan_id]['finished_hosts']:
147
            finished_hosts.extend(
148
                self.scans_table[scan_id]['finished_hosts'].get(target))
149
150
        return finished_hosts
151
152
    def results_iterator(self, scan_id, pop_res):
153
        """ Returns an iterator over scan_id scan's results. If pop_res is True,
154
        it removed the fetched results from the list.
155
        """
156
        if pop_res:
157
            result_aux = self.scans_table[scan_id]['results']
158
            self.scans_table[scan_id]['results'] = list()
159
            return iter(result_aux)
160
161
        return iter(self.scans_table[scan_id]['results'])
162
163
    def ids_iterator(self):
164
        """ Returns an iterator over the collection's scan IDS. """
165
166
        return iter(self.scans_table.keys())
167
168
    def remove_single_result(self, scan_id, result):
169
        """Removes a single result from the result list in scan_table.
170
171
        Parameters:
172
            scan_id (uuid): Scan ID to identify the scan process to be resumed.
173
            result (dict): The result to be removed from the results list.
174
        """
175
        results = self.scans_table[scan_id]['results']
176
        results.remove(result)
177
        self.scans_table[scan_id]['results'] = results
178
179
    def del_results_for_stopped_hosts(self, scan_id):
180
        """ Remove results from the result table for those host
181
        """
182
        unfinished_hosts = self.get_hosts_unfinished(scan_id)
183
        for result in self.results_iterator(scan_id, False):
184
            if result['host'] in unfinished_hosts:
185
                self.remove_single_result(scan_id, result)
186
187
    def resume_scan(self, scan_id, options):
188
        """ Reset the scan status in the scan_table to INIT.
189
        Also, overwrite the options, because a resume task cmd
190
        can add some new option. E.g. exclude hosts list.
191
        Parameters:
192
            scan_id (uuid): Scan ID to identify the scan process to be resumed.
193
            options (dict): Options for the scan to be resumed. This options
194
                            are not added to the already existent ones.
195
                            The old ones are removed
196
197
        Return:
198
            Scan ID which identifies the current scan.
199
        """
200
        self.scans_table[scan_id]['status'] = ScanStatus.INIT
201
        if options:
202
            self.scans_table[scan_id]['options'] = options
203
204
        self.del_results_for_stopped_hosts(scan_id)
205
206
        return scan_id
207
208
    def create_scan(self, scan_id='', targets='', options=None, vts=''):
209
        """ Creates a new scan with provided scan information. """
210
211
        if self.data_manager is None:
212
            self.data_manager = multiprocessing.Manager()
213
214
        # Check if it is possible to resume task. To avoid to resume, the
215
        # scan must be deleted from the scans_table.
216
        if scan_id and self.id_exists(scan_id) and (
217
                self.get_status(scan_id) == ScanStatus.STOPPED):
218
            return self.resume_scan(scan_id, options)
219
220
        if not options:
221
            options = dict()
222
        scan_info = self.data_manager.dict()
223
        scan_info['results'] = list()
224
        scan_info['finished_hosts'] = dict(
225
            [[target, []] for target, _, _, _ in targets])
226
        scan_info['progress'] = 0
227
        scan_info['target_progress'] = dict(
228
            [[target, {}] for target, _, _, _ in targets])
229
        scan_info['targets'] = targets
230
        scan_info['vts'] = vts
231
        scan_info['options'] = options
232
        scan_info['start_time'] = int(time.time())
233
        scan_info['end_time'] = "0"
234
        scan_info['status'] = ScanStatus.INIT
235
        if scan_id is None or scan_id == '':
236
            scan_id = str(uuid.uuid4())
237
        scan_info['scan_id'] = scan_id
238
        self.scans_table[scan_id] = scan_info
239
        return scan_id
240
241
    def set_status(self, scan_id, status):
242
        """ Sets scan_id scan's status. """
243
        self.scans_table[scan_id]['status'] = status
244
245
    def get_status(self, scan_id):
246
        """ Get scan_id scans's status."""
247
248
        return self.scans_table[scan_id]['status']
249
250
    def get_options(self, scan_id):
251
        """ Get scan_id scan's options list. """
252
253
        return self.scans_table[scan_id]['options']
254
255
    def set_option(self, scan_id, name, value):
256
        """ Set a scan_id scan's name option to value. """
257
258
        self.scans_table[scan_id]['options'][name] = value
259
260
    def get_progress(self, scan_id):
261
        """ Get a scan's current progress value. """
262
263
        return self.scans_table[scan_id]['progress']
264
265
    def get_target_progress(self, scan_id, target):
266
        """ Get a target's current progress value.
267
        The value is calculated with the progress of each single host
268
        in the target."""
269
270
        total_hosts = len(target_str_to_list(target))
271
        host_progresses = self.scans_table[scan_id]['target_progress'].get(target)
272
        try:
273
            t_prog = sum(host_progresses.values()) / total_hosts
274
        except ZeroDivisionError:
275
            LOGGER.error("Zero division error in ", get_target_progress.__name__)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable get_target_progress does not seem to be defined.
Loading history...
276
            raise
277
        return t_prog
278
279
    def get_start_time(self, scan_id):
280
        """ Get a scan's start time. """
281
282
        return self.scans_table[scan_id]['start_time']
283
284
    def get_end_time(self, scan_id):
285
        """ Get a scan's end time. """
286
287
        return self.scans_table[scan_id]['end_time']
288
289
    def get_target_list(self, scan_id):
290
        """ Get a scan's target list. """
291
292
        target_list = []
293
        for target, _, _, _ in self.scans_table[scan_id]['targets']:
294
            target_list.append(target)
295
        return target_list
296
297
    def get_ports(self, scan_id, target):
298
        """ Get a scan's ports list. If a target is specified
299
        it will return the corresponding port for it. If not,
300
        it returns the port item of the first nested list in
301
        the target's list.
302
        """
303
        if target:
304
            for item in self.scans_table[scan_id]['targets']:
305
                if target == item[0]:
306
                    return item[1]
307
308
        return self.scans_table[scan_id]['targets'][0][1]
309
310
    def get_exclude_hosts(self, scan_id, target):
311
        """ Get an exclude host list for a given target.
312
        """
313
        if target:
314
            for item in self.scans_table[scan_id]['targets']:
315
                if target == item[0]:
316
                    return item[3]
317
318
    def get_credentials(self, scan_id, target):
319
        """ Get a scan's credential list. It return dictionary with
320
        the corresponding credential for a given target.
321
        """
322
        if target:
323
            for item in self.scans_table[scan_id]['targets']:
324
                if target == item[0]:
325
                    return item[2]
326
327
    def get_vts(self, scan_id):
328
        """ Get a scan's vts list. """
329
330
        return self.scans_table[scan_id]['vts']
331
332
    def id_exists(self, scan_id):
333
        """ Check whether a scan exists in the table. """
334
335
        return self.scans_table.get(scan_id) is not None
336
337
    def delete_scan(self, scan_id):
338
        """ Delete a scan if fully finished. """
339
340
        if self.get_status(scan_id) == ScanStatus.RUNNING:
341
            return False
342
        self.scans_table.pop(scan_id)
343
        if len(self.scans_table) == 0:
344
            del self.data_manager
345
            self.data_manager = None
346
        return True
347
348
349 View Code Duplication
class ResultType(object):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
350
351
    """ Various scan results types values. """
352
353
    ALARM = 0
354
    LOG = 1
355
    ERROR = 2
356
    HOST_DETAIL = 3
357
358
    @classmethod
359
    def get_str(cls, result_type):
360
        """ Return string name of a result type. """
361
        if result_type == cls.ALARM:
362
            return "Alarm"
363
        elif result_type == cls.LOG:
364
            return "Log Message"
365
        elif result_type == cls.ERROR:
366
            return "Error Message"
367
        elif result_type == cls.HOST_DETAIL:
368
            return "Host Detail"
369
        else:
370
            assert False, "Erroneous result type {0}.".format(result_type)
371
372
    @classmethod
373
    def get_type(cls, result_name):
374
        """ Return string name of a result type. """
375
        if result_name == "Alarm":
376
            return cls.ALARM
377
        elif result_name == "Log Message":
378
            return cls.LOG
379
        elif result_name == "Error Message":
380
            return cls.ERROR
381
        elif result_name == "Host Detail":
382
            return cls.HOST_DETAIL
383
        else:
384
            assert False, "Erroneous result name {0}.".format(result_name)
385
386
387
__inet_pton = None
388
389
390 View Code Duplication
def inet_pton(address_family, ip_string):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
391
    """ A platform independent version of inet_pton """
392
    global __inet_pton
393
    if __inet_pton is None:
394
        if hasattr(socket, 'inet_pton'):
395
            __inet_pton = socket.inet_pton
396
        else:
397
            from ospd import win_socket
398
            __inet_pton = win_socket.inet_pton
399
400
    return __inet_pton(address_family, ip_string)
401
402
403
__inet_ntop = None
404
405
406 View Code Duplication
def inet_ntop(address_family, packed_ip):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
407
    """ A platform independent version of inet_ntop """
408
    global __inet_ntop
409
    if __inet_ntop is None:
410
        if hasattr(socket, 'inet_ntop'):
411
            __inet_ntop = socket.inet_ntop
412
        else:
413
            from ospd import win_socket
414
            __inet_ntop = win_socket.inet_ntop
415
416
    return __inet_ntop(address_family, packed_ip)
417
418
419
def target_to_ipv4(target):
420
    """ Attempt to return a single IPv4 host list from a target string. """
421
422
    try:
423
        inet_pton(socket.AF_INET, target)
424
        return [target]
425
    except socket.error:
426
        return None
427
428
429
def target_to_ipv6(target):
430
    """ Attempt to return a single IPv6 host list from a target string. """
431
432
    try:
433
        inet_pton(socket.AF_INET6, target)
434
        return [target]
435
    except socket.error:
436
        return None
437
438
439 View Code Duplication
def ipv4_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
440
    """ Return a list of IPv4 entries from start_packed to end_packed. """
441
442
    new_list = list()
443
    start = struct.unpack('!L', start_packed)[0]
444
    end = struct.unpack('!L', end_packed)[0]
445
    for value in range(start, end + 1):
446
        new_ip = socket.inet_ntoa(struct.pack('!L', value))
447
        new_list.append(new_ip)
448
    return new_list
449
450
451 View Code Duplication
def target_to_ipv4_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
452
    """ Attempt to return a IPv4 short range list from a target string. """
453
454
    splitted = target.split('-')
455
    if len(splitted) != 2:
456
        return None
457
    try:
458
        start_packed = inet_pton(socket.AF_INET, splitted[0])
459
        end_value = int(splitted[1])
460
    except (socket.error, ValueError):
461
        return None
462
    start_value = int(binascii.hexlify(bytes(start_packed[3])), 16)
463
    if end_value < 0 or end_value > 255 or end_value < start_value:
464
        return None
465
    end_packed = start_packed[0:3] + struct.pack('B', end_value)
466
    return ipv4_range_to_list(start_packed, end_packed)
467
468
469 View Code Duplication
def target_to_ipv4_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
470
    """ Attempt to return a IPv4 CIDR list from a target string. """
471
472
    splitted = target.split('/')
473
    if len(splitted) != 2:
474
        return None
475
    try:
476
        start_packed = inet_pton(socket.AF_INET, splitted[0])
477
        block = int(splitted[1])
478
    except (socket.error, ValueError):
479
        return None
480
    if block <= 0 or block > 30:
481
        return None
482
    start_value = int(binascii.hexlify(start_packed), 16) >> (32 - block)
483
    start_value = (start_value << (32 - block)) + 1
484
    end_value = (start_value | (0xffffffff >> block)) - 1
485
    start_packed = struct.pack('!I', start_value)
486
    end_packed = struct.pack('!I', end_value)
487
    return ipv4_range_to_list(start_packed, end_packed)
488
489
490 View Code Duplication
def target_to_ipv6_cidr(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
491
    """ Attempt to return a IPv6 CIDR list from a target string. """
492
493
    splitted = target.split('/')
494
    if len(splitted) != 2:
495
        return None
496
    try:
497
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
498
        block = int(splitted[1])
499
    except (socket.error, ValueError):
500
        return None
501
    if block <= 0 or block > 126:
502
        return None
503
    start_value = int(binascii.hexlify(start_packed), 16) >> (128 - block)
504
    start_value = (start_value << (128 - block)) + 1
505
    end_value = (start_value | (int('ff' * 16, 16) >> block)) - 1
506
    high = start_value >> 64
507
    low = start_value & ((1 << 64) - 1)
508
    start_packed = struct.pack('!QQ', high, low)
509
    high = end_value >> 64
510
    low = end_value & ((1 << 64) - 1)
511
    end_packed = struct.pack('!QQ', high, low)
512
    return ipv6_range_to_list(start_packed, end_packed)
513
514
515 View Code Duplication
def target_to_ipv4_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
516
    """ Attempt to return a IPv4 long-range list from a target string. """
517
518
    splitted = target.split('-')
519
    if len(splitted) != 2:
520
        return None
521
    try:
522
        start_packed = inet_pton(socket.AF_INET, splitted[0])
523
        end_packed = inet_pton(socket.AF_INET, splitted[1])
524
    except socket.error:
525
        return None
526
    if end_packed < start_packed:
527
        return None
528
    return ipv4_range_to_list(start_packed, end_packed)
529
530
531 View Code Duplication
def ipv6_range_to_list(start_packed, end_packed):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
532
    """ Return a list of IPv6 entries from start_packed to end_packed. """
533
534
    new_list = list()
535
    start = int(binascii.hexlify(start_packed), 16)
536
    end = int(binascii.hexlify(end_packed), 16)
537
    for value in range(start, end + 1):
538
        high = value >> 64
539
        low = value & ((1 << 64) - 1)
540
        new_ip = inet_ntop(socket.AF_INET6,
541
                           struct.pack('!2Q', high, low))
542
        new_list.append(new_ip)
543
    return new_list
544
545
546 View Code Duplication
def target_to_ipv6_short(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
547
    """ Attempt to return a IPv6 short-range list from a target string. """
548
549
    splitted = target.split('-')
550
    if len(splitted) != 2:
551
        return None
552
    try:
553
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
554
        end_value = int(splitted[1], 16)
555
    except (socket.error, ValueError):
556
        return None
557
    start_value = int(binascii.hexlify(start_packed[14:]), 16)
558
    if end_value < 0 or end_value > 0xffff or end_value < start_value:
559
        return None
560
    end_packed = start_packed[:14] + struct.pack('!H', end_value)
561
    return ipv6_range_to_list(start_packed, end_packed)
562
563
564 View Code Duplication
def target_to_ipv6_long(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
565
    """ Attempt to return a IPv6 long-range list from a target string. """
566
567
    splitted = target.split('-')
568
    if len(splitted) != 2:
569
        return None
570
    try:
571
        start_packed = inet_pton(socket.AF_INET6, splitted[0])
572
        end_packed = inet_pton(socket.AF_INET6, splitted[1])
573
    except socket.error:
574
        return None
575
    if end_packed < start_packed:
576
        return None
577
    return ipv6_range_to_list(start_packed, end_packed)
578
579
580
def target_to_hostname(target):
581
    """ Attempt to return a single hostname list from a target string. """
582
583
    if len(target) == 0 or len(target) > 255:
584
        return None
585
    if not re.match(r'^[\w.-]+$', target):
586
        return None
587
    return [target]
588
589
590 View Code Duplication
def target_to_list(target):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
591
    """ Attempt to return a list of single hosts from a target string. """
592
593
    # Is it an IPv4 address ?
594
    new_list = target_to_ipv4(target)
595
    # Is it an IPv6 address ?
596
    if not new_list:
597
        new_list = target_to_ipv6(target)
598
    # Is it an IPv4 CIDR ?
599
    if not new_list:
600
        new_list = target_to_ipv4_cidr(target)
601
    # Is it an IPv6 CIDR ?
602
    if not new_list:
603
        new_list = target_to_ipv6_cidr(target)
604
    # Is it an IPv4 short-range ?
605
    if not new_list:
606
        new_list = target_to_ipv4_short(target)
607
    # Is it an IPv4 long-range ?
608
    if not new_list:
609
        new_list = target_to_ipv4_long(target)
610
    # Is it an IPv6 short-range ?
611
    if not new_list:
612
        new_list = target_to_ipv6_short(target)
613
    # Is it an IPv6 long-range ?
614
    if not new_list:
615
        new_list = target_to_ipv6_long(target)
616
    # Is it a hostname ?
617
    if not new_list:
618
        new_list = target_to_hostname(target)
619
    return new_list
620
621
622 View Code Duplication
def target_str_to_list(target_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
623
    """ Parses a targets string into a list of individual targets. """
624
    new_list = list()
625
    for target in target_str.split(','):
626
        target = target.strip()
627
        target_list = target_to_list(target)
628
        if target_list:
629
            new_list.extend(target_list)
630
        else:
631
            LOGGER.info("{0}: Invalid target value".format(target))
632
            return None
633
    return list(collections.OrderedDict.fromkeys(new_list))
634
635
636
def resolve_hostname(hostname):
637
    """ Returns IP of a hostname. """
638
639
    assert hostname
640
    try:
641
        return socket.gethostbyname(hostname)
642
    except socket.gaierror:
643
        return None
644
645
646 View Code Duplication
def port_range_expand(portrange):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
647
    """
648
    Receive a port range and expands it in individual ports.
649
650
    @input Port range.
651
    e.g. "4-8"
652
653
    @return List of integers.
654
    e.g. [4, 5, 6, 7, 8]
655
    """
656
    if not portrange or '-' not in portrange:
657
        LOGGER.info("Invalid port range format")
658
        return None
659
    port_list = list()
660
    for single_port in range(int(portrange[:portrange.index('-')]),
661
                             int(portrange[portrange.index('-') + 1:]) + 1):
662
        port_list.append(single_port)
663
    return port_list
664
665
666 View Code Duplication
def port_str_arrange(ports):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
667
    """ Gives a str in the format (always tcp listed first).
668
    T:<tcp ports/portrange comma separated>U:<udp ports comma separated>
669
    """
670
    b_tcp = ports.find("T")
671
    b_udp = ports.find("U")
672
    if (b_udp != -1 and b_tcp != -1) and b_udp < b_tcp:
673
        return ports[b_tcp:] + ports[b_udp:b_tcp]
674
675
    return ports
676
677
678 View Code Duplication
def ports_str_check_failed(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
679
    """
680
    Check if the port string is well formed.
681
    Return True if fail, False other case.
682
    """
683
684
    pattern = r'[^TU:0-9, \-]'
685
    if (
686
        re.search(pattern, port_str)
687
        or port_str.count('T') > 1
688
        or port_str.count('U') > 1
689
        or port_str.count(':') < (port_str.count('T') + port_str.count('U'))
690
    ):
691
        return True
692
    return False
693
694
695 View Code Duplication
def ports_as_list(port_str):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
696
    """
697
    Parses a ports string into two list of individual tcp and udp ports.
698
699
    @input string containing a port list
700
    e.g. T:1,2,3,5-8 U:22,80,600-1024
701
702
    @return two list of sorted integers, for tcp and udp ports respectively.
703
    """
704
    if not port_str:
705
        LOGGER.info("Invalid port value")
706
        return [None, None]
707
708
    if ports_str_check_failed(port_str):
709
        LOGGER.info("{0}: Port list malformed.")
710
        return [None, None]
711
712
    tcp_list = list()
713
    udp_list = list()
714
    ports = port_str.replace(' ', '')
715
    b_tcp = ports.find("T")
716
    b_udp = ports.find("U")
717
718
    if ports[b_tcp - 1] == ',':
719
        ports = ports[:b_tcp - 1] + ports[b_tcp:]
720
    if ports[b_udp - 1] == ',':
721
        ports = ports[:b_udp - 1] + ports[b_udp:]
722
    ports = port_str_arrange(ports)
723
724
    tports = ''
725
    uports = ''
726
    # TCP ports listed first, then UDP ports
727
    if b_udp != -1 and b_tcp != -1:
728
        tports = ports[ports.index('T:') + 2:ports.index('U:')]
729
        uports = ports[ports.index('U:') + 2:]
730
    # Only UDP ports
731
    elif b_tcp == -1 and b_udp != -1:
732
        uports = ports[ports.index('U:') + 2:]
733
    # Only TCP ports
734
    elif b_udp == -1 and b_tcp != -1:
735
        tports = ports[ports.index('T:') + 2:]
736
    else:
737
        tports = ports
738
739
    if tports:
740
        for port in tports.split(','):
741
            if '-' in port:
742
                tcp_list.extend(port_range_expand(port))
743
            else:
744
                tcp_list.append(int(port))
745
        tcp_list.sort()
746
    if uports:
747
        for port in uports.split(','):
748
            if '-' in port:
749
                udp_list.extend(port_range_expand(port))
750
            else:
751
                udp_list.append(int(port))
752
        udp_list.sort()
753
754
    return (tcp_list, udp_list)
755
756
757
def get_tcp_port_list(port_str):
758
    """ Return a list with tcp ports from a given port list in string format """
759
    return ports_as_list(port_str)[0]
760
761
762
def get_udp_port_list(port_str):
763
    """ Return a list with udp ports from a given port list in string format """
764
    return ports_as_list(port_str)[1]
765
766
767 View Code Duplication
def port_list_compress(port_list):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
768
    """ Compress a port list and return a string. """
769
770
    if not port_list or len(port_list) == 0:
771
        LOGGER.info("Invalid or empty port list.")
772
        return ''
773
774
    port_list = sorted(set(port_list))
775
    compressed_list = []
776
    for key, group in itertools.groupby(enumerate(port_list),
777
                                        lambda t: t[1] - t[0]):
778
        group = list(group)
779
        if group[0][1] == group[-1][1]:
780
            compressed_list.append(str(group[0][1]))
781
        else:
782
            compressed_list.append(str(group[0][1]) + '-' + str(group[-1][1]))
783
784
    return ','.join(compressed_list)
785
786
787
def valid_uuid(value):
788
    """ Check if value is a valid UUID. """
789
790
    try:
791
        uuid.UUID(value, version=4)
792
        return True
793
    except (TypeError, ValueError, AttributeError):
794
        return False
795
796
797 View Code Duplication
def create_args_parser(description):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
798
    """ Create a command-line arguments parser for OSPD. """
799
800
    parser = argparse.ArgumentParser(description=description)
801
802
    def network_port(string):
803
        """ Check if provided string is a valid network port. """
804
805
        value = int(string)
806
        if not 0 < value <= 65535:
807
            raise argparse.ArgumentTypeError(
808
                'port must be in ]0,65535] interval')
809
        return value
810
811
    def cacert_file(cacert):
812
        """ Check if provided file is a valid CA Certificate """
813
        try:
814
            context = ssl.create_default_context(cafile=cacert)
815
        except AttributeError:
816
            # Python version < 2.7.9
817
            return cacert
818
        except IOError:
819
            raise argparse.ArgumentTypeError('CA Certificate not found')
820
        try:
821
            not_after = context.get_ca_certs()[0]['notAfter']
822
            not_after = ssl.cert_time_to_seconds(not_after)
823
            not_before = context.get_ca_certs()[0]['notBefore']
824
            not_before = ssl.cert_time_to_seconds(not_before)
825
        except (KeyError, IndexError):
826
            raise argparse.ArgumentTypeError('CA Certificate is erroneous')
827
        if not_after < int(time.time()):
828
            raise argparse.ArgumentTypeError('CA Certificate expired')
829
        if not_before > int(time.time()):
830
            raise argparse.ArgumentTypeError('CA Certificate not active yet')
831
        return cacert
832
833
    def log_level(string):
834
        """ Check if provided string is a valid log level. """
835
836
        value = getattr(logging, string.upper(), None)
837
        if not isinstance(value, int):
838
            raise argparse.ArgumentTypeError(
839
                'log level must be one of {debug,info,warning,error,critical}')
840
        return value
841
842
    def filename(string):
843
        """ Check if provided string is a valid file path. """
844
845
        if not os.path.isfile(string):
846
            raise argparse.ArgumentTypeError(
847
                '%s is not a valid file path' % string)
848
        return string
849
850
    parser.add_argument('-p', '--port', default=PORT, type=network_port,
851
                        help='TCP Port to listen on. Default: {0}'.format(PORT))
852
    parser.add_argument('-b', '--bind-address', default=ADDRESS,
853
                        help='Address to listen on. Default: {0}'
854
                        .format(ADDRESS))
855
    parser.add_argument('-u', '--unix-socket',
856
                        help='Unix file socket to listen on.')
857
    parser.add_argument('-k', '--key-file', type=filename,
858
                        help='Server key file. Default: {0}'.format(KEY_FILE))
859
    parser.add_argument('-c', '--cert-file', type=filename,
860
                        help='Server cert file. Default: {0}'.format(CERT_FILE))
861
    parser.add_argument('--ca-file', type=cacert_file,
862
                        help='CA cert file. Default: {0}'.format(CA_FILE))
863
    parser.add_argument('-L', '--log-level', default='warning', type=log_level,
864
                        help='Wished level of logging. Default: WARNING')
865
    parser.add_argument('--foreground', action='store_true',
866
                        help='Run in foreground and logs all messages to console.')
867
    parser.add_argument('-l', '--log-file', type=filename,
868
                        help='Path to the logging file.')
869
    parser.add_argument('--version', action='store_true',
870
                        help='Print version then exit.')
871
    return parser
872
873
874
def go_to_background():
875
    """ Daemonize the running process. """
876
    try:
877
        if os.fork():
878
            sys.exit()
879
    except OSError as errmsg:
880
        LOGGER.error('Fork failed: {0}'.format(errmsg))
881
        sys.exit('Fork failed')
882
883
884 View Code Duplication
def get_common_args(parser, args=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
885
    """ Return list of OSPD common command-line arguments from parser, after
886
    validating provided values or setting default ones.
887
888
    """
889
890
    options = parser.parse_args(args)
891
    # TCP Port to listen on.
892
    port = options.port
893
894
    # Network address to bind listener to
895
    address = options.bind_address
896
897
    # Unix file socket to listen on
898
    unix_socket = options.unix_socket
899
900
    # Debug level.
901
    log_level = options.log_level
902
903
    # Server key path.
904
    keyfile = options.key_file or KEY_FILE
905
906
    # Server cert path.
907
    certfile = options.cert_file or CERT_FILE
908
909
    # CA cert path.
910
    cafile = options.ca_file or CA_FILE
911
912
    common_args = dict()
913
    common_args['port'] = port
914
    common_args['address'] = address
915
    common_args['unix_socket'] = unix_socket
916
    common_args['keyfile'] = keyfile
917
    common_args['certfile'] = certfile
918
    common_args['cafile'] = cafile
919
    common_args['log_level'] = log_level
920
    common_args['foreground'] = options.foreground
921
    common_args['log_file'] = options.log_file
922
    common_args['version'] = options.version
923
924
    return common_args
925
926
927 View Code Duplication
def print_version(wrapper):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
928
    """ Prints the server version and license information."""
929
930
    scanner_name = wrapper.get_scanner_name()
931
    server_version = wrapper.get_server_version()
932
    print("OSP Server for {0} version {1}".format(scanner_name, server_version))
933
    protocol_version = wrapper.get_protocol_version()
934
    print("OSP Version: {0}".format(protocol_version))
935
    daemon_name = wrapper.get_daemon_name()
936
    daemon_version = wrapper.get_daemon_version()
937
    print("Using: {0} {1}".format(daemon_name, daemon_version))
938
    print("Copyright (C) 2014, 2015 Greenbone Networks GmbH\n"
939
          "License GPLv2+: GNU GPL version 2 or later\n"
940
          "This is free software: you are free to change"
941
          " and redistribute it.\n"
942
          "There is NO WARRANTY, to the extent permitted by law.")
943
944
945 View Code Duplication
def main(name, klass):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
946
    """ OSPD Main function. """
947
948
    # Common args parser.
949
    parser = create_args_parser(name)
950
951
    # Common args
952
    cargs = get_common_args(parser)
953
    logging.getLogger().setLevel(cargs['log_level'])
954
    wrapper = klass(certfile=cargs['certfile'], keyfile=cargs['keyfile'],
955
                    cafile=cargs['cafile'])
956
957
    if cargs['version']:
958
        print_version(wrapper)
959
        sys.exit()
960
961
    if cargs['foreground']:
962
        console = logging.StreamHandler()
963
        console.setFormatter(
964
            logging.Formatter(
965
                '%(asctime)s %(name)s: %(levelname)s: %(message)s'))
966
        logging.getLogger().addHandler(console)
967
    elif cargs['log_file']:
968
        logfile = logging.handlers.WatchedFileHandler(cargs['log_file'])
969
        logfile.setFormatter(
970
            logging.Formatter(
971
                '%(asctime)s %(name)s: %(levelname)s: %(message)s'))
972
        logging.getLogger().addHandler(logfile)
973
        go_to_background()
974
    else:
975
        syslog = logging.handlers.SysLogHandler('/dev/log')
976
        syslog.setFormatter(
977
            logging.Formatter('%(name)s: %(levelname)s: %(message)s'))
978
        logging.getLogger().addHandler(syslog)
979
        # Duplicate syslog's file descriptor to stout/stderr.
980
        syslog_fd = syslog.socket.fileno()
981
        os.dup2(syslog_fd, 1)
982
        os.dup2(syslog_fd, 2)
983
        go_to_background()
984
985
    if not wrapper.check():
986
        return 1
987
    return wrapper.run(cargs['address'], cargs['port'], cargs['unix_socket'])
988