Passed
Pull Request — master (#253)
by Juan José
01:24
created

ospd.command.command.GetVts.handle_xml()   C

Complexity

Conditions 9

Size

Total Lines 51
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 29
nop 2
dl 0
loc 51
rs 6.6666
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import multiprocessing
19
import re
20
import logging
21
import subprocess
22
23
from decimal import Decimal
24
from typing import Optional, Dict, Any, Union, Iterator
25
26
from xml.etree.ElementTree import Element, SubElement
27
28
import psutil
29
30
from ospd.errors import OspdCommandError
31
from ospd.misc import valid_uuid, create_process
32
from ospd.protocol import OspRequest, OspResponse
33
from ospd.xml import (
34
    simple_response_str,
35
    get_elements_from_dict,
36
    XmlStringHelper,
37
)
38
39
from .initsubclass import InitSubclassMeta
40
from .registry import register_command
41
42
logger = logging.getLogger(__name__)
43
44
45
class BaseCommand(metaclass=InitSubclassMeta):
46
47
    name = None
48
    description = None
49
    attributes = None
50
    elements = None
51
    must_be_initialized = None
52
53
    def __init_subclass__(cls, **kwargs):
54
        super_cls = super()
55
56
        if hasattr(super_cls, '__init_subclass__'):
57
            super_cls.__init_subclass__(**kwargs)
58
59
        register_command(cls)
60
61
    def __init__(self, daemon):
62
        self._daemon = daemon
63
64
    def get_name(self) -> str:
65
        return self.name
66
67
    def get_description(self) -> str:
68
        return self.description
69
70
    def get_attributes(self) -> Optional[Dict[str, Any]]:
71
        return self.attributes
72
73
    def get_elements(self) -> Optional[Dict[str, Any]]:
74
        return self.elements
75
76
    def handle_xml(self, xml: Element) -> Union[bytes, Iterator[bytes]]:
77
        raise NotImplementedError()
78
79
    def as_dict(self):
80
        return {
81
            'name': self.get_name(),
82
            'attributes': self.get_attributes(),
83
            'description': self.get_description(),
84
            'elements': self.get_elements(),
85
        }
86
87
    def __repr__(self):
88
        return '<{} description="{}" attributes={} elements={}>'.format(
89
            self.name, self.description, self.attributes, self.elements
90
        )
91
92
93
class HelpCommand(BaseCommand):
94
    name = "help"
95
    description = 'Print the commands help.'
96
    attributes = {'format': 'Help format. Could be text or xml.'}
97
    must_be_initialized = False
98
99
    def handle_xml(self, xml: Element) -> bytes:
100
        help_format = xml.get('format')
101
102
        if help_format is None or help_format == "text":
103
            # Default help format is text.
104
            return simple_response_str(
105
                'help', 200, 'OK', self._daemon.get_help_text()
106
            )
107
        elif help_format == "xml":
108
            text = get_elements_from_dict(
109
                {k: v.as_dict() for k, v in self._daemon.commands.items()}
110
            )
111
            return simple_response_str('help', 200, 'OK', text)
112
113
        raise OspdCommandError('Bogus help format', 'help')
114
115
116
class GetVersion(BaseCommand):
117
    name = "get_version"
118
    description = 'Return various version information'
119
    must_be_initialized = False
120
121
    def handle_xml(self, xml: Element) -> bytes:
122
        """ Handles <get_version> command.
123
124
        Return:
125
            Response string for <get_version> command.
126
        """
127
        protocol = Element('protocol')
128
129
        for name, value in [
130
            ('name', 'OSP'),
131
            ('version', self._daemon.get_protocol_version()),
132
        ]:
133
            elem = SubElement(protocol, name)
134
            elem.text = value
135
136
        daemon = Element('daemon')
137
        for name, value in [
138
            ('name', self._daemon.get_daemon_name()),
139
            ('version', self._daemon.get_daemon_version()),
140
        ]:
141
            elem = SubElement(daemon, name)
142
            elem.text = value
143
144
        scanner = Element('scanner')
145
        for name, value in [
146
            ('name', self._daemon.get_scanner_name()),
147
            ('version', self._daemon.get_scanner_version()),
148
        ]:
149
            elem = SubElement(scanner, name)
150
            elem.text = value
151
152
        content = [protocol, daemon, scanner]
153
154
        vts_version = self._daemon.get_vts_version()
155
        if vts_version:
156
            vts = Element('vts')
157
            elem = SubElement(vts, 'version')
158
            elem.text = vts_version
159
            content.append(vts)
160
161
        return simple_response_str('get_version', 200, 'OK', content)
162
163
164
GVMCG_TITLES = [
165
    'cpu-*',
166
    'proc',
167
    'mem',
168
    'swap',
169
    'load',
170
    'df-*',
171
    'disk-sd[a-z][0-9]-rw',
172
    'disk-sd[a-z][0-9]-load',
173
    'disk-sd[a-z][0-9]-io-load',
174
    'interface-eth*-traffic',
175
    'interface-eth*-err-rate',
176
    'interface-eth*-err',
177
    'sensors-*_temperature-*',
178
    'sensors-*_fanspeed-*',
179
    'sensors-*_voltage-*',
180
    'titles',
181
]  # type: List[str]
182
183
184
class GetPerformance(BaseCommand):
185
    name = "get_performance"
186
    description = 'Return system report'
187
    attributes = {
188
        'start': 'Time of first data point in report.',
189
        'end': 'Time of last data point in report.',
190
        'title': 'Name of report.',
191
    }
192
    must_be_initialized = False
193
194
    def handle_xml(self, xml: Element) -> bytes:
195
        """ Handles <get_performance> command.
196
197
        @return: Response string for <get_performance> command.
198
        """
199
        start = xml.attrib.get('start')
200
        end = xml.attrib.get('end')
201
        titles = xml.attrib.get('titles')
202
203
        cmd = ['gvmcg']
204
        if start:
205
            try:
206
                int(start)
207
            except ValueError:
208
                raise OspdCommandError(
209
                    'Start argument must be integer.', 'get_performance'
210
                )
211
212
            cmd.append(start)
213
214
        if end:
215
            try:
216
                int(end)
217
            except ValueError:
218
                raise OspdCommandError(
219
                    'End argument must be integer.', 'get_performance'
220
                )
221
222
            cmd.append(end)
223
224
        if titles:
225
            combined = "(" + ")|(".join(GVMCG_TITLES) + ")"
226
            forbidden = "^[^|&;]+$"
227
            if re.match(combined, titles) and re.match(forbidden, titles):
228
                cmd.append(titles)
229
            else:
230
                raise OspdCommandError(
231
                    'Arguments not allowed', 'get_performance'
232
                )
233
234
        try:
235
            output = subprocess.check_output(cmd)
236
        except (subprocess.CalledProcessError, OSError) as e:
237
            raise OspdCommandError(
238
                'Bogus get_performance format. %s' % e, 'get_performance'
239
            )
240
241
        return simple_response_str(
242
            'get_performance', 200, 'OK', output.decode()
243
        )
244
245
246
class GetScannerDetails(BaseCommand):
247
    name = 'get_scanner_details'
248
    description = 'Return scanner description and parameters'
249
    must_be_initialized = True
250
251
    def handle_xml(self, xml: Element) -> bytes:
252
        """ Handles <get_scanner_details> command.
253
254
        @return: Response string for <get_scanner_details> command.
255
        """
256
        desc_xml = Element('description')
257
        desc_xml.text = self._daemon.get_scanner_description()
258
        scanner_params = self._daemon.get_scanner_params()
259
        details = [
260
            desc_xml,
261
            OspResponse.create_scanner_params_xml(scanner_params),
262
        ]
263
        return simple_response_str('get_scanner_details', 200, 'OK', details)
264
265
266
class DeleteScan(BaseCommand):
267
    name = 'delete_scan'
268
    description = 'Delete a finished scan.'
269
    attributes = {'scan_id': 'ID of scan to delete.'}
270
    must_be_initialized = False
271
272
    def handle_xml(self, xml: Element) -> bytes:
273
        """ Handles <delete_scan> command.
274
275
        @return: Response string for <delete_scan> command.
276
        """
277
        scan_id = xml.get('scan_id')
278
        if scan_id is None:
279
            return simple_response_str(
280
                'delete_scan', 404, 'No scan_id attribute'
281
            )
282
283
        if not self._daemon.scan_exists(scan_id):
284
            text = "Failed to find scan '{0}'".format(scan_id)
285
            return simple_response_str('delete_scan', 404, text)
286
287
        self._daemon.check_scan_process(scan_id)
288
289
        if self._daemon.delete_scan(scan_id):
290
            return simple_response_str('delete_scan', 200, 'OK')
291
292
        raise OspdCommandError('Scan in progress', 'delete_scan')
293
294
295
class GetVts(BaseCommand):
296
    name = 'get_vts'
297
    description = 'List of available vulnerability tests.'
298
    attributes = {
299
        'vt_id': 'ID of a specific vulnerability test to get.',
300
        'filter': 'Optional filter to get an specific vt collection.',
301
    }
302
    must_be_initialized = True
303
304
    def handle_xml(self, xml: Element) -> Iterator[bytes]:
305
        """ Handles <get_vts> command.
306
        Writes the vt collection on the stream.
307
        The <get_vts> element accept two optional arguments.
308
        vt_id argument receives a single vt id.
309
        filter argument receives a filter selecting a sub set of vts.
310
        If both arguments are given, the vts which match with the filter
311
        are return.
312
313
        @return: Response string for <get_vts> command on fail.
314
        """
315
        xml_helper = XmlStringHelper()
316
317
        vt_id = xml.get('vt_id')
318
        vt_filter = xml.get('filter')
319
        _details = xml.get('details')
320
321
        vt_details = False if _details == '0' else True
322
323
        if self._daemon.vts and vt_id and vt_id not in self._daemon.vts:
324
            text = "Failed to find vulnerability test '{0}'".format(vt_id)
325
            raise OspdCommandError(text, 'get_vts', 404)
326
327
        filtered_vts = None
328
        if vt_filter:
329
            filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
330
                self._daemon.vts, vt_filter
331
            )
332
        vts_selection = self._daemon.get_vts_selection_list(vt_id, filtered_vts)
333
        # List of xml pieces with the generator to be iterated
334
        yield xml_helper.create_response('get_vts')
335
336
        begin_vts_tag = xml_helper.create_element('vts')
337
        val = len(self._daemon.vts)
338
        begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "total", val)
339
        if filtered_vts:
340
            val = len(filtered_vts)
341
            begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "sent", val)
342
343
        if self._daemon.vts.sha256_hash is not None:
344
            begin_vts_tag = xml_helper.add_attr(
345
                begin_vts_tag, "sha256_hash", self._daemon.vts.sha256_hash
346
            )
347
348
        yield begin_vts_tag
349
350
        for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
351
            yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
352
353
        yield xml_helper.create_element('vts', end=True)
354
        yield xml_helper.create_response('get_vts', end=True)
355
356
357
class StopScan(BaseCommand):
358
    name = 'stop_scan'
359
    description = 'Stop a currently running scan.'
360
    attributes = {'scan_id': 'ID of scan to stop.'}
361
    must_be_initialized = True
362
363
    def handle_xml(self, xml: Element) -> bytes:
364
        """ Handles <stop_scan> command.
365
366
        @return: Response string for <stop_scan> command.
367
        """
368
369
        scan_id = xml.get('scan_id')
370
        if scan_id is None or scan_id == '':
371
            raise OspdCommandError('No scan_id attribute', 'stop_scan')
372
373
        self._daemon.stop_scan(scan_id)
374
375
        # Don't send response until the scan is stopped.
376
        try:
377
            self._daemon.scan_processes[scan_id].join()
378
        except KeyError:
379
            pass
380
381
        return simple_response_str('stop_scan', 200, 'OK')
382
383
384
class GetScans(BaseCommand):
385
    name = 'get_scans'
386
    description = 'List the scans in buffer.'
387
    attributes = {
388
        'scan_id': 'ID of a specific scan to get.',
389
        'details': 'Whether to return the full scan report.',
390
        'pop_results': 'Whether to remove the fetched results.',
391
        'max_results': 'Maximum number of results to fetch.',
392
    }
393
    must_be_initialized = False
394
395
    def handle_xml(self, xml: Element) -> bytes:
396
        """ Handles <get_scans> command.
397
398
        @return: Response string for <get_scans> command.
399
        """
400
401
        scan_id = xml.get('scan_id')
402
        details = xml.get('details')
403
        pop_res = xml.get('pop_results')
404
        max_res = xml.get('max_results')
405
406
        if details and details == '0':
407
            details = False
408
        else:
409
            details = True
410
            if pop_res and pop_res == '1':
411
                pop_res = True
412
            else:
413
                pop_res = False
414
            if max_res:
415
                max_res = int(max_res)
416
417
        responses = []
418
        if scan_id and scan_id in self._daemon.scan_collection.ids_iterator():
419
            self._daemon.check_scan_process(scan_id)
420
            scan = self._daemon.get_scan_xml(scan_id, details, pop_res, max_res)
421
            responses.append(scan)
422
        elif scan_id:
423
            text = "Failed to find scan '{0}'".format(scan_id)
424
            return simple_response_str('get_scans', 404, text)
425
        else:
426
            for scan_id in self._daemon.scan_collection.ids_iterator():
427
                self._daemon.check_scan_process(scan_id)
428
                scan = self._daemon.get_scan_xml(
429
                    scan_id, details, pop_res, max_res
430
                )
431
                responses.append(scan)
432
433
        return simple_response_str('get_scans', 200, 'OK', responses)
434
435
436
class StartScan(BaseCommand):
437
    name = 'start_scan'
438
    description = 'Start a new scan.'
439
    attributes = {
440
        'target': 'Target host to scan',
441
        'ports': 'Ports list to scan',
442
        'scan_id': 'Optional UUID value to use as scan ID',
443
        'parallel': 'Optional nummer of parallel target to scan',
444
    }
445
    must_be_initialized = True
446
447
    def get_elements(self):
448
        elements = {}
449
450
        if self.elements:
451
            elements.update(self.elements)
452
453
        scanner_params = elements.get('scanner_params', {}).copy()
454
        elements['scanner_params'] = scanner_params
455
456
        scanner_params.update(
457
            {
458
                k: v['description']
459
                for k, v in self._daemon.scanner_params.items()
460
            }
461
        )
462
463
        return elements
464
465
    def is_new_scan_allowed(self) -> bool:
466
        """ Check if max_scans has been reached.
467
468
        Return:
469
            True if a new scan can be launch.
470
        """
471
        if (self._daemon.max_scans == 0) or (
472
            len(self._daemon.scan_processes) < self._daemon.max_scans
473
        ):
474
            return True
475
476
        return False
477
478
    def is_enough_free_memory(self) -> bool:
479
        """ Check if there is enough free memory in the system to run
480
        a new scan. The necessary memory is a rough calculation and very
481
        conservative.
482
483
        Return:
484
            True if there is enough memory for a new scan.
485
        """
486
487
        ps_process = psutil.Process()
488
        proc_memory = ps_process.memory_info().rss
489
490
        free_mem = psutil.virtual_memory().free
491
492
        if free_mem > (4 * proc_memory):
493
            return True
494
495
        return False
496
497
    def handle_xml(self, xml: Element) -> bytes:
498
        """ Handles <start_scan> command.
499
500
        Return:
501
            Response string for <start_scan> command.
502
        """
503
504
        if self._daemon.check_free_memory and not self.is_enough_free_memory():
505
            raise OspdCommandError(
506
                'Not possible to run a new scan. Not enough free memory.',
507
                'start_scan',
508
            )
509
510
        if not self.is_new_scan_allowed():
511
            raise OspdCommandError(
512
                'Not possible to run a new scan. Max scan limit reached.',
513
                'start_scan',
514
            )
515
516
        target_str = xml.get('target')
517
        ports_str = xml.get('ports')
518
519
        # For backward compatibility, if target and ports attributes are set,
520
        # <targets> element is ignored.
521
        if target_str is None or ports_str is None:
522
            target_element = xml.find('targets/target')
523
            if target_element is None:
524
                raise OspdCommandError('No targets or ports', 'start_scan')
525
            else:
526
                scan_target = OspRequest.process_target_element(target_element)
527
        else:
528
            scan_target = {
529
                'hosts': target_str,
530
                'ports': ports_str,
531
                'credentials': {},
532
                'exclude_hosts': '',
533
                'finished_hosts': '',
534
                'options': {},
535
            }
536
            logger.warning(
537
                "Legacy start scan command format is being used, which "
538
                "is deprecated since 20.04. Please read the documentation "
539
                "for start scan command."
540
            )
541
542
        scan_id = xml.get('scan_id')
543
        if scan_id is not None and scan_id != '' and not valid_uuid(scan_id):
544
            raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
545
546
        if xml.get('parallel'):
547
            logger.warning(
548
                "parallel attribute of start_scan will be ignored, sice "
549
                "parallel scan is not supported by OSPd."
550
            )
551
552
        scanner_params = xml.find('scanner_params')
553
        if scanner_params is None:
554
            raise OspdCommandError('No scanner_params element', 'start_scan')
555
556
        params = self._daemon.preprocess_scan_params(scanner_params)
557
558
        # VTS is an optional element. If present should not be empty.
559
        vt_selection = {}  # type: Dict
560
        scanner_vts = xml.find('vt_selection')
561
        if scanner_vts is not None:
562
            if len(scanner_vts) == 0:
563
                raise OspdCommandError('VTs list is empty', 'start_scan')
564
            else:
565
                vt_selection = OspRequest.process_vts_params(scanner_vts)
566
567
        # Dry run case.
568
        if 'dry_run' in params and int(params['dry_run']):
569
            scan_func = self._daemon.dry_run_scan
570
            scan_params = None
571
        else:
572
            scan_func = self._daemon.start_scan
573
            scan_params = self._daemon.process_scan_params(params)
574
575
        scan_id_aux = scan_id
576
        scan_id = self._daemon.create_scan(
577
            scan_id, scan_target, scan_params, vt_selection
578
        )
579
580
        if not scan_id:
581
            id_ = Element('id')
582
            id_.text = scan_id_aux
583
            return simple_response_str('start_scan', 100, 'Continue', id_)
584
585
        scan_process = create_process(
586
            func=scan_func, args=(scan_id, scan_target)
587
        )
588
589
        self._daemon.scan_processes[scan_id] = scan_process
590
591
        scan_process.start()
592
593
        id_ = Element('id')
594
        id_.text = scan_id
595
596
        return simple_response_str('start_scan', 200, 'OK', id_)
597
598
599
class GetMemoryUsage(BaseCommand):
600
601
    name = "get_memory_usage"
602
    description = "print the memory consumption of all processes"
603
    attributes = {
604
        'unit': 'Unit for displaying memory consumption (b = bytes, '
605
        'kb = kilobytes, mb = megabytes). Defaults to b.'
606
    }
607
    must_be_initialized = False
608
609
    @staticmethod
610
    def _get_memory(value: int, unit: str = None) -> str:
611
        if not unit:
612
            return str(value)
613
614
        unit = unit.lower()
615
616
        if unit == 'kb':
617
            return str(Decimal(value) / 1024)
618
619
        if unit == 'mb':
620
            return str(Decimal(value) / (1024 * 1024))
621
622
        return str(value)
623
624
    @staticmethod
625
    def _create_process_element(name: str, pid: int):
626
        process_element = Element('process')
627
        process_element.set('name', name)
628
        process_element.set('pid', str(pid))
629
630
        return process_element
631
632
    @classmethod
633
    def _add_memory_info(
634
        cls, process_element: Element, pid: int, unit: str = None
635
    ):
636
        try:
637
            ps_process = psutil.Process(pid)
638
        except psutil.NoSuchProcess:
639
            return
640
641
        memory = ps_process.memory_info()
642
643
        rss_element = Element('rss')
644
        rss_element.text = cls._get_memory(memory.rss, unit)
645
646
        process_element.append(rss_element)
647
648
        vms_element = Element('vms')
649
        vms_element.text = cls._get_memory(memory.vms, unit)
650
651
        process_element.append(vms_element)
652
653
        shared_element = Element('shared')
654
        shared_element.text = cls._get_memory(memory.shared, unit)
655
656
        process_element.append(shared_element)
657
658
    def handle_xml(self, xml: Element) -> bytes:
659
        processes_element = Element('processes')
660
        unit = xml.get('unit')
661
662
        current_process = multiprocessing.current_process()
663
        process_element = self._create_process_element(
664
            current_process.name, current_process.pid
665
        )
666
667
        self._add_memory_info(process_element, current_process.pid, unit)
668
669
        processes_element.append(process_element)
670
671
        for proc in multiprocessing.active_children():
672
            process_element = self._create_process_element(proc.name, proc.pid)
673
674
            self._add_memory_info(process_element, proc.pid, unit)
675
676
            processes_element.append(process_element)
677
678
        return simple_response_str('get_memory', 200, 'OK', processes_element)
679