Passed
Push — master ( 6ffcfb...8e7850 )
by Dawid
01:17
created

scan.create_task()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
"""Automation script for OpenVAS 10."""
3
4
import subprocess
5
import argparse
6
import base64
7
import time
8
import os
9
from lxml import etree
10
from typing import Optional
11
from typing import Union
12
from typing import Dict
13
from typing import List
14
from typing import Set
15
from typing import IO
16
17
DEBUG: bool = False
18
19
scan_profiles: Dict[str, str] = {
20
    "Discovery": "8715c877-47a0-438d-98a3-27c7a6ab2196",
21
    "Empty": "085569ce-73ed-11df-83c3-002264764cea",
22
    "Full and fast": "daba56c8-73ec-11df-a475-002264764cea",
23
    "Full and fast ultimate": "698f691e-7489-11df-9d8c-002264764cea",
24
    "Full and very deep": "708f25c4-7489-11df-8094-002264764cea",
25
    "Full and very deep ultimate": "74db13d6-7489-11df-91b9-002264764cea",
26
    "Host Discovery": "2d3f051c-55ba-11e3-bf43-406186ea4fc5",
27
    "System Discovery": "bbca7412-a950-11e3-9109-406186ea4fc5"
28
}
29
30
report_formats: Dict[str, str] = {
31
    "Anonymous XML": "5057e5cc-b825-11e4-9d0e-28d24461215b",
32
    "ARF": "910200ca-dc05-11e1-954f-406186ea4fc5",
33
    "CPE": "5ceff8ba-1f62-11e1-ab9f-406186ea4fc5",
34
    "CSV Hosts": "9087b18c-626c-11e3-8892-406186ea4fc5",
35
    "CSV Results": "c1645568-627a-11e3-a660-406186ea4fc5",
36
    "HTML": "6c248850-1f62-11e1-b082-406186ea4fc5",
37
    "ITG": "77bd6c4a-1f62-11e1-abf0-406186ea4fc5",
38
    "LaTeX": "a684c02c-b531-11e1-bdc2-406186ea4fc5",
39
    "NBE": "9ca6fe72-1f62-11e1-9e7c-406186ea4fc5",
40
    "PDF": "c402cc3e-b531-11e1-9163-406186ea4fc5",
41
    "Topology SVG": "9e5e5deb-879e-4ecc-8be6-a71cd0875cdd",
42
    "TXT": "a3810a62-1f62-11e1-9219-406186ea4fc5",
43
    "Verinice ISM": "c15ad349-bd8d-457a-880a-c7056532ee15",
44
    "Verinice ITG": "50c9950a-f326-11e4-800c-28d24461215b",
45
    "XML": "a994b278-1f62-11e1-96ac-406186ea4fc5"
46
}
47
48
alive_tests: Set[str] = {
49
    "Scan Config Default",
50
    "ICMP, TCP-ACK Service & ARP Ping",
51
    "TCP-ACK Service & ARP Ping",
52
    "ICMP & ARP Ping",
53
    "ICMP & TCP-ACK Service Ping",
54
    "ARP Ping",
55
    "TCP-ACK Service Ping",
56
    "TCP-SYN Service Ping",
57
    "ICMP Ping",
58
    "Consider Alive",
59
}
60
61
62
def execute_command(command: str, xpath: Optional[str] = None) -> Union[str, float, bool, List]:
63
    """Execute omp command and return its output (optionally xpath can be used to get nested XML element)."""
64
    global DEBUG
65
66
    command: str = "omp -u admin -w admin -h 127.0.0.1 -p 9390 --xml \'{}\'".format(command)
67
68
    if DEBUG:
69
        print("[DEBUG] Command: {}".format(command))
70
71
    response: str = subprocess.check_output(command, stderr=subprocess.STDOUT, shell=True).decode().strip()
72
73
    if DEBUG:
74
        print("[DEBUG] Response: {}".format(response))
75
76
    return etree.XML(response).xpath(xpath) if xpath else response
77
78
79
def perform_cleanup() -> None:
80
    """Remove all existing tasks and targets."""
81
    existing_tasks: List = execute_command("<get_tasks/>", "//get_tasks_response/task")
82
83
    for task in existing_tasks:
84
        execute_command("<delete_task task_id=\"{}\"/>".format(task.get("id")))
85
86
    existing_targets: List = execute_command("<get_targets/>", "//get_targets_response/target")
87
88
    for target in existing_targets:
89
        execute_command("<delete_target target_id=\"{}\"/>".format(target.get("id")))
90
91
92
def save_report(path: str, report: str) -> None:
93
    """Save report to specified file."""
94
    file: IO[str] = open(path, 'wb')
95
    file.write(report)
96
    file.close()
97
98
99
def get_report(report_id: str, output_format: str) -> str:
100
    """Get generated report. Decode from Base64 if not XML."""
101
    command: str = "<get_reports report_id=\"{}\" format_id=\"{}\" ".format(report_id, output_format) + \
102
                   "filter=\"levels=hmlg\" type=\"scan\" details=\"1\" note_details=\"1\" ignore_pagination=\"1\"/>"
103
104
    if output_format == 'a994b278-1f62-11e1-96ac-406186ea4fc5':
105
        report: etree.Element = execute_command(command, '//get_reports_response/report')[0]
106
    else:
107
        report: str = execute_command(command, 'string(//get_reports_response/report/text())')
108
109
    return base64.b64decode(report) if isinstance(report, str) else etree.tostring(report).strip()
110
111
112
def process_task(task_id: str) -> str:
113
    """Wait for task to finish and return report id."""
114
    status: Optional[str] = None
115
    task: Optional[str] = None
116
117
    while status != "Done":
118
        try:
119
            time.sleep(10)
120
121
            task = execute_command("<get_tasks task_id=\"{}\"/>".format(task_id))
122
            status = etree.XML(task).xpath("string(//status/text())")
123
            progress: int = int(etree.XML(task).xpath("string(//progress/text())"))
124
125
            os.system("clear")
126
127
            if progress > 0:
128
                print("Task status: {} {}%".format(status, progress))
129
            else:
130
                print("Task status: Complete")
131
        except subprocess.CalledProcessError as exception:
132
            print("ERROR: ", exception.output)
133
134
    return etree.XML(task).xpath("string(//report/@id)")
135
136
137
def start_task(task_id) -> None:
138
    """Start task with specified id."""
139
    execute_command("<start_task task_id=\"{}\"/>".format(task_id))
140
141
142
def create_task(profile, target_id) -> str:
143
    """Create new scan task for target."""
144
    command: str = "<create_task><name>scan</name>" + \
145
                   "<target id=\"{}\"></target>".format(target_id) + \
146
                   "<config id=\"{}\"></config></create_task>".format(profile)
147
148
    return execute_command(command, "string(//create_task_response/@id)")
149
150
151
def create_target(scan) -> str:
152
    """Create new target."""
153
    command: str = "<create_target><name>scan</name><hosts>{}</hosts>".format(scan['target']) + \
154
                   "<exclude_hosts>{}</exclude_hosts>".format(scan['exclude']) + \
155
                   "<alive_tests>{}</alive_tests></create_target>".format(scan['tests'])
156
157
    return execute_command(command, "string(//create_target_response/@id)")
158
159
160
def make_scan(scan: Dict[str, str]) -> None:
161
    """Make automated OpenVAS scan and save generated report."""
162
    perform_cleanup()
163
    print("Performed initial cleanup.")
164
165
    target_id = create_target(scan)
166
    print("Created target with id: {}.".format(target_id))
167
168
    task_id = create_task(scan['profile'], target_id)
169
    print("Created task with id: {}.".format(task_id))
170
171
    start_task(task_id)
172
    print("Started task.")
173
174
    print("Waiting for task to finish...")
175
    report_id = process_task(task_id)
176
    print("Finished processing task.")
177
178
    report = get_report(report_id, scan['format'])
179
    print("Generated report.")
180
181
    save_report(scan['output'], report)
182
    print("Saved report to {}.".format(scan['output']))
183
184
    perform_cleanup()
185
    print("Done!")
186
187
188
def start_scan(args: argparse.Namespace) -> None:
189
    """Override default settings and start scan."""
190
    global DEBUG
191
192
    if args.debug:
193
        DEBUG = True
194
195
    subprocess.check_call(
196
        ["sed -i 's/max_hosts.*/max_hosts = " + str(args.hosts) + "/' /etc/openvas/openvassd.conf"],
197
        shell=True,
198
        stdout=subprocess.DEVNULL
199
    )
200
    subprocess.check_call(
201
        ["sed -i 's/max_checks.*/max_checks = " + str(args.checks) + "/' /etc/openvas/openvassd.conf"],
202
        shell=True,
203
        stdout=subprocess.DEVNULL
204
    )
205
206
    if args.update is True:
207
        print("Starting and updating OpenVAS...")
208
        subprocess.check_call(["/update"], shell=True, stdout=subprocess.DEVNULL)
209
    else:
210
        print("Starting OpenVAS...")
211
        subprocess.check_call(["/start"], shell=True, stdout=subprocess.DEVNULL)
212
213
    print("Starting scan with settings:")
214
    print("* Target: {}".format(args.target))
215
    print("* Excluded hosts: {}".format(args.exclude))
216
    print("* Scan profile: {}".format(args.profile))
217
    print("* Alive tests: {}".format(args.tests))
218
    print("* Max hosts: {}".format(args.hosts))
219
    print("* Max checks: {}".format(args.checks))
220
    print("* Report format: {}".format(args.format))
221
    print("* Output file: {}\n".format(args.output))
222
223
    make_scan({'target': args.target, 'exclude': args.exclude, 'tests': args.tests.replace("&", "&amp;"),
224
               'profile': scan_profiles[args.profile], 'format': report_formats[args.format],
225
               'output': "/reports/" + args.output})
226
227
228
def report_format(arg: Optional[str]) -> str:
229
    """Check if report format value is valid."""
230
    if arg not in report_formats:
231
        raise argparse.ArgumentTypeError("Specified report format is invalid!")
232
233
    return arg
234
235
236
def scan_profile(arg: Optional[str]) -> str:
237
    """Check if scan profile value is valid."""
238
    if arg not in scan_profiles:
239
        raise argparse.ArgumentTypeError("Specified scan profile is invalid!")
240
241
    return arg
242
243
244
def alive_test(arg: Optional[str]) -> str:
245
    """Check if alive test value is valid."""
246
    if arg not in alive_tests:
247
        raise argparse.ArgumentTypeError("Specified alive tests are invalid!")
248
249
    return arg
250
251
252
def max_hosts(arg: Optional[str]) -> int:
253
    """Check if max hosts value is valid."""
254
    try:
255
        value = int(arg)
256
257
        if value <= 0:
258
            raise ValueError
259
    except ValueError:
260
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous tested hosts is invalid!")
261
262
    return value
263
264
265
def max_checks(arg: Optional[str]) -> int:
266
    """Check if max checks value is valid."""
267
    try:
268
        value = int(arg)
269
270
        if value <= 0:
271
            raise ValueError
272
    except ValueError:
273
        raise argparse.ArgumentTypeError("Specified maximum number of simultaneous checks against hosts is invalid!")
274
275
    return value
276
277
278
def parse_arguments():
279
    """Add and parse script arguments."""
280
    parser: argparse.ArgumentParser = argparse.ArgumentParser(
281
        description='Run OpenVAS scan with specified target and save report.')
282
    parser.add_argument('target', help='scan target')
283
    parser.add_argument('-o', '--output', help='output file (default: openvas.report)',
284
                        default="openvas.report", required=False)
285
    parser.add_argument('-f', '--format', help='format for report (default: ARF)',
286
                        default="ARF", type=report_format, required=False)
287
    parser.add_argument('-p', '--profile', help='scan profile (default: Full and fast)',
288
                        default="Full and fast", type=scan_profile, required=False)
289
    parser.add_argument('-t', '--tests', help='alive tests (default: ICMP, TCP-ACK Service & ARP Ping)',
290
                        default="ICMP, TCP-ACK Service & ARP Ping", type=alive_test, required=False)
291
    parser.add_argument('-e', '--exclude', help='hosts excluded from scan target (Default: "")',
292
                        default="", required=False)
293
    parser.add_argument('-m', '--hosts', help='maximum number of simultaneous tested hosts (Default: 3)',
294
                        type=max_hosts, default=3, required=False)
295
    parser.add_argument('-c', '--checks', help='maximum number of simultaneous checks against each host (Default: 10)',
296
                        type=max_checks, default=10, required=False)
297
    parser.add_argument('--update', help='synchronize feeds before scanning',
298
                        nargs='?', const=True, default=False, required=False)
299
    parser.add_argument('--debug', help='enable command responses printing',
300
                        nargs='?', const=True, default=False, required=False)
301
302
    return parser.parse_args()
303
304
305
if __name__ == '__main__':
306
    arguments: argparse.Namespace = parse_arguments()
307
308
    start_scan(arguments)
309