| Conditions | 15 |
| Total Lines | 153 |
| Code Lines | 104 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like ospd_openvas.dryrun.DryRun.exec_dry_run_scan() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf-8 -*- |
||
| 41 | def exec_dry_run_scan(self, scan_id, nvti, ospd_params): |
||
| 42 | options = self._daemon.scan_collection.get_options(scan_id) |
||
| 43 | results_per_host = None |
||
| 44 | if "results_per_host" in options: |
||
| 45 | results_per_host = options.get("results_per_host") |
||
| 46 | |||
| 47 | if not results_per_host or not isinstance(results_per_host, int): |
||
| 48 | logger.debug("Using default value for results_per_host options") |
||
| 49 | results_per_host = ospd_params["results_per_host"].get("default") |
||
| 50 | |||
| 51 | # Get the host list |
||
| 52 | target = self._daemon.scan_collection.get_host_list(scan_id) |
||
| 53 | logger.info("The target list %s", target) |
||
| 54 | host_list = target_str_to_list(target) |
||
| 55 | |||
| 56 | # Get the port list |
||
| 57 | ports = self._daemon.scan_collection.get_ports(scan_id) |
||
| 58 | logger.info("The port list %s", ports) |
||
| 59 | tcp, _ = ports_as_list(ports) |
||
| 60 | # Get exclude hosts list. It must not be scanned |
||
| 61 | exclude_hosts = self._daemon.scan_collection.get_exclude_hosts(scan_id) |
||
| 62 | logger.info("The exclude hosts list %s", exclude_hosts) |
||
| 63 | |||
| 64 | self._daemon.set_scan_total_hosts( |
||
| 65 | scan_id, |
||
| 66 | count_total=len(host_list), |
||
| 67 | ) |
||
| 68 | self._daemon.scan_collection.set_amount_dead_hosts( |
||
| 69 | scan_id, total_dead=0 |
||
| 70 | ) |
||
| 71 | |||
| 72 | # Get list of VTS. Ignore script params |
||
| 73 | vts = list(self._daemon.scan_collection.get_vts(scan_id)) |
||
| 74 | if "vt_groups" in vts: |
||
| 75 | vts.remove("vt_groups") |
||
| 76 | vthelper = VtHelper(nvti) |
||
| 77 | |||
| 78 | # Run the scan. |
||
| 79 | # Scan simulation for each single host. |
||
| 80 | # Run the scan against the host, and generates results. |
||
| 81 | while host_list: |
||
| 82 | # Get a host from the list |
||
| 83 | current_host = host_list.pop() |
||
| 84 | |||
| 85 | # Check if the scan was stopped. |
||
| 86 | status = self._daemon.get_scan_status(scan_id) |
||
| 87 | if status == ScanStatus.STOPPED or status == ScanStatus.FINISHED: |
||
| 88 | logger.debug( |
||
| 89 | 'Task %s stopped or finished.', |
||
| 90 | scan_id, |
||
| 91 | ) |
||
| 92 | return |
||
| 93 | |||
| 94 | res_list = ResultList() |
||
| 95 | |||
| 96 | res_list.add_scan_log_to_list( |
||
| 97 | host=current_host, |
||
| 98 | name="HOST_START", |
||
| 99 | value=str(int(time.time())), |
||
| 100 | ) |
||
| 101 | |||
| 102 | # Generate N results per host. Default 10 results |
||
| 103 | res_count = 0 |
||
| 104 | while res_count < results_per_host: |
||
| 105 | res_count += 1 |
||
| 106 | oid = choice(vts) |
||
| 107 | port = choice(tcp) |
||
| 108 | vt = vthelper.get_single_vt(oid) |
||
| 109 | if vt: |
||
| 110 | if vt.get('qod_type'): |
||
| 111 | qod_t = vt.get('qod_type') |
||
| 112 | rqod = nvti.QOD_TYPES[qod_t] |
||
| 113 | elif vt.get('qod'): |
||
| 114 | rqod = vt.get('qod') |
||
| 115 | |||
| 116 | rname = vt.get('name') |
||
| 117 | else: |
||
| 118 | logger.debug("oid %s not found", oid) |
||
| 119 | |||
| 120 | res_type = int(uniform(1, 5)) |
||
| 121 | # Error |
||
| 122 | if res_type == 1: |
||
| 123 | res_list.add_scan_error_to_list( |
||
| 124 | host=current_host, |
||
| 125 | hostname=current_host + ".hostname.net", |
||
| 126 | name=rname, |
||
|
|
|||
| 127 | value="error running the script " + oid, |
||
| 128 | port=port, |
||
| 129 | test_id=oid, |
||
| 130 | uri="No location", |
||
| 131 | ) |
||
| 132 | # Log |
||
| 133 | elif res_type == 2: |
||
| 134 | res_list.add_scan_log_to_list( |
||
| 135 | host=current_host, |
||
| 136 | hostname=current_host + ".hostname.net", |
||
| 137 | name=rname, |
||
| 138 | value="Log generate from a dry run scan for the script " |
||
| 139 | + oid, |
||
| 140 | port=port, |
||
| 141 | qod=rqod, |
||
| 142 | test_id=oid, |
||
| 143 | uri="No location", |
||
| 144 | ) |
||
| 145 | # Alarm |
||
| 146 | else: |
||
| 147 | r_severity = vthelper.get_severity_score(vt) |
||
| 148 | res_list.add_scan_alarm_to_list( |
||
| 149 | host=current_host, |
||
| 150 | hostname=current_host + ".hostname.net", |
||
| 151 | name=rname, |
||
| 152 | value="Log generate from a dry run scan for the script " |
||
| 153 | + oid, |
||
| 154 | port=port, |
||
| 155 | test_id=oid, |
||
| 156 | severity=r_severity, |
||
| 157 | qod=rqod, |
||
| 158 | uri="No location", |
||
| 159 | ) |
||
| 160 | |||
| 161 | res_list.add_scan_log_to_list( |
||
| 162 | host=current_host, |
||
| 163 | name="HOST_END", |
||
| 164 | value=str(int(time.time())), |
||
| 165 | ) |
||
| 166 | |||
| 167 | # Add the result to the scan collection |
||
| 168 | if len(res_list): |
||
| 169 | logger.debug( |
||
| 170 | '%s: Inserting %d results into scan ' |
||
| 171 | 'scan collection table', |
||
| 172 | scan_id, |
||
| 173 | len(res_list), |
||
| 174 | ) |
||
| 175 | self._daemon.scan_collection.add_result_list(scan_id, res_list) |
||
| 176 | |||
| 177 | # Set the host scan progress as finished |
||
| 178 | host_progress = dict() |
||
| 179 | host_progress[current_host] = ScanProgress.FINISHED |
||
| 180 | self._daemon.set_scan_progress_batch( |
||
| 181 | scan_id, host_progress=host_progress |
||
| 182 | ) |
||
| 183 | |||
| 184 | # Update the host status, Finished host. So ospd can |
||
| 185 | # calculate the scan progress. |
||
| 186 | # This is quite importan, since the final scan status depends on |
||
| 187 | # the progress calculation. |
||
| 188 | finished_host = list() |
||
| 189 | finished_host.append(current_host) |
||
| 190 | self._daemon.sort_host_finished(scan_id, finished_host) |
||
| 191 | |||
| 192 | time.sleep(1) |
||
| 193 | logger.debug('%s: End task', scan_id) |
||
| 194 |