Passed
Pull Request — master (#344)
by Jaspar
01:38
created

start-alert-scan.gmp.main()   B

Complexity

Conditions 5

Size

Total Lines 44
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 2
dl 0
loc 44
rs 8.7653
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Henning Häcker
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
from typing import List
20
from argparse import ArgumentParser, RawTextHelpFormatter
21
22
23
HELP_TEXT = """
24
        This script makes an E-Mail alert scan.
25
26
        Usage examples: 
27
            $ gvm-script --gmp-username name --gmp-password pass ssh --hostname
28
            ... start-alert-scan.gmp.py +h
29
            ... start-alert-scan.gmp.py ++target-name ++hosts ++ports \
30
                    ++port-list-name +C +R +S
31
            ... start-alert-scan.gmp.py ++target-name ++hosts ++port-list-id \
32
                    +C ++recipient ++sender
33
            ... start-alert-scan.gmp.py ++target-id +C ++recipient ++sender
34
    """
35
36
37
def get_config(gmp, config, debug=False):
38
    # get all configs of the openvas instance
39
    # filter for all rows!
40
    res = gmp.get_configs(filter="rows=-1")
41
42
    if config < 0 or config > 4:
43
        raise ValueError("Wrong config identifier. Choose between [0,4].")
44
    # match the config abbreviation to accepted config names
45
    config_list = [
46
        'Full and fast',
47
        'Full and fast ultimate',
48
        'Full and very deep',
49
        'Full and very deep ultimate',
50
        'System Discovery',
51
    ]
52
    template_abbreviation_mapper = {
53
        0: config_list[0],
54
        1: config_list[1],
55
        2: config_list[2],
56
        3: config_list[3],
57
        4: config_list[4],
58
    }
59
60
    for conf in res.xpath('config'):
61
        cid = conf.xpath('@id')[0]
62
        name = conf.xpath('name/text()')[0]
63
64
        # get the config id of the desired template
65
        if template_abbreviation_mapper.get(config) == name:
66
            config_id = cid
67
            if debug:
68
                print(name + ": " + config_id)
69
            break
70
71
    return config_id
0 ignored issues
show
introduced by
The variable config_id does not seem to be defined for all execution paths.
Loading history...
72
73
74
def get_target(
75
    gmp,
76
    target_name: str = None,
77
    hosts: List[str] = None,
78
    ports: str = None,
79
    port_list_name: str = None,
80
    port_list_id: str = None,
81
    debug: bool = True,
82
):
83
    if target_name is None:
84
        target_name = "target"
85
    targets = gmp.get_targets(filter=target_name)
86
    existing_targets = [""]
87
    for target in targets.findall("target"):
88
        existing_targets.append(str(target.find('name').text))
89
    counter = 0
90
    # iterate over existing targets and find a vacant targetName
91
    if target_name in existing_targets:
92
        while True:
93
            tmp_name = "{} ({})".format(target_name, str(counter))
94
            if tmp_name in existing_targets:
95
                counter += 1
96
            else:
97
                target_name = tmp_name
98
                break
99
100
    if debug:
101
        print("target name: {}".format(target_name))
102
103
    if not port_list_id:
104
        existing_port_lists = [""]
105
        port_lists_tree = gmp.get_port_lists()
106
        for plist in port_lists_tree.findall("port_list"):
107
            existing_port_lists.append(str(plist.find('name').text))
108
109
        print(existing_port_lists)
110
111
        if port_list_name is None:
112
            port_list_name = "portlist"
113
114
        if port_list_name in existing_port_lists:
115
            counter = 0
116
            while True:
117
                tmp_name = "{} ({})".format(port_list_name, str(counter))
118
                if tmp_name in existing_port_lists:
119
                    counter += 1
120
                else:
121
                    port_list_name = tmp_name
122
                    break
123
124
        port_list = gmp.create_port_list(port_list_name, ports)
125
        # create port list
126
        port_list_id = port_list.xpath('@id')[0]
127
        if debug:
128
            print("New Portlist-name:\t{}".format(str(port_list_name)))
129
            print("New Portlist-id:\t{}".format(str(port_list_id)))
130
131
    # integrate port list id into create_target
132
    res = gmp.create_target(target_name, hosts=hosts, port_list_id=port_list_id)
133
    print("New target '{}' created.".format(target_name))
134
    return res.xpath('@id')[0]
135
136
137
def get_alert(
138
    gmp,
139
    sender_email: str,
140
    recipient_email: str,
141
    alert_name: str = None,
142
    debug=False,
143
):
144
145
    # create alert if necessary
146
    alert_object = gmp.get_alerts(filter='name={}'.format(alert_name))
147
    alert = alert_object.xpath('alert')
148
149
    if len(alert) == 0:
150
        print("creating new alert {}".format(alert_name))
151
        gmp.create_alert(
152
            alert_name,
153
            event=gmp.types.AlertEvent.TASK_RUN_STATUS_CHANGED,
154
            event_data={"status": "Done"},
155
            condition=gmp.types.AlertCondition.ALWAYS,
156
            method=gmp.types.AlertMethod.EMAIL,
157
            method_data={
158
                """Task '$n': $e
159
160
After the event $e,
161
the following condition was met: $c
162
163
This email escalation is configured to attach report format '$r'.
164
Full details and other report formats are available on the scan engine.
165
166
$t
167
168
Note:
169
This email was sent to you as a configured security scan escalation.
170
Please contact your local system administrator if you think you
171
should not have received it.
172
""": "message",
173
                "2": "notice",
174
                sender_email: "from_address",
175
                "[OpenVAS-Manager] Task": "subject",
176
                "c402cc3e-b531-11e1-9163-406186ea4fc5": "notice_attach_format",
177
                recipient_email: "to_address",
178
            },
179
        )
180
181
        alert_object = gmp.get_alerts(filter='name={}'.format(recipient_email))
182
        alert = alert_object.xpath('alert')
183
184
    alert_id = alert[0].get('id', 'no id found')
185
    if debug:
186
        print("alert_id: {}".format(str(alert_id)))
187
188
    return alert_id
189
190
191
def get_scanner(gmp):
192
    res = gmp.get_scanners()
193
    scanner_ids = res.xpath('scanner/@id')
194
    return scanner_ids[1]  # "default scanner"
195
196
197
def create_and_start_task(
198
    gmp,
199
    config_id: str,
200
    target_id: str,
201
    scanner_id: str,
202
    alert_id: str,
203
    alert_name: str,
204
    debug: bool = False,
205
):
206
    # Create the task
207
    task_name = "Alert Scan for Alert {}".format(alert_name)
208
    tasks = gmp.get_tasks(filter="name~{}".format(task_name))
209
    if tasks is not None:
210
        task_name = "Alert Scan for Alert {} ({})".format(
211
            alert_name, len(tasks.xpath('tasks/@id'))
212
        )
213
    task_comment = "Alert Scan"
214
    res = gmp.create_task(
215
        task_name,
216
        config_id,
217
        target_id,
218
        scanner_id,
219
        alert_ids=[alert_id],
220
        comment=task_comment,
221
    )
222
223
    # Start the task
224
    task_id = res.xpath('@id')[0]
225
    gmp.start_task(task_id)
226
227
    print('Task started: ' + task_name)
228
229
    if debug:
230
        # Stop the task (for performance reasons)
231
        gmp.stop_task(task_id)
232
        print('Task stopped')
233
234
235
def parse_args(args):  # pylint: disable=unused-argument
236
    parser = ArgumentParser(
237
        prefix_chars="+",
238
        add_help=False,
239
        formatter_class=RawTextHelpFormatter,
240
        description=HELP_TEXT,
241
    )
242
243
    parser.add_argument(
244
        "+h",
245
        "++help",
246
        action="help",
247
        help="Show this help message and exit.",
248
    )
249
250
    target = parser.add_mutually_exclusive_group(required=True)
251
252
    target.add_argument(
253
        "++target-id",
254
        type=str,
255
        dest="target_id",
256
        help="Use an existing target by target id",
257
    )
258
259
    target.add_argument(
260
        "++target-name",
261
        type=str,
262
        dest="target_name",
263
        help="Create a target by name",
264
    )
265
266
    parser.add_argument(
267
        "++hosts",
268
        nargs='+',
269
        dest='hosts',
270
        help="Host(s) for the new target",
271
    )
272
273
    ports = parser.add_mutually_exclusive_group()
274
275
    ports.add_argument(
276
        "++port-list-id",
277
        type=str,
278
        dest="port_list_id",
279
        help="An existing portlist id for the new target",
280
    )
281
    ports.add_argument(
282
        "++ports",
283
        type=str,
284
        dest='ports',
285
        help="Ports in the new target: e.g. T:80-80,8080",
286
    )
287
288
    parser.add_argument(
289
        "++port-list-name",
290
        type=str,
291
        dest="port_list_name",
292
        help="Name for the new portlist in the new target",
293
    )
294
295
    config = parser.add_mutually_exclusive_group()
296
297
    config.add_argument(
298
        "+C",
299
        "++scan-config",
300
        default=0,
301
        type=int,
302
        dest='config',
303
        help="Choose from existing scan config:"
304
        "\n  0: Full and fast"
305
        "\n  1: Full and fast ultimate"
306
        "\n  2: Full and very deep"
307
        "\n  3: Full and very deep ultimate"
308
        "\n  4: System Discovery",
309
    )
310
311
    config.add_argument(
312
        "++scan-config-id",
313
        type=str,
314
        dest='scan_config_id',
315
        help="Use existing scan config by id",
316
    )
317
318
    parser.add_argument(
319
        "++scanner-id",
320
        type=str,
321
        dest='scanner_id',
322
        help="Use existing scanner by id",
323
    )
324
325
    parser.add_argument(
326
        "+R",
327
        "++recipient",
328
        required=True,
329
        dest='recipient_email',
330
        type=str,
331
        help="Alert recipient E-Mail address",
332
    )
333
334
    parser.add_argument(
335
        "+S",
336
        "++sender",
337
        required=True,
338
        dest='sender_email',
339
        type=str,
340
        help="Alert senders E-Mail address",
341
    )
342
343
    parser.add_argument(
344
        "++alert-name",
345
        dest='alert_name',
346
        type=str,
347
        help="Optional Alert name",
348
    )
349
350
    script_args, _ = parser.parse_known_args()
351
    return script_args
352
353
354
def main(gmp, args):
355
    # pylint: disable=undefined-variable, unused-argument
356
357
    script_args = parse_args(args)
358
359
    # set alert_name to recipient email if no other name
360
    # is given
361
    if script_args.alert_name is None:
362
        script_args.alert_name = script_args.recipient_email
363
364
    # use existing config from argument
365
    if not script_args.scan_config_id:
366
        config_id = get_config(gmp, script_args.config)
367
    else:
368
        config_id = script_args.scan_config_id
369
370
    # create new target or use existing one from id
371
    if not script_args.target_id:
372
        target_id = get_target(
373
            gmp,
374
            target_name=script_args.target_name,
375
            hosts=script_args.hosts,
376
            ports=script_args.ports,
377
            port_list_name=script_args.port_list_name,
378
            port_list_id=script_args.port_list_id,
379
        )
380
    else:
381
        target_id = script_args.target_id
382
    alert_id = get_alert(
383
        gmp,
384
        script_args.sender_email,
385
        script_args.recipient_email,
386
        script_args.alert_name,
387
    )
388
    if not script_args.scanner_id:
389
        scanner_id = get_scanner(gmp)
390
    else:
391
        scanner_id = script_args.scanner_id
392
393
    create_and_start_task(
394
        gmp, config_id, target_id, scanner_id, alert_id, script_args.alert_name
395
    )
396
397
    print("\nScript finished\n")
398
399
400
if __name__ == '__gmp__':
401
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
402