Passed
Push — master ( 45fa4e...575b00 )
by Dawid
47s
created

scan.alive_test()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
"""Automation script for OpenVAS 9."""
3
4
import subprocess
5
import argparse
6
import base64
7
import time
8
import os
9
from lxml import etree
10
from typing import Optional
11
from typing import Union
12
from typing import Dict
13
from typing import List
14
from typing import Set
15
from typing import IO
16
17
DEBUG: bool = False
18
19
scan_profiles: Dict[str, str] = {
20
    "Discovery": "8715c877-47a0-438d-98a3-27c7a6ab2196",
21
    "Empty": "085569ce-73ed-11df-83c3-002264764cea",
22
    "Full and fast": "daba56c8-73ec-11df-a475-002264764cea",
23
    "Full and fast ultimate": "698f691e-7489-11df-9d8c-002264764cea",
24
    "Full and very deep": "708f25c4-7489-11df-8094-002264764cea",
25
    "Full and very deep ultimate": "74db13d6-7489-11df-91b9-002264764cea",
26
    "Host Discovery": "2d3f051c-55ba-11e3-bf43-406186ea4fc5",
27
    "System Discovery": "bbca7412-a950-11e3-9109-406186ea4fc5"
28
}
29
30
report_formats: Dict[str, str] = {
31
    "Anonymous XML": "5057e5cc-b825-11e4-9d0e-28d24461215b",
32
    "ARF": "910200ca-dc05-11e1-954f-406186ea4fc5",
33
    "CPE": "5ceff8ba-1f62-11e1-ab9f-406186ea4fc5",
34
    "CSV Hosts": "9087b18c-626c-11e3-8892-406186ea4fc5",
35
    "CSV Results": "c1645568-627a-11e3-a660-406186ea4fc5",
36
    "HTML": "6c248850-1f62-11e1-b082-406186ea4fc5",
37
    "ITG": "77bd6c4a-1f62-11e1-abf0-406186ea4fc5",
38
    "LaTeX": "a684c02c-b531-11e1-bdc2-406186ea4fc5",
39
    "NBE": "9ca6fe72-1f62-11e1-9e7c-406186ea4fc5",
40
    "PDF": "c402cc3e-b531-11e1-9163-406186ea4fc5",
41
    "Topology SVG": "9e5e5deb-879e-4ecc-8be6-a71cd0875cdd",
42
    "TXT": "a3810a62-1f62-11e1-9219-406186ea4fc5",
43
    "Verinice ISM": "c15ad349-bd8d-457a-880a-c7056532ee15",
44
    "Verinice ITG": "50c9950a-f326-11e4-800c-28d24461215b",
45
    "XML": "a994b278-1f62-11e1-96ac-406186ea4fc5"
46
}
47
48
alive_tests: Set[str] = {
49
    "Scan Config Default",
50
    "ICMP, TCP-ACK Service & ARP Ping",
51
    "TCP-ACK Service & ARP Ping",
52
    "ICMP & ARP Ping",
53
    "ICMP & TCP-ACK Service Ping",
54
    "ARP Ping",
55
    "TCP-ACK Service Ping",
56
    "TCP-SYN Service Ping",
57
    "ICMP Ping",
58
    "Consider Alive",
59
}
60
61
62
def execute_command(command: str, xpath: Optional[str] = None) -> Union[str, float, bool, List]:
63
    """Execute omp command and return its output (optionally xpath can be used to get nested XML element)."""
64
    global DEBUG
65
66
    command: str = "omp -u admin -w admin -h 127.0.0.1 -p 9390 --xml \'{}\'".format(command)
67
68
    if DEBUG:
69
        print("[DEBUG] Command: {}".format(command))
70
71
    response: str = ''
72
73
    try:
74
        response = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode().strip()
75
    except subprocess.CalledProcessError as e:
76
        error = e.output.decode('utf-8')
77
78
        if 'Failed to authenticate.' not in error:
79
            print("[ERROR] Response: {}".format(error))
80
            exit(1)
81
82
    if DEBUG:
83
        print("[DEBUG] Response: {}".format(response))
84
85
    return etree.XML(response).xpath(xpath) if xpath else response
86
87
88
def perform_cleanup() -> None:
89
    """Remove all existing tasks and targets."""
90
    existing_tasks: List = execute_command("<get_tasks/>", "//get_tasks_response/task")
91
92
    for task in existing_tasks:
93
        execute_command("<delete_task task_id=\"{}\"/>".format(task.get("id")))
94
95
    existing_targets: List = execute_command("<get_targets/>", "//get_targets_response/target")
96
97
    for target in existing_targets:
98
        execute_command("<delete_target target_id=\"{}\"/>".format(target.get("id")))
99
100
101
def print_logs() -> None:
102
    """Show logs from OpenVAS."""
103
    if DEBUG:
104
        logs: str = open("/var/log/openvas/openvassd.messages", "r").read()
105
106
        print("[DEBUG] OpenVAS Logs: {}".format(logs))
107
108
109
def save_report(path: str, report: str) -> None:
110
    """Save report to specified file."""
111
    file: IO[str] = open(path, 'wb')
112
    file.write(report)
113
    file.close()
114
115
116
def get_report(report_id: str, output_format: str) -> str:
117
    """Get generated report. Decode from Base64 if not XML."""
118
    command: str = "<get_reports report_id=\"{}\" format_id=\"{}\" ".format(report_id, output_format) + \
119
                   "filter=\"apply_overrides=1 overrides=1 notes=1 levels=hmlg\"" + \
120
                   "details=\"1\" notes_details=\"1\" result_tags=\"1\" ignore_pagination=\"1\"/>"
121
122
    if output_format == 'a994b278-1f62-11e1-96ac-406186ea4fc5':
123
        report: etree.Element = execute_command(command, '//get_reports_response/report')[0]
124
    else:
125
        report: str = execute_command(command, 'string(//get_reports_response/report/text())')
126
127
    return base64.b64decode(report) if isinstance(report, str) else etree.tostring(report).strip()
128
129
130
def process_task(task_id: str) -> str:
131
    """Wait for task to finish and return report id."""
132
    status: Optional[str] = None
133
    task: Optional[str] = None
134
135
    while status != "Done":
136
        try:
137
            time.sleep(10)
138
139
            task = execute_command("<get_tasks task_id=\"{}\"/>".format(task_id))
140
            status = etree.XML(task).xpath("string(//status/text())")
141
            progress: int = int(etree.XML(task).xpath("string(//progress/text())"))
142
143
            os.system("clear")
144
145
            if progress > 0:
146
                print("Task status: {} {}%".format(status, progress))
147
            else:
148
                print("Task status: Complete")
149
        except subprocess.CalledProcessError as exception:
150
            print("ERROR: ", exception.output)
151
152
    return etree.XML(task).xpath("string(//report/@id)")
153
154
155
def start_task(task_id) -> None:
156
    """Start task with specified id."""
157
    execute_command("<start_task task_id=\"{}\"/>".format(task_id))
158
159
160
def create_task(profile, target_id) -> str:
161
    """Create new scan task for target."""
162
    command: str = "<create_task><name>scan</name>" + \
163
                   "<target id=\"{}\"></target>".format(target_id) + \
164
                   "<config id=\"{}\"></config></create_task>".format(profile)
165
166
    return execute_command(command, "string(//create_task_response/@id)")
167
168
169
def create_target(scan) -> str:
170
    """Create new target."""
171
    command: str = "<create_target><name>{0}</name><hosts>{0}</hosts>".format(scan['target']) + \
172
                   "<exclude_hosts>{}</exclude_hosts>".format(scan['exclude']) + \
173
                   "<alive_tests>{}</alive_tests></create_target>".format(scan['tests'])
174
175
    return execute_command(command, "string(//create_target_response/@id)")
176
177
178
def make_scan(scan: Dict[str, str]) -> None:
179
    """Make automated OpenVAS scan and save generated report."""
180
    perform_cleanup()
181
    print("Performed initial cleanup.")
182
183
    target_id = create_target(scan)
184
    print("Created target with id: {}.".format(target_id))
185
186
    task_id = create_task(scan['profile'], target_id)
187
    print("Created task with id: {}.".format(task_id))
188
189
    start_task(task_id)
190
    print("Started task.")
191
192
    print("Waiting for task to finish...")
193
    report_id = process_task(task_id)
194
    print("Finished processing task.")
195
196
    report = get_report(report_id, scan['format'])
197
    print("Generated report.")
198
199
    save_report(scan['output'], report)
200
    print("Saved report to {}.".format(scan['output']))
201
202
    print_logs()
203
    perform_cleanup()
204
    print("Done!")
205
206
207
def start_scan(args: argparse.Namespace) -> None:
208
    """Override default settings and start scan."""
209
    global DEBUG
210
211
    if args.debug:
212
        DEBUG = True
213
214
    subprocess.check_call(
215
        ["sed -i 's/max_hosts.*/max_hosts = " + str(args.hosts) + "/' /etc/openvas/openvassd.conf"],
216
        shell=True,
217
        stdout=subprocess.DEVNULL
218
    )
219
    subprocess.check_call(
220
        ["sed -i 's/max_checks.*/max_checks = " + str(args.checks) + "/' /etc/openvas/openvassd.conf"],
221
        shell=True,
222
        stdout=subprocess.DEVNULL
223
    )
224
225
    if args.update is True:
226
        print("Starting and updating OpenVAS...")
227
        subprocess.check_call(["/update"], shell=True, stdout=subprocess.DEVNULL)
228
    else:
229
        print("Starting OpenVAS...")
230
        subprocess.check_call(["/start"], shell=True, stdout=subprocess.DEVNULL)
231
232
    print("Starting scan with settings:")
233
    print("* Target: {}".format(args.target))
234
    print("* Excluded hosts: {}".format(args.exclude))
235
    print("* Scan profile: {}".format(args.profile))
236
    print("* Alive tests: {}".format(args.tests))
237
    print("* Max hosts: {}".format(args.hosts))
238
    print("* Max checks: {}".format(args.checks))
239
    print("* Report format: {}".format(args.format))
240
    print("* Output file: {}\n".format(args.output))
241
242
    make_scan({'target': args.target, 'exclude': args.exclude, 'tests': args.tests.replace("&", "&amp;"),
243
               'profile': scan_profiles[args.profile], 'format': report_formats[args.format],
244
               'output': "/reports/" + args.output})
245
246
247
def report_format(arg: Optional[str]) -> str:
248
    """Check if report format value is valid."""
249
    if arg not in report_formats:
250
        raise argparse.ArgumentTypeError("Specified report format is invalid!")
251
252
    return arg
253
254
255
def scan_profile(arg: Optional[str]) -> str:
256
    """Check if scan profile value is valid."""
257
    if arg not in scan_profiles:
258
        raise argparse.ArgumentTypeError("Specified scan profile is invalid!")
259
260
    return arg
261
262
263
def alive_test(arg: Optional[str]) -> str:
264
    """Check if alive test value is valid."""
265
    if arg not in alive_tests:
266
        raise argparse.ArgumentTypeError("Specified alive tests are invalid!")
267
268
    return arg
269
270
271
def max_hosts(arg: Optional[str]) -> int:
272
    """Check if max hosts value is valid."""
273
    try:
274
        value = int(arg)
275
276
        if value <= 0:
277
            raise ValueError
278
    except ValueError:
279
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous tested hosts is invalid!")
280
281
    return value
282
283
284
def max_checks(arg: Optional[str]) -> int:
285
    """Check if max checks value is valid."""
286
    try:
287
        value = int(arg)
288
289
        if value <= 0:
290
            raise ValueError
291
    except ValueError:
292
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous checks against hosts is invalid!")
293
294
    return value
295
296
297
def parse_arguments():
298
    """Add and parse script arguments."""
299
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
300
        description='Run OpenVAS scan with specified target and save report.')
301
    parser.add_argument('target', help='scan target')
302
    parser.add_argument('-o', '--output', help='output file (default: openvas.report)',
303
                        default="openvas.report", required=False)
304
    parser.add_argument('-f', '--format', help='format for report (default: ARF)',
305
                        default="ARF", type=report_format, required=False)
306
    parser.add_argument('-p', '--profile', help='scan profile (default: Full and fast)',
307
                        default="Full and fast", type=scan_profile, required=False)
308
    parser.add_argument('-t', '--tests', help='alive tests (default: ICMP, TCP-ACK Service & ARP Ping)',
309
                        default="ICMP, TCP-ACK Service & ARP Ping", type=alive_test, required=False)
310
    parser.add_argument('-e', '--exclude', help='hosts excluded from scan target (Default: "")',
311
                        default="", required=False)
312
    parser.add_argument('-m', '--hosts', help='maximum number of simultaneous tested hosts (Default: 10)',
313
                        type=max_hosts, default=10, required=False)
314
    parser.add_argument('-c', '--checks', help='maximum number of simultaneous checks against each host (Default: 3)',
315
                        type=max_checks, default=3, required=False)
316
    parser.add_argument('--update', help='synchronize feeds before scanning',
317
                        nargs='?', const=True, default=False, required=False)
318
    parser.add_argument('--debug', help='enable command responses printing',
319
                        nargs='?', const=True, default=False, required=False)
320
321
    return parser.parse_args()
322
323
324
if __name__ == '__main__':
325
    arguments: argparse.Namespace = parse_arguments()
326
327
    start_scan(arguments)
328