scan.print_logs()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 0
dl 0
loc 10
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
"""Automation script for Greenbone scanner 20.08."""
3
4
import subprocess
5
import argparse
6
import base64
7
import time
8
import os
9
from lxml import etree
10
from typing import Optional
11
from typing import Union
12
from typing import Dict
13
from typing import List
14
from typing import Set
15
from typing import IO
16
17
DEBUG: bool = False
18
19
scan_profiles: Dict[str, str] = {
20
    "Base": "d21f6c81-2b88-4ac1-b7b4-a2a9f2ad4663",
21
    "Discovery": "8715c877-47a0-438d-98a3-27c7a6ab2196",
22
    "Empty": "085569ce-73ed-11df-83c3-002264764cea",
23
    "Full and fast": "daba56c8-73ec-11df-a475-002264764cea",
24
    "Host Discovery": "2d3f051c-55ba-11e3-bf43-406186ea4fc5",
25
    "System Discovery": "bbca7412-a950-11e3-9109-406186ea4fc5",
26
    "GaussDB 100 V300R001C00 Security Hardening Guide (Standalone)": "61327f09-8a54-4854-9e1c-16798285fb28",
27
    "EulerOS Linux Security Configuration": "9f822ad3-9208-4e02-ac03-78dce3ca9a23",
28
    "Huawei Datacom Product Security Configuration Audit Guide": "aab5c4a1-eab1-4f4e-acac8c36d08de6bc",
29
    "IT-Grundschutz": "c4b7c0cb-6502-4809-b034-8e635311b3e6"
30
}
31
32
report_formats: Dict[str, str] = {
33
    "Anonymous XML": "5057e5cc-b825-11e4-9d0e-28d24461215b",
34
    "CSV Results": "c1645568-627a-11e3-a660-406186ea4fc5",
35
    "ITG": "77bd6c4a-1f62-11e1-abf0-406186ea4fc5",
36
    "PDF": "c402cc3e-b531-11e1-9163-406186ea4fc5",
37
    "TXT": "a3810a62-1f62-11e1-9219-406186ea4fc5",
38
    "XML": "a994b278-1f62-11e1-96ac-406186ea4fc5"
39
}
40
41
scan_ports: Dict[str, str] = {
42
    "All IANA Assigned TCP": "33d0cd82-57c6-11e1-8ed1-406186ea4fc5",
43
    "All IANA Assigned TCP and UDP": "4a4717fe-57d2-11e1-9a26-406186ea4fc5",
44
    "All TCP and Nmap top 100 UDP": "730ef368-57e2-11e1-a90f-406186ea4fc5",
45
}
46
47
alive_tests: Set[str] = {
48
    "Scan Config Default",
49
    "ICMP, TCP-ACK Service & ARP Ping",
50
    "TCP-ACK Service & ARP Ping",
51
    "ICMP & ARP Ping",
52
    "ICMP & TCP-ACK Service Ping",
53
    "ARP Ping",
54
    "TCP-ACK Service Ping",
55
    "TCP-SYN Service Ping",
56
    "ICMP Ping",
57
    "Consider Alive",
58
}
59
60
61
def check_error(error: str):
62
    """Print exception error and exit. Ignore OpenVAS temporary authentication error."""
63
    if 'Failed to authenticate.' not in error:
64
        print("[ERROR] Response: {}".format(error))
65
        exit(1)
66
67
68
def execute_command(command: str, xpath: Optional[str] = None) -> Union[str, float, bool, List]:
69
    """Execute GVMD command and return its output (optionally xpath can be used to get nested XML element)."""
70
    global DEBUG
71
72
    command: str = "su - service -c \"gvm-cli --gmp-username admin --gmp-password admin " \
73
                   "socket --socketpath /usr/local/var/run/gvmd.sock --xml \'{}\'\"".format(command)
74
75
    if DEBUG:
76
        print("[DEBUG] Command: {}".format(command))
77
78
    response: str = ''
79
80
    try:
81
        response = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode().strip()
82
    except subprocess.CalledProcessError as e:
83
        check_error(e.output.decode('utf-8'))
84
85
    if DEBUG:
86
        print("[DEBUG] Response: {}".format(response))
87
88
    return etree.XML(response).xpath(xpath) if xpath else response
89
90
91
def perform_cleanup() -> None:
92
    """Remove all existing tasks and targets."""
93
    existing_tasks: List = execute_command(r"<get_tasks/>", "//get_tasks_response/task")
94
95
    for task in existing_tasks:
96
        execute_command(r"<delete_task task_id=\"{}\" ultimate=\"true\"/>".format(task.get("id")))
97
98
    existing_targets: List = execute_command(r"<get_targets/>", "//get_targets_response/target")
99
100
    for target in existing_targets:
101
       execute_command(r"<delete_target target_id=\"{}\" ultimate=\"true\"/>".format(target.get("id")))
102
103
104
def print_logs() -> None:
105
    """Show logs from OpenVAS and GVMD."""
106
    if DEBUG:
107
        logs: str = open("/usr/local/var/log/gvm/openvas.log", "r").read()
108
109
        print("[DEBUG] OpenVAS Logs: {}".format(logs))
110
111
        logs: str = open("/usr/local/var/log/gvm/gvmd.log", "r").read()
112
113
        print("[DEBUG] GVMD Logs: {}".format(logs))
114
115
116
def save_report(path: str, report: str) -> None:
117
    """Save report to specified file."""
118
    file: IO[str] = open(path, 'wb')
119
    file.write(report)
120
    file.close()
121
122
123
def get_report(report_id: str, output_format: str) -> Optional[str]:
124
    """Get generated report. Decode from Base64 if not XML."""
125
    command: str = r"<get_reports report_id=\"{}\" format_id=\"{}\" ".format(report_id, output_format) + \
126
                   r"filter=\"apply_overrides=1 overrides=1 notes=1 levels=hmlg\" " + \
127
                   r"details=\"1\" notes_details=\"1\" result_tags=\"1\" ignore_pagination=\"1\"/>"
128
129
    try:
130
        if output_format == 'a994b278-1f62-11e1-96ac-406186ea4fc5':
131
            report: etree.Element = execute_command(command, '//get_reports_response/report')[0]
132
        else:
133
            report: str = execute_command(command, 'string(//get_reports_response/report/text())')
134
    except etree.XMLSyntaxError:
135
        print("Generated report is empty!")
136
137
        return None
138
139
    return base64.b64decode(report) if isinstance(report, str) else etree.tostring(report).strip()
140
141
142
def process_task(task_id: str) -> str:
143
    """Wait for task to finish and return report id."""
144
    status: Optional[str] = None
145
    task: Optional[str] = None
146
147
    while status != "Done":
148
        try:
149
            time.sleep(10)
150
151
            task = execute_command(r"<get_tasks task_id=\"{}\"/>".format(task_id))
152
            status = etree.XML(task).xpath("string(//status/text())")
153
            progress: int = int(etree.XML(task).xpath("string(//progress/text())"))
154
155
            os.system("clear")
156
157
            if status != "Done":
158
                print("Task status: {} {}%".format(status, progress))
159
            else:
160
                print("Task status: Complete")
161
        except subprocess.CalledProcessError as exception:
162
            print("ERROR: ", exception.output)
163
        except etree.XMLSyntaxError:
164
            print("ERROR: Cannot get task status")
165
166
    return etree.XML(task).xpath("string(//report/@id)")
167
168
169
def start_task(task_id) -> None:
170
    """Start task with specified id."""
171
    execute_command(r"<start_task task_id=\"{}\"/>".format(task_id))
172
173
174
def create_task(profile, target_id) -> str:
175
    """Create new scan task for target."""
176
    command: str = r"<create_task><name>scan</name>" + \
177
                   r"<target id=\"{}\"></target>".format(target_id) + \
178
                   r"<config id=\"{}\"></config></create_task>".format(profile)
179
180
    return execute_command(command, "string(//create_task_response/@id)")
181
182
183
def create_target(scan) -> str:
184
    """Create new target."""
185
    command: str = r"<create_target><name>scan</name><hosts>{0}</hosts>".format(scan['target']) + \
186
                   r"<port_list id=\"{}\"></port_list>".format(scan['port_list_id']) + \
187
                   r"<exclude_hosts>{}</exclude_hosts>".format(scan['exclude']) + \
188
                   r"<live_tests>{}</live_tests></create_target>".format(scan['tests'])
189
190
    return execute_command(command, "string(//create_target_response/@id)")
191
192
193
def make_scan(scan: Dict[str, str]) -> None:
194
    """Make automated OpenVAS scan and save generated report."""
195
    perform_cleanup()
196
    print("Performed initial cleanup.")
197
198
    target_id = create_target(scan)
199
    print("Created target with id: {}.".format(target_id))
200
201
    task_id = create_task(scan['profile'], target_id)
202
    print("Created task with id: {}.".format(task_id))
203
204
    start_task(task_id)
205
    print("Started task.")
206
207
    print("Waiting for task to finish...")
208
    report_id = process_task(task_id)
209
    print("Finished processing task.")
210
211
    report = get_report(report_id, scan['format'])
212
    print("Generated report.")
213
214
    if report:
215
        save_report(scan['output'], report)
216
        print("Saved report to {}.".format(scan['output']))
217
218
    print_logs()
219
    perform_cleanup()
220
    print("Done!")
221
222
223
def start_scan(args: argparse.Namespace) -> None:
224
    """Override default settings and start scan."""
225
    global DEBUG
226
227
    if args.debug:
228
        DEBUG = True
229
230
    subprocess.check_call(
231
        ["sed -i 's/max_hosts.*/max_hosts = " + str(args.hosts) + "/' /usr/local/etc/openvas/openvas.conf"],
232
        shell=True,
233
        stdout=subprocess.DEVNULL
234
    )
235
    subprocess.check_call(
236
        ["sed -i 's/max_checks.*/max_checks = " + str(args.checks) + "/' /usr/local/etc/openvas/openvas.conf"],
237
        shell=True,
238
        stdout=subprocess.DEVNULL
239
    )
240
241
    if args.update is True:
242
        print("Starting and updating OpenVAS...")
243
        subprocess.check_call(["update-scanner"], shell=True, stdout=subprocess.DEVNULL)
244
    else:
245
        print("Starting OpenVAS...")
246
        subprocess.check_call(["start-scanner"], shell=True, stdout=subprocess.DEVNULL)
247
248
    print("Starting scan with settings:")
249
    print("* Target: {}".format(args.target))
250
    print("* Excluded hosts: {}".format(args.exclude))
251
    print("* Scan profile: {}".format(args.profile))
252
    print("* Scan ports: {}".format(args.ports))
253
    print("* Alive tests: {}".format(args.tests))
254
    print("* Max hosts: {}".format(args.hosts))
255
    print("* Max checks: {}".format(args.checks))
256
    print("* Report format: {}".format(args.format))
257
    print("* Output file: {}\n".format(args.output))
258
259
    make_scan({'target': args.target, 'exclude': args.exclude, 'tests': args.tests.replace("&", "&amp;"),
260
               'profile': scan_profiles[args.profile], 'port_list_id': scan_ports[args.ports],
261
               'format': report_formats[args.format], 'output': "/reports/" + args.output})
262
263
264
def report_format(arg: Optional[str]) -> str:
265
    """Check if report format value is valid."""
266
    if arg not in report_formats:
267
        raise argparse.ArgumentTypeError("Specified report format is invalid!")
268
269
    return arg
270
271
272
def scan_profile(arg: Optional[str]) -> str:
273
    """Check if scan profile value is valid."""
274
    if arg not in scan_profiles:
275
        raise argparse.ArgumentTypeError("Specified scan profile is invalid!")
276
277
    return arg
278
279
def scan_ports_option(arg: Optional[str]) -> str:
280
    """Check if scan ports value is valid."""
281
    if arg not in scan_ports:
282
        raise argparse.ArgumentTypeError("Specified scan ports option is invalid!")
283
284
    return arg
285
286
287
def alive_test(arg: Optional[str]) -> str:
288
    """Check if alive test value is valid."""
289
    if arg not in alive_tests:
290
        raise argparse.ArgumentTypeError("Specified alive tests are invalid!")
291
292
    return arg
293
294
295
def max_hosts(arg: Optional[str]) -> int:
296
    """Check if max hosts value is valid."""
297
    try:
298
        value = int(arg)
299
300
        if value <= 0:
301
            raise ValueError
302
    except ValueError:
303
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous tested hosts is invalid!")
304
305
    return value
306
307
308
def max_checks(arg: Optional[str]) -> int:
309
    """Check if max checks value is valid."""
310
    try:
311
        value = int(arg)
312
313
        if value <= 0:
314
            raise ValueError
315
    except ValueError:
316
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous checks against hosts is invalid!")
317
318
    return value
319
320
321
def parse_arguments():
322
    """Add and parse script arguments."""
323
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
324
        description='Run OpenVAS scan with specified target and save report.')
325
    parser.add_argument('target', help='scan target')
326
    parser.add_argument('-o', '--output', help='output file (default: openvas.report)',
327
                        default="openvas.report", required=False)
328
    parser.add_argument('-f', '--format', help='format for report (default: XML)',
329
                        default="XML", type=report_format, required=False)
330
    parser.add_argument('-p', '--profile', help='scan profile (default: )',
331
                        default="Full and fast", type=scan_profile, required=False)
332
    parser.add_argument('-P', '--ports', help='scan ports (default: All TCP and Nmap top 100 UDP)',
333
                        default="All TCP and Nmap top 100 UDP", type=scan_ports_option, required=False)
334
    parser.add_argument('-t', '--tests', help='alive tests (default: ICMP, TCP-ACK Service & ARP Ping)',
335
                        default="ICMP, TCP-ACK Service & ARP Ping", type=alive_test, required=False)
336
    parser.add_argument('-e', '--exclude', help='hosts excluded from scan target (Default: "")',
337
                        default="", required=False)
338
    parser.add_argument('-m', '--hosts', help='maximum number of simultaneous tested hosts (Default: 15)',
339
                        type=max_hosts, default=10, required=False)
340
    parser.add_argument('-c', '--checks', help='maximum number of simultaneous checks against each host (Default: 5)',
341
                        type=max_checks, default=3, required=False)
342
    parser.add_argument('--update', help='synchronize feeds before scanning',
343
                        nargs='?', const=True, default=False, required=False)
344
    parser.add_argument('--debug', help='enable command responses printing',
345
                        nargs='?', const=True, default=False, required=False)
346
347
    return parser.parse_args()
348
349
350
if __name__ == '__main__':
351
    arguments: argparse.Namespace = parse_arguments()
352
353
    start_scan(arguments)
354