Completed
Push — master ( 8082e9...516ce7 )
by
unknown
17s queued 13s
created

SSHDaemonTestCase.test_run_command_new_credential()   A

Complexity

Conditions 1

Size

Total Lines 23
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 18
nop 1
dl 0
loc 23
rs 9.5
c 0
b 0
f 0
1
# Copyright (C) 2015-2018 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: GPL-2.0-or-later
4
#
5
# This program is free software; you can redistribute it and/or
6
# modify it under the terms of the GNU General Public License
7
# as published by the Free Software Foundation; either version 2
8
# of the License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU General Public License for more details.
14
#
15
# You should have received a copy of the GNU General Public License
16
# along with this program; if not, write to the Free Software
17
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18
19
""" Test module for ospd ssh support.
20
"""
21
22
import unittest
23
24
from ospd import ospd_ssh
25
from ospd.ospd_ssh import OSPDaemonSimpleSSH
26
27
28
class FakeFile(object):
29
    def __init__(self, content):
30
        self.content = content
31
32
    def readlines(self):
33
        return self.content.split('\n')
34
35
36
commands = None  # pylint: disable=invalid-name
37
38
39
class FakeSSHClient(object):
40
    def __init__(self):
41
        global commands
42
        commands = []
43
44
    def set_missing_host_key_policy(self, policy):
45
        pass
46
47
    def connect(self, **kwargs):
48
        pass
49
50
    def exec_command(self, cmd):
51
        commands.append(cmd)
52
        return None, FakeFile(''), None
53
54
    def close(self):
55
        pass
56
57
58
class FakeExceptions(object):
59
    AuthenticationException = None  # pylint: disable=invalid-name
60
61
62
class fakeparamiko(object):  # pylint: disable=invalid-name
63
    @staticmethod
64
    def SSHClient(*args):  # pylint: disable=invalid-name
65
        return FakeSSHClient(*args)
66
67
    @staticmethod
68
    def AutoAddPolicy():  # pylint: disable=invalid-name
69
        pass
70
71
    ssh_exception = FakeExceptions
72
73
74
class SSHDaemonTestCase(unittest.TestCase):
75
    def test_no_paramiko(self):
76
        ospd_ssh.paramiko = None
77
        self.assertRaises(
78
            ImportError, OSPDaemonSimpleSSH, 'cert', 'key', 'ca', '10'
79
        )
80
81
    def test_run_command(self):
82
        ospd_ssh.paramiko = fakeparamiko
83
        daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
84
        scanid = daemon.create_scan(
85
            None,
86
            [['host.example.com', '80, 443', '', '']],
87
            dict(port=5, ssh_timeout=15, username_password='dummy:pw'),
88
            '',
89
        )
90
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
91
        self.assertTrue(isinstance(res, list))
92
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
93
94
    def test_run_command_legacy_credential(self):
95
        ospd_ssh.paramiko = fakeparamiko
96
        daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
97
        scanid = daemon.create_scan(
98
            None,
99
            [['host.example.com', '80, 443', '', '']],
100
            dict(port=5, ssh_timeout=15, username='dummy', password='pw'),
101
            '',
102
        )
103
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
104
        self.assertTrue(isinstance(res, list))
105
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
106
107
    def test_run_command_new_credential(self):
108
        ospd_ssh.paramiko = fakeparamiko
109
        daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
110
111
        cred_dict = {
112
            'ssh': {
113
                'type': 'up',
114
                'password': 'mypass',
115
                'port': '22',
116
                'username': 'scanuser',
117
            },
118
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
119
        }
120
121
        scanid = daemon.create_scan(
122
            None,
123
            [['host.example.com', '80, 443', cred_dict, '']],
124
            dict(port=5, ssh_timeout=15),
125
            '',
126
        )
127
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
128
        self.assertTrue(isinstance(res, list))
129
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
130
131
    def test_run_command_no_credential(self):
132
        ospd_ssh.paramiko = fakeparamiko
133
        daemon = OSPDaemonSimpleSSH('cert', 'key', 'ca', '10')
134
        scanid = daemon.create_scan(
135
            None,
136
            [['host.example.com', '80, 443', '', '']],
137
            dict(port=5, ssh_timeout=15),
138
            '',
139
        )
140
        self.assertRaises(
141
            ValueError,
142
            daemon.run_command,
143
            scanid,
144
            'host.example.com',
145
            'cat /etc/passwd',
146
        )
147