Passed
Pull Request — master (#344)
by Jaspar
01:13
created

start-alert-scan.gmp.get_target()   D

Complexity

Conditions 12

Size

Total Lines 51
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 41
nop 7
dl 0
loc 51
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like start-alert-scan.gmp.get_target() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# Copyright (C) 2018 Henning Häcker
3
#
4
# SPDX-License-Identifier: GPL-3.0-or-later
5
#
6
# This program is free software: you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation, either version 3 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful,
12
# but WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
# GNU General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19
import sys
20
from typing import List
21
from argparse import ArgumentParser, RawTextHelpFormatter
22
23
24
HELP_TEXT = """
25
        This script makes an E-Mail alert scan.
26
27
        Usage examples: 
28
            $ gvm-script --gmp-username name --gmp-password pass ssh --hostname
29
            ... start-alert-scan.gmp.py +h
30
            ... start-alert-scan.gmp.py ++target-name ++hosts ++ports \
31
                    ++port-list-name +C ++recipient ++sender
32
            ... start-alert-scan.gmp.py ++target-name ++hosts ++port-list-id \
33
                    +C ++recipient ++sender
34
            ... start-alert-scan.gmp.py ++target-id +C ++recipient ++sender
35
    """
36
37
38
def get_config(gmp, config, debug=False):
39
    # get all configs of the openvas instance
40
    # filter for all rows!
41
    res = gmp.get_configs(filter="rows=-1")
42
43
    if config < 0 or config > 4:
44
        raise ValueError("Wrong config identifier. Choose between [0,4].")
45
    # match the config abbreviation to accepted config names
46
    config_list = [
47
        'Full and fast',
48
        'Full and fast ultimate',
49
        'Full and very deep',
50
        'Full and very deep ultimate',
51
        'System Discovery',
52
    ]
53
    template_abbreviation_mapper = {
54
        0: config_list[0],
55
        1: config_list[1],
56
        2: config_list[2],
57
        3: config_list[3],
58
        4: config_list[4],
59
    }
60
61
    for conf in res.xpath('config'):
62
        cid = conf.xpath('@id')[0]
63
        name = conf.xpath('name/text()')[0]
64
65
        # get the config id of the desired template
66
        if template_abbreviation_mapper.get(config) == name:
67
            config_id = cid
68
            if debug:
69
                print(name + ": " + config_id)
70
            break
71
72
    return config_id
0 ignored issues
show
introduced by
The variable config_id does not seem to be defined for all execution paths.
Loading history...
73
74
75
def get_target(
76
    gmp,
77
    target_name: str = None,
78
    hosts: List[str] = None,
79
    ports: str = None,
80
    port_list_name: str = None,
81
    port_list_id: str = None,
82
    debug: bool = False,
83
):
84
    targets = gmp.get_targets()
85
    counter = 0
86
    exists = True
87
    # iterate over existing targets and find a vacant targetName
88
    while exists:
89
        exists = False
90
        if target_name is None:
91
            target_name = "target_{}".format(str(counter))
92
        for target in targets.xpath('target'):
93
            name = target.xpath('name/text()')[0]
94
            if name == target_name:
95
                exists = True
96
                break
97
        counter += 1
98
99
    if debug:
100
        print("target name: {}".format(target_name))
101
102
    if not port_list_id:
103
        # iterate over existing port lists and find a vacant name
104
        existing_port_lists = [""]
105
        for name in gmp.get_port_lists().findall("port_list/name"):
106
            existing_port_lists.append(str(name.text))
107
108
        if port_list_name is None:
109
            port_list_name = "portlist_{}".format(str(counter))
110
        counter = 0
111
        while True:
112
            if port_list_name not in existing_port_lists:
113
                break
114
            counter += 1
115
116
        # create port list
117
        portlist = gmp.create_port_list(port_list_name, ports)
118
        port_list_id = portlist.xpath('@id')[0]
119
        if debug:
120
            print("New Portlist-name:\t{}".format(str(port_list_name)))
121
            print("New Portlist-id:\t{}".format(str(port_list_id)))
122
123
    # integrate port list id into create_target
124
    res = gmp.create_target(target_name, hosts=hosts, port_list_id=port_list_id)
125
    return res.xpath('@id')[0]
126
127
128
def get_alert(
129
    gmp,
130
    sender_email: str,
131
    recipient_email: str,
132
    alert_name: str = None,
133
    debug=False,
134
):
135
136
    # create alert if necessary
137
    alert_object = gmp.get_alerts(filter='name={}'.format(alert_name))
138
    alert = alert_object.xpath('alert')
139
140
    if len(alert) == 0:
141
        print("creating new alert {}".format(alert_name))
142
        gmp.create_alert(
143
            alert_name,
144
            event=gmp.types.AlertEvent.TASK_RUN_STATUS_CHANGED,
145
            event_data={"status": "Done"},
146
            condition=gmp.types.AlertCondition.ALWAYS,
147
            method=gmp.types.AlertMethod.EMAIL,
148
            method_data={
149
                """Task '$n': $e
150
151
After the event $e,
152
the following condition was met: $c
153
154
This email escalation is configured to attach report format '$r'.
155
Full details and other report formats are available on the scan engine.
156
157
$t
158
159
Note:
160
This email was sent to you as a configured security scan escalation.
161
Please contact your local system administrator if you think you
162
should not have received it.
163
""": "message",
164
                "2": "notice",
165
                sender_email: "from_address",
166
                "[OpenVAS-Manager] Task": "subject",
167
                "c402cc3e-b531-11e1-9163-406186ea4fc5": "notice_attach_format",
168
                recipient_email: "to_address",
169
            },
170
        )
171
172
        alert_object = gmp.get_alerts(filter='name={}'.format(recipient_email))
173
        alert = alert_object.xpath('alert')
174
175
    alert_id = alert[0].get('id', 'no id found')
176
    if debug:
177
        print("alert_id: {}".format(str(alert_id)))
178
179
    return alert_id
180
181
182
def get_scanner(gmp):
183
    res = gmp.get_scanners()
184
    scanner_ids = res.xpath('scanner/@id')
185
    return scanner_ids[1]  # default scanner
186
187
188
def create_and_start_task(
189
    gmp,
190
    config_id: str,
191
    target_id: str,
192
    scanner_id: str,
193
    alert_id: str,
194
    alert_name: str,
195
    debug: bool = False,
196
):
197
    # Create the task
198
    tasks = gmp.get_tasks(filter="name~{}".format(alert_name))
199
    task_name = "Alert Scan for Alert {}".format(alert_name)
200
    if tasks:
201
        task_name = "Alert Scan for Alert {} ({})".format(
202
            alert_name, len(tasks.xpath('tasks/@id'))
203
        )
204
    task_comment = "Alert Scan"
205
    res = gmp.create_task(
206
        task_name,
207
        config_id,
208
        target_id,
209
        scanner_id,
210
        alert_ids=[alert_id],
211
        comment=task_comment,
212
    )
213
214
    # Start the task
215
    task_id = res.xpath('@id')[0]
216
    gmp.start_task(task_id)
217
218
    print('Task started: ' + task_name)
219
220
    if debug:
221
        # Stop the task (for performance reasons)
222
        gmp.stop_task(task_id)
223
        print('Task stopped')
224
225
226
def main(gmp, args):
227
    # pylint: disable=undefined-variable
228
229
    parser = ArgumentParser(
230
        prefix_chars="+",
231
        add_help=False,
232
        formatter_class=RawTextHelpFormatter,
233
        description=HELP_TEXT,
234
    )
235
236
    parser.add_argument(
237
        "+h",
238
        "++help",
239
        action="help",
240
        help="Show this help message and exit.",
241
    )
242
243
    target = parser.add_mutually_exclusive_group(required=True)
244
245
    target.add_argument(
246
        "++target-id",
247
        type=str,
248
        dest="target_id",
249
        help="Use an existing by target id",
250
    )
251
252
    target.add_argument(
253
        "++target-name",
254
        type=str,
255
        dest="target_name",
256
        help="Create a target by name",
257
    )
258
259
    parser.add_argument(
260
        "++hosts",
261
        nargs='+',
262
        dest='hosts',
263
        help="Host(s) for the new target",
264
    )
265
266
    ports = parser.add_mutually_exclusive_group()
267
268
    ports.add_argument(
269
        "++port-list-id",
270
        type=str,
271
        dest="port_list_id",
272
        help="An existing portlist id for the new target",
273
    )
274
    ports.add_argument(
275
        "++ports",
276
        type=str,
277
        dest='ports',
278
        help="Ports in the new target: e.g. T:80-80,8080",
279
    )
280
281
    parser.add_argument(
282
        "++port-list-name",
283
        type=str,
284
        dest="port_list_name",
285
        help="Name for the new portlist in the new target",
286
    )
287
288
    parser.add_argument(
289
        "+C",
290
        "++scan_config",
291
        default=0,
292
        type=int,
293
        dest='config',
294
        help="Choose from existing scan config:"
295
        " 0: Full and fast"
296
        " 1: Full and fast ultimate"
297
        " 2: Full and very deep"
298
        " 3: Full and very deep ultimate"
299
        " 4: System Discovery",
300
    )
301
302
    parser.add_argument(
303
        "++recipient",
304
        required=True,
305
        dest='recipient_email',
306
        type=str,
307
        help="Alert recipient E-Mail address",
308
    )
309
310
    parser.add_argument(
311
        "++sender",
312
        required=True,
313
        dest='sender_email',
314
        type=str,
315
        help="Alert senders E-Mail address",
316
    )
317
318
    parser.add_argument(
319
        "++alert-name",
320
        dest='alert_name',
321
        type=str,
322
        help="Optional Alert name",
323
    )
324
325
    script_args, _ = parser.parse_known_args()
326
327
    if script_args.alert_name is None:
328
        script_args.alert_name = script_args.recipient_email
329
330
    config_id = get_config(gmp, script_args.config)
331
    if not script_args.target_id:
332
        target_id = get_target(
333
            gmp,
334
            target_name=script_args.target_id,
335
            hosts=script_args.hosts,
336
            ports=script_args.ports,
337
            port_list_name=script_args.port_list_name,
338
            port_list_id=script_args.port_list_id,
339
        )
340
    else:
341
        target_id = script_args.target_id
342
    alert_id = get_alert(
343
        gmp,
344
        script_args.sender_email,
345
        script_args.recipient_email,
346
        script_args.alert_name,
347
    )
348
    scanner_id = get_scanner(gmp)
349
350
    create_and_start_task(
351
        gmp, config_id, target_id, scanner_id, alert_id, script_args.alert_name
352
    )
353
354
    print("\nScript finished\n")
355
356
357
if __name__ == '__gmp__':
358
    main(gmp, args)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable gmp does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
359