Passed
Pull Request — master (#424)
by Juan José
02:08
created

ospd_openvas.dryrun.DryRun.exec_dry_run_scan()   F

Complexity

Conditions 12

Size

Total Lines 147
Code Lines 99

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 99
nop 3
dl 0
loc 147
rs 3.3927
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd_openvas.dryrun.DryRun.exec_dry_run_scan() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2021 Greenbone Networks GmbH
3
#
4
# SPDX-License-Identifier: AGPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU Affero General Public License as
8
# published by the Free Software Foundation, either version 3 of the
9
# License, or (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU Affero General Public License for more details.
15
#
16
# You should have received a copy of the GNU Affero General Public License
17
# along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19
20
# pylint: disable=too-many-lines
21
22
""" Methods for dry run """
23
24
import logging
25
import time
26
27
from random import uniform, choice
28
29
from ospd.scan import ScanProgress, ScanStatus
30
from ospd.network import target_str_to_list, ports_as_list
31
from ospd.resultlist import ResultList
32
from ospd_openvas.vthelper import VtHelper
33
34
logger = logging.getLogger(__name__)
35
36
37
class DryRun:
38
    def __init__(self, daemon):
39
        self._daemon = daemon
40
41
    def exec_dry_run_scan(self, scan_id, nvti):
42
        params = self._daemon.get_scanner_params()
43
        results_per_host = params.get("results_per_host", 10)
44
45
        # Get the host list
46
        target = self._daemon.scan_collection.get_host_list(scan_id)
47
        logger.info("The target list %s", target)
48
        host_list = target_str_to_list(target)
49
50
        # Get the port list
51
        ports = self._daemon.scan_collection.get_ports(scan_id)
52
        logger.info("The port list %s", ports)
53
        tcp, _ = ports_as_list(ports)
54
        # Get exclude hosts list. It must not be scanned
55
        exclude_hosts = self._daemon.scan_collection.get_exclude_hosts(scan_id)
56
        logger.info("The exclude hosts list %s", exclude_hosts)
57
58
        self._daemon.set_scan_total_hosts(
59
            scan_id,
60
            count_total=len(host_list),
61
        )
62
        self._daemon.scan_collection.set_amount_dead_hosts(
63
            scan_id, total_dead=0
64
        )
65
66
        # Get list of VTS. Ignore script params
67
        vts = list(self._daemon.scan_collection.get_vts(scan_id))
68
        if "vt_groups" in vts:
69
            vts.remove("vt_groups")
70
        vthelper = VtHelper(nvti)
71
72
        # Run the scan.
73
        # Scan simulation for each single host.
74
        # Run the scan against the host, and generates results.
75
        while host_list:
76
            # Get a host from the list
77
            current_host = host_list.pop()
78
79
            # Check if the scan was stopped.
80
            status = self._daemon.get_scan_status(scan_id)
81
            if status == ScanStatus.STOPPED or status == ScanStatus.FINISHED:
82
                logger.debug(
83
                    'Task %s stopped or finished.',
84
                    scan_id,
85
                )
86
                return
87
88
            res_list = ResultList()
89
90
            res_list.add_scan_log_to_list(
91
                host=current_host,
92
                name="HOST_START",
93
                value=str(int(time.time())),
94
            )
95
96
            # Generate N results per host. Default 10 results
97
            res_count = 0
98
            while res_count < results_per_host:
99
                res_count += 1
100
                oid = choice(vts)
101
                port = choice(tcp)
102
                vt = vthelper.get_single_vt(oid)
103
                if vt:
104
                    if vt.get('qod_type'):
105
                        qod_t = vt.get('qod_type')
106
                        rqod = nvti.QOD_TYPES[qod_t]
107
                    elif vt.get('qod'):
108
                        rqod = vt.get('qod')
109
110
                    rname = vt.get('name')
111
                else:
112
                    logger.debug("oid %s not found", oid)
113
114
                res_type = int(uniform(1, 5))
115
                # Error
116
                if res_type == 1:
117
                    res_list.add_scan_error_to_list(
118
                        host=current_host,
119
                        hostname=current_host + ".hostname.net",
120
                        name=rname,
0 ignored issues
show
introduced by
The variable rname does not seem to be defined for all execution paths.
Loading history...
121
                        value="error running the script " + oid,
122
                        port=port,
123
                        test_id=oid,
124
                        uri="No location",
125
                    )
126
                # Log
127
                elif res_type == 2:
128
                    res_list.add_scan_log_to_list(
129
                        host=current_host,
130
                        hostname=current_host + ".hostname.net",
131
                        name=rname,
132
                        value="Log generate from a dry run scan for the script "
133
                        + oid,
134
                        port=port,
135
                        qod=rqod,
0 ignored issues
show
introduced by
The variable rqod does not seem to be defined for all execution paths.
Loading history...
136
                        test_id=oid,
137
                        uri="No location",
138
                    )
139
                # Alarm
140
                else:
141
                    r_severity = vthelper.get_severity_score(vt)
142
                    res_list.add_scan_alarm_to_list(
143
                        host=current_host,
144
                        hostname=current_host + ".hostname.net",
145
                        name=rname,
146
                        value="Log generate from a dry run scan for the script "
147
                        + oid,
148
                        port=port,
149
                        test_id=oid,
150
                        severity=r_severity,
151
                        qod=rqod,
152
                        uri="No location",
153
                    )
154
155
            res_list.add_scan_log_to_list(
156
                host=current_host,
157
                name="HOST_END",
158
                value=str(int(time.time())),
159
            )
160
161
            # Add the result to the scan collection
162
            if len(res_list):
163
                logger.debug(
164
                    '%s: Inserting %d results into scan '
165
                    'scan collection table',
166
                    scan_id,
167
                    len(res_list),
168
                )
169
                self._daemon.scan_collection.add_result_list(scan_id, res_list)
170
171
            # Set the host scan progress as finished
172
            host_progress = dict()
173
            host_progress[current_host] = ScanProgress.FINISHED
174
            self._daemon.set_scan_progress_batch(
175
                scan_id, host_progress=host_progress
176
            )
177
178
            # Update the host status, Finished host. So ospd can
179
            # calculate the scan progress.
180
            # This is quite importan, since the final scan status depends on
181
            # the progress calculation.
182
            finished_host = list()
183
            finished_host.append(current_host)
184
            self._daemon.sort_host_finished(scan_id, finished_host)
185
186
            time.sleep(1)
187
        logger.debug('%s: End task', scan_id)
188