Passed
Push — master ( 1051e6...1c5840 )
by Dawid
51s queued 11s
created

scan.start_task()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
"""Automation script for Greenbone scanner 20.08."""
3
4
import subprocess
5
import argparse
6
import base64
7
import time
8
import os
9
from lxml import etree
10
from typing import Optional
11
from typing import Union
12
from typing import Dict
13
from typing import List
14
from typing import Set
15
from typing import IO
16
17
DEBUG: bool = False
18
19
scan_profiles: Dict[str, str] = {
20
    "Discovery": "8715c877-47a0-438d-98a3-27c7a6ab2196",
21
    "Empty": "085569ce-73ed-11df-83c3-002264764cea",
22
    "Full and fast": "daba56c8-73ec-11df-a475-002264764cea",
23
    "Full and fast ultimate": "698f691e-7489-11df-9d8c-002264764cea",
24
    "Full and very deep": "708f25c4-7489-11df-8094-002264764cea",
25
    "Full and very deep ultimate": "74db13d6-7489-11df-91b9-002264764cea",
26
    "Host Discovery": "2d3f051c-55ba-11e3-bf43-406186ea4fc5",
27
    "System Discovery": "bbca7412-a950-11e3-9109-406186ea4fc5"
28
}
29
30
report_formats: Dict[str, str] = {
31
    "Anonymous XML": "5057e5cc-b825-11e4-9d0e-28d24461215b",
32
    "CSV Results": "c1645568-627a-11e3-a660-406186ea4fc5",
33
    "ITG": "77bd6c4a-1f62-11e1-abf0-406186ea4fc5",
34
    "PDF": "c402cc3e-b531-11e1-9163-406186ea4fc5",
35
    "TXT": "a3810a62-1f62-11e1-9219-406186ea4fc5",
36
    "XML": "a994b278-1f62-11e1-96ac-406186ea4fc5"
37
}
38
39
scan_ports: Dict[str, str] = {
40
    "All IANA Assigned TCP": "33d0cd82-57c6-11e1-8ed1-406186ea4fc5",
41
    "All IANA Assigned TCP and UDP": "4a4717fe-57d2-11e1-9a26-406186ea4fc5",
42
    "All TCP and Nmap top 100 UDP": "730ef368-57e2-11e1-a90f-406186ea4fc5",
43
}
44
45
alive_tests: Set[str] = {
46
    "Scan Config Default",
47
    "ICMP, TCP-ACK Service & ARP Ping",
48
    "TCP-ACK Service & ARP Ping",
49
    "ICMP & ARP Ping",
50
    "ICMP & TCP-ACK Service Ping",
51
    "ARP Ping",
52
    "TCP-ACK Service Ping",
53
    "TCP-SYN Service Ping",
54
    "ICMP Ping",
55
    "Consider Alive",
56
}
57
58
59
def check_error(error: str):
60
    """Print exception error and exit. Ignore OpenVAS temporary authentication error."""
61
    if 'Failed to authenticate.' not in error:
62
        print("[ERROR] Response: {}".format(error))
63
        exit(1)
64
65
66
def execute_command(command: str, xpath: Optional[str] = None) -> Union[str, float, bool, List]:
67
    """Execute GVMD command and return its output (optionally xpath can be used to get nested XML element)."""
68
    global DEBUG
69
70
    command: str = "su - service -c \"gvm-cli --gmp-username admin --gmp-password admin " \
71
                   "socket --socketpath /usr/local/var/run/gvmd.sock --xml \'{}\'\"".format(command)
72
73
    if DEBUG:
74
        print("[DEBUG] Command: {}".format(command))
75
76
    response: str = ''
77
78
    try:
79
        response = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode().strip()
80
    except subprocess.CalledProcessError as e:
81
        check_error(e.output.decode('utf-8'))
82
83
    if DEBUG:
84
        print("[DEBUG] Response: {}".format(response))
85
86
    return etree.XML(response).xpath(xpath) if xpath else response
87
88
89
def perform_cleanup() -> None:
90
    """Remove all existing tasks and targets."""
91
    existing_tasks: List = execute_command(r"<get_tasks/>", "//get_tasks_response/task")
92
93
    for task in existing_tasks:
94
        execute_command(r"<delete_task task_id=\"{}\" ultimate=\"true\"/>".format(task.get("id")))
95
96
    existing_targets: List = execute_command(r"<get_targets/>", "//get_targets_response/target")
97
98
    for target in existing_targets:
99
       execute_command(r"<delete_target target_id=\"{}\" ultimate=\"true\"/>".format(target.get("id")))
100
101
102
def print_logs() -> None:
103
    """Show logs from OpenVAS and GVMD."""
104
    if DEBUG:
105
        logs: str = open("/usr/local/var/log/gvm/openvas.log", "r").read()
106
107
        print("[DEBUG] OpenVAS Logs: {}".format(logs))
108
109
        logs: str = open("/usr/local/var/log/gvm/gvmd.log", "r").read()
110
111
        print("[DEBUG] GVMD Logs: {}".format(logs))
112
113
114
def save_report(path: str, report: str) -> None:
115
    """Save report to specified file."""
116
    file: IO[str] = open(path, 'wb')
117
    file.write(report)
118
    file.close()
119
120
121
def get_report(report_id: str, output_format: str) -> Optional[str]:
122
    """Get generated report. Decode from Base64 if not XML."""
123
    command: str = r"<get_reports report_id=\"{}\" format_id=\"{}\" ".format(report_id, output_format) + \
124
                   r"filter=\"apply_overrides=1 overrides=1 notes=1 levels=hmlg\" " + \
125
                   r"details=\"1\" notes_details=\"1\" result_tags=\"1\" ignore_pagination=\"1\"/>"
126
127
    try:
128
        if output_format == 'a994b278-1f62-11e1-96ac-406186ea4fc5':
129
            report: etree.Element = execute_command(command, '//get_reports_response/report')[0]
130
        else:
131
            report: str = execute_command(command, 'string(//get_reports_response/report/text())')
132
    except etree.XMLSyntaxError:
133
        print("Generated report is empty!")
134
135
        return None
136
137
    return base64.b64decode(report) if isinstance(report, str) else etree.tostring(report).strip()
138
139
140
def process_task(task_id: str) -> str:
141
    """Wait for task to finish and return report id."""
142
    status: Optional[str] = None
143
    task: Optional[str] = None
144
145
    while status != "Done":
146
        try:
147
            time.sleep(10)
148
149
            task = execute_command(r"<get_tasks task_id=\"{}\"/>".format(task_id))
150
            status = etree.XML(task).xpath("string(//status/text())")
151
            progress: int = int(etree.XML(task).xpath("string(//progress/text())"))
152
153
            os.system("clear")
154
155
            if progress > 0:
156
                print("Task status: {} {}%".format(status, progress))
157
            else:
158
                print("Task status: Complete")
159
        except subprocess.CalledProcessError as exception:
160
            print("ERROR: ", exception.output)
161
        except etree.XMLSyntaxError:
162
            print("ERROR: Cannot get task status")
163
164
    return etree.XML(task).xpath("string(//report/@id)")
165
166
167
def start_task(task_id) -> None:
168
    """Start task with specified id."""
169
    execute_command(r"<start_task task_id=\"{}\"/>".format(task_id))
170
171
172
def create_task(profile, target_id) -> str:
173
    """Create new scan task for target."""
174
    command: str = r"<create_task><name>scan</name>" + \
175
                   r"<target id=\"{}\"></target>".format(target_id) + \
176
                   r"<config id=\"{}\"></config></create_task>".format(profile)
177
178
    return execute_command(command, "string(//create_task_response/@id)")
179
180
181
def create_target(scan) -> str:
182
    """Create new target."""
183
    command: str = r"<create_target><name>scan</name><hosts>{0}</hosts>".format(scan['target']) + \
184
                   r"<port_list id=\"{}\"></port_list>".format(scan['port_list_id']) + \
185
                   r"<exclude_hosts>{}</exclude_hosts>".format(scan['exclude']) + \
186
                   r"<live_tests>{}</live_tests></create_target>".format(scan['tests'])
187
188
    return execute_command(command, "string(//create_target_response/@id)")
189
190
191
def make_scan(scan: Dict[str, str]) -> None:
192
    """Make automated OpenVAS scan and save generated report."""
193
    perform_cleanup()
194
    print("Performed initial cleanup.")
195
196
    target_id = create_target(scan)
197
    print("Created target with id: {}.".format(target_id))
198
199
    task_id = create_task(scan['profile'], target_id)
200
    print("Created task with id: {}.".format(task_id))
201
202
    start_task(task_id)
203
    print("Started task.")
204
205
    print("Waiting for task to finish...")
206
    report_id = process_task(task_id)
207
    print("Finished processing task.")
208
209
    report = get_report(report_id, scan['format'])
210
    print("Generated report.")
211
212
    if report:
213
        save_report(scan['output'], report)
214
        print("Saved report to {}.".format(scan['output']))
215
216
    print_logs()
217
    perform_cleanup()
218
    print("Done!")
219
220
221
def start_scan(args: argparse.Namespace) -> None:
222
    """Override default settings and start scan."""
223
    global DEBUG
224
225
    if args.debug:
226
        DEBUG = True
227
228
    subprocess.check_call(
229
        ["sed -i 's/max_hosts.*/max_hosts = " + str(args.hosts) + "/' /usr/local/etc/openvas/openvas.conf"],
230
        shell=True,
231
        stdout=subprocess.DEVNULL
232
    )
233
    subprocess.check_call(
234
        ["sed -i 's/max_checks.*/max_checks = " + str(args.checks) + "/' /usr/local/etc/openvas/openvas.conf"],
235
        shell=True,
236
        stdout=subprocess.DEVNULL
237
    )
238
239
    if args.update is True:
240
        print("Starting and updating OpenVAS...")
241
        subprocess.check_call(["update-scanner"], shell=True, stdout=subprocess.DEVNULL)
242
    else:
243
        print("Starting OpenVAS...")
244
        subprocess.check_call(["start-scanner"], shell=True, stdout=subprocess.DEVNULL)
245
246
    print("Starting scan with settings:")
247
    print("* Target: {}".format(args.target))
248
    print("* Excluded hosts: {}".format(args.exclude))
249
    print("* Scan profile: {}".format(args.profile))
250
    print("* Scan ports: {}".format(args.ports))
251
    print("* Alive tests: {}".format(args.tests))
252
    print("* Max hosts: {}".format(args.hosts))
253
    print("* Max checks: {}".format(args.checks))
254
    print("* Report format: {}".format(args.format))
255
    print("* Output file: {}\n".format(args.output))
256
257
    make_scan({'target': args.target, 'exclude': args.exclude, 'tests': args.tests.replace("&", "&amp;"),
258
               'profile': scan_profiles[args.profile], 'port_list_id': scan_ports[args.ports],
259
               'format': report_formats[args.format], 'output': "/reports/" + args.output})
260
261
262
def report_format(arg: Optional[str]) -> str:
263
    """Check if report format value is valid."""
264
    if arg not in report_formats:
265
        raise argparse.ArgumentTypeError("Specified report format is invalid!")
266
267
    return arg
268
269
270
def scan_profile(arg: Optional[str]) -> str:
271
    """Check if scan profile value is valid."""
272
    if arg not in scan_profiles:
273
        raise argparse.ArgumentTypeError("Specified scan profile is invalid!")
274
275
    return arg
276
277
def scan_ports_option(arg: Optional[str]) -> str:
278
    """Check if scan ports value is valid."""
279
    if arg not in scan_ports:
280
        raise argparse.ArgumentTypeError("Specified scan ports option is invalid!")
281
282
    return arg
283
284
285
def alive_test(arg: Optional[str]) -> str:
286
    """Check if alive test value is valid."""
287
    if arg not in alive_tests:
288
        raise argparse.ArgumentTypeError("Specified alive tests are invalid!")
289
290
    return arg
291
292
293
def max_hosts(arg: Optional[str]) -> int:
294
    """Check if max hosts value is valid."""
295
    try:
296
        value = int(arg)
297
298
        if value <= 0:
299
            raise ValueError
300
    except ValueError:
301
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous tested hosts is invalid!")
302
303
    return value
304
305
306
def max_checks(arg: Optional[str]) -> int:
307
    """Check if max checks value is valid."""
308
    try:
309
        value = int(arg)
310
311
        if value <= 0:
312
            raise ValueError
313
    except ValueError:
314
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous checks against hosts is invalid!")
315
316
    return value
317
318
319
def parse_arguments():
320
    """Add and parse script arguments."""
321
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
322
        description='Run OpenVAS scan with specified target and save report.')
323
    parser.add_argument('target', help='scan target')
324
    parser.add_argument('-o', '--output', help='output file (default: openvas.report)',
325
                        default="openvas.report", required=False)
326
    parser.add_argument('-f', '--format', help='format for report (default: XML)',
327
                        default="XML", type=report_format, required=False)
328
    parser.add_argument('-p', '--profile', help='scan profile (default: )',
329
                        default="Full and fast", type=scan_profile, required=False)
330
    parser.add_argument('-P', '--ports', help='scan ports (default: All TCP and Nmap top 100 UDP)',
331
                        default="All TCP and Nmap top 100 UDP", type=scan_ports_option, required=False)
332
    parser.add_argument('-t', '--tests', help='alive tests (default: ICMP, TCP-ACK Service & ARP Ping)',
333
                        default="ICMP, TCP-ACK Service & ARP Ping", type=alive_test, required=False)
334
    parser.add_argument('-e', '--exclude', help='hosts excluded from scan target (Default: "")',
335
                        default="", required=False)
336
    parser.add_argument('-m', '--hosts', help='maximum number of simultaneous tested hosts (Default: 15)',
337
                        type=max_hosts, default=10, required=False)
338
    parser.add_argument('-c', '--checks', help='maximum number of simultaneous checks against each host (Default: 5)',
339
                        type=max_checks, default=3, required=False)
340
    parser.add_argument('--update', help='synchronize feeds before scanning',
341
                        nargs='?', const=True, default=False, required=False)
342
    parser.add_argument('--debug', help='enable command responses printing',
343
                        nargs='?', const=True, default=False, required=False)
344
345
    return parser.parse_args()
346
347
348
if __name__ == '__main__':
349
    arguments: argparse.Namespace = parse_arguments()
350
351
    start_scan(arguments)
352