Passed
Push — master ( b83bb4...170235 )
by Dawid
20:07
created

scan.execute_command()   A

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
nop 2
dl 0
loc 15
rs 9.95
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
"""Automation script for OpenVAS 10."""
3
4
import subprocess
5
import argparse
6
import base64
7
import time
8
import os
9
from lxml import etree
10
from typing import Optional
11
from typing import Union
12
from typing import Dict
13
from typing import List
14
from typing import Set
15
from typing import IO
16
17
DEBUG: bool = False
18
19
scan_profiles: Dict[str, str] = {
20
    "Discovery": "8715c877-47a0-438d-98a3-27c7a6ab2196",
21
    "Empty": "085569ce-73ed-11df-83c3-002264764cea",
22
    "Full and fast": "daba56c8-73ec-11df-a475-002264764cea",
23
    "Full and fast ultimate": "698f691e-7489-11df-9d8c-002264764cea",
24
    "Full and very deep": "708f25c4-7489-11df-8094-002264764cea",
25
    "Full and very deep ultimate": "74db13d6-7489-11df-91b9-002264764cea",
26
    "Host Discovery": "2d3f051c-55ba-11e3-bf43-406186ea4fc5",
27
    "System Discovery": "bbca7412-a950-11e3-9109-406186ea4fc5"
28
}
29
30
report_formats: Dict[str, str] = {
31
    "Anonymous XML": "5057e5cc-b825-11e4-9d0e-28d24461215b",
32
    "ARF": "910200ca-dc05-11e1-954f-406186ea4fc5",
33
    "CPE": "5ceff8ba-1f62-11e1-ab9f-406186ea4fc5",
34
    "CSV Hosts": "9087b18c-626c-11e3-8892-406186ea4fc5",
35
    "CSV Results": "c1645568-627a-11e3-a660-406186ea4fc5",
36
    "HTML": "6c248850-1f62-11e1-b082-406186ea4fc5",
37
    "ITG": "77bd6c4a-1f62-11e1-abf0-406186ea4fc5",
38
    "LaTeX": "a684c02c-b531-11e1-bdc2-406186ea4fc5",
39
    "NBE": "9ca6fe72-1f62-11e1-9e7c-406186ea4fc5",
40
    "PDF": "c402cc3e-b531-11e1-9163-406186ea4fc5",
41
    "Topology SVG": "9e5e5deb-879e-4ecc-8be6-a71cd0875cdd",
42
    "TXT": "a3810a62-1f62-11e1-9219-406186ea4fc5",
43
    "Verinice ISM": "c15ad349-bd8d-457a-880a-c7056532ee15",
44
    "Verinice ITG": "50c9950a-f326-11e4-800c-28d24461215b",
45
    "XML": "a994b278-1f62-11e1-96ac-406186ea4fc5"
46
}
47
48
alive_tests: Set[str] = {
49
    "Scan Config Default",
50
    "ICMP, TCP-ACK Service & ARP Ping",
51
    "TCP-ACK Service & ARP Ping",
52
    "ICMP & ARP Ping",
53
    "ICMP & TCP-ACK Service Ping",
54
    "ARP Ping",
55
    "TCP-ACK Service Ping",
56
    "TCP-SYN Service Ping",
57
    "ICMP Ping",
58
    "Consider Alive",
59
}
60
61
62
def execute_command(command: str, xpath: Optional[str] = None) -> Union[str, float, bool, List]:
63
    """Execute omp command and return its output (optionally xpath can be used to get nested XML element)."""
64
    global DEBUG
65
66
    command: str = "omp -u admin -w admin -h 127.0.0.1 -p 9390 --xml \'{}\'".format(command)
67
68
    if DEBUG:
69
        print("[DEBUG] Command: {}".format(command))
70
71
    response: str = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode().strip()
72
73
    if DEBUG:
74
        print("[DEBUG] Response: {}".format(response))
75
76
    return etree.XML(response).xpath(xpath) if xpath else response
77
78
79
def perform_cleanup() -> None:
80
    """Remove all existing tasks and targets."""
81
    existing_tasks: List = execute_command("<get_tasks/>", "//get_tasks_response/task")
82
83
    for task in existing_tasks:
84
        execute_command("<delete_task task_id=\"{}\"/>".format(task.get("id")))
85
86
    existing_targets: List = execute_command("<get_targets/>", "//get_targets_response/target")
87
88
    for target in existing_targets:
89
        execute_command("<delete_target target_id=\"{}\"/>".format(target.get("id")))
90
91
92
def print_logs() -> None:
93
    """Show logs from OpenVAS."""
94
    if DEBUG:
95
        logs: str = open("/var/log/openvas/openvassd.messages", "r").read()
96
97
        print("OpenVAS Logs: {}".format(logs))
98
99
100
def save_report(path: str, report: str) -> None:
101
    """Save report to specified file."""
102
    file: IO[str] = open(path, 'wb')
103
    file.write(report)
104
    file.close()
105
106
107
def get_report(report_id: str, output_format: str) -> str:
108
    """Get generated report. Decode from Base64 if not XML."""
109
    command: str = "<get_reports report_id=\"{}\" format_id=\"{}\" ".format(report_id, output_format) + \
110
                   "filter=\"levels=hmlg\" type=\"scan\" details=\"1\" note_details=\"1\" ignore_pagination=\"1\"/>"
111
112
    if output_format == 'a994b278-1f62-11e1-96ac-406186ea4fc5':
113
        report: etree.Element = execute_command(command, '//get_reports_response/report')[0]
114
    else:
115
        report: str = execute_command(command, 'string(//get_reports_response/report/text())')
116
117
    return base64.b64decode(report) if isinstance(report, str) else etree.tostring(report).strip()
118
119
120
def process_task(task_id: str) -> str:
121
    """Wait for task to finish and return report id."""
122
    status: Optional[str] = None
123
    task: Optional[str] = None
124
125
    while status != "Done":
126
        try:
127
            time.sleep(10)
128
129
            task = execute_command("<get_tasks task_id=\"{}\"/>".format(task_id))
130
            status = etree.XML(task).xpath("string(//status/text())")
131
            progress: int = int(etree.XML(task).xpath("string(//progress/text())"))
132
133
            os.system("clear")
134
135
            if progress > 0:
136
                print("Task status: {} {}%".format(status, progress))
137
            else:
138
                print("Task status: Complete")
139
        except subprocess.CalledProcessError as exception:
140
            print("ERROR: ", exception.output)
141
142
    return etree.XML(task).xpath("string(//report/@id)")
143
144
145
def start_task(task_id) -> None:
146
    """Start task with specified id."""
147
    execute_command("<start_task task_id=\"{}\"/>".format(task_id))
148
149
150
def create_task(profile, target_id) -> str:
151
    """Create new scan task for target."""
152
    command: str = "<create_task><name>scan</name>" + \
153
                   "<target id=\"{}\"></target>".format(target_id) + \
154
                   "<config id=\"{}\"></config></create_task>".format(profile)
155
156
    return execute_command(command, "string(//create_task_response/@id)")
157
158
159
def create_target(scan) -> str:
160
    """Create new target."""
161
    command: str = "<create_target><name>scan</name><hosts>{}</hosts>".format(scan['target']) + \
162
                   "<exclude_hosts>{}</exclude_hosts>".format(scan['exclude']) + \
163
                   "<alive_tests>{}</alive_tests></create_target>".format(scan['tests'])
164
165
    return execute_command(command, "string(//create_target_response/@id)")
166
167
168
def make_scan(scan: Dict[str, str]) -> None:
169
    """Make automated OpenVAS scan and save generated report."""
170
    perform_cleanup()
171
    print("Performed initial cleanup.")
172
173
    target_id = create_target(scan)
174
    print("Created target with id: {}.".format(target_id))
175
176
    task_id = create_task(scan['profile'], target_id)
177
    print("Created task with id: {}.".format(task_id))
178
179
    start_task(task_id)
180
    print("Started task.")
181
182
    print("Waiting for task to finish...")
183
    report_id = process_task(task_id)
184
    print("Finished processing task.")
185
186
    report = get_report(report_id, scan['format'])
187
    print("Generated report.")
188
189
    save_report(scan['output'], report)
190
    print("Saved report to {}.".format(scan['output']))
191
192
    print_logs()
193
    perform_cleanup()
194
    print("Done!")
195
196
197
def start_scan(args: argparse.Namespace) -> None:
198
    """Override default settings and start scan."""
199
    global DEBUG
200
201
    if args.debug:
202
        DEBUG = True
203
204
    subprocess.check_call(
205
        ["sed -i 's/max_hosts.*/max_hosts = " + str(args.hosts) + "/' /etc/openvas/openvassd.conf"],
206
        shell=True,
207
        stdout=subprocess.DEVNULL
208
    )
209
    subprocess.check_call(
210
        ["sed -i 's/max_checks.*/max_checks = " + str(args.checks) + "/' /etc/openvas/openvassd.conf"],
211
        shell=True,
212
        stdout=subprocess.DEVNULL
213
    )
214
215
    if args.update is True:
216
        print("Starting and updating OpenVAS...")
217
        subprocess.check_call(["/update"], shell=True, stdout=subprocess.DEVNULL)
218
    else:
219
        print("Starting OpenVAS...")
220
        subprocess.check_call(["/start"], shell=True, stdout=subprocess.DEVNULL)
221
222
    print("Starting scan with settings:")
223
    print("* Target: {}".format(args.target))
224
    print("* Excluded hosts: {}".format(args.exclude))
225
    print("* Scan profile: {}".format(args.profile))
226
    print("* Alive tests: {}".format(args.tests))
227
    print("* Max hosts: {}".format(args.hosts))
228
    print("* Max checks: {}".format(args.checks))
229
    print("* Report format: {}".format(args.format))
230
    print("* Output file: {}\n".format(args.output))
231
232
    make_scan({'target': args.target, 'exclude': args.exclude, 'tests': args.tests.replace("&", "&amp;"),
233
               'profile': scan_profiles[args.profile], 'format': report_formats[args.format],
234
               'output': "/reports/" + args.output})
235
236
237
def report_format(arg: Optional[str]) -> str:
238
    """Check if report format value is valid."""
239
    if arg not in report_formats:
240
        raise argparse.ArgumentTypeError("Specified report format is invalid!")
241
242
    return arg
243
244
245
def scan_profile(arg: Optional[str]) -> str:
246
    """Check if scan profile value is valid."""
247
    if arg not in scan_profiles:
248
        raise argparse.ArgumentTypeError("Specified scan profile is invalid!")
249
250
    return arg
251
252
253
def alive_test(arg: Optional[str]) -> str:
254
    """Check if alive test value is valid."""
255
    if arg not in alive_tests:
256
        raise argparse.ArgumentTypeError("Specified alive tests are invalid!")
257
258
    return arg
259
260
261
def max_hosts(arg: Optional[str]) -> int:
262
    """Check if max hosts value is valid."""
263
    try:
264
        value = int(arg)
265
266
        if value <= 0:
267
            raise ValueError
268
    except ValueError:
269
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous tested hosts is invalid!")
270
271
    return value
272
273
274
def max_checks(arg: Optional[str]) -> int:
275
    """Check if max checks value is valid."""
276
    try:
277
        value = int(arg)
278
279
        if value <= 0:
280
            raise ValueError
281
    except ValueError:
282
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous checks against hosts is invalid!")
283
284
    return value
285
286
287
def parse_arguments():
288
    """Add and parse script arguments."""
289
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
290
        description='Run OpenVAS scan with specified target and save report.')
291
    parser.add_argument('target', help='scan target')
292
    parser.add_argument('-o', '--output', help='output file (default: openvas.report)',
293
                        default="openvas.report", required=False)
294
    parser.add_argument('-f', '--format', help='format for report (default: ARF)',
295
                        default="ARF", type=report_format, required=False)
296
    parser.add_argument('-p', '--profile', help='scan profile (default: Full and fast)',
297
                        default="Full and fast", type=scan_profile, required=False)
298
    parser.add_argument('-t', '--tests', help='alive tests (default: ICMP, TCP-ACK Service & ARP Ping)',
299
                        default="ICMP, TCP-ACK Service & ARP Ping", type=alive_test, required=False)
300
    parser.add_argument('-e', '--exclude', help='hosts excluded from scan target (Default: "")',
301
                        default="", required=False)
302
    parser.add_argument('-m', '--hosts', help='maximum number of simultaneous tested hosts (Default: 3)',
303
                        type=max_hosts, default=3, required=False)
304
    parser.add_argument('-c', '--checks', help='maximum number of simultaneous checks against each host (Default: 10)',
305
                        type=max_checks, default=10, required=False)
306
    parser.add_argument('--update', help='synchronize feeds before scanning',
307
                        nargs='?', const=True, default=False, required=False)
308
    parser.add_argument('--debug', help='enable command responses printing',
309
                        nargs='?', const=True, default=False, required=False)
310
311
    return parser.parse_args()
312
313
314
if __name__ == '__main__':
315
    arguments: argparse.Namespace = parse_arguments()
316
317
    start_scan(arguments)
318