ospd.ospd_ssh   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 163
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 11
eloc 79
dl 0
loc 163
rs 10
c 0
b 0
f 0

2 Methods

Rating   Name   Duplication   Size   Complexity  
A OSPDaemonSimpleSSH.__init__() 0 15 3
B OSPDaemonSimpleSSH.run_command() 0 66 8
1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" OSP Daemon class for simple remote SSH-based command execution.
19
"""
20
21
22
# This is needed for older pythons as our current module is called the same
23
# as the ospd package
24
# Another solution would be to rename that file.
25
from __future__ import absolute_import
26
27
import socket
28
29
from typing import Optional, Dict
30
from ospd.ospd import OSPDaemon
31
32
try:
33
    import paramiko
34
except ImportError:
35
    paramiko = None
36
37
SSH_SCANNER_PARAMS = {
38
    'username_password': {
39
        'type': 'credential_up',
40
        'name': 'SSH credentials',
41
        'default': '',
42
        'mandatory': 0,
43
        'description': 'The SSH credentials in username:password format. Used'
44
        ' to log into the target and to run the commands on'
45
        ' that target. This should not be a privileged user'
46
        ' like "root", a regular privileged user account'
47
        ' should be sufficient in most cases.',
48
    },
49
    'port': {
50
        'type': 'integer',
51
        'name': 'SSH Port',
52
        'default': 22,
53
        'mandatory': 0,
54
        'description': 'The SSH port which to use for logging in with the'
55
        ' given username_password.',
56
    },
57
    'ssh_timeout': {
58
        'type': 'integer',
59
        'name': 'SSH timeout',
60
        'default': 30,
61
        'mandatory': 0,
62
        'description': 'Timeout when communicating with the target via SSH.',
63
    },
64
}  # type: Dict
65
66
# pylint: disable=abstract-method
67
class OSPDaemonSimpleSSH(OSPDaemon):
68
69
    """
70
    OSP Daemon class for simple remote SSH-based command execution.
71
72
    This class automatically adds scanner parameters to handle remote
73
    ssh login into the target systems: username, password, port and
74
    ssh_timout
75
76
    The method run_command can be used to execute a single command
77
    on the given remote system. The stdout result is returned as
78
    an array.
79
    """
80
81
    def __init__(self, **kwargs):
82
        """Initializes the daemon and add parameters needed to remote SSH
83
        execution."""
84
        super().__init__(**kwargs)
85
86
        self._niceness = kwargs.get('niceness', None)
87
88
        if paramiko is None:
89
            raise ImportError(
90
                'paramiko needs to be installed in order to use'
91
                ' the %s class.' % self.__class__.__name__
92
            )
93
94
        for name, param in SSH_SCANNER_PARAMS.items():
95
            self.set_scanner_param(name, param)
96
97
    def run_command(self, scan_id: str, host: str, cmd: str) -> Optional[str]:
98
        """
99
        Run a single command via SSH and return the content of stdout or
100
        None in case of an Error. A scan error is issued in the latter
101
        case.
102
103
        For logging into 'host', the scan options 'port', 'username',
104
        'password' and 'ssh_timeout' are used.
105
        """
106
107
        ssh = paramiko.SSHClient()
108
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
109
110
        options = self.get_scan_options(scan_id)
111
112
        port = int(options['port'])
113
        timeout = int(options['ssh_timeout'])
114
115
        # For backward compatibility, consider the legacy mode to get
116
        # credentials as scan_option.
117
        # First and second modes should be removed in future releases.
118
        # On the third case it receives the credentials as a subelement of
119
        # the <target>.
120
        credentials = self.get_scan_credentials(scan_id)
121
        if (
122
            'username_password' in options
123
            and ':' in options['username_password']
124
        ):
125
            username, password = options['username_password'].split(':', 1)
126
        elif 'username' in options and options['username']:
127
            username = options['username']
128
            password = options['password']
129
        elif credentials:
130
            cred_params = credentials.get('ssh')
131
            username = cred_params.get('username', '')
132
            password = cred_params.get('password', '')
133
        else:
134
            self.add_scan_error(
135
                scan_id, host=host, value='Erroneous username_password value'
136
            )
137
            raise ValueError('Erroneous username_password value')
138
139
        try:
140
            ssh.connect(
141
                hostname=host,
142
                username=username,
143
                password=password,
144
                timeout=timeout,
145
                port=port,
146
            )
147
        except (
148
            paramiko.ssh_exception.AuthenticationException,
149
            socket.error,
150
        ) as err:
151
            # Errors: No route to host, connection timeout, authentication
152
            # failure etc,.
153
            self.add_scan_error(scan_id, host=host, value=str(err))
154
            return None
155
156
        if self._niceness is not None:
157
            cmd = "nice -n %s %s" % (self._niceness, cmd)
158
        _, stdout, _ = ssh.exec_command(cmd)
159
        result = stdout.readlines()
160
        ssh.close()
161
162
        return result
163