Passed
Pull Request — master (#216)
by Juan José
02:01
created

ospd.ospd.OSPDaemon.get_vt_iterator()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" OSP Daemon core class.
22
"""
23
24
import logging
25
import socket
26
import ssl
27
import multiprocessing
28
import time
29
import os
30
31
from typing import (
32
    List,
33
    Any,
34
    Iterator,
35
    Dict,
36
    Optional,
37
    Iterable,
38
    Tuple,
39
)
40
from xml.etree.ElementTree import Element, SubElement
41
42
import defusedxml.ElementTree as secET
43
44
from deprecated import deprecated
45
46
from ospd import __version__
47
from ospd.command import get_commands
48
from ospd.errors import OspdCommandError
49
from ospd.misc import ResultType
50
from ospd.network import resolve_hostname, target_str_to_list
51
from ospd.protocol import OspRequest, OspResponse
52
from ospd.scan import ScanCollection, ScanStatus
53
from ospd.server import BaseServer
54
from ospd.vtfilter import VtsFilter
55
from ospd.vts import Vts
56
from ospd.xml import (
57
    elements_as_text,
58
    get_result_xml,
59
    get_elements_from_dict,
60
)
61
62
logger = logging.getLogger(__name__)
63
64
PROTOCOL_VERSION = "1.2"
65
66
SCHEDULER_CHECK_PERIOD = 10  # in seconds
67
68
BASE_SCANNER_PARAMS = {
69
    'debug_mode': {
70
        'type': 'boolean',
71
        'name': 'Debug Mode',
72
        'default': 0,
73
        'mandatory': 0,
74
        'description': 'Whether to get extra scan debug information.',
75
    },
76
    'dry_run': {
77
        'type': 'boolean',
78
        'name': 'Dry Run',
79
        'default': 0,
80
        'mandatory': 0,
81
        'description': 'Whether to dry run scan.',
82
    },
83
}  # type: Dict
84
85
86
def _terminate_process_group(process: multiprocessing.Process) -> None:
87
    os.killpg(os.getpgid(process.pid), 15)
88
89
90
class OSPDaemon:
91
92
    """ Daemon class for OSP traffic handling.
93
94
    Every scanner wrapper should subclass it and make necessary additions and
95
    changes.
96
97
    * Add any needed parameters in __init__.
98
    * Implement check() method which verifies scanner availability and other
99
      environment related conditions.
100
    * Implement process_scan_params and exec_scan methods which are
101
      specific to handling the <start_scan> command, executing the wrapped
102
      scanner and storing the results.
103
    * exec_scan() should return 0 if host is dead or not reached, 1 if host is
104
      alive and 2 if scan error or status is unknown.
105
    * Implement other methods that assert to False such as get_scanner_name,
106
      get_scanner_version.
107
    * Use Call set_command_attributes at init time to add scanner command
108
      specific options eg. the w3af profile for w3af wrapper.
109
    """
110
111
    def __init__(
112
        self, *, customvtfilter=None, **kwargs
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
113
    ):  # pylint: disable=unused-argument
114
        """ Initializes the daemon's internal data. """
115
        self.scan_collection = ScanCollection()
116
        self.scan_processes = dict()
117
118
        self.daemon_info = dict()
119
        self.daemon_info['name'] = "OSPd"
120
        self.daemon_info['version'] = __version__
121
        self.daemon_info['description'] = "No description"
122
123
        self.scanner_info = dict()
124
        self.scanner_info['name'] = 'No name'
125
        self.scanner_info['version'] = 'No version'
126
        self.scanner_info['description'] = 'No description'
127
128
        self.server_version = None  # Set by the subclass.
129
130
        self.initialized = None  # Set after initialization finished
131
132
        self.scaninfo_store_time = kwargs.get('scaninfo_store_time')
133
134
        self.protocol_version = PROTOCOL_VERSION
135
136
        self.commands = {}
137
138
        for command_class in get_commands():
139
            command = command_class(self)
140
            self.commands[command.get_name()] = command
141
142
        self.scanner_params = dict()
143
144
        for name, params in BASE_SCANNER_PARAMS.items():
145
            self.set_scanner_param(name, params)
146
147
        self.vts = Vts()
148
        self.vts_version = None
149
150
        if customvtfilter:
151
            self.vts_filter = customvtfilter
152
        else:
153
            self.vts_filter = VtsFilter()
154
155
    def init(self, server: BaseServer) -> None:
156
        """ Should be overridden by a subclass if the initialization is costly.
157
158
            Will be called after check.
159
        """
160
        server.start(self.handle_client_stream)
161
        self.initialized = True
162
163
    def set_command_attributes(self, name: str, attributes: Dict) -> None:
164
        """ Sets the xml attributes of a specified command. """
165
        if self.command_exists(name):
166
            command = self.commands.get(name)
167
            command.attributes = attributes
168
169
    @deprecated(version="20.4", reason="Use set_scanner_param instead")
170
    def add_scanner_param(self, name: str, scanner_params: Dict) -> None:
171
        """ Set a scanner parameter. """
172
        self.set_scanner_param(name, scanner_params)
173
174
    def set_scanner_param(self, name: str, scanner_params: Dict) -> None:
175
        """ Set a scanner parameter. """
176
177
        assert name
178
        assert scanner_params
179
180
        self.scanner_params[name] = scanner_params
181
182
    def get_scanner_params(self) -> Dict:
183
        return self.scanner_params
184
185
    def add_vt(
186
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
187
        vt_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
188
        name: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
189
        vt_params: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
190
        vt_refs: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
191
        custom: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
192
        vt_creation_time: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
193
        vt_modification_time: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
194
        vt_dependencies: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
195
        summary: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
196
        impact: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
197
        affected: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
198
        insight: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
199
        solution: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
200
        solution_t: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
201
        solution_m: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
202
        detection: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
203
        qod_t: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
204
        qod_v: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
205
        severities: str = None,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
206
    ) -> None:
207
        """ Add a vulnerability test information.
208
209
        IMPORTANT: The VT's Data Manager will store the vts collection.
210
        If the collection is considerably big and it will be consultated
211
        intensible during a routine, consider to do a deepcopy(), since
212
        accessing the shared memory in the data manager is very expensive.
213
        At the end of the routine, the temporal copy must be set to None
214
        and deleted.
215
        """
216
        self.vts.add(
217
            vt_id,
218
            name=name,
219
            vt_params=vt_params,
220
            vt_refs=vt_refs,
221
            custom=custom,
222
            vt_creation_time=vt_creation_time,
223
            vt_modification_time=vt_modification_time,
224
            vt_dependencies=vt_dependencies,
225
            summary=summary,
226
            impact=impact,
227
            affected=affected,
228
            insight=insight,
229
            solution=solution,
230
            solution_t=solution_t,
231
            solution_m=solution_m,
232
            detection=detection,
233
            qod_t=qod_t,
234
            qod_v=qod_v,
235
            severities=severities,
236
        )
237
238
    def set_vts_version(self, vts_version: str) -> None:
239
        """ Add into the vts dictionary an entry to identify the
240
        vts version.
241
242
        Parameters:
243
            vts_version (str): Identifies a unique vts version.
244
        """
245
        if not vts_version:
246
            raise OspdCommandError(
247
                'A vts_version parameter is required', 'set_vts_version'
248
            )
249
        self.vts_version = vts_version
250
251
    def get_vts_version(self) -> Optional[str]:
252
        """Return the vts version.
253
        """
254
        return self.vts_version
255
256
    def command_exists(self, name: str) -> bool:
257
        """ Checks if a commands exists. """
258
        return name in self.commands
259
260
    def get_scanner_name(self) -> str:
261
        """ Gives the wrapped scanner's name. """
262
        return self.scanner_info['name']
263
264
    def get_scanner_version(self) -> str:
265
        """ Gives the wrapped scanner's version. """
266
        return self.scanner_info['version']
267
268
    def get_scanner_description(self) -> str:
269
        """ Gives the wrapped scanner's description. """
270
        return self.scanner_info['description']
271
272
    def get_server_version(self) -> str:
273
        """ Gives the specific OSP server's version. """
274
        assert self.server_version
275
        return self.server_version
276
277
    def get_protocol_version(self) -> str:
278
        """ Gives the OSP's version. """
279
        return self.protocol_version
280
281
    def preprocess_scan_params(self, xml_params):
282
        """ Processes the scan parameters. """
283
        params = {}
284
285
        for param in xml_params:
286
            params[param.tag] = param.text or ''
287
288
        # Set default values.
289
        for key in self.scanner_params:
290
            if key not in params:
291
                params[key] = self.get_scanner_param_default(key)
292
                if self.get_scanner_param_type(key) == 'selection':
293
                    params[key] = params[key].split('|')[0]
294
295
        # Validate values.
296
        for key in params:
297
            param_type = self.get_scanner_param_type(key)
298
            if not param_type:
299
                continue
300
301
            if param_type in ['integer', 'boolean']:
302
                try:
303
                    params[key] = int(params[key])
304
                except ValueError:
305
                    raise OspdCommandError(
306
                        'Invalid %s value' % key, 'start_scan'
307
                    )
308
309
            if param_type == 'boolean':
310
                if params[key] not in [0, 1]:
311
                    raise OspdCommandError(
312
                        'Invalid %s value' % key, 'start_scan'
313
                    )
314
            elif param_type == 'selection':
315
                selection = self.get_scanner_param_default(key).split('|')
316
                if params[key] not in selection:
317
                    raise OspdCommandError(
318
                        'Invalid %s value' % key, 'start_scan'
319
                    )
320
            if self.get_scanner_param_mandatory(key) and params[key] == '':
321
                raise OspdCommandError(
322
                    'Mandatory %s value is missing' % key, 'start_scan'
323
                )
324
325
        return params
326
327
    def process_scan_params(self, params: Dict) -> Dict:
328
        """ This method is to be overridden by the child classes if necessary
329
        """
330
        return params
331
332
    @staticmethod
333
    @deprecated(
334
        version="20.4",
335
        reason="Please use OspRequest.process_vt_params instead.",
336
    )
337
    def process_vts_params(scanner_vts) -> Dict:
338
        return OspRequest.process_vts_params(scanner_vts)
339
340
    @staticmethod
341
    @deprecated(
342
        version="20.4",
343
        reason="Please use OspRequest.process_credential_elements instead.",
344
    )
345
    def process_credentials_elements(cred_tree) -> Dict:
346
        return OspRequest.process_credentials_elements(cred_tree)
347
348
    @staticmethod
349
    @deprecated(
350
        version="20.4",
351
        reason="Please use OspRequest.process_targets_elements instead.",
352
    )
353
    def process_targets_element(scanner_target) -> List:
354
        return OspRequest.process_targets_element(scanner_target)
355
356
    def stop_scan(self, scan_id: str) -> None:
357
        scan_process = self.scan_processes.get(scan_id)
358
        if not scan_process:
359
            raise OspdCommandError(
360
                'Scan not found {0}.'.format(scan_id), 'stop_scan'
361
            )
362
        if not scan_process.is_alive():
363
            raise OspdCommandError(
364
                'Scan already stopped or finished.', 'stop_scan'
365
            )
366
367
        self.set_scan_status(scan_id, ScanStatus.STOPPED)
368
369
        logger.info('%s: Scan stopping %s.', scan_id, scan_process.ident)
370
371
        self.stop_scan_cleanup(scan_id)
372
373
        try:
374
            scan_process.terminate()
375
        except AttributeError:
376
            logger.debug('%s: The scanner task stopped unexpectedly.', scan_id)
377
378
        try:
379
            _terminate_process_group(scan_process)
380
        except ProcessLookupError as e:
381
            logger.info(
382
                '%s: Scan already stopped %s.', scan_id, scan_process.pid
383
            )
384
385
        if scan_process.ident != os.getpid():
386
            scan_process.join(0)
387
388
        logger.info('%s: Scan stopped.', scan_id)
389
390
    @staticmethod
391
    def stop_scan_cleanup(scan_id: str):
392
        """ Should be implemented by subclass in case of a clean up before
393
        terminating is needed. """
394
395
    @staticmethod
396
    def target_is_finished(scan_id: str):
397
        """ Should be implemented by subclass in case of a check before
398
        stopping is needed. """
399
400
    def exec_scan(self, scan_id: str):
401
        """ Asserts to False. Should be implemented by subclass. """
402
        raise NotImplementedError
403
404
    def finish_scan(self, scan_id: str) -> None:
405
        """ Sets a scan as finished. """
406
        self.set_scan_progress(scan_id, 100)
407
        self.set_scan_status(scan_id, ScanStatus.FINISHED)
408
        logger.info("%s: Scan finished.", scan_id)
409
410
    def get_daemon_name(self) -> str:
411
        """ Gives osp daemon's name. """
412
        return self.daemon_info['name']
413
414
    def get_daemon_version(self) -> str:
415
        """ Gives osp daemon's version. """
416
        return self.daemon_info['version']
417
418
    def get_scanner_param_type(self, param: str):
419
        """ Returns type of a scanner parameter. """
420
        assert isinstance(param, str)
421
        entry = self.scanner_params.get(param)
422
        if not entry:
423
            return None
424
        return entry.get('type')
425
426
    def get_scanner_param_mandatory(self, param: str):
427
        """ Returns if a scanner parameter is mandatory. """
428
        assert isinstance(param, str)
429
        entry = self.scanner_params.get(param)
430
        if not entry:
431
            return False
432
        return entry.get('mandatory')
433
434
    def get_scanner_param_default(self, param: str):
435
        """ Returns default value of a scanner parameter. """
436
        assert isinstance(param, str)
437
        entry = self.scanner_params.get(param)
438
        if not entry:
439
            return None
440
        return entry.get('default')
441
442
    @deprecated(
443
        version="20.4",
444
        reason="Please use OspResponse.create_scanner_params_xml instead.",
445
    )
446
    def get_scanner_params_xml(self):
447
        """ Returns the OSP Daemon's scanner params in xml format. """
448
        return OspResponse.create_scanner_params_xml(self.scanner_params)
449
450
    def handle_client_stream(self, stream) -> None:
451
        """ Handles stream of data received from client. """
452
453
        data = b''
454
455
        while True:
456
            try:
457
                buf = stream.read()
458
                if not buf:
459
                    break
460
461
                data += buf
462
            except (AttributeError, ValueError) as message:
463
                logger.error(message)
464
                return
465
            except (ssl.SSLError) as exception:
466
                logger.debug('Error: %s', exception)
467
                break
468
            except (socket.timeout) as exception:
469
                break
470
471
        if len(data) <= 0:
472
            logger.debug("Empty client stream")
473
            return
474
475
        if not self.initialized:
476
            exception = OspdCommandError(
477
                '%s is still starting' % self.daemon_info['name'], 'error'
478
            )
479
            response = exception.as_xml()
480
            stream.write(response)
481
            stream.close()
482
            return
483
484
        response = None
485
        try:
486
            self.handle_command(data, stream)
487
        except OspdCommandError as exception:
488
            response = exception.as_xml()
489
            logger.debug('Command error: %s', exception.message)
490
        except Exception:  # pylint: disable=broad-except
491
            logger.exception('While handling client command:')
492
            exception = OspdCommandError('Fatal error', 'error')
493
            response = exception.as_xml()
494
495
        if response:
496
            stream.write(response)
497
        stream.close()
498
499
    def calculate_progress(self, scan_id: str) -> float:
500
        """ Calculate the total scan progress. """
501
502
        return self.scan_collection.calculate_target_progress(scan_id)
503
504
    def process_exclude_hosts(self, scan_id: str, exclude_hosts: str) -> None:
505
        """ Process the exclude hosts before launching the scans."""
506
507
        exc_hosts_list = ''
508
        if not exclude_hosts:
509
            return
510
        exc_hosts_list = target_str_to_list(exclude_hosts)
511
        self.remove_scan_hosts_from_target_progress(scan_id, exc_hosts_list)
512
513
    def process_finished_hosts(self, scan_id: str, finished_hosts: str) -> None:
514
        """ Process the finished hosts before launching the scans.
515
        Set finished hosts as finished with 100% to calculate
516
        the scan progress."""
517
518
        exc_hosts_list = ''
519
        if not finished_hosts:
520
            return
521
522
        exc_hosts_list = target_str_to_list(finished_hosts)
523
524
        for host in exc_hosts_list:
525
            self.set_scan_host_finished(scan_id, host)
526
            self.set_scan_host_progress(scan_id, host, 100)
527
528
    def start_scan(self, scan_id: str, target: Dict) -> None:
529
        """ Starts the scan with scan_id. """
530
        os.setsid()
531
532
        if target is None or not target:
533
            raise OspdCommandError('Erroneous target', 'start_scan')
534
535
        logger.info("%s: Scan started.", scan_id)
536
537
        self.process_exclude_hosts(scan_id, target.get('exclude_hosts'))
538
        self.process_finished_hosts(scan_id, target.get('finished_hosts'))
539
540
        try:
541
            self.set_scan_status(scan_id, ScanStatus.RUNNING)
542
            ret = self.exec_scan(scan_id)
543
        except Exception as e:  # pylint: disable=broad-except
544
            self.add_scan_error(
545
                scan_id,
546
                name='',
547
                host=self.get_scan_host(scan_id),
548
                value='Host process failure (%s).' % e,
549
            )
550
            logger.exception('While scanning: %s', scan_id)
551
        else:
552
            logger.info("%s: Host scan finished.", scan_id)
553
554
        if self.get_scan_status(scan_id) != ScanStatus.STOPPED:
555
            self.finish_scan(scan_id)
556
557
    def dry_run_scan(self, scan_id: str, target: Dict) -> None:
558
        """ Dry runs a scan. """
559
560
        os.setsid()
561
562
        host = resolve_hostname(target[0])
563
        if host is None:
564
            logger.info("Couldn't resolve %s.", self.get_scan_host(scan_id))
565
566
        port = self.get_scan_ports(scan_id)
567
568
        logger.info("%s:%s: Dry run mode.", host, port)
569
570
        self.add_scan_log(scan_id, name='', host=host, value='Dry run result')
571
572
        self.finish_scan(scan_id)
573
574
    def handle_timeout(self, scan_id: str, host: str) -> None:
575
        """ Handles scanner reaching timeout error. """
576
        self.add_scan_error(
577
            scan_id,
578
            host=host,
579
            name="Timeout",
580
            value="{0} exec timeout.".format(self.get_scanner_name()),
581
        )
582
583
    def remove_scan_hosts_from_target_progress(
584
        self, scan_id: str, exc_hosts_list: List
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
585
    ) -> None:
586
        """ Remove a list of hosts from the main scan progress table."""
587
        self.scan_collection.remove_hosts_from_target_progress(
588
            scan_id, exc_hosts_list
589
        )
590
591
    def set_scan_host_finished(self, scan_id: str, host: str) -> None:
592
        """ Add the host in a list of finished hosts """
593
        self.scan_collection.set_host_finished(scan_id, host)
594
595
    def set_scan_progress(self, scan_id: str, progress: int) -> None:
596
        """ Sets scan_id scan's progress which is a number
597
        between 0 and 100. """
598
        self.scan_collection.set_progress(scan_id, progress)
599
600
    def set_scan_host_progress(
601
        self, scan_id: str, host: str, progress: int
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
602
    ) -> None:
603
        """ Sets host's progress which is part of target.
604
        Each time a host progress is updated, the scan progress
605
        is updated too.
606
        """
607
        self.scan_collection.set_host_progress(scan_id, host, progress)
608
609
        scan_progress = self.calculate_progress(scan_id)
610
        self.set_scan_progress(scan_id, scan_progress)
611
612
    def set_scan_status(self, scan_id: str, status: ScanStatus) -> None:
613
        """ Set the scan's status."""
614
        self.scan_collection.set_status(scan_id, status)
615
616
    def get_scan_status(self, scan_id: str) -> ScanStatus:
617
        """ Get scan_id scans's status."""
618
        return self.scan_collection.get_status(scan_id)
619
620
    def scan_exists(self, scan_id: str) -> bool:
621
        """ Checks if a scan with ID scan_id is in collection.
622
623
        @return: 1 if scan exists, 0 otherwise.
624
        """
625
        return self.scan_collection.id_exists(scan_id)
626
627
    def get_help_text(self) -> str:
628
        """ Returns the help output in plain text format."""
629
630
        txt = ''
631
        for name, info in self.commands.items():
632
            description = info.get_description()
633
            attributes = info.get_attributes()
634
            elements = info.get_elements()
635
636
            command_txt = "\t{0: <22} {1}\n".format(name, description)
637
638
            if attributes:
639
                command_txt = ''.join([command_txt, "\t Attributes:\n"])
640
641
                for attrname, attrdesc in attributes.items():
642
                    attr_txt = "\t  {0: <22} {1}\n".format(attrname, attrdesc)
643
                    command_txt = ''.join([command_txt, attr_txt])
644
645
            if elements:
646
                command_txt = ''.join(
647
                    [command_txt, "\t Elements:\n", elements_as_text(elements),]
648
                )
649
650
            txt += command_txt
651
652
        return txt
653
654
    @deprecated(version="20.4", reason="Use ospd.xml.elements_as_text instead.")
655
    def elements_as_text(self, elems: Dict, indent: int = 2) -> str:
656
        """ Returns the elems dictionary as formatted plain text. """
657
        return elements_as_text(elems, indent)
658
659
    def delete_scan(self, scan_id: str) -> int:
660
        """ Deletes scan_id scan from collection.
661
662
        @return: 1 if scan deleted, 0 otherwise.
663
        """
664
        if self.get_scan_status(scan_id) == ScanStatus.RUNNING:
665
            return 0
666
667
        # Don't delete the scan until the process stops
668
        exitcode = None
669
        try:
670
            self.scan_processes[scan_id].join()
671
            exitcode = self.scan_processes[scan_id].exitcode
672
        except KeyError:
673
            logger.debug('Scan process for %s not found', scan_id)
674
675
        if exitcode or exitcode == 0:
676
            del self.scan_processes[scan_id]
677
678
        return self.scan_collection.delete_scan(scan_id)
679
680
    def get_scan_results_xml(
681
        self, scan_id: str, pop_res: bool, max_res: Optional[int]
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
682
    ):
683
        """ Gets scan_id scan's results in XML format.
684
685
        @return: String of scan results in xml.
686
        """
687
        results = Element('results')
688
        for result in self.scan_collection.results_iterator(
689
            scan_id, pop_res, max_res
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
690
        ):
691
            results.append(get_result_xml(result))
692
693
        logger.debug('Returning %d results', len(results))
694
        return results
695
696
    @deprecated(
697
        version="20.4",
698
        reason="Please use ospd.xml.get_elements_from_dict instead.",
699
    )
700
    def get_xml_str(self, data: Dict) -> List:
701
        """ Creates a string in XML Format using the provided data structure.
702
703
        @param: Dictionary of xml tags and their elements.
704
705
        @return: String of data in xml format.
706
        """
707
        return get_elements_from_dict(data)
708
709
    def get_scan_xml(
710
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
711
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
712
        detailed: bool = True,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
713
        pop_res: bool = False,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
714
        max_res: int = 0,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
715
    ):
716
        """ Gets scan in XML format.
717
718
        @return: String of scan in XML format.
719
        """
720
        if not scan_id:
721
            return Element('scan')
722
723
        target = self.get_scan_host(scan_id)
724
        progress = self.get_scan_progress(scan_id)
725
        status = self.get_scan_status(scan_id)
726
        start_time = self.get_scan_start_time(scan_id)
727
        end_time = self.get_scan_end_time(scan_id)
728
        response = Element('scan')
729
        for name, value in [
730
            ('id', scan_id),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
731
            ('target', target),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
732
            ('progress', progress),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
733
            ('status', status.name.lower()),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
734
            ('start_time', start_time),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
735
            ('end_time', end_time),
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
736
        ]:
737
            response.set(name, str(value))
738
        if detailed:
739
            response.append(
740
                self.get_scan_results_xml(scan_id, pop_res, max_res)
741
            )
742
        return response
743
744
    @staticmethod
745
    def get_custom_vt_as_xml_str(  # pylint: disable=unused-argument
746
        vt_id: str, custom: Dict
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
747
    ) -> str:
748
        """ Create a string representation of the XML object from the
749
        custom data object.
750
        This needs to be implemented by each ospd wrapper, in case
751
        custom elements for VTs are used.
752
753
        The custom XML object which is returned will be embedded
754
        into a <custom></custom> element.
755
756
        @return: XML object as string for custom data.
757
        """
758
        return ''
759
760
    @staticmethod
761
    def get_params_vt_as_xml_str(  # pylint: disable=unused-argument
762
        vt_id: str, vt_params
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
763
    ) -> str:
764
        """ Create a string representation of the XML object from the
765
        vt_params data object.
766
        This needs to be implemented by each ospd wrapper, in case
767
        vt_params elements for VTs are used.
768
769
        The params XML object which is returned will be embedded
770
        into a <params></params> element.
771
772
        @return: XML object as string for vt parameters data.
773
        """
774
        return ''
775
776
    @staticmethod
777
    def get_refs_vt_as_xml_str(  # pylint: disable=unused-argument
778
        vt_id: str, vt_refs
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
779
    ) -> str:
780
        """ Create a string representation of the XML object from the
781
        refs data object.
782
        This needs to be implemented by each ospd wrapper, in case
783
        refs elements for VTs are used.
784
785
        The refs XML object which is returned will be embedded
786
        into a <refs></refs> element.
787
788
        @return: XML object as string for vt references data.
789
        """
790
        return ''
791
792
    @staticmethod
793
    def get_dependencies_vt_as_xml_str(  # pylint: disable=unused-argument
794
        vt_id: str, vt_dependencies
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
795
    ) -> str:
796
        """ Create a string representation of the XML object from the
797
        vt_dependencies data object.
798
        This needs to be implemented by each ospd wrapper, in case
799
        vt_dependencies elements for VTs are used.
800
801
        The vt_dependencies XML object which is returned will be embedded
802
        into a <dependencies></dependencies> element.
803
804
        @return: XML object as string for vt dependencies data.
805
        """
806
        return ''
807
808
    @staticmethod
809
    def get_creation_time_vt_as_xml_str(  # pylint: disable=unused-argument
810
        vt_id: str, vt_creation_time
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
811
    ) -> str:
812
        """ Create a string representation of the XML object from the
813
        vt_creation_time data object.
814
        This needs to be implemented by each ospd wrapper, in case
815
        vt_creation_time elements for VTs are used.
816
817
        The vt_creation_time XML object which is returned will be embedded
818
        into a <vt_creation_time></vt_creation_time> element.
819
820
        @return: XML object as string for vt creation time data.
821
        """
822
        return ''
823
824
    @staticmethod
825
    def get_modification_time_vt_as_xml_str(  # pylint: disable=unused-argument
826
        vt_id: str, vt_modification_time
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
827
    ) -> str:
828
        """ Create a string representation of the XML object from the
829
        vt_modification_time data object.
830
        This needs to be implemented by each ospd wrapper, in case
831
        vt_modification_time elements for VTs are used.
832
833
        The vt_modification_time XML object which is returned will be embedded
834
        into a <vt_modification_time></vt_modification_time> element.
835
836
        @return: XML object as string for vt references data.
837
        """
838
        return ''
839
840
    @staticmethod
841
    def get_summary_vt_as_xml_str(  # pylint: disable=unused-argument
842
        vt_id: str, summary
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
843
    ) -> str:
844
        """ Create a string representation of the XML object from the
845
        summary data object.
846
        This needs to be implemented by each ospd wrapper, in case
847
        summary elements for VTs are used.
848
849
        The summary XML object which is returned will be embedded
850
        into a <summary></summary> element.
851
852
        @return: XML object as string for summary data.
853
        """
854
        return ''
855
856
    @staticmethod
857
    def get_impact_vt_as_xml_str(  # pylint: disable=unused-argument
858
        vt_id: str, impact
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
859
    ) -> str:
860
        """ Create a string representation of the XML object from the
861
        impact data object.
862
        This needs to be implemented by each ospd wrapper, in case
863
        impact elements for VTs are used.
864
865
        The impact XML object which is returned will be embedded
866
        into a <impact></impact> element.
867
868
        @return: XML object as string for impact data.
869
        """
870
        return ''
871
872
    @staticmethod
873
    def get_affected_vt_as_xml_str(  # pylint: disable=unused-argument
874
        vt_id: str, affected
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
875
    ) -> str:
876
        """ Create a string representation of the XML object from the
877
        affected data object.
878
        This needs to be implemented by each ospd wrapper, in case
879
        affected elements for VTs are used.
880
881
        The affected XML object which is returned will be embedded
882
        into a <affected></affected> element.
883
884
        @return: XML object as string for affected data.
885
        """
886
        return ''
887
888
    @staticmethod
889
    def get_insight_vt_as_xml_str(  # pylint: disable=unused-argument
890
        vt_id: str, insight
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
891
    ) -> str:
892
        """ Create a string representation of the XML object from the
893
        insight data object.
894
        This needs to be implemented by each ospd wrapper, in case
895
        insight elements for VTs are used.
896
897
        The insight XML object which is returned will be embedded
898
        into a <insight></insight> element.
899
900
        @return: XML object as string for insight data.
901
        """
902
        return ''
903
904
    @staticmethod
905
    def get_solution_vt_as_xml_str(  # pylint: disable=unused-argument
906
        vt_id: str, solution, solution_type=None, solution_method=None
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
907
    ) -> str:
908
        """ Create a string representation of the XML object from the
909
        solution data object.
910
        This needs to be implemented by each ospd wrapper, in case
911
        solution elements for VTs are used.
912
913
        The solution XML object which is returned will be embedded
914
        into a <solution></solution> element.
915
916
        @return: XML object as string for solution data.
917
        """
918
        return ''
919
920
    @staticmethod
921
    def get_detection_vt_as_xml_str(  # pylint: disable=unused-argument
922
        vt_id: str, detection=None, qod_type=None, qod=None
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
923
    ) -> str:
924
        """ Create a string representation of the XML object from the
925
        detection data object.
926
        This needs to be implemented by each ospd wrapper, in case
927
        detection elements for VTs are used.
928
929
        The detection XML object which is returned is an element with
930
        tag <detection></detection> element
931
932
        @return: XML object as string for detection data.
933
        """
934
        return ''
935
936
    @staticmethod
937
    def get_severities_vt_as_xml_str(  # pylint: disable=unused-argument
938
        vt_id: str, severities
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
939
    ) -> str:
940
        """ Create a string representation of the XML object from the
941
        severities data object.
942
        This needs to be implemented by each ospd wrapper, in case
943
        severities elements for VTs are used.
944
945
        The severities XML objects which are returned will be embedded
946
        into a <severities></severities> element.
947
948
        @return: XML object as string for severities data.
949
        """
950
        return ''
951
952
    def get_vt_iterator(self) -> Iterator[Tuple[str, Dict]]:
953
        """ Return iterator object for getting elements
954
        from the VTs dictionary. """
955
        return self.vts.items()
956
957
    def get_vt_xml(self, single_vt: Tuple[str, Dict]) -> Element:
958
        """ Gets a single vulnerability test information in XML format.
959
960
        @return: String of single vulnerability test information in XML format.
961
        """
962
        if not single_vt:
963
            return Element('vt')
964
965
        vt_id, vt = single_vt
966
967
        name = vt.get('name')
968
        vt_xml = Element('vt')
969
        vt_xml.set('id', vt_id)
970
971
        for name, value in [('name', name)]:
972
            elem = SubElement(vt_xml, name)
973
            elem.text = str(value)
974
975
        if vt.get('vt_params'):
976
            params_xml_str = self.get_params_vt_as_xml_str(
977
                vt_id, vt.get('vt_params')
978
            )
979
            vt_xml.append(secET.fromstring(params_xml_str))
980
981
        if vt.get('vt_refs'):
982
            refs_xml_str = self.get_refs_vt_as_xml_str(vt_id, vt.get('vt_refs'))
983
            vt_xml.append(secET.fromstring(refs_xml_str))
984
985
        if vt.get('vt_dependencies'):
986
            dependencies = self.get_dependencies_vt_as_xml_str(
987
                vt_id, vt.get('vt_dependencies')
988
            )
989
            vt_xml.append(secET.fromstring(dependencies))
990
991
        if vt.get('creation_time'):
992
            vt_ctime = self.get_creation_time_vt_as_xml_str(
993
                vt_id, vt.get('creation_time')
994
            )
995
            vt_xml.append(secET.fromstring(vt_ctime))
996
997
        if vt.get('modification_time'):
998
            vt_mtime = self.get_modification_time_vt_as_xml_str(
999
                vt_id, vt.get('modification_time')
1000
            )
1001
            vt_xml.append(secET.fromstring(vt_mtime))
1002
1003
        if vt.get('summary'):
1004
            summary_xml_str = self.get_summary_vt_as_xml_str(
1005
                vt_id, vt.get('summary')
1006
            )
1007
            vt_xml.append(secET.fromstring(summary_xml_str))
1008
1009
        if vt.get('impact'):
1010
            impact_xml_str = self.get_impact_vt_as_xml_str(
1011
                vt_id, vt.get('impact')
1012
            )
1013
            vt_xml.append(secET.fromstring(impact_xml_str))
1014
1015
        if vt.get('affected'):
1016
            affected_xml_str = self.get_affected_vt_as_xml_str(
1017
                vt_id, vt.get('affected')
1018
            )
1019
            vt_xml.append(secET.fromstring(affected_xml_str))
1020
1021
        if vt.get('insight'):
1022
            insight_xml_str = self.get_insight_vt_as_xml_str(
1023
                vt_id, vt.get('insight')
1024
            )
1025
            vt_xml.append(secET.fromstring(insight_xml_str))
1026
1027
        if vt.get('solution'):
1028
            solution_xml_str = self.get_solution_vt_as_xml_str(
1029
                vt_id,
1030
                vt.get('solution'),
1031
                vt.get('solution_type'),
1032
                vt.get('solution_method'),
1033
            )
1034
            vt_xml.append(secET.fromstring(solution_xml_str))
1035
1036
        if vt.get('detection') or vt.get('qod_type') or vt.get('qod'):
1037
            detection_xml_str = self.get_detection_vt_as_xml_str(
1038
                vt_id, vt.get('detection'), vt.get('qod_type'), vt.get('qod')
1039
            )
1040
            vt_xml.append(secET.fromstring(detection_xml_str))
1041
1042
        if vt.get('severities'):
1043
            severities_xml_str = self.get_severities_vt_as_xml_str(
1044
                vt_id, vt.get('severities')
1045
            )
1046
            vt_xml.append(secET.fromstring(severities_xml_str))
1047
1048
        if vt.get('custom'):
1049
            custom_xml_str = self.get_custom_vt_as_xml_str(
1050
                vt_id, vt.get('custom')
1051
            )
1052
            vt_xml.append(secET.fromstring(custom_xml_str))
1053
1054
        return vt_xml
1055
1056
    def get_vts_selection_list(
1057
        self, vt_id: str = None, filtered_vts: Dict = None
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1058
    ) -> Iterable[str]:
1059
        """
1060
        Get list of VT's OID.
1061
        If vt_id is specified, the collection will contain only this vt, if
1062
        found.
1063
        If no vt_id is specified or filtered_vts is None (default), the
1064
        collection will contain all vts. Otherwise those vts passed
1065
        in filtered_vts or vt_id are returned. In case of both vt_id and
1066
        filtered_vts are given, filtered_vts has priority.
1067
1068
        Arguments:
1069
            vt_id (vt_id, optional): ID of the vt to get.
1070
            filtered_vts (list, optional): Filtered VTs collection.
1071
1072
        Return:
1073
            List of selected VT's OID.
1074
        """
1075
        vts_xml = []
1076
        if not self.vts:
1077
            return vts_xml
1078
1079
        # No match for the filter
1080
        if filtered_vts is not None and len(filtered_vts) == 0:
1081
            return vts_xml
1082
1083
        if filtered_vts:
1084
            vts_list = filtered_vts
1085
        elif vt_id:
1086
            vts_list = [vt_id]
1087
        else:
1088
            vts_list = self.vts.keys()
1089
1090
        return vts_list
1091
1092
    def handle_command(self, command: str, stream) -> str:
1093
        """ Handles an osp command in a string.
1094
1095
        @return: OSP Response to command.
1096
        """
1097
        try:
1098
            tree = secET.fromstring(command)
1099
        except secET.ParseError:
1100
            logger.debug("Erroneous client input: %s", command)
1101
            raise OspdCommandError('Invalid data')
1102
1103
        command = self.commands.get(tree.tag, None)
1104
        if not command and tree.tag != "authenticate":
1105
            raise OspdCommandError('Bogus command name')
1106
1107
        response = command.handle_xml(tree)
1108
1109
        if isinstance(response, bytes):
1110
            stream.write(response)
1111
        else:
1112
            for data in response:
1113
                stream.write(data)
1114
1115
    def check(self):
1116
        """ Asserts to False. Should be implemented by subclass. """
1117
        raise NotImplementedError
1118
1119
    def run(self) -> None:
1120
        """ Starts the Daemon, handling commands until interrupted.
1121
        """
1122
1123
        try:
1124
            while True:
1125
                time.sleep(SCHEDULER_CHECK_PERIOD)
1126
                self.scheduler()
1127
                self.clean_forgotten_scans()
1128
                self.wait_for_children()
1129
        except KeyboardInterrupt:
1130
            logger.info("Received Ctrl-C shutting-down ...")
1131
1132
    def scheduler(self):
1133
        """ Should be implemented by subclass in case of need
1134
        to run tasks periodically. """
1135
1136
    def wait_for_children(self):
1137
        """ Join the zombie process to releases resources."""
1138
        for scan_id in self.scan_processes:
1139
            self.scan_processes[scan_id].join(0)
1140
1141
    def create_scan(
1142
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1143
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1144
        targets: Dict,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1145
        options: Optional[Dict],
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1146
        vt_selection: Dict,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1147
    ) -> Optional[str]:
1148
        """ Creates a new scan.
1149
1150
        @target: Target to scan.
1151
        @options: Miscellaneous scan options.
1152
1153
        @return: New scan's ID. None if the scan_id already exists and the
1154
                 scan status is RUNNING or FINISHED.
1155
        """
1156
        status = None
1157
        scan_exists = self.scan_exists(scan_id)
1158
        if scan_id and scan_exists:
1159
            status = self.get_scan_status(scan_id)
1160
1161
        if scan_exists and status == ScanStatus.STOPPED:
1162
            logger.info("Scan %s exists. Resuming scan.", scan_id)
1163
        elif scan_exists and (
1164
            status == ScanStatus.RUNNING or status == ScanStatus.FINISHED
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1165
        ):
1166
            logger.info(
1167
                "Scan %s exists with status %s.", scan_id, status.name.lower()
1168
            )
1169
            return
1170
        return self.scan_collection.create_scan(
1171
            scan_id, targets, options, vt_selection
1172
        )
1173
1174
    def get_scan_options(self, scan_id: str) -> str:
1175
        """ Gives a scan's list of options. """
1176
        return self.scan_collection.get_options(scan_id)
1177
1178
    def set_scan_option(self, scan_id: str, name: str, value: Any) -> None:
1179
        """ Sets a scan's option to a provided value. """
1180
        return self.scan_collection.set_option(scan_id, name, value)
1181
1182
    def clean_forgotten_scans(self) -> None:
1183
        """ Check for old stopped or finished scans which have not been
1184
        deleted and delete them if the are older than the set value."""
1185
1186
        if not self.scaninfo_store_time:
1187
            return
1188
1189
        for scan_id in list(self.scan_collection.ids_iterator()):
1190
            end_time = int(self.get_scan_end_time(scan_id))
1191
            scan_status = self.get_scan_status(scan_id)
1192
1193
            if (
1194
                scan_status == ScanStatus.STOPPED
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1195
                or scan_status == ScanStatus.FINISHED
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1196
            ) and end_time:
1197
                stored_time = int(time.time()) - end_time
1198
                if stored_time > self.scaninfo_store_time * 3600:
1199
                    logger.debug(
1200
                        'Scan %s is older than %d hours and seems have been '
1201
                        'forgotten. Scan info will be deleted from the '
1202
                        'scan table',
1203
                        scan_id,
1204
                        self.scaninfo_store_time,
1205
                    )
1206
                    self.delete_scan(scan_id)
1207
1208
    def check_scan_process(self, scan_id: str) -> None:
1209
        """ Check the scan's process, and terminate the scan if not alive. """
1210
        scan_process = self.scan_processes[scan_id]
1211
        progress = self.get_scan_progress(scan_id)
1212
1213
        if progress < 100 and not scan_process.is_alive():
1214
            if not self.get_scan_status(scan_id) == ScanStatus.STOPPED:
1215
                self.set_scan_status(scan_id, ScanStatus.STOPPED)
1216
                self.add_scan_error(
1217
                    scan_id, name="", host="", value="Scan process failure."
1218
                )
1219
1220
                logger.info("%s: Scan stopped with errors.", scan_id)
1221
1222
        elif progress == 100:
1223
            scan_process.join(0)
1224
1225
    def get_scan_progress(self, scan_id: str):
1226
        """ Gives a scan's current progress value. """
1227
        return self.scan_collection.get_progress(scan_id)
1228
1229
    def get_scan_host(self, scan_id: str) -> str:
1230
        """ Gives a scan's target. """
1231
        return self.scan_collection.get_host_list(scan_id)
1232
1233
    def get_scan_ports(self, scan_id: str) -> str:
1234
        """ Gives a scan's ports list. """
1235
        return self.scan_collection.get_ports(scan_id)
1236
1237
    def get_scan_exclude_hosts(self, scan_id: str):
1238
        """ Gives a scan's exclude host list. If a target is passed gives
1239
        the exclude host list for the given target. """
1240
        return self.scan_collection.get_exclude_hosts(scan_id)
1241
1242
    def get_scan_credentials(self, scan_id: str) -> Dict:
1243
        """ Gives a scan's credential list. If a target is passed gives
1244
        the credential list for the given target. """
1245
        return self.scan_collection.get_credentials(scan_id)
1246
1247
    def get_scan_target_options(self, scan_id: str) -> Dict:
1248
        """ Gives a scan's target option dict. If a target is passed gives
1249
        the credential list for the given target. """
1250
        return self.scan_collection.get_target_options(scan_id)
1251
1252
    def get_scan_vts(self, scan_id: str) -> Dict:
1253
        """ Gives a scan's vts. """
1254
        return self.scan_collection.get_vts(scan_id)
1255
1256
    def get_scan_unfinished_hosts(self, scan_id: str) -> List:
1257
        """ Get a list of unfinished hosts."""
1258
        return self.scan_collection.get_hosts_unfinished(scan_id)
1259
1260
    def get_scan_finished_hosts(self, scan_id: str) -> List:
1261
        """ Get a list of unfinished hosts."""
1262
        return self.scan_collection.get_hosts_finished(scan_id)
1263
1264
    def get_scan_start_time(self, scan_id: str) -> str:
1265
        """ Gives a scan's start time. """
1266
        return self.scan_collection.get_start_time(scan_id)
1267
1268
    def get_scan_end_time(self, scan_id: str) -> str:
1269
        """ Gives a scan's end time. """
1270
        return self.scan_collection.get_end_time(scan_id)
1271
1272
    def add_scan_log(
1273
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1274
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1275
        host: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1276
        hostname: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1277
        name: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1278
        value: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1279
        port: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1280
        test_id: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1281
        qod: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1282
    ):
1283
        """ Adds a log result to scan_id scan. """
1284
        self.scan_collection.add_result(
1285
            scan_id,
1286
            ResultType.LOG,
1287
            host,
1288
            hostname,
1289
            name,
1290
            value,
1291
            port,
1292
            test_id,
1293
            '0.0',
1294
            qod,
1295
        )
1296
1297
    def add_scan_error(
1298
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1299
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1300
        host: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1301
        hostname: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1302
        name: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1303
        value: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1304
        port: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1305
    ) -> None:
1306
        """ Adds an error result to scan_id scan. """
1307
        self.scan_collection.add_result(
1308
            scan_id, ResultType.ERROR, host, hostname, name, value, port
1309
        )
1310
1311
    def add_scan_host_detail(
1312
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1313
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1314
        host: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1315
        hostname: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1316
        name: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1317
        value: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1318
    ) -> None:
1319
        """ Adds a host detail result to scan_id scan. """
1320
        self.scan_collection.add_result(
1321
            scan_id, ResultType.HOST_DETAIL, host, hostname, name, value
1322
        )
1323
1324
    def add_scan_alarm(
1325
        self,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1326
        scan_id: str,
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1327
        host: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1328
        hostname: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1329
        name: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1330
        value: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1331
        port: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1332
        test_id: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1333
        severity: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1334
        qod: str = '',
0 ignored issues
show
Coding Style introduced by
Wrong hanging indentation before block (add 4 spaces).
Loading history...
1335
    ):
1336
        """ Adds an alarm result to scan_id scan. """
1337
        self.scan_collection.add_result(
1338
            scan_id,
1339
            ResultType.ALARM,
1340
            host,
1341
            hostname,
1342
            name,
1343
            value,
1344
            port,
1345
            test_id,
1346
            severity,
1347
            qod,
1348
        )
1349