Passed
Push — master ( 575b00...4815b6 )
by Dawid
46s
created

scan.check_error()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
"""Automation script for OpenVAS 9."""
3
4
import subprocess
5
import argparse
6
import base64
7
import time
8
import os
9
from lxml import etree
10
from typing import Optional
11
from typing import Union
12
from typing import Dict
13
from typing import List
14
from typing import Set
15
from typing import IO
16
17
DEBUG: bool = False
18
19
scan_profiles: Dict[str, str] = {
20
    "Discovery": "8715c877-47a0-438d-98a3-27c7a6ab2196",
21
    "Empty": "085569ce-73ed-11df-83c3-002264764cea",
22
    "Full and fast": "daba56c8-73ec-11df-a475-002264764cea",
23
    "Full and fast ultimate": "698f691e-7489-11df-9d8c-002264764cea",
24
    "Full and very deep": "708f25c4-7489-11df-8094-002264764cea",
25
    "Full and very deep ultimate": "74db13d6-7489-11df-91b9-002264764cea",
26
    "Host Discovery": "2d3f051c-55ba-11e3-bf43-406186ea4fc5",
27
    "System Discovery": "bbca7412-a950-11e3-9109-406186ea4fc5"
28
}
29
30
report_formats: Dict[str, str] = {
31
    "Anonymous XML": "5057e5cc-b825-11e4-9d0e-28d24461215b",
32
    "ARF": "910200ca-dc05-11e1-954f-406186ea4fc5",
33
    "CPE": "5ceff8ba-1f62-11e1-ab9f-406186ea4fc5",
34
    "CSV Hosts": "9087b18c-626c-11e3-8892-406186ea4fc5",
35
    "CSV Results": "c1645568-627a-11e3-a660-406186ea4fc5",
36
    "HTML": "6c248850-1f62-11e1-b082-406186ea4fc5",
37
    "ITG": "77bd6c4a-1f62-11e1-abf0-406186ea4fc5",
38
    "LaTeX": "a684c02c-b531-11e1-bdc2-406186ea4fc5",
39
    "NBE": "9ca6fe72-1f62-11e1-9e7c-406186ea4fc5",
40
    "PDF": "c402cc3e-b531-11e1-9163-406186ea4fc5",
41
    "Topology SVG": "9e5e5deb-879e-4ecc-8be6-a71cd0875cdd",
42
    "TXT": "a3810a62-1f62-11e1-9219-406186ea4fc5",
43
    "Verinice ISM": "c15ad349-bd8d-457a-880a-c7056532ee15",
44
    "Verinice ITG": "50c9950a-f326-11e4-800c-28d24461215b",
45
    "XML": "a994b278-1f62-11e1-96ac-406186ea4fc5"
46
}
47
48
alive_tests: Set[str] = {
49
    "Scan Config Default",
50
    "ICMP, TCP-ACK Service & ARP Ping",
51
    "TCP-ACK Service & ARP Ping",
52
    "ICMP & ARP Ping",
53
    "ICMP & TCP-ACK Service Ping",
54
    "ARP Ping",
55
    "TCP-ACK Service Ping",
56
    "TCP-SYN Service Ping",
57
    "ICMP Ping",
58
    "Consider Alive",
59
}
60
61
62
def check_error(error: str):
63
    """Print exception error and exit. Ignore OpenVAS temporary authentication error."""
64
    if 'Failed to authenticate.' not in error:
65
        print("[ERROR] Response: {}".format(error))
66
        exit(1)
67
68
69
def execute_command(command: str, xpath: Optional[str] = None) -> Union[str, float, bool, List]:
70
    """Execute omp command and return its output (optionally xpath can be used to get nested XML element)."""
71
    global DEBUG
72
73
    command: str = "omp -u admin -w admin -h 127.0.0.1 -p 9390 --xml \'{}\'".format(command)
74
75
    if DEBUG:
76
        print("[DEBUG] Command: {}".format(command))
77
78
    response: str = ''
79
80
    try:
81
        response = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode().strip()
82
    except subprocess.CalledProcessError as e:
83
        check_error(e.output.decode('utf-8'))
84
85
    if DEBUG:
86
        print("[DEBUG] Response: {}".format(response))
87
88
    return etree.XML(response).xpath(xpath) if xpath else response
89
90
91
def perform_cleanup() -> None:
92
    """Remove all existing tasks and targets."""
93
    existing_tasks: List = execute_command("<get_tasks/>", "//get_tasks_response/task")
94
95
    for task in existing_tasks:
96
        execute_command("<delete_task task_id=\"{}\"/>".format(task.get("id")))
97
98
    existing_targets: List = execute_command("<get_targets/>", "//get_targets_response/target")
99
100
    for target in existing_targets:
101
        execute_command("<delete_target target_id=\"{}\"/>".format(target.get("id")))
102
103
104
def print_logs() -> None:
105
    """Show logs from OpenVAS."""
106
    if DEBUG:
107
        logs: str = open("/var/log/openvas/openvassd.messages", "r").read()
108
109
        print("[DEBUG] OpenVAS Logs: {}".format(logs))
110
111
112
def save_report(path: str, report: str) -> None:
113
    """Save report to specified file."""
114
    file: IO[str] = open(path, 'wb')
115
    file.write(report)
116
    file.close()
117
118
119
def get_report(report_id: str, output_format: str) -> str:
120
    """Get generated report. Decode from Base64 if not XML."""
121
    command: str = "<get_reports report_id=\"{}\" format_id=\"{}\" ".format(report_id, output_format) + \
122
                   "filter=\"apply_overrides=1 overrides=1 notes=1 levels=hmlg\"" + \
123
                   "details=\"1\" notes_details=\"1\" result_tags=\"1\" ignore_pagination=\"1\"/>"
124
125
    if output_format == 'a994b278-1f62-11e1-96ac-406186ea4fc5':
126
        report: etree.Element = execute_command(command, '//get_reports_response/report')[0]
127
    else:
128
        report: str = execute_command(command, 'string(//get_reports_response/report/text())')
129
130
    return base64.b64decode(report) if isinstance(report, str) else etree.tostring(report).strip()
131
132
133
def process_task(task_id: str) -> str:
134
    """Wait for task to finish and return report id."""
135
    status: Optional[str] = None
136
    task: Optional[str] = None
137
138
    while status != "Done":
139
        try:
140
            time.sleep(10)
141
142
            task = execute_command("<get_tasks task_id=\"{}\"/>".format(task_id))
143
            status = etree.XML(task).xpath("string(//status/text())")
144
            progress: int = int(etree.XML(task).xpath("string(//progress/text())"))
145
146
            os.system("clear")
147
148
            if progress > 0:
149
                print("Task status: {} {}%".format(status, progress))
150
            else:
151
                print("Task status: Complete")
152
        except subprocess.CalledProcessError as exception:
153
            print("ERROR: ", exception.output)
154
155
    return etree.XML(task).xpath("string(//report/@id)")
156
157
158
def start_task(task_id) -> None:
159
    """Start task with specified id."""
160
    execute_command("<start_task task_id=\"{}\"/>".format(task_id))
161
162
163
def create_task(profile, target_id) -> str:
164
    """Create new scan task for target."""
165
    command: str = "<create_task><name>scan</name>" + \
166
                   "<target id=\"{}\"></target>".format(target_id) + \
167
                   "<config id=\"{}\"></config></create_task>".format(profile)
168
169
    return execute_command(command, "string(//create_task_response/@id)")
170
171
172
def create_target(scan) -> str:
173
    """Create new target."""
174
    command: str = "<create_target><name>{0}</name><hosts>{0}</hosts>".format(scan['target']) + \
175
                   "<exclude_hosts>{}</exclude_hosts>".format(scan['exclude']) + \
176
                   "<alive_tests>{}</alive_tests></create_target>".format(scan['tests'])
177
178
    return execute_command(command, "string(//create_target_response/@id)")
179
180
181
def make_scan(scan: Dict[str, str]) -> None:
182
    """Make automated OpenVAS scan and save generated report."""
183
    perform_cleanup()
184
    print("Performed initial cleanup.")
185
186
    target_id = create_target(scan)
187
    print("Created target with id: {}.".format(target_id))
188
189
    task_id = create_task(scan['profile'], target_id)
190
    print("Created task with id: {}.".format(task_id))
191
192
    start_task(task_id)
193
    print("Started task.")
194
195
    print("Waiting for task to finish...")
196
    report_id = process_task(task_id)
197
    print("Finished processing task.")
198
199
    report = get_report(report_id, scan['format'])
200
    print("Generated report.")
201
202
    save_report(scan['output'], report)
203
    print("Saved report to {}.".format(scan['output']))
204
205
    print_logs()
206
    perform_cleanup()
207
    print("Done!")
208
209
210
def start_scan(args: argparse.Namespace) -> None:
211
    """Override default settings and start scan."""
212
    global DEBUG
213
214
    if args.debug:
215
        DEBUG = True
216
217
    subprocess.check_call(
218
        ["sed -i 's/max_hosts.*/max_hosts = " + str(args.hosts) + "/' /etc/openvas/openvassd.conf"],
219
        shell=True,
220
        stdout=subprocess.DEVNULL
221
    )
222
    subprocess.check_call(
223
        ["sed -i 's/max_checks.*/max_checks = " + str(args.checks) + "/' /etc/openvas/openvassd.conf"],
224
        shell=True,
225
        stdout=subprocess.DEVNULL
226
    )
227
228
    if args.update is True:
229
        print("Starting and updating OpenVAS...")
230
        subprocess.check_call(["/update"], shell=True, stdout=subprocess.DEVNULL)
231
    else:
232
        print("Starting OpenVAS...")
233
        subprocess.check_call(["/start"], shell=True, stdout=subprocess.DEVNULL)
234
235
    print("Starting scan with settings:")
236
    print("* Target: {}".format(args.target))
237
    print("* Excluded hosts: {}".format(args.exclude))
238
    print("* Scan profile: {}".format(args.profile))
239
    print("* Alive tests: {}".format(args.tests))
240
    print("* Max hosts: {}".format(args.hosts))
241
    print("* Max checks: {}".format(args.checks))
242
    print("* Report format: {}".format(args.format))
243
    print("* Output file: {}\n".format(args.output))
244
245
    make_scan({'target': args.target, 'exclude': args.exclude, 'tests': args.tests.replace("&", "&amp;"),
246
               'profile': scan_profiles[args.profile], 'format': report_formats[args.format],
247
               'output': "/reports/" + args.output})
248
249
250
def report_format(arg: Optional[str]) -> str:
251
    """Check if report format value is valid."""
252
    if arg not in report_formats:
253
        raise argparse.ArgumentTypeError("Specified report format is invalid!")
254
255
    return arg
256
257
258
def scan_profile(arg: Optional[str]) -> str:
259
    """Check if scan profile value is valid."""
260
    if arg not in scan_profiles:
261
        raise argparse.ArgumentTypeError("Specified scan profile is invalid!")
262
263
    return arg
264
265
266
def alive_test(arg: Optional[str]) -> str:
267
    """Check if alive test value is valid."""
268
    if arg not in alive_tests:
269
        raise argparse.ArgumentTypeError("Specified alive tests are invalid!")
270
271
    return arg
272
273
274
def max_hosts(arg: Optional[str]) -> int:
275
    """Check if max hosts value is valid."""
276
    try:
277
        value = int(arg)
278
279
        if value <= 0:
280
            raise ValueError
281
    except ValueError:
282
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous tested hosts is invalid!")
283
284
    return value
285
286
287
def max_checks(arg: Optional[str]) -> int:
288
    """Check if max checks value is valid."""
289
    try:
290
        value = int(arg)
291
292
        if value <= 0:
293
            raise ValueError
294
    except ValueError:
295
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous checks against hosts is invalid!")
296
297
    return value
298
299
300
def parse_arguments():
301
    """Add and parse script arguments."""
302
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
303
        description='Run OpenVAS scan with specified target and save report.')
304
    parser.add_argument('target', help='scan target')
305
    parser.add_argument('-o', '--output', help='output file (default: openvas.report)',
306
                        default="openvas.report", required=False)
307
    parser.add_argument('-f', '--format', help='format for report (default: ARF)',
308
                        default="ARF", type=report_format, required=False)
309
    parser.add_argument('-p', '--profile', help='scan profile (default: Full and fast)',
310
                        default="Full and fast", type=scan_profile, required=False)
311
    parser.add_argument('-t', '--tests', help='alive tests (default: ICMP, TCP-ACK Service & ARP Ping)',
312
                        default="ICMP, TCP-ACK Service & ARP Ping", type=alive_test, required=False)
313
    parser.add_argument('-e', '--exclude', help='hosts excluded from scan target (Default: "")',
314
                        default="", required=False)
315
    parser.add_argument('-m', '--hosts', help='maximum number of simultaneous tested hosts (Default: 10)',
316
                        type=max_hosts, default=10, required=False)
317
    parser.add_argument('-c', '--checks', help='maximum number of simultaneous checks against each host (Default: 3)',
318
                        type=max_checks, default=3, required=False)
319
    parser.add_argument('--update', help='synchronize feeds before scanning',
320
                        nargs='?', const=True, default=False, required=False)
321
    parser.add_argument('--debug', help='enable command responses printing',
322
                        nargs='?', const=True, default=False, required=False)
323
324
    return parser.parse_args()
325
326
327
if __name__ == '__main__':
328
    arguments: argparse.Namespace = parse_arguments()
329
330
    start_scan(arguments)
331