ospd.command.command.StartScan.handle_xml()   F
last analyzed

Complexity

Conditions 14

Size

Total Lines 95
Code Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 59
nop 2
dl 0
loc 95
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like ospd.command.command.StartScan.handle_xml() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
import multiprocessing
19
import re
20
import logging
21
import subprocess
22
23
from decimal import Decimal
24
from typing import Optional, Dict, Any, Union, Iterator
25
26
from xml.etree.ElementTree import Element, SubElement
27
28
import psutil
29
30
from ospd.errors import OspdCommandError
31
from ospd.misc import valid_uuid
32
from ospd.protocol import OspRequest, OspResponse
33
from ospd.xml import (
34
    simple_response_str,
35
    get_elements_from_dict,
36
    XmlStringHelper,
37
)
38
39
from .initsubclass import InitSubclassMeta
40
from .registry import register_command
41
42
logger = logging.getLogger(__name__)
43
44
45
class BaseCommand(metaclass=InitSubclassMeta):
46
47
    name = None
48
    description = None
49
    attributes = None
50
    elements = None
51
    must_be_initialized = None
52
53
    def __init_subclass__(cls, **kwargs):
54
        super_cls = super()
55
56
        if hasattr(super_cls, '__init_subclass__'):
57
            super_cls.__init_subclass__(**kwargs)
58
59
        register_command(cls)
60
61
    def __init__(self, daemon):
62
        self._daemon = daemon
63
64
    def get_name(self) -> str:
65
        return self.name
66
67
    def get_description(self) -> str:
68
        return self.description
69
70
    def get_attributes(self) -> Optional[Dict[str, Any]]:
71
        return self.attributes
72
73
    def get_elements(self) -> Optional[Dict[str, Any]]:
74
        return self.elements
75
76
    def handle_xml(self, xml: Element) -> Union[bytes, Iterator[bytes]]:
77
        raise NotImplementedError()
78
79
    def as_dict(self):
80
        return {
81
            'name': self.get_name(),
82
            'attributes': self.get_attributes(),
83
            'description': self.get_description(),
84
            'elements': self.get_elements(),
85
        }
86
87
    def __repr__(self):
88
        return '<{} description="{}" attributes={} elements={}>'.format(
89
            self.name, self.description, self.attributes, self.elements
90
        )
91
92
93
class HelpCommand(BaseCommand):
94
    name = "help"
95
    description = 'Print the commands help.'
96
    attributes = {'format': 'Help format. Could be text or xml.'}
97
    must_be_initialized = False
98
99
    def handle_xml(self, xml: Element) -> bytes:
100
        help_format = xml.get('format')
101
102
        if help_format is None or help_format == "text":
103
            # Default help format is text.
104
            return simple_response_str(
105
                'help', 200, 'OK', self._daemon.get_help_text()
106
            )
107
        elif help_format == "xml":
108
            text = get_elements_from_dict(
109
                {k: v.as_dict() for k, v in self._daemon.commands.items()}
110
            )
111
            return simple_response_str('help', 200, 'OK', text)
112
113
        raise OspdCommandError('Bogus help format', 'help')
114
115
116
class GetVersion(BaseCommand):
117
    name = "get_version"
118
    description = 'Return various version information'
119
    must_be_initialized = False
120
121
    def handle_xml(self, xml: Element) -> bytes:
122
        """Handles <get_version> command.
123
124
        Return:
125
            Response string for <get_version> command.
126
        """
127
        protocol = Element('protocol')
128
129
        for name, value in [
130
            ('name', 'OSP'),
131
            ('version', self._daemon.get_protocol_version()),
132
        ]:
133
            elem = SubElement(protocol, name)
134
            elem.text = value
135
136
        daemon = Element('daemon')
137
        for name, value in [
138
            ('name', self._daemon.get_daemon_name()),
139
            ('version', self._daemon.get_daemon_version()),
140
        ]:
141
            elem = SubElement(daemon, name)
142
            elem.text = value
143
144
        scanner = Element('scanner')
145
        for name, value in [
146
            ('name', self._daemon.get_scanner_name()),
147
            ('version', self._daemon.get_scanner_version()),
148
        ]:
149
            elem = SubElement(scanner, name)
150
            elem.text = value
151
152
        content = [protocol, daemon, scanner]
153
154
        vts_version = self._daemon.get_vts_version()
155
        if vts_version:
156
            vts = Element('vts')
157
            elem = SubElement(vts, 'version')
158
            elem.text = vts_version
159
            content.append(vts)
160
161
        return simple_response_str('get_version', 200, 'OK', content)
162
163
164
GVMCG_TITLES = [
165
    'cpu-.*',
166
    'proc',
167
    'mem',
168
    'swap',
169
    'load',
170
    'df-.*',
171
    'disk-sd[a-z][0-9]-rw',
172
    'disk-sd[a-z][0-9]-load',
173
    'disk-sd[a-z][0-9]-io-load',
174
    'interface-eth.*-traffic',
175
    'interface-eth.*-err-rate',
176
    'interface-eth.*-err',
177
    'sensors-.*_temperature-.*',
178
    'sensors-.*_fanspeed-.*',
179
    'sensors-.*_voltage-.*',
180
    'titles',
181
]  # type: List[str]
182
183
184
class GetPerformance(BaseCommand):
185
    name = "get_performance"
186
    description = 'Return system report'
187
    attributes = {
188
        'start': 'Time of first data point in report.',
189
        'end': 'Time of last data point in report.',
190
        'title': 'Name of report.',
191
    }
192
    must_be_initialized = False
193
194
    def handle_xml(self, xml: Element) -> bytes:
195
        """Handles <get_performance> command.
196
197
        @return: Response string for <get_performance> command.
198
        """
199
        start = xml.attrib.get('start')
200
        end = xml.attrib.get('end')
201
        titles = xml.attrib.get('titles')
202
203
        cmd = ['gvmcg']
204
        if start:
205
            try:
206
                int(start)
207
            except ValueError:
208
                raise OspdCommandError(
209
                    'Start argument must be integer.', 'get_performance'
210
                ) from None
211
212
            cmd.append(start)
213
214
        if end:
215
            try:
216
                int(end)
217
            except ValueError:
218
                raise OspdCommandError(
219
                    'End argument must be integer.', 'get_performance'
220
                ) from None
221
222
            cmd.append(end)
223
224
        if titles:
225
            combined = "(" + ")|(".join(GVMCG_TITLES) + ")"
226
            forbidden = "^[^|&;]+$"
227
            if re.match(combined, titles) and re.match(forbidden, titles):
228
                cmd.append(titles)
229
            else:
230
                raise OspdCommandError(
231
                    'Arguments not allowed', 'get_performance'
232
                )
233
234
        try:
235
            output = subprocess.check_output(cmd)
236
        except (subprocess.CalledProcessError, OSError) as e:
237
            raise OspdCommandError(
238
                'Bogus get_performance format. %s' % e, 'get_performance'
239
            ) from None
240
241
        return simple_response_str(
242
            'get_performance', 200, 'OK', output.decode()
243
        )
244
245
246
class GetScannerDetails(BaseCommand):
247
    name = 'get_scanner_details'
248
    description = 'Return scanner description and parameters'
249
    must_be_initialized = True
250
251
    def handle_xml(self, xml: Element) -> bytes:
252
        """Handles <get_scanner_details> command.
253
254
        @return: Response string for <get_scanner_details> command.
255
        """
256
        list_all = xml.get('list_all')
257
        list_all = True if list_all == '1' else False
258
259
        desc_xml = Element('description')
260
        desc_xml.text = self._daemon.get_scanner_description()
261
        scanner_params = self._daemon.get_scanner_params()
262
263
        if not list_all:
264
            scanner_params = {
265
                key: value
266
                for (key, value) in scanner_params.items()
267
                if value.get('visible_for_client')
268
            }
269
270
        details = [
271
            desc_xml,
272
            OspResponse.create_scanner_params_xml(scanner_params),
273
        ]
274
        return simple_response_str('get_scanner_details', 200, 'OK', details)
275
276
277
class DeleteScan(BaseCommand):
278
    name = 'delete_scan'
279
    description = 'Delete a finished scan.'
280
    attributes = {'scan_id': 'ID of scan to delete.'}
281
    must_be_initialized = False
282
283
    def handle_xml(self, xml: Element) -> bytes:
284
        """Handles <delete_scan> command.
285
286
        @return: Response string for <delete_scan> command.
287
        """
288
        scan_id = xml.get('scan_id')
289
        if scan_id is None:
290
            return simple_response_str(
291
                'delete_scan', 404, 'No scan_id attribute'
292
            )
293
294
        if not self._daemon.scan_exists(scan_id):
295
            text = "Failed to find scan '{0}'".format(scan_id)
296
            return simple_response_str('delete_scan', 404, text)
297
298
        self._daemon.check_scan_process(scan_id)
299
300
        if self._daemon.delete_scan(scan_id):
301
            return simple_response_str('delete_scan', 200, 'OK')
302
303
        raise OspdCommandError('Scan in progress', 'delete_scan')
304
305
306
class GetVts(BaseCommand):
307
    name = 'get_vts'
308
    description = 'List of available vulnerability tests.'
309
    attributes = {
310
        'vt_id': 'ID of a specific vulnerability test to get.',
311
        'filter': 'Optional filter to get an specific vt collection.',
312
    }
313
    must_be_initialized = True
314
315
    def handle_xml(self, xml: Element) -> Iterator[bytes]:
316
        """Handles <get_vts> command.
317
        Writes the vt collection on the stream.
318
        The <get_vts> element accept two optional arguments.
319
        vt_id argument receives a single vt id.
320
        filter argument receives a filter selecting a sub set of vts.
321
        If both arguments are given, the vts which match with the filter
322
        are return.
323
324
        @return: Response string for <get_vts> command on fail.
325
        """
326
        self._daemon.vts.is_cache_available = False
327
328
        xml_helper = XmlStringHelper()
329
330
        vt_id = xml.get('vt_id')
331
        vt_filter = xml.get('filter')
332
        _details = xml.get('details')
333
        version_only = xml.get('version_only')
334
335
        vt_details = False if _details == '0' else True
336
337
        if self._daemon.vts and vt_id and vt_id not in self._daemon.vts:
338
            self._daemon.vts.is_cache_available = True
339
            text = "Failed to find vulnerability test '{0}'".format(vt_id)
340
            raise OspdCommandError(text, 'get_vts', 404)
341
342
        filtered_vts = None
343
        if vt_filter and not version_only:
344
            try:
345
                filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
346
                    self._daemon.vts, vt_filter
347
                )
348
            except OspdCommandError as filter_error:
349
                self._daemon.vts.is_cache_available = True
350
                raise filter_error
351
352
        if not version_only:
353
            vts_selection = self._daemon.get_vts_selection_list(
354
                vt_id, filtered_vts
355
            )
356
        # List of xml pieces with the generator to be iterated
357
        yield xml_helper.create_response('get_vts')
358
359
        begin_vts_tag = xml_helper.create_element('vts')
360
        begin_vts_tag = xml_helper.add_attr(
361
            begin_vts_tag, "vts_version", self._daemon.get_vts_version()
362
        )
363
        val = len(self._daemon.vts)
364
        begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "total", val)
365
        if filtered_vts and not version_only:
366
            val = len(filtered_vts)
367
            begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "sent", val)
368
369
        if self._daemon.vts.sha256_hash is not None:
370
            begin_vts_tag = xml_helper.add_attr(
371
                begin_vts_tag, "sha256_hash", self._daemon.vts.sha256_hash
372
            )
373
374
        yield begin_vts_tag
375
        if not version_only:
376
            for vt in self._daemon.get_vt_iterator(vts_selection, vt_details):
0 ignored issues
show
introduced by
The variable vts_selection does not seem to be defined in case BooleanNotNode on line 352 is False. Are you sure this can never be the case?
Loading history...
377
                yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
378
379
        yield xml_helper.create_element('vts', end=True)
380
        yield xml_helper.create_response('get_vts', end=True)
381
382
        self._daemon.vts.is_cache_available = True
383
384
385
class StopScan(BaseCommand):
386
    name = 'stop_scan'
387
    description = 'Stop a currently running scan.'
388
    attributes = {'scan_id': 'ID of scan to stop.'}
389
    must_be_initialized = True
390
391
    def handle_xml(self, xml: Element) -> bytes:
392
        """Handles <stop_scan> command.
393
394
        @return: Response string for <stop_scan> command.
395
        """
396
397
        scan_id = xml.get('scan_id')
398
        if scan_id is None or scan_id == '':
399
            raise OspdCommandError('No scan_id attribute', 'stop_scan')
400
401
        self._daemon.stop_scan(scan_id)
402
403
        # Don't send response until the scan is stopped.
404
        try:
405
            self._daemon.scan_processes[scan_id].join()
406
        except KeyError:
407
            pass
408
409
        return simple_response_str('stop_scan', 200, 'OK')
410
411
412
class GetScans(BaseCommand):
413
    name = 'get_scans'
414
    description = 'Get information about a scan in buffer.'
415
    attributes = {
416
        'scan_id': 'Mandatory ID of a specific scan to get.',
417
        'details': 'Whether to return the full scan report.',
418
        'pop_results': 'Whether to remove the fetched results.',
419
        'max_results': 'Maximum number of results to fetch.',
420
        'progress': 'Whether to return a detailed scan progress',
421
    }
422
    must_be_initialized = False
423
424
    def handle_xml(self, xml: Element) -> bytes:
425
        """Handles <get_scans> command.
426
427
        @return: Response string for <get_scans> command.
428
        """
429
430
        scan_id = xml.get('scan_id')
431
        if scan_id is None or scan_id == '':
432
            raise OspdCommandError('No scan_id attribute', 'get_scans')
433
434
        details = xml.get('details')
435
        pop_res = xml.get('pop_results')
436
        max_res = xml.get('max_results')
437
        progress = xml.get('progress')
438
439
        if details and details == '0':
440
            details = False
441
        else:
442
            details = True
443
            pop_res = pop_res and pop_res == '1'
444
445
            if max_res:
446
                max_res = int(max_res)
447
448
        progress = progress and progress == '1'
449
450
        responses = []
451
        if scan_id in self._daemon.scan_collection.ids_iterator():
452
            self._daemon.check_scan_process(scan_id)
453
            scan = self._daemon.get_scan_xml(
454
                scan_id, details, pop_res, max_res, progress
455
            )
456
            responses.append(scan)
457
        else:
458
            text = "Failed to find scan '{0}'".format(scan_id)
459
            return simple_response_str('get_scans', 404, text)
460
461
        return simple_response_str('get_scans', 200, 'OK', responses)
462
463
464
class StartScan(BaseCommand):
465
    name = 'start_scan'
466
    description = 'Start a new scan.'
467
    attributes = {
468
        'target': 'Target host to scan',
469
        'ports': 'Ports list to scan',
470
        'scan_id': 'Optional UUID value to use as scan ID',
471
        'parallel': 'Optional nummer of parallel target to scan',
472
    }
473
    must_be_initialized = False
474
475
    def get_elements(self):
476
        elements = {}
477
478
        if self.elements:
479
            elements.update(self.elements)
480
481
        scanner_params = elements.get('scanner_params', {}).copy()
482
        elements['scanner_params'] = scanner_params
483
484
        scanner_params.update(
485
            {
486
                k: v['description']
487
                for k, v in self._daemon.scanner_params.items()
488
            }
489
        )
490
491
        return elements
492
493
    def handle_xml(self, xml: Element) -> bytes:
494
        """Handles <start_scan> command.
495
496
        Return:
497
            Response string for <start_scan> command.
498
        """
499
500
        current_queued_scans = self._daemon.get_count_queued_scans()
501
        if (
502
            self._daemon.max_queued_scans
503
            and current_queued_scans >= self._daemon.max_queued_scans
504
        ):
505
            logger.info(
506
                'Maximum number of queued scans set to %d reached.',
507
                self._daemon.max_queued_scans,
508
            )
509
            raise OspdCommandError(
510
                'Maximum number of queued scans set to %d reached.'
511
                % self._daemon.max_queued_scans,
512
                'start_scan',
513
            )
514
515
        target_str = xml.get('target')
516
        ports_str = xml.get('ports')
517
518
        # For backward compatibility, if target and ports attributes are set,
519
        # <targets> element is ignored.
520
        if target_str is None or ports_str is None:
521
            target_element = xml.find('targets/target')
522
            if target_element is None:
523
                raise OspdCommandError('No targets or ports', 'start_scan')
524
            else:
525
                scan_target = OspRequest.process_target_element(target_element)
526
        else:
527
            scan_target = {
528
                'hosts': target_str,
529
                'ports': ports_str,
530
                'credentials': {},
531
                'exclude_hosts': '',
532
                'finished_hosts': '',
533
                'options': {},
534
            }
535
            logger.warning(
536
                "Legacy start scan command format is being used, which "
537
                "is deprecated since 20.08. Please read the documentation "
538
                "for start scan command."
539
            )
540
541
        scan_id = xml.get('scan_id')
542
        if scan_id is not None and scan_id != '' and not valid_uuid(scan_id):
543
            raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
544
545
        if xml.get('parallel'):
546
            logger.warning(
547
                "parallel attribute of start_scan will be ignored, sice "
548
                "parallel scan is not supported by OSPd."
549
            )
550
551
        scanner_params = xml.find('scanner_params')
552
        if scanner_params is None:
553
            raise OspdCommandError('No scanner_params element', 'start_scan')
554
555
        # params are the parameters we got from the <scanner_params> XML.
556
        params = self._daemon.preprocess_scan_params(scanner_params)
557
558
        # VTS is an optional element. If present should not be empty.
559
        vt_selection = {}  # type: Dict
560
        scanner_vts = xml.find('vt_selection')
561
        if scanner_vts is not None:
562
            if len(scanner_vts) == 0:
563
                raise OspdCommandError('VTs list is empty', 'start_scan')
564
            else:
565
                vt_selection = OspRequest.process_vts_params(scanner_vts)
566
567
        scan_params = self._daemon.process_scan_params(params)
568
        scan_id_aux = scan_id
569
        scan_id = self._daemon.create_scan(
570
            scan_id, scan_target, scan_params, vt_selection
571
        )
572
573
        if not scan_id:
574
            id_ = Element('id')
575
            id_.text = scan_id_aux
576
            return simple_response_str('start_scan', 100, 'Continue', id_)
577
578
        logger.info(
579
            'Scan %s added to the queue in position %d.',
580
            scan_id,
581
            current_queued_scans + 1,
582
        )
583
584
        id_ = Element('id')
585
        id_.text = scan_id
586
587
        return simple_response_str('start_scan', 200, 'OK', id_)
588
589
590
class GetMemoryUsage(BaseCommand):
591
592
    name = "get_memory_usage"
593
    description = "print the memory consumption of all processes"
594
    attributes = {
595
        'unit': 'Unit for displaying memory consumption (b = bytes, '
596
        'kb = kilobytes, mb = megabytes). Defaults to b.'
597
    }
598
    must_be_initialized = False
599
600
    @staticmethod
601
    def _get_memory(value: int, unit: str = None) -> str:
602
        if not unit:
603
            return str(value)
604
605
        unit = unit.lower()
606
607
        if unit == 'kb':
608
            return str(Decimal(value) / 1024)
609
610
        if unit == 'mb':
611
            return str(Decimal(value) / (1024 * 1024))
612
613
        return str(value)
614
615
    @staticmethod
616
    def _create_process_element(name: str, pid: int):
617
        process_element = Element('process')
618
        process_element.set('name', name)
619
        process_element.set('pid', str(pid))
620
621
        return process_element
622
623
    @classmethod
624
    def _add_memory_info(
625
        cls, process_element: Element, pid: int, unit: str = None
626
    ):
627
        try:
628
            ps_process = psutil.Process(pid)
629
        except psutil.NoSuchProcess:
630
            return
631
632
        memory = ps_process.memory_info()
633
634
        rss_element = Element('rss')
635
        rss_element.text = cls._get_memory(memory.rss, unit)
636
637
        process_element.append(rss_element)
638
639
        vms_element = Element('vms')
640
        vms_element.text = cls._get_memory(memory.vms, unit)
641
642
        process_element.append(vms_element)
643
644
        shared_element = Element('shared')
645
        shared_element.text = cls._get_memory(memory.shared, unit)
646
647
        process_element.append(shared_element)
648
649
    def handle_xml(self, xml: Element) -> bytes:
650
        processes_element = Element('processes')
651
        unit = xml.get('unit')
652
653
        current_process = multiprocessing.current_process()
654
        process_element = self._create_process_element(
655
            current_process.name, current_process.pid
656
        )
657
658
        self._add_memory_info(process_element, current_process.pid, unit)
659
660
        processes_element.append(process_element)
661
662
        for proc in multiprocessing.active_children():
663
            process_element = self._create_process_element(proc.name, proc.pid)
664
665
            self._add_memory_info(process_element, proc.pid, unit)
666
667
            processes_element.append(process_element)
668
669
        return simple_response_str('get_memory', 200, 'OK', processes_element)
670