1 | # -*- coding: utf-8 -*- |
||
2 | # Copyright (C) 2018 Henning Häcker |
||
3 | # Copyright (C) 2019-2021 Greenbone Networks GmbH |
||
4 | # |
||
5 | # SPDX-License-Identifier: GPL-3.0-or-later |
||
6 | # |
||
7 | # This program is free software: you can redistribute it and/or modify |
||
8 | # it under the terms of the GNU General Public License as published by |
||
9 | # the Free Software Foundation, either version 3 of the License, or |
||
10 | # (at your option) any later version. |
||
11 | # |
||
12 | # This program is distributed in the hope that it will be useful, |
||
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
15 | # GNU General Public License for more details. |
||
16 | # |
||
17 | # You should have received a copy of the GNU General Public License |
||
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
19 | |||
20 | from typing import List |
||
21 | from argparse import ArgumentParser, RawTextHelpFormatter |
||
22 | |||
23 | |||
24 | HELP_TEXT = """ |
||
25 | This script makes an E-Mail alert scan. |
||
26 | |||
27 | Usage examples: |
||
28 | $ gvm-script --gmp-username name --gmp-password pass ssh --hostname |
||
29 | ... start-alert-scan.gmp.py +h |
||
30 | ... start-alert-scan.gmp.py ++target-name ++hosts ++ports \ |
||
31 | ++port-list-name +C +R +S |
||
32 | ... start-alert-scan.gmp.py ++target-name ++hosts ++port-list-id \ |
||
33 | +C ++recipient ++sender |
||
34 | ... start-alert-scan.gmp.py ++target-id +C ++recipient ++sender |
||
35 | """ |
||
36 | |||
37 | |||
38 | def get_config(gmp, config, debug=False): |
||
39 | # get all configs of the openvas instance |
||
40 | # filter for all rows! |
||
41 | res = gmp.get_configs(filter="rows=-1") |
||
42 | |||
43 | if config < 0 or config > 4: |
||
44 | raise ValueError("Wrong config identifier. Choose between [0,4].") |
||
45 | # match the config abbreviation to accepted config names |
||
46 | config_list = [ |
||
47 | 'Full and fast', |
||
48 | 'Full and fast ultimate', |
||
49 | 'Full and very deep', |
||
50 | 'Full and very deep ultimate', |
||
51 | 'System Discovery', |
||
52 | ] |
||
53 | template_abbreviation_mapper = { |
||
54 | 0: config_list[0], |
||
55 | 1: config_list[1], |
||
56 | 2: config_list[2], |
||
57 | 3: config_list[3], |
||
58 | 4: config_list[4], |
||
59 | } |
||
60 | |||
61 | for conf in res.xpath('config'): |
||
62 | cid = conf.xpath('@id')[0] |
||
63 | name = conf.xpath('name/text()')[0] |
||
64 | |||
65 | # get the config id of the desired template |
||
66 | if template_abbreviation_mapper.get(config) == name: |
||
67 | config_id = cid |
||
68 | if debug: |
||
69 | print(name + ": " + config_id) |
||
70 | break |
||
71 | |||
72 | return config_id |
||
73 | |||
74 | |||
75 | def get_target( |
||
76 | gmp, |
||
77 | target_name: str = None, |
||
78 | hosts: List[str] = None, |
||
79 | ports: str = None, |
||
80 | port_list_name: str = None, |
||
81 | port_list_id: str = None, |
||
82 | debug: bool = True, |
||
83 | ): |
||
84 | if target_name is None: |
||
85 | target_name = "target" |
||
86 | targets = gmp.get_targets(filter=target_name) |
||
87 | existing_targets = [""] |
||
88 | for target in targets.findall("target"): |
||
89 | existing_targets.append(str(target.find('name').text)) |
||
90 | counter = 0 |
||
91 | # iterate over existing targets and find a vacant targetName |
||
92 | if target_name in existing_targets: |
||
93 | while True: |
||
94 | tmp_name = "{} ({})".format(target_name, str(counter)) |
||
95 | if tmp_name in existing_targets: |
||
96 | counter += 1 |
||
97 | else: |
||
98 | target_name = tmp_name |
||
99 | break |
||
100 | |||
101 | if debug: |
||
102 | print("target name: {}".format(target_name)) |
||
103 | |||
104 | if not port_list_id: |
||
105 | existing_port_lists = [""] |
||
106 | port_lists_tree = gmp.get_port_lists() |
||
107 | for plist in port_lists_tree.findall("port_list"): |
||
108 | existing_port_lists.append(str(plist.find('name').text)) |
||
109 | |||
110 | print(existing_port_lists) |
||
111 | |||
112 | if port_list_name is None: |
||
113 | port_list_name = "portlist" |
||
114 | |||
115 | if port_list_name in existing_port_lists: |
||
116 | counter = 0 |
||
117 | while True: |
||
118 | tmp_name = "{} ({})".format(port_list_name, str(counter)) |
||
119 | if tmp_name in existing_port_lists: |
||
120 | counter += 1 |
||
121 | else: |
||
122 | port_list_name = tmp_name |
||
123 | break |
||
124 | |||
125 | port_list = gmp.create_port_list(port_list_name, ports) |
||
126 | # create port list |
||
127 | port_list_id = port_list.xpath('@id')[0] |
||
128 | if debug: |
||
129 | print("New Portlist-name:\t{}".format(str(port_list_name))) |
||
130 | print("New Portlist-id:\t{}".format(str(port_list_id))) |
||
131 | |||
132 | # integrate port list id into create_target |
||
133 | res = gmp.create_target(target_name, hosts=hosts, port_list_id=port_list_id) |
||
134 | print("New target '{}' created.".format(target_name)) |
||
135 | return res.xpath('@id')[0] |
||
136 | |||
137 | |||
138 | def get_alert( |
||
139 | gmp, |
||
140 | sender_email: str, |
||
141 | recipient_email: str, |
||
142 | alert_name: str = None, |
||
143 | debug=False, |
||
144 | ): |
||
145 | |||
146 | # create alert if necessary |
||
147 | alert_object = gmp.get_alerts(filter='name={}'.format(alert_name)) |
||
148 | alert = alert_object.xpath('alert') |
||
149 | |||
150 | if len(alert) == 0: |
||
151 | print("creating new alert {}".format(alert_name)) |
||
152 | gmp.create_alert( |
||
153 | alert_name, |
||
154 | event=gmp.types.AlertEvent.TASK_RUN_STATUS_CHANGED, |
||
155 | event_data={"status": "Done"}, |
||
156 | condition=gmp.types.AlertCondition.ALWAYS, |
||
157 | method=gmp.types.AlertMethod.EMAIL, |
||
158 | method_data={ |
||
159 | """Task '$n': $e |
||
160 | |||
161 | After the event $e, |
||
162 | the following condition was met: $c |
||
163 | |||
164 | This email escalation is configured to attach report format '$r'. |
||
165 | Full details and other report formats are available on the scan engine. |
||
166 | |||
167 | $t |
||
168 | |||
169 | Note: |
||
170 | This email was sent to you as a configured security scan escalation. |
||
171 | Please contact your local system administrator if you think you |
||
172 | should not have received it. |
||
173 | """: "message", |
||
174 | "2": "notice", |
||
175 | sender_email: "from_address", |
||
176 | "[OpenVAS-Manager] Task": "subject", |
||
177 | "c402cc3e-b531-11e1-9163-406186ea4fc5": "notice_attach_format", |
||
178 | recipient_email: "to_address", |
||
179 | }, |
||
180 | ) |
||
181 | |||
182 | alert_object = gmp.get_alerts(filter='name={}'.format(recipient_email)) |
||
183 | alert = alert_object.xpath('alert') |
||
184 | |||
185 | alert_id = alert[0].get('id', 'no id found') |
||
186 | if debug: |
||
187 | print("alert_id: {}".format(str(alert_id))) |
||
188 | |||
189 | return alert_id |
||
190 | |||
191 | |||
192 | def get_scanner(gmp): |
||
193 | res = gmp.get_scanners() |
||
194 | scanner_ids = res.xpath('scanner/@id') |
||
195 | return scanner_ids[1] # "default scanner" |
||
196 | |||
197 | |||
198 | def create_and_start_task( |
||
199 | gmp, |
||
200 | config_id: str, |
||
201 | target_id: str, |
||
202 | scanner_id: str, |
||
203 | alert_id: str, |
||
204 | alert_name: str, |
||
205 | debug: bool = False, |
||
206 | ): |
||
207 | # Create the task |
||
208 | task_name = "Alert Scan for Alert {}".format(alert_name) |
||
209 | tasks = gmp.get_tasks(filter='name="{}"'.format(task_name)) |
||
210 | task_objects = tasks.findall('task') |
||
211 | print(task_objects) |
||
212 | if task_objects: |
||
213 | task_name = "Alert Scan for Alert {} ({})".format( |
||
214 | alert_name, len(task_objects) |
||
215 | ) |
||
216 | |||
217 | task_comment = "Alert Scan" |
||
218 | res = gmp.create_task( |
||
219 | task_name, |
||
220 | config_id, |
||
221 | target_id, |
||
222 | scanner_id, |
||
223 | alert_ids=[alert_id], |
||
224 | comment=task_comment, |
||
225 | ) |
||
226 | |||
227 | # Start the task |
||
228 | task_id = res.xpath('@id')[0] |
||
229 | gmp.start_task(task_id) |
||
230 | |||
231 | if debug: |
||
232 | # Stop the task (for performance reasons) |
||
233 | gmp.stop_task(task_id) |
||
234 | print('Task stopped') |
||
235 | |||
236 | return task_name |
||
237 | |||
238 | |||
239 | def parse_args(args): # pylint: disable=unused-argument |
||
240 | parser = ArgumentParser( |
||
241 | prefix_chars="+", |
||
242 | add_help=False, |
||
243 | formatter_class=RawTextHelpFormatter, |
||
244 | description=HELP_TEXT, |
||
245 | ) |
||
246 | |||
247 | parser.add_argument( |
||
248 | "+h", |
||
249 | "++help", |
||
250 | action="help", |
||
251 | help="Show this help message and exit.", |
||
252 | ) |
||
253 | |||
254 | target = parser.add_mutually_exclusive_group(required=True) |
||
255 | |||
256 | target.add_argument( |
||
257 | "++target-id", |
||
258 | type=str, |
||
259 | dest="target_id", |
||
260 | help="Use an existing target by target id", |
||
261 | ) |
||
262 | |||
263 | target.add_argument( |
||
264 | "++target-name", |
||
265 | type=str, |
||
266 | dest="target_name", |
||
267 | help="Create a target by name", |
||
268 | ) |
||
269 | |||
270 | parser.add_argument( |
||
271 | "++hosts", |
||
272 | nargs='+', |
||
273 | dest='hosts', |
||
274 | help="Host(s) for the new target", |
||
275 | ) |
||
276 | |||
277 | ports = parser.add_mutually_exclusive_group() |
||
278 | |||
279 | ports.add_argument( |
||
280 | "++port-list-id", |
||
281 | type=str, |
||
282 | dest="port_list_id", |
||
283 | help="An existing portlist id for the new target", |
||
284 | ) |
||
285 | ports.add_argument( |
||
286 | "++ports", |
||
287 | type=str, |
||
288 | dest='ports', |
||
289 | help="Ports in the new target: e.g. T:80-80,8080", |
||
290 | ) |
||
291 | |||
292 | parser.add_argument( |
||
293 | "++port-list-name", |
||
294 | type=str, |
||
295 | dest="port_list_name", |
||
296 | help="Name for the new portlist in the new target", |
||
297 | ) |
||
298 | |||
299 | config = parser.add_mutually_exclusive_group() |
||
300 | |||
301 | config.add_argument( |
||
302 | "+C", |
||
303 | "++scan-config", |
||
304 | default=0, |
||
305 | type=int, |
||
306 | dest='config', |
||
307 | help="Choose from existing scan config:" |
||
308 | "\n 0: Full and fast" |
||
309 | "\n 1: Full and fast ultimate" |
||
310 | "\n 2: Full and very deep" |
||
311 | "\n 3: Full and very deep ultimate" |
||
312 | "\n 4: System Discovery", |
||
313 | ) |
||
314 | |||
315 | config.add_argument( |
||
316 | "++scan-config-id", |
||
317 | type=str, |
||
318 | dest='scan_config_id', |
||
319 | help="Use existing scan config by id", |
||
320 | ) |
||
321 | |||
322 | parser.add_argument( |
||
323 | "++scanner-id", |
||
324 | type=str, |
||
325 | dest='scanner_id', |
||
326 | help="Use existing scanner by id", |
||
327 | ) |
||
328 | |||
329 | parser.add_argument( |
||
330 | "+R", |
||
331 | "++recipient", |
||
332 | required=True, |
||
333 | dest='recipient_email', |
||
334 | type=str, |
||
335 | help="Alert recipient E-Mail address", |
||
336 | ) |
||
337 | |||
338 | parser.add_argument( |
||
339 | "+S", |
||
340 | "++sender", |
||
341 | required=True, |
||
342 | dest='sender_email', |
||
343 | type=str, |
||
344 | help="Alert senders E-Mail address", |
||
345 | ) |
||
346 | |||
347 | parser.add_argument( |
||
348 | "++alert-name", |
||
349 | dest='alert_name', |
||
350 | type=str, |
||
351 | help="Optional Alert name", |
||
352 | ) |
||
353 | |||
354 | script_args, _ = parser.parse_known_args() |
||
355 | return script_args |
||
356 | |||
357 | |||
358 | def main(gmp, args): |
||
359 | # pylint: disable=undefined-variable, unused-argument |
||
360 | |||
361 | script_args = parse_args(args) |
||
362 | |||
363 | # set alert_name to recipient email if no other name |
||
364 | # is given |
||
365 | if script_args.alert_name is None: |
||
366 | script_args.alert_name = script_args.recipient_email |
||
367 | |||
368 | # use existing config from argument |
||
369 | if not script_args.scan_config_id: |
||
370 | config_id = get_config(gmp, script_args.config) |
||
371 | else: |
||
372 | config_id = script_args.scan_config_id |
||
373 | |||
374 | # create new target or use existing one from id |
||
375 | if not script_args.target_id: |
||
376 | target_id = get_target( |
||
377 | gmp, |
||
378 | target_name=script_args.target_name, |
||
379 | hosts=script_args.hosts, |
||
380 | ports=script_args.ports, |
||
381 | port_list_name=script_args.port_list_name, |
||
382 | port_list_id=script_args.port_list_id, |
||
383 | ) |
||
384 | else: |
||
385 | target_id = script_args.target_id |
||
386 | alert_id = get_alert( |
||
387 | gmp, |
||
388 | script_args.sender_email, |
||
389 | script_args.recipient_email, |
||
390 | script_args.alert_name, |
||
391 | ) |
||
392 | if not script_args.scanner_id: |
||
393 | scanner_id = get_scanner(gmp) |
||
394 | else: |
||
395 | scanner_id = script_args.scanner_id |
||
396 | |||
397 | create_and_start_task( |
||
398 | gmp, config_id, target_id, scanner_id, alert_id, script_args.alert_name |
||
399 | ) |
||
400 | |||
401 | print('Task started: ' + task_name) |
||
402 | |||
403 | print("\nScript finished\n") |
||
404 | |||
405 | |||
406 | if __name__ == '__gmp__': |
||
407 | main(gmp, args) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() Comprehensibility
Best Practice
introduced
by
|
|||
408 |