Completed
Push — master ( d58222...cd4f3f )
by Juan José
16s queued 12s
created

ospd.command.command.GetVts.handle_xml()   B

Complexity

Conditions 6

Size

Total Lines 43
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 23
nop 2
dl 0
loc 43
rs 8.3946
c 0
b 0
f 0
1
# Copyright (C) 2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
import re
20
import subprocess
21
22
from typing import Optional, Dict, Any, Union, Iterator
23
24
from xml.etree.ElementTree import Element, SubElement
25
26
from ospd.errors import OspdCommandError
27
from ospd.misc import valid_uuid, create_process
28
from ospd.network import target_str_to_list
29
from ospd.protocol import OspRequest, OspResponse
30
from ospd.xml import (
31
    simple_response_str,
32
    get_elements_from_dict,
33
    XmlStringHelper,
34
)
35
36
from .initsubclass import InitSubclassMeta
37
from .registry import register_command
38
39
40
class BaseCommand(metaclass=InitSubclassMeta):
41
42
    name = None
43
    description = None
44
    attributes = None
45
    elements = None
46
47
    def __init_subclass__(cls, **kwargs):
48
        super_cls = super()
49
50
        if hasattr(super_cls, '__init_subclass__'):
51
            super_cls.__init_subclass__(**kwargs)
52
53
        register_command(cls)
54
55
    def __init__(self, daemon):
56
        self._daemon = daemon
57
58
    def get_name(self) -> str:
59
        return self.name
60
61
    def get_description(self) -> str:
62
        return self.description
63
64
    def get_attributes(self) -> Optional[Dict[str, Any]]:
65
        return self.attributes
66
67
    def get_elements(self) -> Optional[Dict[str, Any]]:
68
        return self.elements
69
70
    def handle_xml(self, xml: Element) -> Union[bytes, Iterator[bytes]]:
71
        raise NotImplementedError()
72
73
    def as_dict(self):
74
        return {
75
            'name': self.get_name(),
76
            'attributes': self.get_attributes(),
77
            'description': self.get_description(),
78
            'elements': self.get_elements(),
79
        }
80
81
    def __repr__(self):
82
        return '<{} description="{}" attributes={} elements={}>'.format(
83
            self.name, self.description, self.attributes, self.elements
84
        )
85
86
87
class HelpCommand(BaseCommand):
88
    name = "help"
89
    description = 'Print the commands help.'
90
    attributes = {'format': 'Help format. Could be text or xml.'}
91
92
    def handle_xml(self, xml: Element) -> bytes:
93
        help_format = xml.get('format')
94
95
        if help_format is None or help_format == "text":
96
            # Default help format is text.
97
            return simple_response_str(
98
                'help', 200, 'OK', self._daemon.get_help_text()
99
            )
100
        elif help_format == "xml":
101
            text = get_elements_from_dict(
102
                {k: v.as_dict() for k, v in self._daemon.commands.items()}
103
            )
104
            return simple_response_str('help', 200, 'OK', text)
105
106
        raise OspdCommandError('Bogus help format', 'help')
107
108
109
class GetVersion(BaseCommand):
110
    name = "get_version"
111
    description = 'Return various version information'
112
113
    def handle_xml(self, xml: Element) -> bytes:
114
        """ Handles <get_version> command.
115
116
        Return:
117
            Response string for <get_version> command.
118
        """
119
        protocol = Element('protocol')
120
121
        for name, value in [
122
            ('name', 'OSP'),
123
            ('version', self._daemon.get_protocol_version()),
124
        ]:
125
            elem = SubElement(protocol, name)
126
            elem.text = value
127
128
        daemon = Element('daemon')
129
        for name, value in [
130
            ('name', self._daemon.get_daemon_name()),
131
            ('version', self._daemon.get_daemon_version()),
132
        ]:
133
            elem = SubElement(daemon, name)
134
            elem.text = value
135
136
        scanner = Element('scanner')
137
        for name, value in [
138
            ('name', self._daemon.get_scanner_name()),
139
            ('version', self._daemon.get_scanner_version()),
140
        ]:
141
            elem = SubElement(scanner, name)
142
            elem.text = value
143
144
        content = [protocol, daemon, scanner]
145
146
        vts_version = self._daemon.get_vts_version()
147
        if vts_version:
148
            vts = Element('vts')
149
            elem = SubElement(vts, 'version')
150
            elem.text = vts_version
151
            content.append(vts)
152
153
        return simple_response_str('get_version', 200, 'OK', content)
154
155
156
GVMCG_TITLES = [
157
    'cpu-*',
158
    'proc',
159
    'mem',
160
    'swap',
161
    'load',
162
    'df-*',
163
    'disk-sd[a-z][0-9]-rw',
164
    'disk-sd[a-z][0-9]-load',
165
    'disk-sd[a-z][0-9]-io-load',
166
    'interface-eth*-traffic',
167
    'interface-eth*-err-rate',
168
    'interface-eth*-err',
169
    'sensors-*_temperature-*',
170
    'sensors-*_fanspeed-*',
171
    'sensors-*_voltage-*',
172
    'titles',
173
]  # type: List[str]
174
175
176
class GetPerformance(BaseCommand):
177
    name = "get_performance"
178
    description = 'Return system report'
179
    attributes = {
180
        'start': 'Time of first data point in report.',
181
        'end': 'Time of last data point in report.',
182
        'title': 'Name of report.',
183
    }
184
185
    def handle_xml(self, xml: Element) -> bytes:
186
        """ Handles <get_performance> command.
187
188
        @return: Response string for <get_performance> command.
189
        """
190
        start = xml.attrib.get('start')
191
        end = xml.attrib.get('end')
192
        titles = xml.attrib.get('titles')
193
194
        cmd = ['gvmcg']
195
        if start:
196
            try:
197
                int(start)
198
            except ValueError:
199
                raise OspdCommandError(
200
                    'Start argument must be integer.', 'get_performance'
201
                )
202
203
            cmd.append(start)
204
205
        if end:
206
            try:
207
                int(end)
208
            except ValueError:
209
                raise OspdCommandError(
210
                    'End argument must be integer.', 'get_performance'
211
                )
212
213
            cmd.append(end)
214
215
        if titles:
216
            combined = "(" + ")|(".join(GVMCG_TITLES) + ")"
217
            forbidden = "^[^|&;]+$"
218
            if re.match(combined, titles) and re.match(forbidden, titles):
219
                cmd.append(titles)
220
            else:
221
                raise OspdCommandError(
222
                    'Arguments not allowed', 'get_performance'
223
                )
224
225
        try:
226
            output = subprocess.check_output(cmd)
227
        except (subprocess.CalledProcessError, OSError) as e:
228
            raise OspdCommandError(
229
                'Bogus get_performance format. %s' % e, 'get_performance'
230
            )
231
232
        return simple_response_str(
233
            'get_performance', 200, 'OK', output.decode()
234
        )
235
236
237
class GetScannerDetails(BaseCommand):
238
    name = 'get_scanner_details'
239
    description = 'Return scanner description and parameters'
240
241
    def handle_xml(self, xml: Element) -> bytes:
242
        """ Handles <get_scanner_details> command.
243
244
        @return: Response string for <get_scanner_details> command.
245
        """
246
        desc_xml = Element('description')
247
        desc_xml.text = self._daemon.get_scanner_description()
248
        scanner_params = self._daemon.get_scanner_params()
249
        details = [
250
            desc_xml,
251
            OspResponse.create_scanner_params_xml(scanner_params),
252
        ]
253
        return simple_response_str('get_scanner_details', 200, 'OK', details)
254
255
256
class DeleteScan(BaseCommand):
257
    name = 'delete_scan'
258
    description = 'Delete a finished scan.'
259
    attributes = {'scan_id': 'ID of scan to delete.'}
260
261
    def handle_xml(self, xml: Element) -> bytes:
262
        """ Handles <delete_scan> command.
263
264
        @return: Response string for <delete_scan> command.
265
        """
266
        scan_id = xml.get('scan_id')
267
        if scan_id is None:
268
            return simple_response_str(
269
                'delete_scan', 404, 'No scan_id attribute'
270
            )
271
272
        if not self._daemon.scan_exists(scan_id):
273
            text = "Failed to find scan '{0}'".format(scan_id)
274
            return simple_response_str('delete_scan', 404, text)
275
276
        self._daemon.check_scan_process(scan_id)
277
278
        if self._daemon.delete_scan(scan_id):
279
            return simple_response_str('delete_scan', 200, 'OK')
280
281
        raise OspdCommandError('Scan in progress', 'delete_scan')
282
283
284
class GetVts(BaseCommand):
285
    name = 'get_vts'
286
    description = 'List of available vulnerability tests.'
287
    attributes = {
288
        'vt_id': 'ID of a specific vulnerability test to get.',
289
        'filter': 'Optional filter to get an specific vt collection.',
290
    }
291
292
    def handle_xml(self, xml: Element) -> Iterator[bytes]:
293
        """ Handles <get_vts> command.
294
        Writes the vt collection on the stream.
295
        The <get_vts> element accept two optional arguments.
296
        vt_id argument receives a single vt id.
297
        filter argument receives a filter selecting a sub set of vts.
298
        If both arguments are given, the vts which match with the filter
299
        are return.
300
301
        @return: Response string for <get_vts> command on fail.
302
        """
303
        xml_helper = XmlStringHelper()
304
305
        vt_id = xml.get('vt_id')
306
        vt_filter = xml.get('filter')
307
308
        if vt_id and vt_id not in self._daemon.vts:
309
            text = "Failed to find vulnerability test '{0}'".format(vt_id)
310
            raise OspdCommandError(text, 'get_vts', 404)
311
312
        filtered_vts = None
313
        if vt_filter:
314
            filtered_vts = self._daemon.vts_filter.get_filtered_vts_list(
315
                self._daemon.vts, vt_filter
316
            )
317
318
        # List of xml pieces with the generator to be iterated
319
        yield xml_helper.create_response('get_vts')
320
321
        begin_vts_tag = xml_helper.create_element('vts')
322
        val = len(self._daemon.vts)
323
        begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "total", val)
324
        if filtered_vts:
325
            val = len(filtered_vts)
326
            begin_vts_tag = xml_helper.add_attr(begin_vts_tag, "sent", val)
327
328
        yield begin_vts_tag
329
330
        for vt in self._daemon.get_vts_selection_list(vt_id, filtered_vts):
331
            yield xml_helper.add_element(self._daemon.get_vt_xml(vt))
332
333
        yield xml_helper.create_element('vts', end=True)
334
        yield xml_helper.create_response('get_vts', end=True)
335
336
337
class StopScan(BaseCommand):
338
    name = 'stop_scan'
339
    description = 'Stop a currently running scan.'
340
    attributes = {'scan_id': 'ID of scan to stop.'}
341
342
    def handle_xml(self, xml: Element) -> bytes:
343
        """ Handles <stop_scan> command.
344
345
        @return: Response string for <stop_scan> command.
346
        """
347
348
        scan_id = xml.get('scan_id')
349
        if scan_id is None or scan_id == '':
350
            raise OspdCommandError('No scan_id attribute', 'stop_scan')
351
352
        self._daemon.stop_scan(scan_id)
353
354
        # Don't send response until the scan is stopped.
355
        try:
356
            self._daemon.scan_processes[scan_id].join()
357
            exitcode = self._daemon.scan_processes[scan_id].exitcode
358
        except KeyError:
359
            pass
360
361
        return simple_response_str('stop_scan', 200, 'OK')
362
363
364
class GetScans(BaseCommand):
365
    name = 'get_scans'
366
    description = 'List the scans in buffer.'
367
    attributes = {
368
        'scan_id': 'ID of a specific scan to get.',
369
        'details': 'Whether to return the full scan report.',
370
        'pop_results': 'Whether to remove the fetched results.',
371
        'max_results': 'Maximum number of results to fetch.',
372
    }
373
374
    def handle_xml(self, xml: Element) -> bytes:
375
        """ Handles <get_scans> command.
376
377
        @return: Response string for <get_scans> command.
378
        """
379
380
        scan_id = xml.get('scan_id')
381
        details = xml.get('details')
382
        pop_res = xml.get('pop_results')
383
        max_res = xml.get('max_results')
384
385
        if details and details == '0':
386
            details = False
387
        else:
388
            details = True
389
            if pop_res and pop_res == '1':
390
                pop_res = True
391
            else:
392
                pop_res = False
393
            if max_res:
394
                max_res = int(max_res)
395
396
        responses = []
397
        if scan_id and scan_id in self._daemon.scan_collection.ids_iterator():
398
            self._daemon.check_scan_process(scan_id)
399
            scan = self._daemon.get_scan_xml(scan_id, details, pop_res, max_res)
400
            responses.append(scan)
401
        elif scan_id:
402
            text = "Failed to find scan '{0}'".format(scan_id)
403
            return simple_response_str('get_scans', 404, text)
404
        else:
405
            for scan_id in self._daemon.scan_collection.ids_iterator():
406
                self._daemon.check_scan_process(scan_id)
407
                scan = self._daemon.get_scan_xml(
408
                    scan_id, details, pop_res, max_res
409
                )
410
                responses.append(scan)
411
412
        return simple_response_str('get_scans', 200, 'OK', responses)
413
414
415
class StartScan(BaseCommand):
416
    name = 'start_scan'
417
    description = 'Start a new scan.'
418
    attributes = {
419
        'target': 'Target host to scan',
420
        'ports': 'Ports list to scan',
421
        'scan_id': 'Optional UUID value to use as scan ID',
422
        'parallel': 'Optional nummer of parallel target to scan',
423
    }
424
425
    def get_elements(self):
426
        elements = {}
427
428
        if self.elements:
429
            elements.update(self.elements)
430
431
        scanner_params = elements.get('scanner_params', {}).copy()
432
        elements['scanner_params'] = scanner_params
433
434
        scanner_params.update(
435
            {
436
                k: v['description']
437
                for k, v in self._daemon.scanner_params.items()
438
            }
439
        )
440
441
        return elements
442
443
    def handle_xml(self, xml: Element) -> bytes:
444
        """ Handles <start_scan> command.
445
446
        @return: Response string for <start_scan> command.
447
        """
448
449
        target_str = xml.get('target')
450
        ports_str = xml.get('ports')
451
452
        # For backward compatibility, if target and ports attributes are set,
453
        # <targets> element is ignored.
454
        if target_str is None or ports_str is None:
455
            target_list = xml.find('targets')
456
            if target_list is None or len(target_list) == 0:
457
                raise OspdCommandError('No targets or ports', 'start_scan')
458
            else:
459
                scan_targets = OspRequest.process_targets_element(target_list)
460
        else:
461
            scan_targets = []
462
            for single_target in target_str_to_list(target_str):
463
                scan_targets.append([single_target, ports_str, '', '', '', ''])
464
465
        scan_id = xml.get('scan_id')
466
        if scan_id is not None and scan_id != '' and not valid_uuid(scan_id):
467
            raise OspdCommandError('Invalid scan_id UUID', 'start_scan')
468
469
        try:
470
            parallel = int(xml.get('parallel', '1'))
471
        except ValueError:
472
            raise OspdCommandError(
473
                'Invalid value for parallel scans. It must be a number',
474
                'start_scan',
475
            )
476
477
        if parallel < 1 or parallel > 20:
478
            parallel = 1
479
480
        scanner_params = xml.find('scanner_params')
481
        if scanner_params is None:
482
            raise OspdCommandError('No scanner_params element', 'start_scan')
483
484
        params = self._daemon.preprocess_scan_params(scanner_params)
485
486
        # VTS is an optional element. If present should not be empty.
487
        vt_selection = {}  # type: Dict
488
        scanner_vts = xml.find('vt_selection')
489
        if scanner_vts is not None:
490
            if len(scanner_vts) == 0:
491
                raise OspdCommandError('VTs list is empty', 'start_scan')
492
            else:
493
                vt_selection = OspRequest.process_vts_params(scanner_vts)
494
495
        # Dry run case.
496
        if 'dry_run' in params and int(params['dry_run']):
497
            scan_func = self._daemon.dry_run_scan
498
            scan_params = None
499
        else:
500
            scan_func = self._daemon.start_scan
501
            scan_params = self._daemon.process_scan_params(params)
502
503
        scan_id_aux = scan_id
504
        scan_id = self._daemon.create_scan(
505
            scan_id, scan_targets, scan_params, vt_selection
506
        )
507
508
        if not scan_id:
509
            id_ = Element('id')
510
            id_.text = scan_id_aux
511
            return simple_response_str('start_scan', 100, 'Continue', id_)
512
513
        scan_process = create_process(
514
            func=scan_func, args=(scan_id, scan_targets, parallel)
515
        )
516
517
        self._daemon.scan_processes[scan_id] = scan_process
518
519
        scan_process.start()
520
521
        id_ = Element('id')
522
        id_.text = scan_id
523
524
        return simple_response_str('start_scan', 200, 'OK', id_)
525