Total Complexity | 55 |
Total Lines | 459 |
Duplicated Lines | 21.13 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like gvm.protocols.gmpv9.types often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # -*- coding: utf-8 -*- |
||
2 | # Copyright (C) 2019 Greenbone Networks GmbH |
||
3 | # |
||
4 | # SPDX-License-Identifier: GPL-3.0-or-later |
||
5 | # |
||
6 | # This program is free software: you can redistribute it and/or modify |
||
7 | # it under the terms of the GNU General Public License as published by |
||
8 | # the Free Software Foundation, either version 3 of the License, or |
||
9 | # (at your option) any later version. |
||
10 | # |
||
11 | # This program is distributed in the hope that it will be useful, |
||
12 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
13 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
14 | # GNU General Public License for more details. |
||
15 | # |
||
16 | # You should have received a copy of the GNU General Public License |
||
17 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
18 | from enum import Enum |
||
19 | |||
20 | from typing import Optional |
||
21 | |||
22 | from gvm.errors import InvalidArgument |
||
23 | |||
24 | from gvm.protocols.gmpv8.types import ( |
||
25 | AliveTest, |
||
26 | AssetType, |
||
27 | CredentialFormat, |
||
28 | CredentialType, |
||
29 | FeedType, |
||
30 | HostsOrdering, |
||
31 | InfoType, |
||
32 | PermissionSubjectType, |
||
33 | PortRangeType, |
||
34 | SeverityLevel, |
||
35 | SnmpAuthAlgorithm, |
||
36 | SnmpPrivacyAlgorithm, |
||
37 | TicketStatus, |
||
38 | TimeUnit, |
||
39 | get_alive_test_from_string, |
||
40 | get_asset_type_from_string, |
||
41 | get_credential_format_from_string, |
||
42 | get_credential_type_from_string, |
||
43 | get_feed_type_from_string, |
||
44 | get_hosts_ordering_from_string, |
||
45 | get_info_type_from_string, |
||
46 | get_permission_subject_type_from_string, |
||
47 | get_port_range_type_from_string, |
||
48 | get_severity_level_from_string, |
||
49 | get_snmp_auth_algorithm_from_string, |
||
50 | get_snmp_privacy_algorithm_from_string, |
||
51 | get_ticket_status_from_string, |
||
52 | get_time_unit_from_string, |
||
53 | ) |
||
54 | |||
55 | |||
56 | __all__ = [ |
||
57 | "AlertCondition", |
||
58 | "AlertEvent", |
||
59 | "AlertMethod", |
||
60 | "AliveTest", |
||
61 | "AssetType", |
||
62 | "CredentialFormat", |
||
63 | "CredentialType", |
||
64 | "EntityType", |
||
65 | "FeedType", |
||
66 | "FilterType", |
||
67 | "HostsOrdering", |
||
68 | "InfoType", |
||
69 | "PermissionSubjectType", |
||
70 | "PortRangeType", |
||
71 | "ScannerType", |
||
72 | "SeverityLevel", |
||
73 | "SnmpAuthAlgorithm", |
||
74 | "SnmpPrivacyAlgorithm", |
||
75 | "TicketStatus", |
||
76 | "TimeUnit", |
||
77 | "get_alert_condition_from_string", |
||
78 | "get_alert_event_from_string", |
||
79 | "get_alert_method_from_string", |
||
80 | "get_alive_test_from_string", |
||
81 | "get_asset_type_from_string", |
||
82 | "get_credential_format_from_string", |
||
83 | "get_credential_type_from_string", |
||
84 | "get_entity_type_from_string", |
||
85 | "get_feed_type_from_string", |
||
86 | "get_filter_type_from_string", |
||
87 | "get_hosts_ordering_from_string", |
||
88 | "get_info_type_from_string", |
||
89 | "get_permission_subject_type_from_string", |
||
90 | "get_port_range_type_from_string", |
||
91 | "get_scanner_type_from_string", |
||
92 | "get_severity_level_from_string", |
||
93 | "get_snmp_auth_algorithm_from_string", |
||
94 | "get_snmp_privacy_algorithm_from_string", |
||
95 | "get_ticket_status_from_string", |
||
96 | "get_time_unit_from_string", |
||
97 | ] |
||
98 | |||
99 | |||
100 | View Code Duplication | class EntityType(Enum): |
|
|
|||
101 | """ Enum for entity types """ |
||
102 | |||
103 | AGENT = "agent" |
||
104 | ALERT = "alert" |
||
105 | ASSET = "asset" |
||
106 | CERT_BUND_ADV = "cert_bund_adv" |
||
107 | CPE = "cpe" |
||
108 | CREDENTIAL = "credential" |
||
109 | CVE = "cve" |
||
110 | DFN_CERT_ADV = "dfn_cert_adv" |
||
111 | FILTER = "filter" |
||
112 | GROUP = "group" |
||
113 | HOST = "host" |
||
114 | INFO = "info" |
||
115 | NOTE = "note" |
||
116 | NVT = "nvt" |
||
117 | OPERATING_SYSTEM = "os" |
||
118 | OVALDEF = "ovaldef" |
||
119 | OVERRIDE = "override" |
||
120 | PERMISSION = "permission" |
||
121 | PORT_LIST = "port_list" |
||
122 | REPORT = "report" |
||
123 | REPORT_FORMAT = "report_format" |
||
124 | RESULT = "result" |
||
125 | ROLE = "role" |
||
126 | SCAN_CONFIG = "config" |
||
127 | SCANNER = "scanner" |
||
128 | SCHEDULE = "schedule" |
||
129 | TAG = "tag" |
||
130 | TARGET = "target" |
||
131 | TASK = "task" |
||
132 | TICKET = "ticket" |
||
133 | TLS_CERTIFICATE = "tls_certificate" |
||
134 | USER = "user" |
||
135 | VULNERABILITY = "vuln" |
||
136 | |||
137 | |||
138 | View Code Duplication | def get_entity_type_from_string( |
|
139 | entity_type: Optional[str], |
||
140 | ) -> Optional[EntityType]: |
||
141 | """ Convert a entity type string to an actual EntityType instance |
||
142 | |||
143 | Arguments: |
||
144 | entity_type: Entity type string to convert to a EntityType |
||
145 | """ |
||
146 | if not entity_type: |
||
147 | return None |
||
148 | |||
149 | if entity_type == 'vuln': |
||
150 | return EntityType.VULNERABILITY |
||
151 | |||
152 | if entity_type == 'os': |
||
153 | return EntityType.OPERATING_SYSTEM |
||
154 | |||
155 | if entity_type == 'config': |
||
156 | return EntityType.SCAN_CONFIG |
||
157 | |||
158 | if entity_type == 'tls_certificate': |
||
159 | return EntityType.TLS_CERTIFICATE |
||
160 | |||
161 | try: |
||
162 | return EntityType[entity_type.upper()] |
||
163 | except KeyError: |
||
164 | raise InvalidArgument( |
||
165 | argument='entity_type', |
||
166 | function=get_entity_type_from_string.__name__, |
||
167 | ) |
||
168 | |||
169 | |||
170 | class AlertEvent(Enum): |
||
171 | """ Enum for alert event types """ |
||
172 | |||
173 | TASK_RUN_STATUS_CHANGED = 'Task run status changed' |
||
174 | UPDATED_SECINFO_ARRIVED = 'Updated SecInfo arrived' |
||
175 | NEW_SECINFO_ARRIVED = 'New SecInfo arrived' |
||
176 | TICKET_RECEIVED = 'Ticket received' |
||
177 | ASSIGNED_TICKET_CHANGED = 'Assigned ticket changed' |
||
178 | OWNED_TICKET_CHANGED = 'Owned ticket changed' |
||
179 | |||
180 | |||
181 | def get_alert_event_from_string( |
||
182 | alert_event: Optional[str], |
||
183 | ) -> Optional[AlertEvent]: |
||
184 | """ Convert an alert event string into a AlertEvent instance """ |
||
185 | if not alert_event: |
||
186 | return None |
||
187 | |||
188 | alert_event = alert_event.lower() |
||
189 | |||
190 | if alert_event == 'task run status changed': |
||
191 | return AlertEvent.TASK_RUN_STATUS_CHANGED |
||
192 | |||
193 | if alert_event == 'updated secinfo arrived': |
||
194 | return AlertEvent.UPDATED_SECINFO_ARRIVED |
||
195 | |||
196 | if alert_event == 'new secinfo arrived': |
||
197 | return AlertEvent.NEW_SECINFO_ARRIVED |
||
198 | |||
199 | if alert_event == 'ticket received': |
||
200 | return AlertEvent.TICKET_RECEIVED |
||
201 | |||
202 | if alert_event == 'assigned ticket changed': |
||
203 | return AlertEvent.ASSIGNED_TICKET_CHANGED |
||
204 | |||
205 | if alert_event == 'owned ticket changed': |
||
206 | return AlertEvent.OWNED_TICKET_CHANGED |
||
207 | |||
208 | raise InvalidArgument( |
||
209 | argument='alert_event', function=get_alert_event_from_string.__name__ |
||
210 | ) |
||
211 | |||
212 | |||
213 | class AlertCondition(Enum): |
||
214 | """ Enum for alert condition types """ |
||
215 | |||
216 | ALWAYS = 'Always' |
||
217 | ERROR = 'Error' |
||
218 | SEVERITY_AT_LEAST = 'Severity at least' |
||
219 | SEVERITY_CHANGED = 'Severity changed' |
||
220 | FILTER_COUNT_CHANGED = 'Filter count changed' |
||
221 | FILTER_COUNT_AT_LEAST = 'Filter count at least' |
||
222 | |||
223 | |||
224 | def get_alert_condition_from_string( |
||
225 | alert_condition: Optional[str], |
||
226 | ) -> Optional[AlertCondition]: |
||
227 | """ Convert an alert condition string into a AlertCondition instance """ |
||
228 | if not alert_condition: |
||
229 | return None |
||
230 | |||
231 | alert_condition = alert_condition.lower() |
||
232 | |||
233 | if alert_condition == 'error': |
||
234 | return AlertCondition.ERROR |
||
235 | |||
236 | if alert_condition == 'always': |
||
237 | return AlertCondition.ALWAYS |
||
238 | |||
239 | if alert_condition == 'filter count changed': |
||
240 | return AlertCondition.FILTER_COUNT_CHANGED |
||
241 | |||
242 | if alert_condition == 'filter count at least': |
||
243 | return AlertCondition.FILTER_COUNT_AT_LEAST |
||
244 | |||
245 | if alert_condition == 'severity at least': |
||
246 | return AlertCondition.SEVERITY_AT_LEAST |
||
247 | |||
248 | if alert_condition == 'severity changed': |
||
249 | return AlertCondition.SEVERITY_CHANGED |
||
250 | |||
251 | raise InvalidArgument( |
||
252 | argument='alert_condition', |
||
253 | function=get_alert_condition_from_string.__name__, |
||
254 | ) |
||
255 | |||
256 | |||
257 | class AlertMethod(Enum): |
||
258 | """ Enum for alert method type""" |
||
259 | |||
260 | SCP = "SCP" |
||
261 | SEND = "Send" |
||
262 | SMB = "SMB" |
||
263 | SNMP = "SNMP" |
||
264 | SYSLOG = "Syslog" |
||
265 | EMAIL = "Email" |
||
266 | START_TASK = "Start Task" |
||
267 | HTTP_GET = "HTTP Get" |
||
268 | SOURCEFIRE_CONNECTOR = "Sourcefire Connector" |
||
269 | VERINICE_CONNECTOR = "verinice Connector" |
||
270 | TIPPINGPOINT = "TippingPoint SMS" |
||
271 | ALEMBA_VFIRE = "Alemba vFire" |
||
272 | |||
273 | |||
274 | def get_alert_method_from_string( |
||
275 | alert_method: Optional[str], |
||
276 | ) -> Optional[AlertMethod]: |
||
277 | """ Convert an alert method string into a AlertCondition instance """ |
||
278 | if not alert_method: |
||
279 | return None |
||
280 | |||
281 | alert_method = alert_method.upper() |
||
282 | |||
283 | if alert_method == 'START TASK': |
||
284 | return AlertMethod.START_TASK |
||
285 | |||
286 | if alert_method == 'HTTP GET': |
||
287 | return AlertMethod.HTTP_GET |
||
288 | |||
289 | if alert_method == 'SOURCEFIRE CONNECTOR': |
||
290 | return AlertMethod.SOURCEFIRE_CONNECTOR |
||
291 | |||
292 | if alert_method == 'VERINICE CONNECTOR': |
||
293 | return AlertMethod.VERINICE_CONNECTOR |
||
294 | |||
295 | if alert_method == 'TIPPINGPOINT SMS': |
||
296 | return AlertMethod.TIPPINGPOINT |
||
297 | |||
298 | if alert_method == 'ALEMBA VFIRE': |
||
299 | return AlertMethod.ALEMBA_VFIRE |
||
300 | |||
301 | try: |
||
302 | return AlertMethod[alert_method] |
||
303 | except KeyError: |
||
304 | raise InvalidArgument( |
||
305 | argument='alert_method', |
||
306 | function=get_alert_method_from_string.__name__, |
||
307 | ) |
||
308 | |||
309 | |||
310 | class FilterType(Enum): |
||
311 | """ Enum for filter types """ |
||
312 | |||
313 | AGENT = "agent" |
||
314 | ALERT = "alert" |
||
315 | ASSET = "asset" |
||
316 | SCAN_CONFIG = "config" |
||
317 | CREDENTIAL = "credential" |
||
318 | FILTER = "filter" |
||
319 | GROUP = "group" |
||
320 | HOST = "host" |
||
321 | NOTE = "note" |
||
322 | OPERATING_SYSTEM = "os" |
||
323 | OVERRIDE = "override" |
||
324 | PERMISSION = "permission" |
||
325 | PORT_LIST = "port_list" |
||
326 | REPORT = "report" |
||
327 | REPORT_FORMAT = "report_format" |
||
328 | RESULT = "result" |
||
329 | ROLE = "role" |
||
330 | SCHEDULE = "schedule" |
||
331 | ALL_SECINFO = "secinfo" |
||
332 | TAG = "tag" |
||
333 | TARGET = "target" |
||
334 | TASK = "task" |
||
335 | TICKET = "ticket" |
||
336 | TLS_CERTIFICATE = "tls_certificate" |
||
337 | USER = "user" |
||
338 | VULNERABILITY = "vuln" |
||
339 | |||
340 | |||
341 | View Code Duplication | def get_filter_type_from_string( |
|
342 | filter_type: Optional[str], |
||
343 | ) -> Optional[FilterType]: |
||
344 | """ Convert a filter type string to an actual FilterType instance |
||
345 | |||
346 | Arguments: |
||
347 | filter_type (str): Filter type string to convert to a FilterType |
||
348 | """ |
||
349 | if not filter_type: |
||
350 | return None |
||
351 | |||
352 | if filter_type == 'vuln': |
||
353 | return FilterType.VULNERABILITY |
||
354 | |||
355 | if filter_type == 'os': |
||
356 | return FilterType.OPERATING_SYSTEM |
||
357 | |||
358 | if filter_type == 'config': |
||
359 | return FilterType.SCAN_CONFIG |
||
360 | |||
361 | if filter_type == 'secinfo': |
||
362 | return FilterType.ALL_SECINFO |
||
363 | |||
364 | if filter_type == 'tls_certificate': |
||
365 | return FilterType.TLS_CERTIFICATE |
||
366 | |||
367 | try: |
||
368 | return FilterType[filter_type.upper()] |
||
369 | except KeyError: |
||
370 | raise InvalidArgument( |
||
371 | argument='filter_type', |
||
372 | function=get_filter_type_from_string.__name__, |
||
373 | ) |
||
374 | |||
375 | |||
376 | class ScannerType(Enum): |
||
377 | """ Enum for scanner type """ |
||
378 | |||
379 | OSP_SCANNER_TYPE = "1" |
||
380 | OPENVAS_SCANNER_TYPE = "2" |
||
381 | CVE_SCANNER_TYPE = "3" |
||
382 | GMP_SCANNER_TYPE = "4" # formerly slave scanner |
||
383 | GREENBONE_SENSOR_SCANNER_TYPE = "5" |
||
384 | |||
385 | |||
386 | def get_scanner_type_from_string( |
||
387 | scanner_type: Optional[str], |
||
388 | ) -> Optional[ScannerType]: |
||
389 | """ Convert a scanner type string to an actual ScannerType instance |
||
390 | |||
391 | Arguments: |
||
392 | scanner_type: Scanner type string to convert to a ScannerType |
||
393 | """ |
||
394 | if not scanner_type: |
||
395 | return None |
||
396 | |||
397 | scanner_type = scanner_type.lower() |
||
398 | |||
399 | if ( |
||
400 | scanner_type == ScannerType.OSP_SCANNER_TYPE.value |
||
401 | or scanner_type == 'osp' |
||
402 | ): |
||
403 | return ScannerType.OSP_SCANNER_TYPE |
||
404 | |||
405 | if ( |
||
406 | scanner_type == ScannerType.OPENVAS_SCANNER_TYPE.value |
||
407 | or scanner_type == 'openvas' |
||
408 | ): |
||
409 | return ScannerType.OPENVAS_SCANNER_TYPE |
||
410 | |||
411 | if ( |
||
412 | scanner_type == ScannerType.CVE_SCANNER_TYPE.value |
||
413 | or scanner_type == 'cve' |
||
414 | ): |
||
415 | return ScannerType.CVE_SCANNER_TYPE |
||
416 | |||
417 | if ( |
||
418 | scanner_type == ScannerType.GMP_SCANNER_TYPE.value |
||
419 | or scanner_type == 'gmp' |
||
420 | ): |
||
421 | return ScannerType.GMP_SCANNER_TYPE |
||
422 | |||
423 | if ( |
||
424 | scanner_type == ScannerType.GREENBONE_SENSOR_SCANNER_TYPE.value |
||
425 | or scanner_type == 'greenbone' |
||
426 | ): |
||
427 | return ScannerType.GREENBONE_SENSOR_SCANNER_TYPE |
||
428 | |||
429 | raise InvalidArgument( |
||
430 | argument='scanner_type', function=get_scanner_type_from_string.__name__ |
||
431 | ) |
||
432 | |||
433 | |||
434 | class _UsageType(Enum): |
||
435 | """ Enum for usage types """ |
||
436 | |||
437 | AUDIT = "audit" |
||
438 | POLICY = "policy" |
||
439 | SCAN = "scan" |
||
440 | |||
441 | |||
442 | def __get_usage_type_from_string( |
||
443 | usage_type: Optional[str], |
||
444 | ) -> Optional[_UsageType]: |
||
445 | """ Convert a usage type string to an actual _UsageType instance |
||
446 | |||
447 | Arguments: |
||
448 | entity_type: Usage type string to convert to a _UsageType |
||
449 | """ |
||
450 | if not usage_type: |
||
451 | return None |
||
452 | |||
453 | try: |
||
454 | return _UsageType[usage_type.upper()] |
||
455 | except KeyError: |
||
456 | raise InvalidArgument( |
||
457 | argument='usage_type', |
||
458 | function=__get_usage_type_from_string.__name__, |
||
459 | ) |
||
460 |