Passed
Pull Request — master (#202)
by
unknown
01:23
created

ospd.ospd.OSPDaemon.get_help_text()   A

Complexity

Conditions 5

Size

Total Lines 26
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 17
nop 1
dl 0
loc 26
rs 9.0833
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
# pylint: disable=too-many-lines
20
21
""" OSP Daemon core class.
22
"""
23
24
import logging
25
import socket
26
import ssl
27
import multiprocessing
28
import re
29
import time
30
import os
31
32
from typing import List, Any, Dict, Optional
33
from xml.etree.ElementTree import Element, SubElement
34
35
import defusedxml.ElementTree as secET
36
37
from deprecated import deprecated
38
39
from ospd import __version__
40
from ospd.command import get_commands
41
from ospd.errors import OspdCommandError, OspdError
42
from ospd.misc import ScanCollection, ResultType, ScanStatus, create_process
43
from ospd.network import resolve_hostname, target_str_to_list
44
from ospd.protocol import OspRequest, OspResponse
45
from ospd.server import BaseServer
46
from ospd.vtfilter import VtsFilter
47
from ospd.xml import (
48
    elements_as_text,
49
    get_result_xml,
50
    get_elements_from_dict,
51
)
52
53
logger = logging.getLogger(__name__)
54
55
PROTOCOL_VERSION = "1.2"
56
57
SCHEDULER_CHECK_PERIOD = 5  # in seconds
58
59
BASE_SCANNER_PARAMS = {
60
    'debug_mode': {
61
        'type': 'boolean',
62
        'name': 'Debug Mode',
63
        'default': 0,
64
        'mandatory': 0,
65
        'description': 'Whether to get extra scan debug information.',
66
    },
67
    'dry_run': {
68
        'type': 'boolean',
69
        'name': 'Dry Run',
70
        'default': 0,
71
        'mandatory': 0,
72
        'description': 'Whether to dry run scan.',
73
    },
74
}  # type: Dict
75
76
77
def _terminate_process_group(process: multiprocessing.Process) -> None:
78
    os.killpg(os.getpgid(process.pid), 15)
79
80
81
class OSPDaemon:
82
83
    """ Daemon class for OSP traffic handling.
84
85
    Every scanner wrapper should subclass it and make necessary additions and
86
    changes.
87
88
    * Add any needed parameters in __init__.
89
    * Implement check() method which verifies scanner availability and other
90
      environment related conditions.
91
    * Implement process_scan_params and exec_scan methods which are
92
      specific to handling the <start_scan> command, executing the wrapped
93
      scanner and storing the results.
94
    * exec_scan() should return 0 if host is dead or not reached, 1 if host is
95
      alive and 2 if scan error or status is unknown.
96
    * Implement other methods that assert to False such as get_scanner_name,
97
      get_scanner_version.
98
    * Use Call set_command_attributes at init time to add scanner command
99
      specific options eg. the w3af profile for w3af wrapper.
100
    """
101
102
    def __init__(
103
        self, *, customvtfilter=None, **kwargs
104
    ):  # pylint: disable=unused-argument
105
        """ Initializes the daemon's internal data. """
106
        self.scan_collection = ScanCollection()
107
        self.scan_processes = dict()
108
109
        self.daemon_info = dict()
110
        self.daemon_info['name'] = "OSPd"
111
        self.daemon_info['version'] = __version__
112
        self.daemon_info['description'] = "No description"
113
114
        self.scanner_info = dict()
115
        self.scanner_info['name'] = 'No name'
116
        self.scanner_info['version'] = 'No version'
117
        self.scanner_info['description'] = 'No description'
118
119
        self.server_version = None  # Set by the subclass.
120
121
        self.scaninfo_store_time = kwargs.get('scaninfo_store_time')
122
123
        self.protocol_version = PROTOCOL_VERSION
124
125
        self.commands = {}
126
127
        for command_class in get_commands():
128
            command = command_class(self)
129
            self.commands[command.get_name()] = command
130
131
        self.scanner_params = dict()
132
133
        for name, params in BASE_SCANNER_PARAMS.items():
134
            self.set_scanner_param(name, params)
135
136
        self.vts = None
137
        self.vt_id_pattern = re.compile("[0-9a-zA-Z_\\-:.]{1,80}")
138
        self.vts_version = None
139
140
        if customvtfilter:
141
            self.vts_filter = customvtfilter
142
        else:
143
            self.vts_filter = VtsFilter()
144
145
    def init(self) -> None:
146
        """ Should be overridden by a subclass if the initialization is costly.
147
148
            Will be called after check.
149
        """
150
151
    def set_command_attributes(self, name: str, attributes: Dict) -> None:
152
        """ Sets the xml attributes of a specified command. """
153
        if self.command_exists(name):
154
            command = self.commands.get(name)
155
            command.attributes = attributes
156
157
    @deprecated(version="20.4", reason="Use set_scanner_param instead")
158
    def add_scanner_param(self, name: str, scanner_params: Dict) -> None:
159
        """ Set a scanner parameter. """
160
        self.set_scanner_param(name, scanner_params)
161
162
    def set_scanner_param(self, name: str, scanner_params: Dict) -> None:
163
        """ Set a scanner parameter. """
164
165
        assert name
166
        assert scanner_params
167
168
        self.scanner_params[name] = scanner_params
169
170
    def get_scanner_params(self) -> Dict:
171
        return self.scanner_params
172
173
    def add_vt(
174
        self,
175
        vt_id: str,
176
        name: str = None,
177
        vt_params: str = None,
178
        vt_refs: str = None,
179
        custom: str = None,
180
        vt_creation_time: str = None,
181
        vt_modification_time: str = None,
182
        vt_dependencies: str = None,
183
        summary: str = None,
184
        impact: str = None,
185
        affected: str = None,
186
        insight: str = None,
187
        solution: str = None,
188
        solution_t: str = None,
189
        solution_m: str = None,
190
        detection: str = None,
191
        qod_t: str = None,
192
        qod_v: str = None,
193
        severities: str = None,
194
    ) -> None:
195
        """ Add a vulnerability test information.
196
197
        IMPORTANT: The VT's Data Manager will store the vts collection.
198
        If the collection is considerably big and it will be consultated
199
        intensible during a routine, consider to do a deepcopy(), since
200
        accessing the shared memory in the data manager is very expensive.
201
        At the end of the routine, the temporal copy must be set to None
202
        and deleted.
203
        """
204
        if self.vts is None:
205
            self.vts = multiprocessing.Manager().dict()
206
207
        if not vt_id:
208
            raise OspdError('Invalid vt_id {}'.format(vt_id))
209
210
        if self.vt_id_pattern.fullmatch(vt_id) is None:
211
            raise OspdError('Invalid vt_id {}'.format(vt_id))
212
213
        if vt_id in self.vts:
214
            raise OspdError('vt_id {} already exists'.format(vt_id))
215
216
        if name is None:
217
            name = ''
218
219
        vt = {'name': name}
220
        if custom is not None:
221
            vt["custom"] = custom
222
        if vt_params is not None:
223
            vt["vt_params"] = vt_params
224
        if vt_refs is not None:
225
            vt["vt_refs"] = vt_refs
226
        if vt_dependencies is not None:
227
            vt["vt_dependencies"] = vt_dependencies
228
        if vt_creation_time is not None:
229
            vt["creation_time"] = vt_creation_time
230
        if vt_modification_time is not None:
231
            vt["modification_time"] = vt_modification_time
232
        if summary is not None:
233
            vt["summary"] = summary
234
        if impact is not None:
235
            vt["impact"] = impact
236
        if affected is not None:
237
            vt["affected"] = affected
238
        if insight is not None:
239
            vt["insight"] = insight
240
241
        if solution is not None:
242
            vt["solution"] = solution
243
            if solution_t is not None:
244
                vt["solution_type"] = solution_t
245
            if solution_m is not None:
246
                vt["solution_method"] = solution_m
247
248
        if detection is not None:
249
            vt["detection"] = detection
250
251
        if qod_t is not None:
252
            vt["qod_type"] = qod_t
253
        elif qod_v is not None:
254
            vt["qod"] = qod_v
255
256
        if severities is not None:
257
            vt["severities"] = severities
258
259
        self.vts[vt_id] = vt
260
261
    def set_vts_version(self, vts_version: str) -> None:
262
        """ Add into the vts dictionary an entry to identify the
263
        vts version.
264
265
        Parameters:
266
            vts_version (str): Identifies a unique vts version.
267
        """
268
        if not vts_version:
269
            raise OspdCommandError(
270
                'A vts_version parameter is required', 'set_vts_version'
271
            )
272
        self.vts_version = vts_version
273
274
    def get_vts_version(self) -> Optional[str]:
275
        """Return the vts version.
276
        """
277
        return self.vts_version
278
279
    def command_exists(self, name: str) -> bool:
280
        """ Checks if a commands exists. """
281
        return name in self.commands
282
283
    def get_scanner_name(self) -> str:
284
        """ Gives the wrapped scanner's name. """
285
        return self.scanner_info['name']
286
287
    def get_scanner_version(self) -> str:
288
        """ Gives the wrapped scanner's version. """
289
        return self.scanner_info['version']
290
291
    def get_scanner_description(self) -> str:
292
        """ Gives the wrapped scanner's description. """
293
        return self.scanner_info['description']
294
295
    def get_server_version(self) -> str:
296
        """ Gives the specific OSP server's version. """
297
        assert self.server_version
298
        return self.server_version
299
300
    def get_protocol_version(self) -> str:
301
        """ Gives the OSP's version. """
302
        return self.protocol_version
303
304
    def preprocess_scan_params(self, xml_params):
305
        """ Processes the scan parameters. """
306
        params = {}
307
308
        for param in xml_params:
309
            params[param.tag] = param.text or ''
310
311
        # Set default values.
312
        for key in self.scanner_params:
313
            if key not in params:
314
                params[key] = self.get_scanner_param_default(key)
315
                if self.get_scanner_param_type(key) == 'selection':
316
                    params[key] = params[key].split('|')[0]
317
318
        # Validate values.
319
        for key in params:
320
            param_type = self.get_scanner_param_type(key)
321
            if not param_type:
322
                continue
323
324
            if param_type in ['integer', 'boolean']:
325
                try:
326
                    params[key] = int(params[key])
327
                except ValueError:
328
                    raise OspdCommandError(
329
                        'Invalid %s value' % key, 'start_scan'
330
                    )
331
332
            if param_type == 'boolean':
333
                if params[key] not in [0, 1]:
334
                    raise OspdCommandError(
335
                        'Invalid %s value' % key, 'start_scan'
336
                    )
337
            elif param_type == 'selection':
338
                selection = self.get_scanner_param_default(key).split('|')
339
                if params[key] not in selection:
340
                    raise OspdCommandError(
341
                        'Invalid %s value' % key, 'start_scan'
342
                    )
343
            if self.get_scanner_param_mandatory(key) and params[key] == '':
344
                raise OspdCommandError(
345
                    'Mandatory %s value is missing' % key, 'start_scan'
346
                )
347
348
        return params
349
350
    def process_scan_params(self, params: Dict) -> Dict:
351
        """ This method is to be overridden by the child classes if necessary
352
        """
353
        return params
354
355
    @staticmethod
356
    @deprecated(
357
        version="20.4",
358
        reason="Please use OspRequest.process_vt_params instead.",
359
    )
360
    def process_vts_params(scanner_vts) -> Dict:
361
        return OspRequest.process_vts_params(scanner_vts)
362
363
    @staticmethod
364
    @deprecated(
365
        version="20.4",
366
        reason="Please use OspRequest.process_credential_elements instead.",
367
    )
368
    def process_credentials_elements(cred_tree) -> Dict:
369
        return OspRequest.process_credentials_elements(cred_tree)
370
371
    @staticmethod
372
    @deprecated(
373
        version="20.4",
374
        reason="Please use OspRequest.process_targets_elements instead.",
375
    )
376
    def process_targets_element(scanner_target) -> List:
377
        return OspRequest.process_targets_element(scanner_target)
378
379
    def stop_scan(self, scan_id: str) -> None:
380
        scan_process = self.scan_processes.get(scan_id)
381
        if not scan_process:
382
            raise OspdCommandError(
383
                'Scan not found {0}.'.format(scan_id), 'stop_scan'
384
            )
385
        if not scan_process.is_alive():
386
            raise OspdCommandError(
387
                'Scan already stopped or finished.', 'stop_scan'
388
            )
389
390
        self.set_scan_status(scan_id, ScanStatus.STOPPED)
391
392
        logger.info('%s: Scan stopping %s.', scan_id, scan_process.ident)
393
394
        self.stop_scan_cleanup(scan_id)
395
396
        try:
397
            scan_process.terminate()
398
        except AttributeError:
399
            logger.debug('%s: The scanner task stopped unexpectedly.', scan_id)
400
401
        try:
402
            _terminate_process_group(scan_process)
403
        except ProcessLookupError as e:
404
            logger.info(
405
                '%s: Scan already stopped %s.', scan_id, scan_process.pid
406
            )
407
408
        if scan_process.ident != os.getpid():
409
            scan_process.join(0)
410
411
        logger.info('%s: Scan stopped.', scan_id)
412
413
    @staticmethod
414
    def stop_scan_cleanup(scan_id: str):
415
        """ Should be implemented by subclass in case of a clean up before
416
        terminating is needed. """
417
418
    @staticmethod
419
    def target_is_finished(scan_id: str):
420
        """ Should be implemented by subclass in case of a check before
421
        stopping is needed. """
422
423
    def exec_scan(self, scan_id: str, target):
424
        """ Asserts to False. Should be implemented by subclass. """
425
        raise NotImplementedError
426
427
    def finish_scan(self, scan_id: str) -> None:
428
        """ Sets a scan as finished. """
429
        self.set_scan_progress(scan_id, 100)
430
        self.set_scan_status(scan_id, ScanStatus.FINISHED)
431
        logger.info("%s: Scan finished.", scan_id)
432
433
    def get_daemon_name(self) -> str:
434
        """ Gives osp daemon's name. """
435
        return self.daemon_info['name']
436
437
    def get_daemon_version(self) -> str:
438
        """ Gives osp daemon's version. """
439
        return self.daemon_info['version']
440
441
    def get_scanner_param_type(self, param: str):
442
        """ Returns type of a scanner parameter. """
443
        assert isinstance(param, str)
444
        entry = self.scanner_params.get(param)
445
        if not entry:
446
            return None
447
        return entry.get('type')
448
449
    def get_scanner_param_mandatory(self, param: str):
450
        """ Returns if a scanner parameter is mandatory. """
451
        assert isinstance(param, str)
452
        entry = self.scanner_params.get(param)
453
        if not entry:
454
            return False
455
        return entry.get('mandatory')
456
457
    def get_scanner_param_default(self, param: str):
458
        """ Returns default value of a scanner parameter. """
459
        assert isinstance(param, str)
460
        entry = self.scanner_params.get(param)
461
        if not entry:
462
            return None
463
        return entry.get('default')
464
465
    @deprecated(
466
        version="20.4",
467
        reason="Please use OspResponse.create_scanner_params_xml instead.",
468
    )
469
    def get_scanner_params_xml(self):
470
        """ Returns the OSP Daemon's scanner params in xml format. """
471
        return OspResponse.create_scanner_params_xml(self.scanner_params)
472
473
    def handle_client_stream(self, stream) -> None:
474
        """ Handles stream of data received from client. """
475
476
        data = b''
477
478
        while True:
479
            try:
480
                buf = stream.read()
481
                if not buf:
482
                    break
483
484
                data += buf
485
            except (AttributeError, ValueError) as message:
486
                logger.error(message)
487
                return
488
            except (ssl.SSLError) as exception:
489
                logger.debug('Error: %s', exception)
490
                break
491
            except (socket.timeout) as exception:
492
                break
493
494
        if len(data) <= 0:
495
            logger.debug("Empty client stream")
496
            return
497
498
        try:
499
            response = self.handle_command(data)
500
        except OspdCommandError as exception:
501
            response = exception.as_xml()
502
            logger.debug('Command error: %s', exception.message)
503
        except Exception:  # pylint: disable=broad-except
504
            logger.exception('While handling client command:')
505
            exception = OspdCommandError('Fatal error', 'error')
506
            response = exception.as_xml()
507
508
        stream.write(response)
509
        stream.close()
510
511
    def parallel_scan(self, scan_id: str, target: str) -> None:
512
        """ Starts the scan with scan_id. """
513
        try:
514
            ret = self.exec_scan(scan_id, target)
515
            if ret == 0:
516
                logger.info("%s: Host scan dead.", target)
517
            elif ret == 1:
518
                logger.info("%s: Host scan alived.", target)
519
            elif ret == 2:
520
                logger.info("%s: Scan error or status unknown.", target)
521
            else:
522
                logger.debug('%s: No host status returned', target)
523
        except Exception as e:  # pylint: disable=broad-except
524
            self.add_scan_error(
525
                scan_id,
526
                name='',
527
                host=target,
528
                value='Host process failure (%s).' % e,
529
            )
530
            logger.exception('While scanning %s:', target)
531
        else:
532
            logger.info("%s: Host scan finished.", target)
533
534
    def check_pending_target(self, scan_id: str, multiscan_proc: List) -> List:
535
        """ Check if a scan process is still alive. In case the process
536
        finished or is stopped, removes the process from the multiscan
537
        _process list.
538
        Processes dead and with progress < 100% are considered stopped
539
        or with failures. Then will try to stop the other runnings (target)
540
        scans owned by the same task.
541
542
        @input scan_id        Scan_id of the whole scan.
543
        @input multiscan_proc A list with the scan process which
544
                              may still be alive.
545
546
        @return Actualized list with current running scan processes.
547
        """
548
        for running_target_proc, running_target_id in multiscan_proc:
549
            if not running_target_proc.is_alive():
550
                target_prog = self.get_scan_target_progress(
551
                    scan_id, running_target_id
552
                )
553
554
                _not_finished_clean = target_prog < 100
555
                _not_stopped = (
556
                    self.get_scan_status(scan_id) != ScanStatus.STOPPED
557
                )
558
559
                if _not_finished_clean and _not_stopped:
560
                    if not self.target_is_finished(scan_id):
561
                        self.stop_scan(scan_id)
562
563
                running_target = (running_target_proc, running_target_id)
564
                multiscan_proc.remove(running_target)
565
566
        return multiscan_proc
567
568
    def calculate_progress(self, scan_id: str) -> float:
569
        """ Calculate the total scan progress from the
570
        partial target progress. """
571
572
        t_prog = dict()
573
        for target in self.get_scan_target(scan_id):
574
            t_prog[target] = self.get_scan_target_progress(scan_id, target)
575
        return sum(t_prog.values()) / len(t_prog)
576
577
    def process_exclude_hosts(self, scan_id: str, target_list: List) -> None:
578
        """ Process the exclude hosts before launching the scans."""
579
580
        for target, _, _, exclude_hosts, _, _ in target_list:
581
            exc_hosts_list = ''
582
            if not exclude_hosts:
583
                continue
584
            exc_hosts_list = target_str_to_list(exclude_hosts)
585
            self.remove_scan_hosts_from_target_progress(
586
                scan_id, target, exc_hosts_list
587
            )
588
589
    def process_finished_hosts(self, scan_id: str, target_list: List) -> None:
590
        """ Process the finished hosts before launching the scans.
591
        Set finished hosts as finished with 100% to calculate
592
        the scan progress."""
593
594
        for target, _, _, _, finished_hosts, _ in target_list:
595
            exc_hosts_list = ''
596
            if not finished_hosts:
597
                continue
598
            exc_hosts_list = target_str_to_list(finished_hosts)
599
600
            for host in exc_hosts_list:
601
                self.set_scan_host_finished(scan_id, target, host)
602
                self.set_scan_host_progress(scan_id, target, host, 100)
603
604
    def start_scan(self, scan_id: str, targets: List, parallel=1) -> None:
605
        """ Handle N parallel scans if 'parallel' is greater than 1. """
606
607
        os.setsid()
608
609
        multiscan_proc = []
610
        logger.info("%s: Scan started.", scan_id)
611
        target_list = targets
612
        if target_list is None or not target_list:
613
            raise OspdCommandError('Erroneous targets list', 'start_scan')
614
615
        self.process_exclude_hosts(scan_id, target_list)
616
        self.process_finished_hosts(scan_id, target_list)
617
618
        for _index, target in enumerate(target_list):
619
            while len(multiscan_proc) >= parallel:
620
                progress = self.calculate_progress(scan_id)
621
                self.set_scan_progress(scan_id, progress)
622
                multiscan_proc = self.check_pending_target(
623
                    scan_id, multiscan_proc
624
                )
625
                time.sleep(1)
626
627
            # If the scan status is stopped, does not launch anymore target
628
            # scans
629
            if self.get_scan_status(scan_id) == ScanStatus.STOPPED:
630
                return
631
632
            logger.debug(
633
                "%s: Host scan started on ports %s.", target[0], target[1]
634
            )
635
            scan_process = create_process(
636
                func=self.parallel_scan, args=(scan_id, target[0])
637
            )
638
            multiscan_proc.append((scan_process, target[0]))
639
            scan_process.start()
640
            self.set_scan_status(scan_id, ScanStatus.RUNNING)
641
642
        # Wait until all single target were scanned
643
        while multiscan_proc:
644
            multiscan_proc = self.check_pending_target(scan_id, multiscan_proc)
645
            if multiscan_proc:
646
                progress = self.calculate_progress(scan_id)
647
                self.set_scan_progress(scan_id, progress)
648
            time.sleep(1)
649
650
        # Only set the scan as finished if the scan was not stopped.
651
        if self.get_scan_status(scan_id) != ScanStatus.STOPPED:
652
            self.finish_scan(scan_id)
653
654
    def dry_run_scan(  # pylint: disable=unused-argument
655
        self, scan_id: str, targets: List, parallel: int
656
    ) -> None:
657
        """ Dry runs a scan. """
658
659
        os.setsid()
660
661
        for _, target in enumerate(targets):
662
            host = resolve_hostname(target[0])
663
            if host is None:
664
                logger.info("Couldn't resolve %s.", target[0])
665
                continue
666
667
            port = self.get_scan_ports(scan_id, target=target[0])
668
669
            logger.info("%s:%s: Dry run mode.", host, port)
670
671
            self.add_scan_log(
672
                scan_id, name='', host=host, value='Dry run result'
673
            )
674
675
        self.finish_scan(scan_id)
676
677
    def handle_timeout(self, scan_id: str, host: str) -> None:
678
        """ Handles scanner reaching timeout error. """
679
        self.add_scan_error(
680
            scan_id,
681
            host=host,
682
            name="Timeout",
683
            value="{0} exec timeout.".format(self.get_scanner_name()),
684
        )
685
686
    def remove_scan_hosts_from_target_progress(
687
        self, scan_id: str, target: str, exc_hosts_list: List
688
    ) -> None:
689
        """ Remove a list of hosts from the main scan progress table."""
690
        self.scan_collection.remove_hosts_from_target_progress(
691
            scan_id, target, exc_hosts_list
692
        )
693
694
    def set_scan_host_finished(
695
        self, scan_id: str, target: str, host: str
696
    ) -> None:
697
        """ Add the host in a list of finished hosts """
698
        self.scan_collection.set_host_finished(scan_id, target, host)
699
700
    def set_scan_progress(self, scan_id: str, progress: int) -> None:
701
        """ Sets scan_id scan's progress which is a number
702
        between 0 and 100. """
703
        self.scan_collection.set_progress(scan_id, progress)
704
705
    def set_scan_host_progress(
706
        self, scan_id: str, target: str, host: str, progress: int
707
    ) -> None:
708
        """ Sets host's progress which is part of target. """
709
        self.scan_collection.set_host_progress(scan_id, target, host, progress)
710
711
    def set_scan_status(self, scan_id: str, status: ScanStatus) -> None:
712
        """ Set the scan's status."""
713
        self.scan_collection.set_status(scan_id, status)
714
715
    def get_scan_status(self, scan_id: str) -> ScanStatus:
716
        """ Get scan_id scans's status."""
717
        return self.scan_collection.get_status(scan_id)
718
719
    def scan_exists(self, scan_id: str) -> bool:
720
        """ Checks if a scan with ID scan_id is in collection.
721
722
        @return: 1 if scan exists, 0 otherwise.
723
        """
724
        return self.scan_collection.id_exists(scan_id)
725
726
    def get_help_text(self) -> str:
727
        """ Returns the help output in plain text format."""
728
729
        txt = ''
730
        for name, info in self.commands.items():
731
            description = info.get_description()
732
            attributes = info.get_attributes()
733
            elements = info.get_elements()
734
735
            command_txt = "\t{0: <22} {1}\n".format(name, description)
736
737
            if attributes:
738
                command_txt = ''.join([command_txt, "\t Attributes:\n"])
739
740
                for attrname, attrdesc in attributes.items():
741
                    attr_txt = "\t  {0: <22} {1}\n".format(attrname, attrdesc)
742
                    command_txt = ''.join([command_txt, attr_txt])
743
744
            if elements:
745
                command_txt = ''.join(
746
                    [command_txt, "\t Elements:\n", elements_as_text(elements),]
747
                )
748
749
            txt += command_txt
750
751
        return txt
752
753
    @deprecated(version="20.4", reason="Use ospd.xml.elements_as_text instead.")
754
    def elements_as_text(self, elems: Dict, indent: int = 2) -> str:
755
        """ Returns the elems dictionary as formatted plain text. """
756
        return elements_as_text(elems, indent)
757
758
    def delete_scan(self, scan_id: str) -> int:
759
        """ Deletes scan_id scan from collection.
760
761
        @return: 1 if scan deleted, 0 otherwise.
762
        """
763
        if self.get_scan_status(scan_id) == ScanStatus.RUNNING:
764
            return 0
765
766
        try:
767
            del self.scan_processes[scan_id]
768
        except KeyError:
769
            logger.debug('Scan process for %s not found', scan_id)
770
        return self.scan_collection.delete_scan(scan_id)
771
772
    def get_scan_results_xml(
773
        self, scan_id: str, pop_res: bool, max_res: Optional[int]
774
    ):
775
        """ Gets scan_id scan's results in XML format.
776
777
        @return: String of scan results in xml.
778
        """
779
        results = Element('results')
780
        for result in self.scan_collection.results_iterator(
781
            scan_id, pop_res, max_res
782
        ):
783
            results.append(get_result_xml(result))
784
785
        logger.debug('Returning %d results', len(results))
786
        return results
787
788
    @deprecated(
789
        version="20.4",
790
        reason="Please use ospd.xml.get_elements_from_dict instead.",
791
    )
792
    def get_xml_str(self, data: Dict) -> List:
793
        """ Creates a string in XML Format using the provided data structure.
794
795
        @param: Dictionary of xml tags and their elements.
796
797
        @return: String of data in xml format.
798
        """
799
        return get_elements_from_dict(data)
800
801
    def get_scan_xml(
802
        self,
803
        scan_id: str,
804
        detailed: bool = True,
805
        pop_res: bool = False,
806
        max_res: int = 0,
807
    ):
808
        """ Gets scan in XML format.
809
810
        @return: String of scan in XML format.
811
        """
812
        if not scan_id:
813
            return Element('scan')
814
815
        target = ','.join(self.get_scan_target(scan_id))
816
        progress = self.get_scan_progress(scan_id)
817
        status = self.get_scan_status(scan_id)
818
        start_time = self.get_scan_start_time(scan_id)
819
        end_time = self.get_scan_end_time(scan_id)
820
        response = Element('scan')
821
        for name, value in [
822
            ('id', scan_id),
823
            ('target', target),
824
            ('progress', progress),
825
            ('status', status.name.lower()),
826
            ('start_time', start_time),
827
            ('end_time', end_time),
828
        ]:
829
            response.set(name, str(value))
830
        if detailed:
831
            response.append(
832
                self.get_scan_results_xml(scan_id, pop_res, max_res)
833
            )
834
        return response
835
836
    @staticmethod
837
    def get_custom_vt_as_xml_str(  # pylint: disable=unused-argument
838
        vt_id: str, custom: Dict
839
    ) -> str:
840
        """ Create a string representation of the XML object from the
841
        custom data object.
842
        This needs to be implemented by each ospd wrapper, in case
843
        custom elements for VTs are used.
844
845
        The custom XML object which is returned will be embedded
846
        into a <custom></custom> element.
847
848
        @return: XML object as string for custom data.
849
        """
850
        return ''
851
852
    @staticmethod
853
    def get_params_vt_as_xml_str(  # pylint: disable=unused-argument
854
        vt_id: str, vt_params
855
    ) -> str:
856
        """ Create a string representation of the XML object from the
857
        vt_params data object.
858
        This needs to be implemented by each ospd wrapper, in case
859
        vt_params elements for VTs are used.
860
861
        The params XML object which is returned will be embedded
862
        into a <params></params> element.
863
864
        @return: XML object as string for vt parameters data.
865
        """
866
        return ''
867
868
    @staticmethod
869
    def get_refs_vt_as_xml_str(  # pylint: disable=unused-argument
870
        vt_id: str, vt_refs
871
    ) -> str:
872
        """ Create a string representation of the XML object from the
873
        refs data object.
874
        This needs to be implemented by each ospd wrapper, in case
875
        refs elements for VTs are used.
876
877
        The refs XML object which is returned will be embedded
878
        into a <refs></refs> element.
879
880
        @return: XML object as string for vt references data.
881
        """
882
        return ''
883
884
    @staticmethod
885
    def get_dependencies_vt_as_xml_str(  # pylint: disable=unused-argument
886
        vt_id: str, vt_dependencies
887
    ) -> str:
888
        """ Create a string representation of the XML object from the
889
        vt_dependencies data object.
890
        This needs to be implemented by each ospd wrapper, in case
891
        vt_dependencies elements for VTs are used.
892
893
        The vt_dependencies XML object which is returned will be embedded
894
        into a <dependencies></dependencies> element.
895
896
        @return: XML object as string for vt dependencies data.
897
        """
898
        return ''
899
900
    @staticmethod
901
    def get_creation_time_vt_as_xml_str(  # pylint: disable=unused-argument
902
        vt_id: str, vt_creation_time
903
    ) -> str:
904
        """ Create a string representation of the XML object from the
905
        vt_creation_time data object.
906
        This needs to be implemented by each ospd wrapper, in case
907
        vt_creation_time elements for VTs are used.
908
909
        The vt_creation_time XML object which is returned will be embedded
910
        into a <vt_creation_time></vt_creation_time> element.
911
912
        @return: XML object as string for vt creation time data.
913
        """
914
        return ''
915
916
    @staticmethod
917
    def get_modification_time_vt_as_xml_str(  # pylint: disable=unused-argument
918
        vt_id: str, vt_modification_time
919
    ) -> str:
920
        """ Create a string representation of the XML object from the
921
        vt_modification_time data object.
922
        This needs to be implemented by each ospd wrapper, in case
923
        vt_modification_time elements for VTs are used.
924
925
        The vt_modification_time XML object which is returned will be embedded
926
        into a <vt_modification_time></vt_modification_time> element.
927
928
        @return: XML object as string for vt references data.
929
        """
930
        return ''
931
932
    @staticmethod
933
    def get_summary_vt_as_xml_str(  # pylint: disable=unused-argument
934
        vt_id: str, summary
935
    ) -> str:
936
        """ Create a string representation of the XML object from the
937
        summary data object.
938
        This needs to be implemented by each ospd wrapper, in case
939
        summary elements for VTs are used.
940
941
        The summary XML object which is returned will be embedded
942
        into a <summary></summary> element.
943
944
        @return: XML object as string for summary data.
945
        """
946
        return ''
947
948
    @staticmethod
949
    def get_impact_vt_as_xml_str(  # pylint: disable=unused-argument
950
        vt_id: str, impact
951
    ) -> str:
952
        """ Create a string representation of the XML object from the
953
        impact data object.
954
        This needs to be implemented by each ospd wrapper, in case
955
        impact elements for VTs are used.
956
957
        The impact XML object which is returned will be embedded
958
        into a <impact></impact> element.
959
960
        @return: XML object as string for impact data.
961
        """
962
        return ''
963
964
    @staticmethod
965
    def get_affected_vt_as_xml_str(  # pylint: disable=unused-argument
966
        vt_id: str, affected
967
    ) -> str:
968
        """ Create a string representation of the XML object from the
969
        affected data object.
970
        This needs to be implemented by each ospd wrapper, in case
971
        affected elements for VTs are used.
972
973
        The affected XML object which is returned will be embedded
974
        into a <affected></affected> element.
975
976
        @return: XML object as string for affected data.
977
        """
978
        return ''
979
980
    @staticmethod
981
    def get_insight_vt_as_xml_str(  # pylint: disable=unused-argument
982
        vt_id: str, insight
983
    ) -> str:
984
        """ Create a string representation of the XML object from the
985
        insight data object.
986
        This needs to be implemented by each ospd wrapper, in case
987
        insight elements for VTs are used.
988
989
        The insight XML object which is returned will be embedded
990
        into a <insight></insight> element.
991
992
        @return: XML object as string for insight data.
993
        """
994
        return ''
995
996
    @staticmethod
997
    def get_solution_vt_as_xml_str(  # pylint: disable=unused-argument
998
        vt_id: str, solution, solution_type=None, solution_method=None
999
    ) -> str:
1000
        """ Create a string representation of the XML object from the
1001
        solution data object.
1002
        This needs to be implemented by each ospd wrapper, in case
1003
        solution elements for VTs are used.
1004
1005
        The solution XML object which is returned will be embedded
1006
        into a <solution></solution> element.
1007
1008
        @return: XML object as string for solution data.
1009
        """
1010
        return ''
1011
1012
    @staticmethod
1013
    def get_detection_vt_as_xml_str(  # pylint: disable=unused-argument
1014
        vt_id: str, detection=None, qod_type=None, qod=None
1015
    ) -> str:
1016
        """ Create a string representation of the XML object from the
1017
        detection data object.
1018
        This needs to be implemented by each ospd wrapper, in case
1019
        detection elements for VTs are used.
1020
1021
        The detection XML object which is returned is an element with
1022
        tag <detection></detection> element
1023
1024
        @return: XML object as string for detection data.
1025
        """
1026
        return ''
1027
1028
    @staticmethod
1029
    def get_severities_vt_as_xml_str(  # pylint: disable=unused-argument
1030
        vt_id: str, severities
1031
    ) -> str:
1032
        """ Create a string representation of the XML object from the
1033
        severities data object.
1034
        This needs to be implemented by each ospd wrapper, in case
1035
        severities elements for VTs are used.
1036
1037
        The severities XML objects which are returned will be embedded
1038
        into a <severities></severities> element.
1039
1040
        @return: XML object as string for severities data.
1041
        """
1042
        return ''
1043
1044
    def get_vt_xml(self, vt_id: str):
1045
        """ Gets a single vulnerability test information in XML format.
1046
1047
        @return: String of single vulnerability test information in XML format.
1048
        """
1049
        if not vt_id:
1050
            return Element('vt')
1051
1052
        vt = self.vts.get(vt_id)
1053
1054
        name = vt.get('name')
1055
        vt_xml = Element('vt')
1056
        vt_xml.set('id', vt_id)
1057
1058
        for name, value in [('name', name)]:
1059
            elem = SubElement(vt_xml, name)
1060
            elem.text = str(value)
1061
1062
        if vt.get('vt_params'):
1063
            params_xml_str = self.get_params_vt_as_xml_str(
1064
                vt_id, vt.get('vt_params')
1065
            )
1066
            vt_xml.append(secET.fromstring(params_xml_str))
1067
1068
        if vt.get('vt_refs'):
1069
            refs_xml_str = self.get_refs_vt_as_xml_str(vt_id, vt.get('vt_refs'))
1070
            vt_xml.append(secET.fromstring(refs_xml_str))
1071
1072
        if vt.get('vt_dependencies'):
1073
            dependencies = self.get_dependencies_vt_as_xml_str(
1074
                vt_id, vt.get('vt_dependencies')
1075
            )
1076
            vt_xml.append(secET.fromstring(dependencies))
1077
1078
        if vt.get('creation_time'):
1079
            vt_ctime = self.get_creation_time_vt_as_xml_str(
1080
                vt_id, vt.get('creation_time')
1081
            )
1082
            vt_xml.append(secET.fromstring(vt_ctime))
1083
1084
        if vt.get('modification_time'):
1085
            vt_mtime = self.get_modification_time_vt_as_xml_str(
1086
                vt_id, vt.get('modification_time')
1087
            )
1088
            vt_xml.append(secET.fromstring(vt_mtime))
1089
1090
        if vt.get('summary'):
1091
            summary_xml_str = self.get_summary_vt_as_xml_str(
1092
                vt_id, vt.get('summary')
1093
            )
1094
            vt_xml.append(secET.fromstring(summary_xml_str))
1095
1096
        if vt.get('impact'):
1097
            impact_xml_str = self.get_impact_vt_as_xml_str(
1098
                vt_id, vt.get('impact')
1099
            )
1100
            vt_xml.append(secET.fromstring(impact_xml_str))
1101
1102
        if vt.get('affected'):
1103
            affected_xml_str = self.get_affected_vt_as_xml_str(
1104
                vt_id, vt.get('affected')
1105
            )
1106
            vt_xml.append(secET.fromstring(affected_xml_str))
1107
1108
        if vt.get('insight'):
1109
            insight_xml_str = self.get_insight_vt_as_xml_str(
1110
                vt_id, vt.get('insight')
1111
            )
1112
            vt_xml.append(secET.fromstring(insight_xml_str))
1113
1114
        if vt.get('solution'):
1115
            solution_xml_str = self.get_solution_vt_as_xml_str(
1116
                vt_id,
1117
                vt.get('solution'),
1118
                vt.get('solution_type'),
1119
                vt.get('solution_method'),
1120
            )
1121
            vt_xml.append(secET.fromstring(solution_xml_str))
1122
1123
        if vt.get('detection') or vt.get('qod_type') or vt.get('qod'):
1124
            detection_xml_str = self.get_detection_vt_as_xml_str(
1125
                vt_id, vt.get('detection'), vt.get('qod_type'), vt.get('qod')
1126
            )
1127
            vt_xml.append(secET.fromstring(detection_xml_str))
1128
1129
        if vt.get('severities'):
1130
            severities_xml_str = self.get_severities_vt_as_xml_str(
1131
                vt_id, vt.get('severities')
1132
            )
1133
            vt_xml.append(secET.fromstring(severities_xml_str))
1134
1135
        if vt.get('custom'):
1136
            custom_xml_str = self.get_custom_vt_as_xml_str(
1137
                vt_id, vt.get('custom')
1138
            )
1139
            vt_xml.append(secET.fromstring(custom_xml_str))
1140
1141
        return vt_xml
1142
1143
    def get_vts_xml(self, vt_id: str = None, filtered_vts: Dict = None):
1144
        """ Gets collection of vulnerability test information in XML format.
1145
        If vt_id is specified, the collection will contain only this vt, if
1146
        found.
1147
        If no vt_id is specified or filtered_vts is None (default), the
1148
        collection will contain all vts. Otherwise those vts passed
1149
        in filtered_vts or vt_id are returned. In case of both vt_id and
1150
        filtered_vts are given, filtered_vts has priority.
1151
1152
        Arguments:
1153
            vt_id (vt_id, optional): ID of the vt to get.
1154
            filtered_vts (dict, optional): Filtered VTs collection.
1155
1156
        Return:
1157
            String of collection of vulnerability test information in
1158
            XML format.
1159
        """
1160
1161
        vts_xml = Element('vts')
1162
1163
        if not self.vts:
1164
            return vts_xml
1165
1166
        if filtered_vts is not None and len(filtered_vts) == 0:
1167
            return vts_xml
1168
1169
        if filtered_vts:
1170
            for vt_id in filtered_vts:
1171
                vts_xml.append(self.get_vt_xml(vt_id))
1172
        elif vt_id:
1173
            vts_xml.append(self.get_vt_xml(vt_id))
1174
        else:
1175
            # Because DictProxy for python3.5 doesn't support
1176
            # iterkeys(), itervalues(), or iteritems() either, the iteration
1177
            # must be done as follow.
1178
            for vt_id in iter(self.vts.keys()):
1179
                vts_xml.append(self.get_vt_xml(vt_id))
1180
1181
        return vts_xml
1182
1183
    def handle_command(self, command: str) -> str:
1184
        """ Handles an osp command in a string.
1185
1186
        @return: OSP Response to command.
1187
        """
1188
        try:
1189
            tree = secET.fromstring(command)
1190
        except secET.ParseError:
1191
            logger.debug("Erroneous client input: %s", command)
1192
            raise OspdCommandError('Invalid data')
1193
1194
        command = self.commands.get(tree.tag, None)
1195
        if not command and tree.tag != "authenticate":
1196
            raise OspdCommandError('Bogus command name')
1197
1198
        return command.handle_xml(tree)
1199
1200
    def check(self):
1201
        """ Asserts to False. Should be implemented by subclass. """
1202
        raise NotImplementedError
1203
1204
    def run(self, server: BaseServer) -> None:
1205
        """ Starts the Daemon, handling commands until interrupted.
1206
        """
1207
1208
        server.start(self.handle_client_stream)
1209
1210
        try:
1211
            while True:
1212
                time.sleep(10)
1213
                self.scheduler()
1214
                self.clean_forgotten_scans()
1215
                self.wait_for_children()
1216
        except KeyboardInterrupt:
1217
            logger.info("Received Ctrl-C shutting-down ...")
1218
        finally:
1219
            logger.info("Shutting-down server ...")
1220
            server.close()
1221
1222
    def scheduler(self):
1223
        """ Should be implemented by subclass in case of need
1224
        to run tasks periodically. """
1225
1226
    def wait_for_children(self):
1227
        """ Join the zombie process to releases resources."""
1228
        for scan_id in self.scan_processes:
1229
            self.scan_processes[scan_id].join(0)
1230
1231
    def create_scan(
1232
        self, scan_id: str, targets: List, options: Optional[Dict], vts: Dict
1233
    ) -> Optional[str]:
1234
        """ Creates a new scan.
1235
1236
        @target: Target to scan.
1237
        @options: Miscellaneous scan options.
1238
1239
        @return: New scan's ID. None if the scan_id already exists and the
1240
                 scan status is RUNNING or FINISHED.
1241
        """
1242
        status = None
1243
        scan_exists = self.scan_exists(scan_id)
1244
        if scan_id and scan_exists:
1245
            status = self.get_scan_status(scan_id)
1246
1247
        if scan_exists and status == ScanStatus.STOPPED:
1248
            logger.info("Scan %s exists. Resuming scan.", scan_id)
1249
        elif scan_exists and (
1250
            status == ScanStatus.RUNNING or status == ScanStatus.FINISHED
1251
        ):
1252
            logger.info(
1253
                "Scan %s exists with status %s.", scan_id, status.name.lower()
1254
            )
1255
            return
1256
        return self.scan_collection.create_scan(scan_id, targets, options, vts)
1257
1258
    def get_scan_options(self, scan_id: str) -> str:
1259
        """ Gives a scan's list of options. """
1260
        return self.scan_collection.get_options(scan_id)
1261
1262
    def set_scan_option(self, scan_id: str, name: str, value: Any) -> None:
1263
        """ Sets a scan's option to a provided value. """
1264
        return self.scan_collection.set_option(scan_id, name, value)
1265
1266
    def clean_forgotten_scans(self) -> None:
1267
        """ Check for old stopped or finished scans which have not been
1268
        deleted and delete them if the are older than the set value."""
1269
1270
        if not self.scaninfo_store_time:
1271
            return
1272
1273
        for scan_id in list(self.scan_collection.ids_iterator()):
1274
            end_time = int(self.get_scan_end_time(scan_id))
1275
            scan_status = self.get_scan_status(scan_id)
1276
1277
            if (
1278
                scan_status == ScanStatus.STOPPED
1279
                or scan_status == ScanStatus.FINISHED
1280
            ) and end_time:
1281
                stored_time = int(time.time()) - end_time
1282
                if stored_time > self.scaninfo_store_time * 3600:
1283
                    logger.debug(
1284
                        'Scan %s is older than %d hours and seems have been '
1285
                        'forgotten. Scan info will be deleted from the '
1286
                        'scan table',
1287
                        scan_id,
1288
                        self.scaninfo_store_time,
1289
                    )
1290
                    self.delete_scan(scan_id)
1291
1292
    def check_scan_process(self, scan_id: str) -> None:
1293
        """ Check the scan's process, and terminate the scan if not alive. """
1294
        scan_process = self.scan_processes[scan_id]
1295
        progress = self.get_scan_progress(scan_id)
1296
1297
        if progress < 100 and not scan_process.is_alive():
1298
            if not self.get_scan_status(scan_id) == ScanStatus.STOPPED:
1299
                self.set_scan_status(scan_id, ScanStatus.STOPPED)
1300
                self.add_scan_error(
1301
                    scan_id, name="", host="", value="Scan process failure."
1302
                )
1303
1304
                logger.info("%s: Scan stopped with errors.", scan_id)
1305
1306
        elif progress == 100:
1307
            scan_process.join(0)
1308
1309
    def get_scan_progress(self, scan_id: str):
1310
        """ Gives a scan's current progress value. """
1311
        return self.scan_collection.get_progress(scan_id)
1312
1313
    def get_scan_target_progress(self, scan_id: str, target: str) -> float:
1314
        """ Gives a list with scan's current progress value of each target. """
1315
        return self.scan_collection.get_target_progress(scan_id, target)
1316
1317
    def get_scan_target(self, scan_id: str) -> List:
1318
        """ Gives a scan's target. """
1319
        return self.scan_collection.get_target_list(scan_id)
1320
1321
    def get_scan_ports(self, scan_id: str, target: str = '') -> str:
1322
        """ Gives a scan's ports list. """
1323
        return self.scan_collection.get_ports(scan_id, target)
1324
1325
    def get_scan_exclude_hosts(self, scan_id: str, target: str = ''):
1326
        """ Gives a scan's exclude host list. If a target is passed gives
1327
        the exclude host list for the given target. """
1328
        return self.scan_collection.get_exclude_hosts(scan_id, target)
1329
1330
    def get_scan_credentials(self, scan_id: str, target: str = '') -> Dict:
1331
        """ Gives a scan's credential list. If a target is passed gives
1332
        the credential list for the given target. """
1333
        return self.scan_collection.get_credentials(scan_id, target)
1334
1335
    def get_scan_target_options(self, scan_id: str, target: str = '') -> Dict:
1336
        """ Gives a scan's target option dict. If a target is passed gives
1337
        the credential list for the given target. """
1338
        return self.scan_collection.get_target_options(scan_id, target)
1339
1340
    def get_scan_vts(self, scan_id: str) -> Dict:
1341
        """ Gives a scan's vts list. """
1342
        return self.scan_collection.get_vts(scan_id)
1343
1344
    def get_scan_unfinished_hosts(self, scan_id: str) -> List:
1345
        """ Get a list of unfinished hosts."""
1346
        return self.scan_collection.get_hosts_unfinished(scan_id)
1347
1348
    def get_scan_finished_hosts(self, scan_id: str) -> List:
1349
        """ Get a list of unfinished hosts."""
1350
        return self.scan_collection.get_hosts_finished(scan_id)
1351
1352
    def get_scan_start_time(self, scan_id: str) -> str:
1353
        """ Gives a scan's start time. """
1354
        return self.scan_collection.get_start_time(scan_id)
1355
1356
    def get_scan_end_time(self, scan_id: str) -> str:
1357
        """ Gives a scan's end time. """
1358
        return self.scan_collection.get_end_time(scan_id)
1359
1360
    def add_scan_log(
1361
        self,
1362
        scan_id: str,
1363
        host: str = '',
1364
        hostname: str = '',
1365
        name: str = '',
1366
        value: str = '',
1367
        port: str = '',
1368
        test_id: str = '',
1369
        qod: str = '',
1370
    ):
1371
        """ Adds a log result to scan_id scan. """
1372
        self.scan_collection.add_result(
1373
            scan_id,
1374
            ResultType.LOG,
1375
            host,
1376
            hostname,
1377
            name,
1378
            value,
1379
            port,
1380
            test_id,
1381
            '0.0',
1382
            qod,
1383
        )
1384
1385
    def add_scan_error(
1386
        self,
1387
        scan_id: str,
1388
        host: str = '',
1389
        hostname: str = '',
1390
        name: str = '',
1391
        value: str = '',
1392
        port: str = '',
1393
    ) -> None:
1394
        """ Adds an error result to scan_id scan. """
1395
        self.scan_collection.add_result(
1396
            scan_id, ResultType.ERROR, host, hostname, name, value, port
1397
        )
1398
1399
    def add_scan_host_detail(
1400
        self,
1401
        scan_id: str,
1402
        host: str = '',
1403
        hostname: str = '',
1404
        name: str = '',
1405
        value: str = '',
1406
    ) -> None:
1407
        """ Adds a host detail result to scan_id scan. """
1408
        self.scan_collection.add_result(
1409
            scan_id, ResultType.HOST_DETAIL, host, hostname, name, value
1410
        )
1411
1412
    def add_scan_alarm(
1413
        self,
1414
        scan_id: str,
1415
        host: str = '',
1416
        hostname: str = '',
1417
        name: str = '',
1418
        value: str = '',
1419
        port: str = '',
1420
        test_id: str = '',
1421
        severity: str = '',
1422
        qod: str = '',
1423
    ):
1424
        """ Adds an alarm result to scan_id scan. """
1425
        self.scan_collection.add_result(
1426
            scan_id,
1427
            ResultType.ALARM,
1428
            host,
1429
            hostname,
1430
            name,
1431
            value,
1432
            port,
1433
            test_id,
1434
            severity,
1435
            qod,
1436
        )
1437