1 | # Copyright (C) 2014-2021 Greenbone Networks GmbH |
||
2 | # |
||
3 | # SPDX-License-Identifier: AGPL-3.0-or-later |
||
4 | # |
||
5 | # This program is free software: you can redistribute it and/or modify |
||
6 | # it under the terms of the GNU Affero General Public License as |
||
7 | # published by the Free Software Foundation, either version 3 of the |
||
8 | # License, or (at your option) any later version. |
||
9 | # |
||
10 | # This program is distributed in the hope that it will be useful, |
||
11 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
12 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
13 | # GNU Affero General Public License for more details. |
||
14 | # |
||
15 | # You should have received a copy of the GNU Affero General Public License |
||
16 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
17 | |||
18 | """ Test module for ospd ssh support. |
||
19 | """ |
||
20 | |||
21 | import unittest |
||
22 | |||
23 | from ospd import ospd_ssh |
||
24 | from ospd.ospd_ssh import OSPDaemonSimpleSSH |
||
25 | from .helper import FakeDataManager |
||
26 | |||
27 | |||
28 | class FakeFile(object): |
||
29 | def __init__(self, content): |
||
30 | self.content = content |
||
31 | |||
32 | def readlines(self): |
||
33 | return self.content.split('\n') |
||
34 | |||
35 | |||
36 | commands = None # pylint: disable=invalid-name |
||
37 | |||
38 | |||
39 | class FakeSSHClient(object): |
||
40 | def __init__(self): |
||
41 | global commands # pylint: disable=global-statement,invalid-name |
||
42 | commands = [] |
||
43 | |||
44 | def set_missing_host_key_policy(self, policy): |
||
45 | pass |
||
46 | |||
47 | def connect(self, **kwargs): |
||
48 | pass |
||
49 | |||
50 | def exec_command(self, cmd): |
||
51 | commands.append(cmd) |
||
52 | return None, FakeFile(''), None |
||
53 | |||
54 | def close(self): |
||
55 | pass |
||
56 | |||
57 | |||
58 | class FakeExceptions(object): |
||
59 | AuthenticationException = None # pylint: disable=invalid-name |
||
60 | |||
61 | |||
62 | class fakeparamiko(object): # pylint: disable=invalid-name |
||
63 | @staticmethod |
||
64 | def SSHClient(*args): # pylint: disable=invalid-name |
||
65 | return FakeSSHClient(*args) |
||
66 | |||
67 | @staticmethod |
||
68 | def AutoAddPolicy(): # pylint: disable=invalid-name |
||
69 | pass |
||
70 | |||
71 | ssh_exception = FakeExceptions |
||
72 | |||
73 | |||
74 | class DummyWrapper(OSPDaemonSimpleSSH): |
||
75 | def __init__(self, niceness=10): |
||
76 | super().__init__(niceness=niceness) |
||
77 | self.scan_collection.data_manager = FakeDataManager() |
||
78 | self.scan_collection.file_storage_dir = '/tmp' |
||
79 | self.initialized = True |
||
80 | |||
81 | def check(self): |
||
82 | return True |
||
83 | |||
84 | def exec_scan(self, scan_id: str): |
||
85 | return |
||
86 | |||
87 | |||
88 | class SSHDaemonTestCase(unittest.TestCase): |
||
89 | def test_no_paramiko(self): |
||
90 | ospd_ssh.paramiko = None |
||
91 | |||
92 | with self.assertRaises(ImportError): |
||
93 | OSPDaemonSimpleSSH() |
||
94 | |||
95 | View Code Duplication | def test_run_command(self): |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
96 | ospd_ssh.paramiko = fakeparamiko |
||
97 | |||
98 | daemon = DummyWrapper(niceness=10) |
||
99 | scanid = daemon.create_scan( |
||
100 | None, |
||
101 | { |
||
102 | 'target': 'host.example.com', |
||
103 | 'ports': '80, 443', |
||
104 | 'credentials': {}, |
||
105 | 'exclude_hosts': '', |
||
106 | 'finished_hosts': '', |
||
107 | 'options': {}, |
||
108 | }, |
||
109 | dict(port=5, ssh_timeout=15, username_password='dummy:pw'), |
||
110 | '', |
||
111 | ) |
||
112 | daemon.start_queued_scans() |
||
113 | res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd') |
||
114 | |||
115 | self.assertIsInstance(res, list) |
||
116 | self.assertEqual(commands, ['nice -n 10 cat /etc/passwd']) |
||
117 | |||
118 | View Code Duplication | def test_run_command_legacy_credential(self): |
|
0 ignored issues
–
show
|
|||
119 | ospd_ssh.paramiko = fakeparamiko |
||
120 | |||
121 | daemon = DummyWrapper(niceness=10) |
||
122 | scanid = daemon.create_scan( |
||
123 | None, |
||
124 | { |
||
125 | 'target': 'host.example.com', |
||
126 | 'ports': '80, 443', |
||
127 | 'credentials': {}, |
||
128 | 'exclude_hosts': '', |
||
129 | 'finished_hosts': '', |
||
130 | 'options': {}, |
||
131 | }, |
||
132 | dict(port=5, ssh_timeout=15, username='dummy', password='pw'), |
||
133 | '', |
||
134 | ) |
||
135 | daemon.start_queued_scans() |
||
136 | res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd') |
||
137 | |||
138 | self.assertIsInstance(res, list) |
||
139 | self.assertEqual(commands, ['nice -n 10 cat /etc/passwd']) |
||
140 | |||
141 | def test_run_command_new_credential(self): |
||
142 | ospd_ssh.paramiko = fakeparamiko |
||
143 | |||
144 | daemon = DummyWrapper(niceness=10) |
||
145 | |||
146 | cred_dict = { |
||
147 | 'ssh': { |
||
148 | 'type': 'up', |
||
149 | 'password': 'mypass', |
||
150 | 'port': '22', |
||
151 | 'username': 'scanuser', |
||
152 | }, |
||
153 | 'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'}, |
||
154 | } |
||
155 | |||
156 | scanid = daemon.create_scan( |
||
157 | None, |
||
158 | { |
||
159 | 'target': 'host.example.com', |
||
160 | 'ports': '80, 443', |
||
161 | 'credentials': cred_dict, |
||
162 | 'exclude_hosts': '', |
||
163 | 'finished_hosts': '', |
||
164 | 'options': {}, |
||
165 | }, |
||
166 | dict(port=5, ssh_timeout=15), |
||
167 | '', |
||
168 | ) |
||
169 | daemon.start_queued_scans() |
||
170 | res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd') |
||
171 | |||
172 | self.assertIsInstance(res, list) |
||
173 | self.assertEqual(commands, ['nice -n 10 cat /etc/passwd']) |
||
174 | |||
175 | def test_run_command_no_credential(self): |
||
176 | ospd_ssh.paramiko = fakeparamiko |
||
177 | |||
178 | daemon = DummyWrapper(niceness=10) |
||
179 | scanid = daemon.create_scan( |
||
180 | None, |
||
181 | { |
||
182 | 'target': 'host.example.com', |
||
183 | 'ports': '80, 443', |
||
184 | 'credentials': {}, |
||
185 | 'exclude_hosts': '', |
||
186 | 'finished_hosts': '', |
||
187 | 'options': {}, |
||
188 | }, |
||
189 | dict(port=5, ssh_timeout=15), |
||
190 | '', |
||
191 | ) |
||
192 | daemon.start_queued_scans() |
||
193 | |||
194 | with self.assertRaises(ValueError): |
||
195 | daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd') |
||
196 |