ospd_openvas.dryrun   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 194
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 116
dl 0
loc 194
rs 10
c 0
b 0
f 0
wmc 16

2 Methods

Rating   Name   Duplication   Size   Complexity  
A DryRun.__init__() 0 2 1
F DryRun.exec_dry_run_scan() 0 153 15
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Methods for dry run """
23
24
import logging
25
import time
26
27
from random import uniform, choice
28
29
from ospd.scan import ScanProgress, ScanStatus
30
from ospd.network import target_str_to_list, ports_as_list
31
from ospd.resultlist import ResultList
32
from ospd_openvas.vthelper import VtHelper
33
34
logger = logging.getLogger(__name__)
35
36
37
class DryRun:
38
    def __init__(self, daemon):
39
        self._daemon = daemon
40
41
    def exec_dry_run_scan(self, scan_id, nvti, ospd_params):
42
        options = self._daemon.scan_collection.get_options(scan_id)
43
        results_per_host = None
44
        if "results_per_host" in options:
45
            results_per_host = options.get("results_per_host")
46
47
        if not results_per_host or not isinstance(results_per_host, int):
48
            logger.debug("Using default value for results_per_host options")
49
            results_per_host = ospd_params["results_per_host"].get("default")
50
51
        # Get the host list
52
        target = self._daemon.scan_collection.get_host_list(scan_id)
53
        logger.info("The target list %s", target)
54
        host_list = target_str_to_list(target)
55
56
        # Get the port list
57
        ports = self._daemon.scan_collection.get_ports(scan_id)
58
        logger.info("The port list %s", ports)
59
        tcp, _ = ports_as_list(ports)
60
        # Get exclude hosts list. It must not be scanned
61
        exclude_hosts = self._daemon.scan_collection.get_exclude_hosts(scan_id)
62
        logger.info("The exclude hosts list %s", exclude_hosts)
63
64
        self._daemon.set_scan_total_hosts(
65
            scan_id,
66
            count_total=len(host_list),
67
        )
68
        self._daemon.scan_collection.set_amount_dead_hosts(
69
            scan_id, total_dead=0
70
        )
71
72
        # Get list of VTS. Ignore script params
73
        vts = list(self._daemon.scan_collection.get_vts(scan_id))
74
        if "vt_groups" in vts:
75
            vts.remove("vt_groups")
76
        vthelper = VtHelper(nvti)
77
78
        # Run the scan.
79
        # Scan simulation for each single host.
80
        # Run the scan against the host, and generates results.
81
        while host_list:
82
            # Get a host from the list
83
            current_host = host_list.pop()
84
85
            # Check if the scan was stopped.
86
            status = self._daemon.get_scan_status(scan_id)
87
            if status == ScanStatus.STOPPED or status == ScanStatus.FINISHED:
88
                logger.debug(
89
                    'Task %s stopped or finished.',
90
                    scan_id,
91
                )
92
                return
93
94
            res_list = ResultList()
95
96
            res_list.add_scan_log_to_list(
97
                host=current_host,
98
                name="HOST_START",
99
                value=str(int(time.time())),
100
            )
101
102
            # Generate N results per host. Default 10 results
103
            res_count = 0
104
            while res_count < results_per_host:
105
                res_count += 1
106
                oid = choice(vts)
107
                port = choice(tcp)
108
                vt = vthelper.get_single_vt(oid)
109
                if vt:
110
                    if vt.get('qod_type'):
111
                        qod_t = vt.get('qod_type')
112
                        rqod = nvti.QOD_TYPES[qod_t]
113
                    elif vt.get('qod'):
114
                        rqod = vt.get('qod')
115
116
                    rname = vt.get('name')
117
                else:
118
                    logger.debug("oid %s not found", oid)
119
120
                res_type = int(uniform(1, 5))
121
                # Error
122
                if res_type == 1:
123
                    res_list.add_scan_error_to_list(
124
                        host=current_host,
125
                        hostname=current_host + ".hostname.net",
126
                        name=rname,
0 ignored issues
show
introduced by
The variable rname does not seem to be defined for all execution paths.
Loading history...
127
                        value="error running the script " + oid,
128
                        port=port,
129
                        test_id=oid,
130
                        uri="No location",
131
                    )
132
                # Log
133
                elif res_type == 2:
134
                    res_list.add_scan_log_to_list(
135
                        host=current_host,
136
                        hostname=current_host + ".hostname.net",
137
                        name=rname,
138
                        value="Log generate from a dry run scan for the script "
139
                        + oid,
140
                        port=port,
141
                        qod=rqod,
0 ignored issues
show
introduced by
The variable rqod does not seem to be defined for all execution paths.
Loading history...
142
                        test_id=oid,
143
                        uri="No location",
144
                    )
145
                # Alarm
146
                else:
147
                    r_severity = vthelper.get_severity_score(vt)
148
                    res_list.add_scan_alarm_to_list(
149
                        host=current_host,
150
                        hostname=current_host + ".hostname.net",
151
                        name=rname,
152
                        value="Log generate from a dry run scan for the script "
153
                        + oid,
154
                        port=port,
155
                        test_id=oid,
156
                        severity=r_severity,
157
                        qod=rqod,
158
                        uri="No location",
159
                    )
160
161
            res_list.add_scan_log_to_list(
162
                host=current_host,
163
                name="HOST_END",
164
                value=str(int(time.time())),
165
            )
166
167
            # Add the result to the scan collection
168
            if len(res_list):
169
                logger.debug(
170
                    '%s: Inserting %d results into scan '
171
                    'scan collection table',
172
                    scan_id,
173
                    len(res_list),
174
                )
175
                self._daemon.scan_collection.add_result_list(scan_id, res_list)
176
177
            # Set the host scan progress as finished
178
            host_progress = dict()
179
            host_progress[current_host] = ScanProgress.FINISHED
180
            self._daemon.set_scan_progress_batch(
181
                scan_id, host_progress=host_progress
182
            )
183
184
            # Update the host status, Finished host. So ospd can
185
            # calculate the scan progress.
186
            # This is quite importan, since the final scan status depends on
187
            # the progress calculation.
188
            finished_host = list()
189
            finished_host.append(current_host)
190
            self._daemon.sort_host_finished(scan_id, finished_host)
191
192
            time.sleep(1)
193
        logger.debug('%s: End task', scan_id)
194