Passed
Pull Request — master (#424)
by Juan José
01:39
created

ospd_openvas.dryrun.DryRun.__init__()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Methods for dry run """
23
24
import logging
25
import time
26
27
from random import uniform, choice
28
29
from ospd.scan import ScanProgress, ScanStatus
30
from ospd.network import target_str_to_list, ports_as_list
31
from ospd.resultlist import ResultList
32
from ospd_openvas.vthelper import VtHelper
33
34
logger = logging.getLogger(__name__)
35
36
37
class DryRun:
38
    def __init__(self, daemon):
39
        self._daemon = daemon
40
41
    def exec_dry_run_scan(self, scan_id, nvti):
42
        params = self._daemon.get_scanner_params()
43
        results_per_host = params.get("results_per_host", 10)
44
45
        # Get the host list
46
        target = self._daemon.scan_collection.get_host_list(scan_id)
47
        logger.info("The target list %s", target)
48
        host_list = target_str_to_list(target)
49
50
        # Get the port list
51
        ports = self._daemon.scan_collection.get_ports(scan_id)
52
        logger.info("The port list %s", ports)
53
        tcp, udp = ports_as_list(ports)
54
        # Get exclude hosts list. It must not be scanned
55
        exclude_hosts = self._daemon.scan_collection.get_exclude_hosts(scan_id)
56
        logger.info("The exclude hosts list %s", exclude_hosts)
57
58
        self._daemon.set_scan_total_hosts(
59
            scan_id,
60
            count_total=len(host_list),
61
        )
62
        self._daemon.scan_collection.set_amount_dead_hosts(
63
            scan_id, total_dead=0
64
        )
65
66
        # Get list of VTS. Ignore script params
67
        vts = list(self._daemon.scan_collection.get_vts(scan_id))
68
        if "vt_groups" in vts:
69
            vts.remove("vt_groups")
70
        vthelper = VtHelper(nvti)
71
72
        # Run the scan.
73
        # Scan simulation for each single host.
74
        # Run the scan against the host, and generates results.
75
        while host_list:
76
            # Get a host from the list
77
            current_host = host_list.pop()
78
79
            # Check if the scan was stopped.
80
            status = self._daemon.get_scan_status(scan_id)
81
            if status == ScanStatus.STOPPED or status == ScanStatus.FINISHED:
82
                logger.debug(
83
                    'Task %s stopped or finished.',
84
                    scan_id,
85
                )
86
                return
87
88
            res_list = ResultList()
89
90
            res_list.add_scan_log_to_list(
91
                host=current_host,
92
                name="HOST_START",
93
                value=str(int(time.time())),
94
            )
95
96
            # Generate N results per host. Default 10 results
97
            for res in range(0, results_per_host):
98
                oid = choice(vts)
99
                port = choice(tcp)
100
                vt = vthelper.get_single_vt(oid)
101
                if vt:
102
                    if vt.get('qod_type'):
103
                        qod_t = vt.get('qod_type')
104
                        rqod = nvti.QOD_TYPES[qod_t]
105
                    elif vt.get('qod'):
106
                        rqod = vt.get('qod')
107
108
                    rname = vt.get('name')
109
                else:
110
                    logger.debug("oid %s not found", oid)
111
112
                res_type = int(uniform(1, 5))
113
                # Error
114
                if res_type == 1:
115
                    res_list.add_scan_error_to_list(
116
                        host=current_host,
117
                        hostname=current_host + ".hostname.net",
118
                        name=rname,
0 ignored issues
show
introduced by
The variable rname does not seem to be defined for all execution paths.
Loading history...
119
                        value="error running the script " + oid,
120
                        port=port,
121
                        test_id=oid,
122
                        uri="No location",
123
                    )
124
                # Log
125
                elif res_type == 2:
126
                    res_list.add_scan_log_to_list(
127
                        host=current_host,
128
                        hostname=current_host + ".hostname.net",
129
                        name=rname,
130
                        value="Log generate from a dry run scan for the script "
131
                        + oid,
132
                        port=port,
133
                        qod=rqod,
0 ignored issues
show
introduced by
The variable rqod does not seem to be defined for all execution paths.
Loading history...
134
                        test_id=oid,
135
                        uri="No location",
136
                    )
137
                # Alarm
138
                else:
139
                    r_severity = vthelper.get_severity_score(vt)
140
                    res_list.add_scan_alarm_to_list(
141
                        host=current_host,
142
                        hostname=current_host + ".hostname.net",
143
                        name=rname,
144
                        value="Log generate from a dry run scan for the script "
145
                        + oid,
146
                        port=port,
147
                        test_id=oid,
148
                        severity=r_severity,
149
                        qod=rqod,
150
                        uri="No location",
151
                    )
152
153
            res_list.add_scan_log_to_list(
154
                host=current_host,
155
                name="HOST_END",
156
                value=str(int(time.time())),
157
            )
158
159
            # Add the result to the scan collection
160
            if len(res_list):
161
                logger.debug(
162
                    '%s: Inserting %d results into scan '
163
                    'scan collection table',
164
                    scan_id,
165
                    len(res_list),
166
                )
167
                self._daemon.scan_collection.add_result_list(scan_id, res_list)
168
169
            # Set the host scan progress as finished
170
            host_progress = dict()
171
            host_progress[current_host] = ScanProgress.FINISHED
172
            self._daemon.set_scan_progress_batch(
173
                scan_id, host_progress=host_progress
174
            )
175
176
            # Update the host status, Finished host. So ospd can
177
            # calculate the scan progress.
178
            # This is quite importan, since the final scan status depends on
179
            # the progress calculation.
180
            finished_host = list()
181
            finished_host.append(current_host)
182
            self._daemon.sort_host_finished(scan_id, finished_host)
183
184
            time.sleep(1)
185
        logger.debug('%s: End task', scan_id)
186