Completed
Push — master ( 494987...e33655 )
by Juan José
23s queued 10s
created

ospd.command.command   F

Complexity

Total Complexity 70

Size/Duplication

Total Lines 497
Duplicated Lines 8.25 %

Importance

Changes 0
Metric Value
eloc 303
dl 41
loc 497
rs 2.8
c 0
b 0
f 0
wmc 70

19 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseCommand.handle_xml() 0 2 1
A BaseCommand.__init_subclass__() 0 7 2
A BaseCommand.get_description() 0 2 1
A GetVts.handle_xml() 0 31 4
A BaseCommand.as_dict() 0 6 1
C GetPerformance.handle_xml() 0 49 9
B GetVersion.handle_xml() 41 41 5
A BaseCommand.__init__() 0 2 1
A BaseCommand.get_name() 0 2 1
A BaseCommand.get_elements() 0 2 1
C GetScans.handle_xml() 0 39 10
A DeleteScan.handle_xml() 0 21 4
A StopScan.handle_xml() 0 13 3
F StartScan.handle_xml() 0 81 18
A BaseCommand.get_attributes() 0 2 1
A StartScan.get_elements() 0 17 2
A BaseCommand.__repr__() 0 3 1
A HelpCommand.handle_xml() 0 15 4
A GetScannerDetails.handle_xml() 0 9 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like ospd.command.command often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
import re
20
import subprocess
21
22
from typing import Optional, Dict, Any
23
24
from xml.etree.ElementTree import Element, SubElement
25
26
from ospd.errors import OspdCommandError
27
from ospd.misc import valid_uuid, create_process
28
from ospd.network import target_str_to_list
29
from ospd.xml import simple_response_str, get_elements_from_dict
30
31
from .initsubclass import InitSubclassMeta
32
33
COMMANDS = []
34
35
36
class BaseCommand(metaclass=InitSubclassMeta):
37
38
    name = None
39
    description = None
40
    attributes = None
41
    elements = None
42
43
    def __init_subclass__(cls, **kwargs):
44
        super_cls = super()
45
46
        if hasattr(super_cls, '__init_subclass__'):
47
            super_cls.__init_subclass__(**kwargs)
48
49
        COMMANDS.append(cls)
50
51
    def __init__(self, daemon):
52
        self._daemon = daemon
53
54
    def get_name(self) -> str:
55
        return self.name
56
57
    def get_description(self) -> str:
58
        return self.description
59
60
    def get_attributes(self) -> Optional[Dict[str, Any]]:
61
        return self.attributes
62
63
    def get_elements(self) -> Optional[Dict[str, Any]]:
64
        return self.elements
65
66
    def handle_xml(self, xml: Element) -> str:
67
        raise NotImplementedError()
68
69
    def as_dict(self):
70
        return {
71
            'name': self.name,
72
            'attributes': self.attributes,
73
            'description': self.description,
74
            'element': self.elements,
75
        }
76
77
    def __repr__(self):
78
        return '<{} description="{}" attributes={} elements={}>'.format(
79
            self.name, self.description, self.attributes, self.elements
80
        )
81
82
83
class HelpCommand(BaseCommand):
84
    name = "help"
85
    description = 'Print the commands help.'
86
    attributes = {'format': 'Help format. Could be text or xml.'}
87
88
    def handle_xml(self, xml: Element) -> str:
89
        help_format = xml.get('format')
90
91
        if help_format is None or help_format == "text":
92
            # Default help format is text.
93
            return simple_response_str(
94
                'help', 200, 'OK', self._daemon.get_help_text()
95
            )
96
        elif help_format == "xml":
97
            text = get_elements_from_dict(
98
                {k: v.as_dict() for k, v in self._daemon.commands.items()}
99
            )
100
            return simple_response_str('help', 200, 'OK', text)
101
102
        raise OspdCommandError('Bogus help format', 'help')
103
104
105
class GetVersion(BaseCommand):
106
    name = "get_version"
107
    description = 'Return various version information'
108
109 View Code Duplication
    def handle_xml(self, xml: Element) -> str:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
110
        """ Handles <get_version> command.
111
112
        Return:
113
            Response string for <get_version> command.
114
        """
115
        protocol = Element('protocol')
116
117
        for name, value in [
118
            ('name', 'OSP'),
119
            ('version', self._daemon.get_protocol_version()),
120
        ]:
121
            elem = SubElement(protocol, name)
122
            elem.text = value
123
124
        daemon = Element('daemon')
125
        for name, value in [
126
            ('name', self._daemon.get_daemon_name()),
127
            ('version', self._daemon.get_daemon_version()),
128
        ]:
129
            elem = SubElement(daemon, name)
130
            elem.text = value
131
132
        scanner = Element('scanner')
133
        for name, value in [
134
            ('name', self._daemon.get_scanner_name()),
135
            ('version', self._daemon.get_scanner_version()),
136
        ]:
137
            elem = SubElement(scanner, name)
138
            elem.text = value
139
140
        content = [protocol, daemon, scanner]
141
142
        vts_version = self._daemon.get_vts_version()
143
        if vts_version:
144
            vts = Element('vts')
145
            elem = SubElement(vts, 'version')
146
            elem.text = vts_version
147
            content.append(vts)
148
149
        return simple_response_str('get_version', 200, 'OK', content)
150
151
152
GVMCG_TITLES = [
153
    'cpu-*',
154
    'proc',
155
    'mem',
156
    'swap',
157
    'load',
158
    'df-*',
159
    'disk-sd[a-z][0-9]-rw',
160
    'disk-sd[a-z][0-9]-load',
161
    'disk-sd[a-z][0-9]-io-load',
162
    'interface-eth*-traffic',
163
    'interface-eth*-err-rate',
164
    'interface-eth*-err',
165
    'sensors-*_temperature-*',
166
    'sensors-*_fanspeed-*',
167
    'sensors-*_voltage-*',
168
    'titles',
169
]  # type: List[str]
170
171
172
class GetPerformance(BaseCommand):
173
    name = "get_performance"
174
    description = 'Return system report'
175
    attributes = {
176
        'start': 'Time of first data point in report.',
177
        'end': 'Time of last data point in report.',
178
        'title': 'Name of report.',
179
    }
180
181
    def handle_xml(self, xml: Element) -> str:
182
        """ Handles <get_performance> command.
183
184
        @return: Response string for <get_performance> command.
185
        """
186
        start = xml.attrib.get('start')
187
        end = xml.attrib.get('end')
188
        titles = xml.attrib.get('titles')
189
190
        cmd = ['gvmcg']
191
        if start:
192
            try:
193
                int(start)
194
            except ValueError:
195
                raise OspdCommandError(
196
                    'Start argument must be integer.', 'get_performance'
197
                )
198
199
            cmd.append(start)
200
201
        if end:
202
            try:
203
                int(end)
204
            except ValueError:
205
                raise OspdCommandError(
206
                    'End argument must be integer.', 'get_performance'
207
                )
208
209
            cmd.append(end)
210
211
        if titles:
212
            combined = "(" + ")|(".join(GVMCG_TITLES) + ")"
213
            forbidden = "^[^|&;]+$"
214
            if re.match(combined, titles) and re.match(forbidden, titles):
215
                cmd.append(titles)
216
            else:
217
                raise OspdCommandError(
218
                    'Arguments not allowed', 'get_performance'
219
                )
220
221
        try:
222
            output = subprocess.check_output(cmd)
223
        except (subprocess.CalledProcessError, OSError) as e:
224
            raise OspdCommandError(
225
                'Bogus get_performance format. %s' % e, 'get_performance'
226
            )
227
228
        return simple_response_str(
229
            'get_performance', 200, 'OK', output.decode()
230
        )
231
232
233
class GetScannerDetails(BaseCommand):
234
    name = 'get_scanner_details'
235
    description = 'Return scanner description and parameters'
236
237
    def handle_xml(self, xml: Element) -> str:
238
        """ Handles <get_scanner_details> command.
239
240
        @return: Response string for <get_scanner_details> command.
241
        """
242
        desc_xml = Element('description')
243
        desc_xml.text = self._daemon.get_scanner_description()
244
        details = [desc_xml, self._daemon.get_scanner_params_xml()]
245
        return simple_response_str('get_scanner_details', 200, 'OK', details)
246
247
248
class DeleteScan(BaseCommand):
249
    name = 'delete_scan'
250
    description = 'Delete a finished scan.'
251
    attributes = {'scan_id': 'ID of scan to delete.'}
252
253
    def handle_xml(self, xml: Element) -> str:
254
        """ Handles <delete_scan> command.
255
256
        @return: Response string for <delete_scan> command.
257
        """
258
        scan_id = xml.get('scan_id')
259
        if scan_id is None:
260
            return simple_response_str(
261
                'delete_scan', 404, 'No scan_id attribute'
262
            )
263
264
        if not self._daemon.scan_exists(scan_id):
265
            text = "Failed to find scan '{0}'".format(scan_id)
266
            return simple_response_str('delete_scan', 404, text)
267
268
        self._daemon.check_scan_process(scan_id)
269
270
        if self._daemon.delete_scan(scan_id):
271
            return simple_response_str('delete_scan', 200, 'OK')
272
273
        raise OspdCommandError('Scan in progress', 'delete_scan')
274
275
276
class GetVts(BaseCommand):
277
    name = 'get_vts'
278
    description = 'List of available vulnerability tests.'
279
    attributes = {
280
        'vt_id': 'ID of a specific vulnerability test to get.',
281
        'filter': 'Optional filter to get an specific vt collection.',
282
    }
283
284
    def handle_xml(self, xml: Element) -> str:
285
        """ Handles <get_vts> command.
286
        The <get_vts> element accept two optional arguments.
287
        vt_id argument receives a single vt id.
288
        filter argument receives a filter selecting a sub set of vts.
289
        If both arguments are given, the vts which match with the filter
290
        are return.
291
292
        @return: Response string for <get_vts> command.
293
        """
294
295
        vt_id = xml.get('vt_id')
296
        vt_filter = xml.get('filter')
297
298
        if vt_id and vt_id not in self._daemon.vts:
299
            text = "Failed to find vulnerability test '{0}'".format(vt_id)
300
            return simple_response_str('get_vts', 404, text)
301
302
        filtered_vts = None
303
        if vt_filter:
304
            filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
305
                self._daemon.vts, vt_filter
306
            )
307
308
        responses = []
309
310
        vts_xml = self._daemon.get_vts_xml(vt_id, filtered_vts)
311
312
        responses.append(vts_xml)
313
314
        return simple_response_str('get_vts', 200, 'OK', responses)
315
316
317
class StopScan(BaseCommand):
318
    name = 'stop_scan'
319
    description = 'Stop a currently running scan.'
320
    attributes = {'scan_id': 'ID of scan to stop.'}
321
322
    def handle_xml(self, xml: Element) -> str:
323
        """ Handles <stop_scan> command.
324
325
        @return: Response string for <stop_scan> command.
326
        """
327
328
        scan_id = xml.get('scan_id')
329
        if scan_id is None or scan_id == '':
330
            raise OspdCommandError('No scan_id attribute', 'stop_scan')
331
332
        self._daemon.stop_scan(scan_id)
333
334
        return simple_response_str('stop_scan', 200, 'OK')
335
336
337
class GetScans(BaseCommand):
338
    name = 'get_scans'
339
    description = 'List the scans in buffer.'
340
    attributes = {
341
        'scan_id': 'ID of a specific scan to get.',
342
        'details': 'Whether to return the full scan report.',
343
        'pop_results': 'Whether to remove the fetched results.',
344
        'max_results': 'Maximum number of results to fetch.',
345
    }
346
347
    def handle_xml(self, xml: Element) -> str:
348
        """ Handles <get_scans> command.
349
350
        @return: Response string for <get_scans> command.
351
        """
352
353
        scan_id = xml.get('scan_id')
354
        details = xml.get('details')
355
        pop_res = xml.get('pop_results')
356
        max_res = xml.get('max_results')
357
358
        if details and details == '0':
359
            details = False
360
        else:
361
            details = True
362
            if pop_res and pop_res == '1':
363
                pop_res = True
364
            else:
365
                pop_res = False
366
            if max_res:
367
                max_res = int(max_res)
368
369
        responses = []
370
        if scan_id and scan_id in self._daemon.scan_collection.ids_iterator():
371
            self._daemon.check_scan_process(scan_id)
372
            scan = self._daemon.get_scan_xml(scan_id, details, pop_res, max_res)
373
            responses.append(scan)
374
        elif scan_id:
375
            text = "Failed to find scan '{0}'".format(scan_id)
376
            return simple_response_str('get_scans', 404, text)
377
        else:
378
            for scan_id in self._daemon.scan_collection.ids_iterator():
379
                self._daemon.check_scan_process(scan_id)
380
                scan = self._daemon.get_scan_xml(
381
                    scan_id, details, pop_res, max_res
382
                )
383
                responses.append(scan)
384
385
        return simple_response_str('get_scans', 200, 'OK', responses)
386
387
388
class StartScan(BaseCommand):
389
    name = 'start_scan'
390
    description = 'Start a new scan.'
391
    attributes = {
392
        'target': 'Target host to scan',
393
        'ports': 'Ports list to scan',
394
        'scan_id': 'Optional UUID value to use as scan ID',
395
        'parallel': 'Optional nummer of parallel target to scan',
396
    }
397
398
    def get_elements(self):
399
        elements = {}
400
401
        if self.elements:
402
            elements.update(self.elements)
403
404
        scanner_params = elements.get('scanner_params', {}).copy()
405
        elements['scanner_params'] = scanner_params
406
407
        scanner_params.update(
408
            {
409
                k: v['description']
410
                for k, v in self._daemon.scanner_params.items()
411
            }
412
        )
413
414
        return elements
415
416
    def handle_xml(self, xml: Element) -> str:
417
        """ Handles <start_scan> command.
418
419
        @return: Response string for <start_scan> command.
420
        """
421
422
        target_str = xml.get('target')
423
        ports_str = xml.get('ports')
424
425
        # For backward compatibility, if target and ports attributes are set,
426
        # <targets> element is ignored.
427
        if target_str is None or ports_str is None:
428
            target_list = xml.find('targets')
429
            if target_list is None or len(target_list) == 0:
430
                raise OspdCommandError('No targets or ports', 'start_scan')
431
            else:
432
                scan_targets = self._daemon.process_targets_element(target_list)
433
        else:
434
            scan_targets = []
435
            for single_target in target_str_to_list(target_str):
436
                scan_targets.append([single_target, ports_str, '', '', '', ''])
437
438
        scan_id = xml.get('scan_id')
439
        if scan_id is not None and scan_id != '' and not valid_uuid(scan_id):
440
            raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
441
442
        try:
443
            parallel = int(xml.get('parallel', '1'))
444
            if parallel < 1 or parallel > 20:
445
                parallel = 1
446
        except ValueError:
447
            raise OspdCommandError(
448
                'Invalid value for parallel scans. It must be a number',
449
                'start_scan',
450
            )
451
452
        scanner_params = xml.find('scanner_params')
453
        if scanner_params is None:
454
            raise OspdCommandError('No scanner_params element', 'start_scan')
455
456
        params = self._daemon.preprocess_scan_params(scanner_params)
457
458
        # VTS is an optional element. If present should not be empty.
459
        vt_selection = {}  # type: Dict
460
        scanner_vts = xml.find('vt_selection')
461
        if scanner_vts is not None:
462
            if len(scanner_vts) == 0:
463
                raise OspdCommandError('VTs list is empty', 'start_scan')
464
            else:
465
                vt_selection = self._daemon.process_vts_params(scanner_vts)
466
467
        # Dry run case.
468
        if 'dry_run' in params and int(params['dry_run']):
469
            scan_func = self._daemon.dry_run_scan
470
            scan_params = None
471
        else:
472
            scan_func = self._daemon.start_scan
473
            scan_params = self._daemon.process_scan_params(params)
474
475
        scan_id_aux = scan_id
476
        scan_id = self._daemon.create_scan(
477
            scan_id, scan_targets, scan_params, vt_selection
478
        )
479
480
        if not scan_id:
481
            id_ = Element('id')
482
            id_.text = scan_id_aux
483
            return simple_response_str('start_scan', 100, 'Continue', id_)
484
485
        scan_process = create_process(
486
            func=scan_func, args=(scan_id, scan_targets, parallel)
487
        )
488
489
        self._daemon.scan_processes[scan_id] = scan_process
490
491
        scan_process.start()
492
493
        id_ = Element('id')
494
        id_.text = scan_id
495
496
        return simple_response_str('start_scan', 200, 'OK', id_)
497