Passed
Pull Request — master (#251)
by
unknown
01:29
created

tests.test_ssh_daemon.DummyWrapper.check()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# Copyright (C) 2014-2020 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Test module for ospd ssh support.
19
"""
20
21
import unittest
22
23
from ospd import ospd_ssh
24
from ospd.ospd_ssh import OSPDaemonSimpleSSH
25
26
27
class FakeFile(object):
28
    def __init__(self, content):
29
        self.content = content
30
31
    def readlines(self):
32
        return self.content.split('\n')
33
34
35
commands = None  # pylint: disable=invalid-name
36
37
38
class FakeSSHClient(object):
39
    def __init__(self):
40
        global commands  # pylint: disable=global-statement,invalid-name
41
        commands = []
42
43
    def set_missing_host_key_policy(self, policy):
44
        pass
45
46
    def connect(self, **kwargs):
47
        pass
48
49
    def exec_command(self, cmd):
50
        commands.append(cmd)
51
        return None, FakeFile(''), None
52
53
    def close(self):
54
        pass
55
56
57
class FakeExceptions(object):
58
    AuthenticationException = None  # pylint: disable=invalid-name
59
60
61
class fakeparamiko(object):  # pylint: disable=invalid-name
62
    @staticmethod
63
    def SSHClient(*args):  # pylint: disable=invalid-name
64
        return FakeSSHClient(*args)
65
66
    @staticmethod
67
    def AutoAddPolicy():  # pylint: disable=invalid-name
68
        pass
69
70
    ssh_exception = FakeExceptions
71
72
73
class DummyWrapper(OSPDaemonSimpleSSH):
74
    def __init__(self, niceness=10):
75
        super().__init__(niceness=niceness)
76
77
    def check(self):
78
        return True
79
80
    def exec_scan(self, scan_id: str):
81
        return
82
83
84
class SSHDaemonTestCase(unittest.TestCase):
85
    def test_no_paramiko(self):
86
        ospd_ssh.paramiko = None
87
88
        with self.assertRaises(ImportError):
89
            OSPDaemonSimpleSSH()
90
91 View Code Duplication
    def test_run_command(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
92
        ospd_ssh.paramiko = fakeparamiko
93
94
        daemon = DummyWrapper(niceness=10)
95
        scanid = daemon.create_scan(
96
            None,
97
            {
98
                'target': 'host.example.com',
99
                'ports': '80, 443',
100
                'credentials': {},
101
                'exclude_hosts': '',
102
                'finished_hosts': '',
103
                'options': {},
104
            },
105
            dict(port=5, ssh_timeout=15, username_password='dummy:pw'),
106
            '',
107
        )
108
109
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
110
111
        self.assertIsInstance(res, list)
112
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
113
114 View Code Duplication
    def test_run_command_legacy_credential(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
115
        ospd_ssh.paramiko = fakeparamiko
116
117
        daemon = DummyWrapper(niceness=10)
118
        scanid = daemon.create_scan(
119
            None,
120
            {
121
                'target': 'host.example.com',
122
                'ports': '80, 443',
123
                'credentials': {},
124
                'exclude_hosts': '',
125
                'finished_hosts': '',
126
                'options': {},
127
            },
128
            dict(port=5, ssh_timeout=15, username='dummy', password='pw'),
129
            '',
130
        )
131
132
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
133
134
        self.assertIsInstance(res, list)
135
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
136
137
    def test_run_command_new_credential(self):
138
        ospd_ssh.paramiko = fakeparamiko
139
140
        daemon = DummyWrapper(niceness=10)
141
142
        cred_dict = {
143
            'ssh': {
144
                'type': 'up',
145
                'password': 'mypass',
146
                'port': '22',
147
                'username': 'scanuser',
148
            },
149
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
150
        }
151
152
        scanid = daemon.create_scan(
153
            None,
154
            {
155
                'target': 'host.example.com',
156
                'ports': '80, 443',
157
                'credentials': cred_dict,
158
                'exclude_hosts': '',
159
                'finished_hosts': '',
160
                'options': {},
161
            },
162
            dict(port=5, ssh_timeout=15),
163
            '',
164
        )
165
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
166
167
        self.assertIsInstance(res, list)
168
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
169
170
    def test_run_command_no_credential(self):
171
        ospd_ssh.paramiko = fakeparamiko
172
173
        daemon = DummyWrapper(niceness=10)
174
        scanid = daemon.create_scan(
175
            None,
176
            {
177
                'target': 'host.example.com',
178
                'ports': '80, 443',
179
                'credentials': {},
180
                'exclude_hosts': '',
181
                'finished_hosts': '',
182
                'options': {},
183
            },
184
            dict(port=5, ssh_timeout=15),
185
            '',
186
        )
187
188
        with self.assertRaises(ValueError):
189
            daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
190