ospd.scan   F
last analyzed

Complexity

Total Complexity 75

Size/Duplication

Total Lines 573
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 304
dl 0
loc 573
rs 2.4
c 0
b 0
f 0
wmc 75

43 Methods

Rating   Name   Duplication   Size   Complexity  
A ScanCollection.set_host_dead() 0 11 1
A ScanCollection.remove_file_pickled_scan_info() 0 3 1
A ScanCollection.clean_temp_result_list() 0 3 1
A ScanCollection.add_result_list() 0 12 1
A ScanCollection.results_iterator() 0 24 4
A ScanCollection.add_result() 0 35 1
A ScanCollection.set_amount_dead_hosts() 0 5 1
A ScanCollection.set_host_progress() 0 11 1
A ScanCollection.init() 0 2 1
A ScanCollection.remove_hosts_from_target_progress() 0 23 4
A ScanCollection.restore_temp_result_list() 0 9 1
A ScanCollection.clean_up_pickled_scan_info() 0 5 3
A ScanCollection.set_host_finished() 0 13 1
A ScanCollection.ids_iterator() 0 7 1
A ScanCollection.set_progress() 0 8 4
A ScanCollection.__init__() 0 8 1
A ScanCollection.get_count_total() 0 20 3
A ScanCollection.get_count_dead() 0 4 1
A ScanCollection.get_options() 0 4 1
A ScanCollection.unpickle_scan_info() 0 34 2
A ScanCollection.get_progress() 0 4 1
A ScanCollection.get_status() 0 4 1
A ScanCollection.update_count_total() 0 4 1
A ScanCollection.get_count_alive() 0 4 1
B ScanCollection.create_scan() 0 49 5
A ScanCollection.get_current_target_progress() 0 3 1
A ScanCollection.set_status() 0 5 3
A ScanCollection.set_option() 0 4 1
A ScanCollection.get_host_count() 0 9 2
A ScanCollection.get_credentials() 0 5 1
A ScanCollection.get_simplified_exclude_host_count() 0 9 2
A ScanCollection.get_finished_hosts() 0 3 1
A ScanCollection.get_target_options() 0 6 1
A ScanCollection.get_host_list() 0 4 1
A ScanCollection.get_exclude_hosts() 0 3 1
A ScanCollection.get_ports() 0 6 1
A ScanCollection.get_end_time() 0 4 1
A ScanCollection.calculate_target_progress() 0 22 2
A ScanCollection.get_vts() 0 7 1
C ScanCollection.simplify_exclude_host_count() 0 44 9
A ScanCollection.delete_scan() 0 11 2
A ScanCollection.id_exists() 0 4 1
A ScanCollection.get_start_time() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like ospd.scan often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import logging
19
import multiprocessing
20
import time
21
import uuid
22
23
from pprint import pformat
24
from collections import OrderedDict
25
from enum import Enum, IntEnum
26
from typing import List, Any, Dict, Iterator, Optional, Iterable, Union
27
28
from ospd.network import target_str_to_list
29
from ospd.datapickler import DataPickler
30
from ospd.errors import OspdCommandError
31
32
LOGGER = logging.getLogger(__name__)
33
34
35
class ScanStatus(Enum):
36
    """Scan status."""
37
38
    QUEUED = 0
39
    INIT = 1
40
    RUNNING = 2
41
    STOPPED = 3
42
    FINISHED = 4
43
    INTERRUPTED = 5
44
45
46
class ScanProgress(IntEnum):
47
    """Scan or host progress."""
48
49
    FINISHED = 100
50
    INIT = 0
51
    DEAD_HOST = -1
52
    INTERRUPTED = -2
53
54
55
class ScanCollection:
56
57
    """Scans collection, managing scans and results read and write, exposing
58
    only needed information.
59
60
    Each scan has meta-information such as scan ID, current progress (from 0 to
61
    100), start time, end time, scan target and options and a list of results.
62
63
    There are 4 types of results: Alarms, Logs, Errors and Host Details.
64
65
    Todo:
66
    - Better checking for Scan ID existence and handling otherwise.
67
    - More data validation.
68
    - Mutex access per table/scan_info.
69
70
    """
71
72
    def __init__(self, file_storage_dir: str) -> None:
73
        """Initialize the Scan Collection."""
74
75
        self.data_manager = (
76
            None
77
        )  # type: Optional[multiprocessing.managers.SyncManager]
78
        self.scans_table = dict()  # type: Dict
79
        self.file_storage_dir = file_storage_dir
80
81
    def init(self):
82
        self.data_manager = multiprocessing.Manager()
83
84
    def add_result(
85
        self,
86
        scan_id: str,
87
        result_type: int,
88
        host: str = '',
89
        hostname: str = '',
90
        name: str = '',
91
        value: str = '',
92
        port: str = '',
93
        test_id: str = '',
94
        severity: str = '',
95
        qod: str = '',
96
        uri: str = '',
97
    ) -> None:
98
        """Add a result to a scan in the table."""
99
100
        assert scan_id
101
        assert len(name) or len(value)
102
103
        result = OrderedDict()  # type: Dict
104
        result['type'] = result_type
105
        result['name'] = name
106
        result['severity'] = severity
107
        result['test_id'] = test_id
108
        result['value'] = value
109
        result['host'] = host
110
        result['hostname'] = hostname
111
        result['port'] = port
112
        result['qod'] = qod
113
        result['uri'] = uri
114
        results = self.scans_table[scan_id]['results']
115
        results.append(result)
116
117
        # Set scan_info's results to propagate results to parent process.
118
        self.scans_table[scan_id]['results'] = results
119
120
    def add_result_list(
121
        self, scan_id: str, result_list: Iterable[Dict[str, str]]
122
    ) -> None:
123
        """
124
        Add a batch of results to the result's table for the corresponding
125
        scan_id
126
        """
127
        results = self.scans_table[scan_id]['results']
128
        results.extend(result_list)
129
130
        # Set scan_info's results to propagate results to parent process.
131
        self.scans_table[scan_id]['results'] = results
132
133
    def remove_hosts_from_target_progress(
134
        self, scan_id: str, hosts: List
135
    ) -> None:
136
        """Remove a list of hosts from the main scan progress table to avoid
137
        the hosts to be included in the calculation of the scan progress"""
138
        if not hosts:
139
            return
140
141
        LOGGER.debug(
142
            '%s: Remove the following hosts from the target list, '
143
            'as they are already finished or are dead: %s',
144
            scan_id,
145
            pformat(hosts),
146
        )
147
148
        target = self.scans_table[scan_id].get('target_progress')
149
        for host in hosts:
150
            if host in target:
151
                del target[host]
152
153
        # Set scan_info's target_progress to propagate progresses
154
        # to parent process.
155
        self.scans_table[scan_id]['target_progress'] = target
156
157
    def set_progress(self, scan_id: str, progress: int) -> None:
158
        """Sets scan_id scan's progress."""
159
160
        if progress > ScanProgress.INIT and progress <= ScanProgress.FINISHED:
161
            self.scans_table[scan_id]['progress'] = progress
162
163
        if progress == ScanProgress.FINISHED:
164
            self.scans_table[scan_id]['end_time'] = int(time.time())
165
166
    def set_host_progress(
167
        self, scan_id: str, host_progress_batch: Dict[str, int]
168
    ) -> None:
169
        """Sets scan_id scan's progress."""
170
171
        host_progresses = self.scans_table[scan_id].get('target_progress')
172
        host_progresses.update(host_progress_batch)
173
174
        # Set scan_info's target_progress to propagate progresses
175
        # to parent process.
176
        self.scans_table[scan_id]['target_progress'] = host_progresses
177
178
    def set_host_finished(self, scan_id: str, hosts: List[str]) -> None:
179
        """Increase the amount of finished hosts which were alive."""
180
181
        LOGGER.debug(
182
            '%s: Setting the following hosts as finished: %s',
183
            scan_id,
184
            pformat(hosts),
185
        )
186
        total_finished = len(hosts)
187
        count_alive = (
188
            self.scans_table[scan_id].get('count_alive') + total_finished
189
        )
190
        self.scans_table[scan_id]['count_alive'] = count_alive
191
192
    def set_host_dead(self, scan_id: str, hosts: List[str]) -> None:
193
        """Increase the amount of dead hosts."""
194
195
        LOGGER.debug(
196
            '%s: Setting the following hosts as dead: %s',
197
            scan_id,
198
            pformat(hosts),
199
        )
200
        total_dead = len(hosts)
201
        count_dead = self.scans_table[scan_id].get('count_dead') + total_dead
202
        self.scans_table[scan_id]['count_dead'] = count_dead
203
204
    def set_amount_dead_hosts(self, scan_id: str, total_dead: int) -> None:
205
        """Increase the amount of dead hosts."""
206
207
        count_dead = self.scans_table[scan_id].get('count_dead') + total_dead
208
        self.scans_table[scan_id]['count_dead'] = count_dead
209
210
    def clean_temp_result_list(self, scan_id):
211
        """Clean the results stored in the temporary list."""
212
        self.scans_table[scan_id]['temp_results'] = list()
213
214
    def restore_temp_result_list(self, scan_id):
215
        """Add the results stored in the temporary list into the results
216
        list again."""
217
        result_aux = self.scans_table[scan_id].get('results', list())
218
        result_aux.extend(self.scans_table[scan_id].get('temp_results', list()))
219
220
        # Propagate results
221
        self.scans_table[scan_id]['results'] = result_aux
222
        self.clean_temp_result_list(scan_id)
223
224
    def results_iterator(
225
        self, scan_id: str, pop_res: bool = False, max_res: int = None
226
    ) -> Iterator[Any]:
227
        """Returns an iterator over scan_id scan's results. If pop_res is True,
228
        it removed the fetched results from the list.
229
230
        If max_res is None, return all the results.
231
        Otherwise, if max_res = N > 0 return N as maximum number of results.
232
233
        max_res works only together with pop_results.
234
        """
235
        if pop_res and max_res:
236
            result_aux = self.scans_table[scan_id].get('results', list())
237
            self.scans_table[scan_id]['results'] = result_aux[max_res:]
238
            self.scans_table[scan_id]['temp_results'] = result_aux[:max_res]
239
            return iter(self.scans_table[scan_id]['temp_results'])
240
        elif pop_res:
241
            self.scans_table[scan_id]['temp_results'] = self.scans_table[
242
                scan_id
243
            ].get('results', list())
244
            self.scans_table[scan_id]['results'] = list()
245
            return iter(self.scans_table[scan_id]['temp_results'])
246
247
        return iter(self.scans_table[scan_id]['results'])
248
249
    def ids_iterator(self) -> Iterator[str]:
250
        """Returns an iterator over the collection's scan IDS."""
251
252
        # Do not iterate over the scans_table because it can change
253
        # during iteration, since it is accessed by multiple processes.
254
        scan_id_list = list(self.scans_table)
255
        return iter(scan_id_list)
256
257
    def clean_up_pickled_scan_info(self) -> None:
258
        """Remove files of pickled scan info"""
259
        for scan_id in self.ids_iterator():
260
            if self.get_status(scan_id) == ScanStatus.QUEUED:
261
                self.remove_file_pickled_scan_info(scan_id)
262
263
    def remove_file_pickled_scan_info(self, scan_id: str) -> None:
264
        pickler = DataPickler(self.file_storage_dir)
265
        pickler.remove_file(scan_id)
266
267
    def unpickle_scan_info(self, scan_id: str) -> None:
268
        """Unpickle a stored scan_inf corresponding to the scan_id
269
        and store it in the scan_table"""
270
271
        scan_info = self.scans_table.get(scan_id)
272
        scan_info_hash = scan_info.pop('scan_info_hash')
273
274
        pickler = DataPickler(self.file_storage_dir)
275
        unpickled_scan_info = pickler.load_data(scan_id, scan_info_hash)
276
277
        if not unpickled_scan_info:
278
            pickler.remove_file(scan_id)
279
            raise OspdCommandError(
280
                'Not possible to unpickle stored scan info for %s' % scan_id,
281
                'start_scan',
282
            )
283
284
        scan_info['results'] = list()
285
        scan_info['temp_results'] = list()
286
        scan_info['progress'] = ScanProgress.INIT.value
287
        scan_info['target_progress'] = dict()
288
        scan_info['count_alive'] = 0
289
        scan_info['count_dead'] = 0
290
        scan_info['count_total'] = None
291
        scan_info['excluded_simplified'] = None
292
        scan_info['target'] = unpickled_scan_info.pop('target')
293
        scan_info['vts'] = unpickled_scan_info.pop('vts')
294
        scan_info['options'] = unpickled_scan_info.pop('options')
295
        scan_info['start_time'] = int(time.time())
296
        scan_info['end_time'] = 0
297
298
        self.scans_table[scan_id] = scan_info
299
300
        pickler.remove_file(scan_id)
301
302
    def create_scan(
303
        self,
304
        scan_id: str = '',
305
        target: Dict = None,
306
        options: Optional[Dict] = None,
307
        vts: Dict = None,
308
    ) -> str:
309
        """Creates a new scan with provided scan information.
310
311
        @target: Target to scan.
312
        @options: Miscellaneous scan options supplied via <scanner_params>
313
                  XML element.
314
315
        @return: Scan's ID. None if error occurs.
316
        """
317
318
        if not options:
319
            options = dict()
320
321
        credentials = target.pop('credentials')
322
323
        scan_info = self.data_manager.dict()  # type: Dict
324
        scan_info['status'] = ScanStatus.QUEUED
325
        scan_info['credentials'] = credentials
326
        scan_info['start_time'] = int(time.time())
327
        scan_info['end_time'] = 0
328
329
        scan_info_to_pickle = {
330
            'target': target,
331
            'options': options,
332
            'vts': vts,
333
        }
334
335
        if scan_id is None or scan_id == '':
336
            scan_id = str(uuid.uuid4())
337
338
        pickler = DataPickler(self.file_storage_dir)
339
        scan_info_hash = None
340
        try:
341
            scan_info_hash = pickler.store_data(scan_id, scan_info_to_pickle)
342
        except OspdCommandError as e:
343
            LOGGER.error(e)
344
            return
345
346
        scan_info['scan_id'] = scan_id
347
        scan_info['scan_info_hash'] = scan_info_hash
348
349
        self.scans_table[scan_id] = scan_info
350
        return scan_id
351
352
    def set_status(self, scan_id: str, status: ScanStatus) -> None:
353
        """Sets scan_id scan's status."""
354
        self.scans_table[scan_id]['status'] = status
355
        if status == ScanStatus.STOPPED or status == ScanStatus.INTERRUPTED:
356
            self.scans_table[scan_id]['end_time'] = int(time.time())
357
358
    def get_status(self, scan_id: str) -> ScanStatus:
359
        """Get scan_id scans's status."""
360
361
        return self.scans_table[scan_id].get('status')
362
363
    def get_options(self, scan_id: str) -> Dict:
364
        """Get scan_id scan's options list."""
365
366
        return self.scans_table[scan_id].get('options')
367
368
    def set_option(self, scan_id, name: str, value: Any) -> None:
369
        """Set a scan_id scan's name option to value."""
370
371
        self.scans_table[scan_id]['options'][name] = value
372
373
    def get_progress(self, scan_id: str) -> int:
374
        """Get a scan's current progress value."""
375
376
        return self.scans_table[scan_id].get('progress', ScanProgress.INIT)
377
378
    def get_count_dead(self, scan_id: str) -> int:
379
        """Get a scan's current dead host count."""
380
381
        return self.scans_table[scan_id]['count_dead']
382
383
    def get_count_alive(self, scan_id: str) -> int:
384
        """Get a scan's current alive host count."""
385
386
        return self.scans_table[scan_id]['count_alive']
387
388
    def update_count_total(self, scan_id: str, count_total: int) -> int:
389
        """Sets a scan's total hosts."""
390
391
        self.scans_table[scan_id]['count_total'] = count_total
392
393
    def get_count_total(self, scan_id: str) -> int:
394
        """Get a scan's total host count."""
395
396
        count_total = self.scans_table[scan_id]['count_total']
397
398
        # The value set by the server has priority over the value
399
        # calculated from the original target list by ospd.
400
        # As ospd is not intelligent enough to check the amount of valid
401
        # hosts, check for duplicated or invalid hosts, consider a negative
402
        # value set for the server, in case it detects an invalid target string
403
        # or a different amount than the original amount in the target list.
404
        if count_total == -1:
405
            count_total = 0
406
        # If the server does not set the total host count
407
        # ospd set the amount of host from the original host list.
408
        elif count_total is None:
409
            count_total = self.get_host_count(scan_id)
410
            self.update_count_total(scan_id, count_total)
411
412
        return count_total
413
414
    def get_current_target_progress(self, scan_id: str) -> Dict[str, int]:
415
        """Get a scan's current hosts progress"""
416
        return self.scans_table[scan_id]['target_progress']
417
418
    def simplify_exclude_host_count(self, scan_id: str) -> int:
419
        """Remove from exclude_hosts the received hosts in the finished_hosts
420
        list sent by the client.
421
        The finished hosts are sent also as exclude hosts for backward
422
        compatibility purposses.
423
424
        Return:
425
            Count of excluded host.
426
        """
427
        exc_hosts_list = target_str_to_list(self.get_exclude_hosts(scan_id))
428
429
        finished_hosts_list = target_str_to_list(
430
            self.get_finished_hosts(scan_id)
431
        )
432
433
        # Remove finished hosts from excluded host list
434
        if finished_hosts_list and exc_hosts_list:
435
            for finished in finished_hosts_list:
436
                if finished in exc_hosts_list:
437
                    exc_hosts_list.remove(finished)
438
439
        # Remove excluded hosts which don't belong to the target list
440
        host_list = target_str_to_list(self.get_host_list(scan_id))
441
        excluded_simplified = 0
442
        invalid_exc_hosts = 0
443
        if exc_hosts_list:
444
            for exc_host in exc_hosts_list:
445
                if exc_host in host_list:
446
                    excluded_simplified += 1
447
                else:
448
                    invalid_exc_hosts += 1
449
450
        if invalid_exc_hosts > 0:
451
            LOGGER.warning(
452
                "Please check the excluded host list. It contains hosts which "
453
                "do not belong to the target. This warning can be ignored if "
454
                "this was done on purpose (e.g. to exclude specific hostname)."
455
            )
456
457
        # Set scan_info's excluded simplified to propagate excluded count
458
        # to parent process.
459
        self.scans_table[scan_id]['excluded_simplified'] = excluded_simplified
460
461
        return excluded_simplified
462
463
    def get_simplified_exclude_host_count(self, scan_id: str) -> int:
464
        """Get a scan's excluded host count."""
465
        excluded_simplified = self.scans_table[scan_id]['excluded_simplified']
466
        # Check for None because it is the init value, as excluded can be 0
467
        # as well
468
        if excluded_simplified is not None:
469
            return excluded_simplified
470
471
        return self.simplify_exclude_host_count(scan_id)
472
473
    def calculate_target_progress(self, scan_id: str) -> int:
474
        """Get a target's current progress value.
475
        The value is calculated with the progress of each single host
476
        in the target."""
477
478
        total_hosts = self.get_count_total(scan_id)
479
        exc_hosts = self.get_simplified_exclude_host_count(scan_id)
480
        count_alive = self.get_count_alive(scan_id)
481
        count_dead = self.get_count_dead(scan_id)
482
        host_progresses = self.get_current_target_progress(scan_id)
483
484
        try:
485
            t_prog = int(
486
                (sum(host_progresses.values()) + 100 * count_alive)
487
                / (total_hosts - exc_hosts - count_dead)
488
            )
489
        except ZeroDivisionError:
490
            # Consider the case in which all hosts are dead or excluded
491
            LOGGER.debug('%s: All hosts dead or excluded.', scan_id)
492
            t_prog = ScanProgress.FINISHED.value
493
494
        return t_prog
495
496
    def get_start_time(self, scan_id: str) -> str:
497
        """Get a scan's start time."""
498
499
        return self.scans_table[scan_id]['start_time']
500
501
    def get_end_time(self, scan_id: str) -> str:
502
        """Get a scan's end time."""
503
504
        return self.scans_table[scan_id]['end_time']
505
506
    def get_host_list(self, scan_id: str) -> Dict:
507
        """Get a scan's host list."""
508
509
        return self.scans_table[scan_id]['target'].get('hosts')
510
511
    def get_host_count(self, scan_id: str) -> int:
512
        """Get total host count in the target."""
513
        host = self.get_host_list(scan_id)
514
        total_hosts = 0
515
516
        if host:
517
            total_hosts = len(target_str_to_list(host))
518
519
        return total_hosts
520
521
    def get_ports(self, scan_id: str) -> str:
522
        """Get a scan's ports list."""
523
        target = self.scans_table[scan_id].get('target')
524
        ports = target.pop('ports')
525
        self.scans_table[scan_id]['target'] = target
526
        return ports
527
528
    def get_exclude_hosts(self, scan_id: str) -> str:
529
        """Get an exclude host list for a given target."""
530
        return self.scans_table[scan_id]['target'].get('exclude_hosts')
531
532
    def get_finished_hosts(self, scan_id: str) -> str:
533
        """Get the finished host list sent by the client for a given target."""
534
        return self.scans_table[scan_id]['target'].get('finished_hosts')
535
536
    def get_credentials(self, scan_id: str) -> Dict[str, Dict[str, str]]:
537
        """Get a scan's credential list. It return dictionary with
538
        the corresponding credential for a given target.
539
        """
540
        return self.scans_table[scan_id].get('credentials')
541
542
    def get_target_options(self, scan_id: str) -> Dict[str, str]:
543
        """Get a scan's target option dictionary.
544
        It return dictionary with the corresponding options for
545
        a given target.
546
        """
547
        return self.scans_table[scan_id]['target'].get('options')
548
549
    def get_vts(self, scan_id: str) -> Dict[str, Union[Dict[str, str], List]]:
550
        """Get a scan's vts."""
551
        scan_info = self.scans_table[scan_id]
552
        vts = scan_info.pop('vts')
553
        self.scans_table[scan_id] = scan_info
554
555
        return vts
556
557
    def id_exists(self, scan_id: str) -> bool:
558
        """Check whether a scan exists in the table."""
559
560
        return self.scans_table.get(scan_id) is not None
561
562
    def delete_scan(self, scan_id: str) -> bool:
563
        """Delete a scan if fully finished."""
564
565
        if self.get_status(scan_id) == ScanStatus.RUNNING:
566
            return False
567
568
        scans_table = self.scans_table
569
        del scans_table[scan_id]
570
        self.scans_table = scans_table
571
572
        return True
573