Conditions | 15 |
Total Lines | 153 |
Code Lines | 104 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like ospd_openvas.dryrun.DryRun.exec_dry_run_scan() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
41 | def exec_dry_run_scan(self, scan_id, nvti, ospd_params): |
||
42 | options = self._daemon.scan_collection.get_options(scan_id) |
||
43 | results_per_host = None |
||
44 | if "results_per_host" in options: |
||
45 | results_per_host = options.get("results_per_host") |
||
46 | |||
47 | if not results_per_host or not isinstance(results_per_host, int): |
||
48 | logger.debug("Using default value for results_per_host options") |
||
49 | results_per_host = ospd_params["results_per_host"].get("default") |
||
50 | |||
51 | # Get the host list |
||
52 | target = self._daemon.scan_collection.get_host_list(scan_id) |
||
53 | logger.info("The target list %s", target) |
||
54 | host_list = target_str_to_list(target) |
||
55 | |||
56 | # Get the port list |
||
57 | ports = self._daemon.scan_collection.get_ports(scan_id) |
||
58 | logger.info("The port list %s", ports) |
||
59 | tcp, _ = ports_as_list(ports) |
||
60 | # Get exclude hosts list. It must not be scanned |
||
61 | exclude_hosts = self._daemon.scan_collection.get_exclude_hosts(scan_id) |
||
62 | logger.info("The exclude hosts list %s", exclude_hosts) |
||
63 | |||
64 | self._daemon.set_scan_total_hosts( |
||
65 | scan_id, |
||
66 | count_total=len(host_list), |
||
67 | ) |
||
68 | self._daemon.scan_collection.set_amount_dead_hosts( |
||
69 | scan_id, total_dead=0 |
||
70 | ) |
||
71 | |||
72 | # Get list of VTS. Ignore script params |
||
73 | vts = list(self._daemon.scan_collection.get_vts(scan_id)) |
||
74 | if "vt_groups" in vts: |
||
75 | vts.remove("vt_groups") |
||
76 | vthelper = VtHelper(nvti) |
||
77 | |||
78 | # Run the scan. |
||
79 | # Scan simulation for each single host. |
||
80 | # Run the scan against the host, and generates results. |
||
81 | while host_list: |
||
82 | # Get a host from the list |
||
83 | current_host = host_list.pop() |
||
84 | |||
85 | # Check if the scan was stopped. |
||
86 | status = self._daemon.get_scan_status(scan_id) |
||
87 | if status == ScanStatus.STOPPED or status == ScanStatus.FINISHED: |
||
88 | logger.debug( |
||
89 | 'Task %s stopped or finished.', |
||
90 | scan_id, |
||
91 | ) |
||
92 | return |
||
93 | |||
94 | res_list = ResultList() |
||
95 | |||
96 | res_list.add_scan_log_to_list( |
||
97 | host=current_host, |
||
98 | name="HOST_START", |
||
99 | value=str(int(time.time())), |
||
100 | ) |
||
101 | |||
102 | # Generate N results per host. Default 10 results |
||
103 | res_count = 0 |
||
104 | while res_count < results_per_host: |
||
105 | res_count += 1 |
||
106 | oid = choice(vts) |
||
107 | port = choice(tcp) |
||
108 | vt = vthelper.get_single_vt(oid) |
||
109 | if vt: |
||
110 | if vt.get('qod_type'): |
||
111 | qod_t = vt.get('qod_type') |
||
112 | rqod = nvti.QOD_TYPES[qod_t] |
||
113 | elif vt.get('qod'): |
||
114 | rqod = vt.get('qod') |
||
115 | |||
116 | rname = vt.get('name') |
||
117 | else: |
||
118 | logger.debug("oid %s not found", oid) |
||
119 | |||
120 | res_type = int(uniform(1, 5)) |
||
121 | # Error |
||
122 | if res_type == 1: |
||
123 | res_list.add_scan_error_to_list( |
||
124 | host=current_host, |
||
125 | hostname=current_host + ".hostname.net", |
||
126 | name=rname, |
||
|
|||
127 | value="error running the script " + oid, |
||
128 | port=port, |
||
129 | test_id=oid, |
||
130 | uri="No location", |
||
131 | ) |
||
132 | # Log |
||
133 | elif res_type == 2: |
||
134 | res_list.add_scan_log_to_list( |
||
135 | host=current_host, |
||
136 | hostname=current_host + ".hostname.net", |
||
137 | name=rname, |
||
138 | value="Log generate from a dry run scan for the script " |
||
139 | + oid, |
||
140 | port=port, |
||
141 | qod=rqod, |
||
142 | test_id=oid, |
||
143 | uri="No location", |
||
144 | ) |
||
145 | # Alarm |
||
146 | else: |
||
147 | r_severity = vthelper.get_severity_score(vt) |
||
148 | res_list.add_scan_alarm_to_list( |
||
149 | host=current_host, |
||
150 | hostname=current_host + ".hostname.net", |
||
151 | name=rname, |
||
152 | value="Log generate from a dry run scan for the script " |
||
153 | + oid, |
||
154 | port=port, |
||
155 | test_id=oid, |
||
156 | severity=r_severity, |
||
157 | qod=rqod, |
||
158 | uri="No location", |
||
159 | ) |
||
160 | |||
161 | res_list.add_scan_log_to_list( |
||
162 | host=current_host, |
||
163 | name="HOST_END", |
||
164 | value=str(int(time.time())), |
||
165 | ) |
||
166 | |||
167 | # Add the result to the scan collection |
||
168 | if len(res_list): |
||
169 | logger.debug( |
||
170 | '%s: Inserting %d results into scan ' |
||
171 | 'scan collection table', |
||
172 | scan_id, |
||
173 | len(res_list), |
||
174 | ) |
||
175 | self._daemon.scan_collection.add_result_list(scan_id, res_list) |
||
176 | |||
177 | # Set the host scan progress as finished |
||
178 | host_progress = dict() |
||
179 | host_progress[current_host] = ScanProgress.FINISHED |
||
180 | self._daemon.set_scan_progress_batch( |
||
181 | scan_id, host_progress=host_progress |
||
182 | ) |
||
183 | |||
184 | # Update the host status, Finished host. So ospd can |
||
185 | # calculate the scan progress. |
||
186 | # This is quite importan, since the final scan status depends on |
||
187 | # the progress calculation. |
||
188 | finished_host = list() |
||
189 | finished_host.append(current_host) |
||
190 | self._daemon.sort_host_finished(scan_id, finished_host) |
||
191 | |||
192 | time.sleep(1) |
||
193 | logger.debug('%s: End task', scan_id) |
||
194 |