Completed
Push — master ( 1ed431...35dbab )
by
unknown
17s queued 13s
created

ospd.command.command   F

Complexity

Total Complexity 81

Size/Duplication

Total Lines 652
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 407
dl 0
loc 652
rs 2
c 0
b 0
f 0
wmc 81

23 Methods

Rating   Name   Duplication   Size   Complexity  
A BaseCommand.handle_xml() 0 2 1
A BaseCommand.__init_subclass__() 0 7 2
A BaseCommand.get_description() 0 2 1
A BaseCommand.as_dict() 0 6 1
A BaseCommand.__init__() 0 2 1
A BaseCommand.get_name() 0 2 1
A BaseCommand.get_elements() 0 2 1
A BaseCommand.get_attributes() 0 2 1
A BaseCommand.__repr__() 0 3 1
C GetPerformance.handle_xml() 0 49 9
B GetVersion.handle_xml() 0 41 5
A DeleteScan.handle_xml() 0 21 4
A HelpCommand.handle_xml() 0 15 4
A GetScannerDetails.handle_xml() 0 13 1
C GetVts.handle_xml() 0 61 10
A StopScan.handle_xml() 0 19 4
B GetScans.handle_xml() 0 38 7
A StartScan.get_elements() 0 17 2
A GetMemoryUsage.handle_xml() 0 21 2
A GetMemoryUsage._create_process_element() 0 7 1
F StartScan.handle_xml() 0 96 16
A GetMemoryUsage._add_memory_info() 0 25 2
A GetMemoryUsage._get_memory() 0 14 4

How to fix   Complexity   

Complexity

Complex classes like ospd.command.command often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import multiprocessing
19
import re
20
import logging
21
import subprocess
22
23
from decimal import Decimal
24
from typing import Optional, Dict, Any, Union, Iterator
25
26
from xml.etree.ElementTree import Element, SubElement
27
28
import psutil
29
30
from ospd.errors import OspdCommandError
31
from ospd.misc import valid_uuid, create_process
32
from ospd.protocol import OspRequest, OspResponse
33
from ospd.xml import (
34
    simple_response_str,
35
    get_elements_from_dict,
36
    XmlStringHelper,
37
)
38
39
from .initsubclass import InitSubclassMeta
40
from .registry import register_command
41
42
logger = logging.getLogger(__name__)
43
44
45
class BaseCommand(metaclass=InitSubclassMeta):
46
47
    name = None
48
    description = None
49
    attributes = None
50
    elements = None
51
    must_be_initialized = None
52
53
    def __init_subclass__(cls, **kwargs):
54
        super_cls = super()
55
56
        if hasattr(super_cls, '__init_subclass__'):
57
            super_cls.__init_subclass__(**kwargs)
58
59
        register_command(cls)
60
61
    def __init__(self, daemon):
62
        self._daemon = daemon
63
64
    def get_name(self) -> str:
65
        return self.name
66
67
    def get_description(self) -> str:
68
        return self.description
69
70
    def get_attributes(self) -> Optional[Dict[str, Any]]:
71
        return self.attributes
72
73
    def get_elements(self) -> Optional[Dict[str, Any]]:
74
        return self.elements
75
76
    def handle_xml(self, xml: Element) -> Union[bytes, Iterator[bytes]]:
77
        raise NotImplementedError()
78
79
    def as_dict(self):
80
        return {
81
            'name': self.get_name(),
82
            'attributes': self.get_attributes(),
83
            'description': self.get_description(),
84
            'elements': self.get_elements(),
85
        }
86
87
    def __repr__(self):
88
        return '<{} description="{}" attributes={} elements={}>'.format(
89
            self.name, self.description, self.attributes, self.elements
90
        )
91
92
93
class HelpCommand(BaseCommand):
94
    name = "help"
95
    description = 'Print the commands help.'
96
    attributes = {'format': 'Help format. Could be text or xml.'}
97
    must_be_initialized = False
98
99
    def handle_xml(self, xml: Element) -> bytes:
100
        help_format = xml.get('format')
101
102
        if help_format is None or help_format == "text":
103
            # Default help format is text.
104
            return simple_response_str(
105
                'help', 200, 'OK', self._daemon.get_help_text()
106
            )
107
        elif help_format == "xml":
108
            text = get_elements_from_dict(
109
                {k: v.as_dict() for k, v in self._daemon.commands.items()}
110
            )
111
            return simple_response_str('help', 200, 'OK', text)
112
113
        raise OspdCommandError('Bogus help format', 'help')
114
115
116
class GetVersion(BaseCommand):
117
    name = "get_version"
118
    description = 'Return various version information'
119
    must_be_initialized = False
120
121
    def handle_xml(self, xml: Element) -> bytes:
122
        """ Handles <get_version> command.
123
124
        Return:
125
            Response string for <get_version> command.
126
        """
127
        protocol = Element('protocol')
128
129
        for name, value in [
130
            ('name', 'OSP'),
131
            ('version', self._daemon.get_protocol_version()),
132
        ]:
133
            elem = SubElement(protocol, name)
134
            elem.text = value
135
136
        daemon = Element('daemon')
137
        for name, value in [
138
            ('name', self._daemon.get_daemon_name()),
139
            ('version', self._daemon.get_daemon_version()),
140
        ]:
141
            elem = SubElement(daemon, name)
142
            elem.text = value
143
144
        scanner = Element('scanner')
145
        for name, value in [
146
            ('name', self._daemon.get_scanner_name()),
147
            ('version', self._daemon.get_scanner_version()),
148
        ]:
149
            elem = SubElement(scanner, name)
150
            elem.text = value
151
152
        content = [protocol, daemon, scanner]
153
154
        vts_version = self._daemon.get_vts_version()
155
        if vts_version:
156
            vts = Element('vts')
157
            elem = SubElement(vts, 'version')
158
            elem.text = vts_version
159
            content.append(vts)
160
161
        return simple_response_str('get_version', 200, 'OK', content)
162
163
164
GVMCG_TITLES = [
165
    'cpu-*',
166
    'proc',
167
    'mem',
168
    'swap',
169
    'load',
170
    'df-*',
171
    'disk-sd[a-z][0-9]-rw',
172
    'disk-sd[a-z][0-9]-load',
173
    'disk-sd[a-z][0-9]-io-load',
174
    'interface-eth*-traffic',
175
    'interface-eth*-err-rate',
176
    'interface-eth*-err',
177
    'sensors-*_temperature-*',
178
    'sensors-*_fanspeed-*',
179
    'sensors-*_voltage-*',
180
    'titles',
181
]  # type: List[str]
182
183
184
class GetPerformance(BaseCommand):
185
    name = "get_performance"
186
    description = 'Return system report'
187
    attributes = {
188
        'start': 'Time of first data point in report.',
189
        'end': 'Time of last data point in report.',
190
        'title': 'Name of report.',
191
    }
192
    must_be_initialized = False
193
194
    def handle_xml(self, xml: Element) -> bytes:
195
        """ Handles <get_performance> command.
196
197
        @return: Response string for <get_performance> command.
198
        """
199
        start = xml.attrib.get('start')
200
        end = xml.attrib.get('end')
201
        titles = xml.attrib.get('titles')
202
203
        cmd = ['gvmcg']
204
        if start:
205
            try:
206
                int(start)
207
            except ValueError:
208
                raise OspdCommandError(
209
                    'Start argument must be integer.', 'get_performance'
210
                )
211
212
            cmd.append(start)
213
214
        if end:
215
            try:
216
                int(end)
217
            except ValueError:
218
                raise OspdCommandError(
219
                    'End argument must be integer.', 'get_performance'
220
                )
221
222
            cmd.append(end)
223
224
        if titles:
225
            combined = "(" + ")|(".join(GVMCG_TITLES) + ")"
226
            forbidden = "^[^|&;]+$"
227
            if re.match(combined, titles) and re.match(forbidden, titles):
228
                cmd.append(titles)
229
            else:
230
                raise OspdCommandError(
231
                    'Arguments not allowed', 'get_performance'
232
                )
233
234
        try:
235
            output = subprocess.check_output(cmd)
236
        except (subprocess.CalledProcessError, OSError) as e:
237
            raise OspdCommandError(
238
                'Bogus get_performance format. %s' % e, 'get_performance'
239
            )
240
241
        return simple_response_str(
242
            'get_performance', 200, 'OK', output.decode()
243
        )
244
245
246
class GetScannerDetails(BaseCommand):
247
    name = 'get_scanner_details'
248
    description = 'Return scanner description and parameters'
249
    must_be_initialized = True
250
251
    def handle_xml(self, xml: Element) -> bytes:
252
        """ Handles <get_scanner_details> command.
253
254
        @return: Response string for <get_scanner_details> command.
255
        """
256
        desc_xml = Element('description')
257
        desc_xml.text = self._daemon.get_scanner_description()
258
        scanner_params = self._daemon.get_scanner_params()
259
        details = [
260
            desc_xml,
261
            OspResponse.create_scanner_params_xml(scanner_params),
262
        ]
263
        return simple_response_str('get_scanner_details', 200, 'OK', details)
264
265
266
class DeleteScan(BaseCommand):
267
    name = 'delete_scan'
268
    description = 'Delete a finished scan.'
269
    attributes = {'scan_id': 'ID of scan to delete.'}
270
    must_be_initialized = False
271
272
    def handle_xml(self, xml: Element) -> bytes:
273
        """ Handles <delete_scan> command.
274
275
        @return: Response string for <delete_scan> command.
276
        """
277
        scan_id = xml.get('scan_id')
278
        if scan_id is None:
279
            return simple_response_str(
280
                'delete_scan', 404, 'No scan_id attribute'
281
            )
282
283
        if not self._daemon.scan_exists(scan_id):
284
            text = "Failed to find scan '{0}'".format(scan_id)
285
            return simple_response_str('delete_scan', 404, text)
286
287
        self._daemon.check_scan_process(scan_id)
288
289
        if self._daemon.delete_scan(scan_id):
290
            return simple_response_str('delete_scan', 200, 'OK')
291
292
        raise OspdCommandError('Scan in progress', 'delete_scan')
293
294
295
class GetVts(BaseCommand):
296
    name = 'get_vts'
297
    description = 'List of available vulnerability tests.'
298
    attributes = {
299
        'vt_id': 'ID of a specific vulnerability test to get.',
300
        'filter': 'Optional filter to get an specific vt collection.',
301
    }
302
    must_be_initialized = True
303
304
    def handle_xml(self, xml: Element) -> Iterator[bytes]:
305
        """ Handles <get_vts> command.
306
        Writes the vt collection on the stream.
307
        The <get_vts> element accept two optional arguments.
308
        vt_id argument receives a single vt id.
309
        filter argument receives a filter selecting a sub set of vts.
310
        If both arguments are given, the vts which match with the filter
311
        are return.
312
313
        @return: Response string for <get_vts> command on fail.
314
        """
315
        self._daemon.vts.is_cache_available = False
316
317
        xml_helper = XmlStringHelper()
318
319
        vt_id = xml.get('vt_id')
320
        vt_filter = xml.get('filter')
321
        _details = xml.get('details')
322
323
        vt_details = False if _details == '0' else True
324
325
        if self._daemon.vts and vt_id and vt_id not in self._daemon.vts:
326
            self._daemon.vts.is_cache_available = True
327
            text = "Failed to find vulnerability test '{0}'".format(vt_id)
328
            raise OspdCommandError(text, 'get_vts', 404)
329
330
        filtered_vts = None
331
        if vt_filter:
332
            try:
333
                filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
334
                    self._daemon.vts, vt_filter
335
                )
336
            except OspdCommandError as filter_error:
337
                self._daemon.vts.is_cache_available = True
338
                raise OspdCommandError(filter_error)
339
340
        vts_selection = self._daemon.get_vts_selection_list(vt_id, filtered_vts)
341
        # List of xml pieces with the generator to be iterated
342
        yield xml_helper.create_response('get_vts')
343
344
        begin_vts_tag = xml_helper.create_element('vts')
345
        val = len(self._daemon.vts)
346
        begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "total", val)
347
        if filtered_vts:
348
            val = len(filtered_vts)
349
            begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "sent", val)
350
351
        if self._daemon.vts.sha256_hash is not None:
352
            begin_vts_tag = xml_helper.add_attr(
353
                begin_vts_tag, "sha256_hash", self._daemon.vts.sha256_hash
354
            )
355
356
        yield begin_vts_tag
357
358
        for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
359
            yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
360
361
        yield xml_helper.create_element('vts', end=True)
362
        yield xml_helper.create_response('get_vts', end=True)
363
364
        self._daemon.vts.is_cache_available = True
365
366
367
class StopScan(BaseCommand):
368
    name = 'stop_scan'
369
    description = 'Stop a currently running scan.'
370
    attributes = {'scan_id': 'ID of scan to stop.'}
371
    must_be_initialized = True
372
373
    def handle_xml(self, xml: Element) -> bytes:
374
        """ Handles <stop_scan> command.
375
376
        @return: Response string for <stop_scan> command.
377
        """
378
379
        scan_id = xml.get('scan_id')
380
        if scan_id is None or scan_id == '':
381
            raise OspdCommandError('No scan_id attribute', 'stop_scan')
382
383
        self._daemon.stop_scan(scan_id)
384
385
        # Don't send response until the scan is stopped.
386
        try:
387
            self._daemon.scan_processes[scan_id].join()
388
        except KeyError:
389
            pass
390
391
        return simple_response_str('stop_scan', 200, 'OK')
392
393
394
class GetScans(BaseCommand):
395
    name = 'get_scans'
396
    description = 'List the scans in buffer.'
397
    attributes = {
398
        'scan_id': 'ID of a specific scan to get.',
399
        'details': 'Whether to return the full scan report.',
400
        'pop_results': 'Whether to remove the fetched results.',
401
        'max_results': 'Maximum number of results to fetch.',
402
    }
403
    must_be_initialized = False
404
405
    def handle_xml(self, xml: Element) -> bytes:
406
        """ Handles <get_scans> command.
407
408
        @return: Response string for <get_scans> command.
409
        """
410
411
        scan_id = xml.get('scan_id')
412
        if scan_id is None or scan_id == '':
413
            raise OspdCommandError('No scan_id attribute', 'get_scans')
414
415
        details = xml.get('details')
416
        pop_res = xml.get('pop_results')
417
        max_res = xml.get('max_results')
418
        progress = xml.get('progress')
419
420
        if details and details == '0':
421
            details = False
422
        else:
423
            details = True
424
            pop_res = pop_res and pop_res == '1'
425
426
            if max_res:
427
                max_res = int(max_res)
428
429
        progress = progress and progress == '1'
430
431
        responses = []
432
        if scan_id in self._daemon.scan_collection.ids_iterator():
433
            self._daemon.check_scan_process(scan_id)
434
            scan = self._daemon.get_scan_xml(
435
                scan_id, details, pop_res, max_res, progress
436
            )
437
            responses.append(scan)
438
        else:
439
            text = "Failed to find scan '{0}'".format(scan_id)
440
            return simple_response_str('get_scans', 404, text)
441
442
        return simple_response_str('get_scans', 200, 'OK', responses)
443
444
445
class StartScan(BaseCommand):
446
    name = 'start_scan'
447
    description = 'Start a new scan.'
448
    attributes = {
449
        'target': 'Target host to scan',
450
        'ports': 'Ports list to scan',
451
        'scan_id': 'Optional UUID value to use as scan ID',
452
        'parallel': 'Optional nummer of parallel target to scan',
453
    }
454
    must_be_initialized = True
455
456
    def get_elements(self):
457
        elements = {}
458
459
        if self.elements:
460
            elements.update(self.elements)
461
462
        scanner_params = elements.get('scanner_params', {}).copy()
463
        elements['scanner_params'] = scanner_params
464
465
        scanner_params.update(
466
            {
467
                k: v['description']
468
                for k, v in self._daemon.scanner_params.items()
469
            }
470
        )
471
472
        return elements
473
474
    def handle_xml(self, xml: Element) -> bytes:
475
        """ Handles <start_scan> command.
476
477
        Return:
478
            Response string for <start_scan> command.
479
        """
480
481
        if (
482
            self._daemon.max_queued_scans
483
            and self._daemon.get_count_queued_scans()
484
            >= self._daemon.max_queued_scans
485
        ):
486
            raise OspdCommandError(
487
                'Maximum number of queued scans reached.', 'start_scan'
488
            )
489
490
        target_str = xml.get('target')
491
        ports_str = xml.get('ports')
492
493
        # For backward compatibility, if target and ports attributes are set,
494
        # <targets> element is ignored.
495
        if target_str is None or ports_str is None:
496
            target_element = xml.find('targets/target')
497
            if target_element is None:
498
                raise OspdCommandError('No targets or ports', 'start_scan')
499
            else:
500
                scan_target = OspRequest.process_target_element(target_element)
501
        else:
502
            scan_target = {
503
                'hosts': target_str,
504
                'ports': ports_str,
505
                'credentials': {},
506
                'exclude_hosts': '',
507
                'finished_hosts': '',
508
                'options': {},
509
            }
510
            logger.warning(
511
                "Legacy start scan command format is being used, which "
512
                "is deprecated since 20.08. Please read the documentation "
513
                "for start scan command."
514
            )
515
516
        scan_id = xml.get('scan_id')
517
        if scan_id is not None and scan_id != '' and not valid_uuid(scan_id):
518
            raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
519
520
        if xml.get('parallel'):
521
            logger.warning(
522
                "parallel attribute of start_scan will be ignored, sice "
523
                "parallel scan is not supported by OSPd."
524
            )
525
526
        scanner_params = xml.find('scanner_params')
527
        if scanner_params is None:
528
            raise OspdCommandError('No scanner_params element', 'start_scan')
529
530
        params = self._daemon.preprocess_scan_params(scanner_params)
531
532
        # VTS is an optional element. If present should not be empty.
533
        vt_selection = {}  # type: Dict
534
        scanner_vts = xml.find('vt_selection')
535
        if scanner_vts is not None:
536
            if len(scanner_vts) == 0:
537
                raise OspdCommandError('VTs list is empty', 'start_scan')
538
            else:
539
                vt_selection = OspRequest.process_vts_params(scanner_vts)
540
541
        # Dry run case.
542
        dry_run = 'dry_run' in params and int(params['dry_run'])
543
        if dry_run:
544
            scan_params = None
545
        else:
546
            scan_params = self._daemon.process_scan_params(params)
547
548
        scan_id_aux = scan_id
549
        scan_id = self._daemon.create_scan(
550
            scan_id, scan_target, scan_params, vt_selection
551
        )
552
553
        if not scan_id:
554
            id_ = Element('id')
555
            id_.text = scan_id_aux
556
            return simple_response_str('start_scan', 100, 'Continue', id_)
557
558
        if dry_run:
559
            scan_func = self._daemon.dry_run_scan
560
            scan_process = create_process(
561
                func=scan_func, args=(scan_id, scan_target)
562
            )
563
            self._daemon.scan_processes[scan_id] = scan_process
564
            scan_process.start()
565
566
        id_ = Element('id')
567
        id_.text = scan_id
568
569
        return simple_response_str('start_scan', 200, 'OK', id_)
570
571
572
class GetMemoryUsage(BaseCommand):
573
574
    name = "get_memory_usage"
575
    description = "print the memory consumption of all processes"
576
    attributes = {
577
        'unit': 'Unit for displaying memory consumption (b = bytes, '
578
        'kb = kilobytes, mb = megabytes). Defaults to b.'
579
    }
580
    must_be_initialized = False
581
582
    @staticmethod
583
    def _get_memory(value: int, unit: str = None) -> str:
584
        if not unit:
585
            return str(value)
586
587
        unit = unit.lower()
588
589
        if unit == 'kb':
590
            return str(Decimal(value) / 1024)
591
592
        if unit == 'mb':
593
            return str(Decimal(value) / (1024 * 1024))
594
595
        return str(value)
596
597
    @staticmethod
598
    def _create_process_element(name: str, pid: int):
599
        process_element = Element('process')
600
        process_element.set('name', name)
601
        process_element.set('pid', str(pid))
602
603
        return process_element
604
605
    @classmethod
606
    def _add_memory_info(
607
        cls, process_element: Element, pid: int, unit: str = None
608
    ):
609
        try:
610
            ps_process = psutil.Process(pid)
611
        except psutil.NoSuchProcess:
612
            return
613
614
        memory = ps_process.memory_info()
615
616
        rss_element = Element('rss')
617
        rss_element.text = cls._get_memory(memory.rss, unit)
618
619
        process_element.append(rss_element)
620
621
        vms_element = Element('vms')
622
        vms_element.text = cls._get_memory(memory.vms, unit)
623
624
        process_element.append(vms_element)
625
626
        shared_element = Element('shared')
627
        shared_element.text = cls._get_memory(memory.shared, unit)
628
629
        process_element.append(shared_element)
630
631
    def handle_xml(self, xml: Element) -> bytes:
632
        processes_element = Element('processes')
633
        unit = xml.get('unit')
634
635
        current_process = multiprocessing.current_process()
636
        process_element = self._create_process_element(
637
            current_process.name, current_process.pid
638
        )
639
640
        self._add_memory_info(process_element, current_process.pid, unit)
641
642
        processes_element.append(process_element)
643
644
        for proc in multiprocessing.active_children():
645
            process_element = self._create_process_element(proc.name, proc.pid)
646
647
            self._add_memory_info(process_element, proc.pid, unit)
648
649
            processes_element.append(process_element)
650
651
        return simple_response_str('get_memory', 200, 'OK', processes_element)
652