tests.test_ssh_daemon   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 196
Duplicated Lines 22.45 %

Importance

Changes 0
Metric Value
eloc 128
dl 44
loc 196
rs 10
c 0
b 0
f 0
wmc 19

17 Methods

Rating   Name   Duplication   Size   Complexity  
A FakeFile.readlines() 0 2 1
A FakeSSHClient.connect() 0 2 1
A fakeparamiko.SSHClient() 0 3 1
A FakeSSHClient.__init__() 0 3 1
A FakeSSHClient.exec_command() 0 3 1
A FakeSSHClient.close() 0 2 1
A FakeSSHClient.set_missing_host_key_policy() 0 2 1
A FakeFile.__init__() 0 2 1
A fakeparamiko.AutoAddPolicy() 0 3 1
A DummyWrapper.check() 0 2 1
A DummyWrapper.__init__() 0 5 1
A SSHDaemonTestCase.test_run_command() 22 22 1
A SSHDaemonTestCase.test_run_command_no_credential() 0 21 2
A DummyWrapper.exec_scan() 0 2 1
A SSHDaemonTestCase.test_run_command_legacy_credential() 22 22 1
A SSHDaemonTestCase.test_no_paramiko() 0 5 2
A SSHDaemonTestCase.test_run_command_new_credential() 0 33 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# Copyright (C) 2014-2021 Greenbone Networks GmbH
2
#
3
# SPDX-License-Identifier: AGPL-3.0-or-later
4
#
5
# This program is free software: you can redistribute it and/or modify
6
# it under the terms of the GNU Affero General Public License as
7
# published by the Free Software Foundation, either version 3 of the
8
# License, or (at your option) any later version.
9
#
10
# This program is distributed in the hope that it will be useful,
11
# but WITHOUT ANY WARRANTY; without even the implied warranty of
12
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
# GNU Affero General Public License for more details.
14
#
15
# You should have received a copy of the GNU Affero General Public License
16
# along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18
""" Test module for ospd ssh support.
19
"""
20
21
import unittest
22
23
from ospd import ospd_ssh
24
from ospd.ospd_ssh import OSPDaemonSimpleSSH
25
from .helper import FakeDataManager
26
27
28
class FakeFile(object):
29
    def __init__(self, content):
30
        self.content = content
31
32
    def readlines(self):
33
        return self.content.split('\n')
34
35
36
commands = None  # pylint: disable=invalid-name
37
38
39
class FakeSSHClient(object):
40
    def __init__(self):
41
        global commands  # pylint: disable=global-statement,invalid-name
42
        commands = []
43
44
    def set_missing_host_key_policy(self, policy):
45
        pass
46
47
    def connect(self, **kwargs):
48
        pass
49
50
    def exec_command(self, cmd):
51
        commands.append(cmd)
52
        return None, FakeFile(''), None
53
54
    def close(self):
55
        pass
56
57
58
class FakeExceptions(object):
59
    AuthenticationException = None  # pylint: disable=invalid-name
60
61
62
class fakeparamiko(object):  # pylint: disable=invalid-name
63
    @staticmethod
64
    def SSHClient(*args):  # pylint: disable=invalid-name
65
        return FakeSSHClient(*args)
66
67
    @staticmethod
68
    def AutoAddPolicy():  # pylint: disable=invalid-name
69
        pass
70
71
    ssh_exception = FakeExceptions
72
73
74
class DummyWrapper(OSPDaemonSimpleSSH):
75
    def __init__(self, niceness=10):
76
        super().__init__(niceness=niceness)
77
        self.scan_collection.data_manager = FakeDataManager()
78
        self.scan_collection.file_storage_dir = '/tmp'
79
        self.initialized = True
80
81
    def check(self):
82
        return True
83
84
    def exec_scan(self, scan_id: str):
85
        return
86
87
88
class SSHDaemonTestCase(unittest.TestCase):
89
    def test_no_paramiko(self):
90
        ospd_ssh.paramiko = None
91
92
        with self.assertRaises(ImportError):
93
            OSPDaemonSimpleSSH()
94
95 View Code Duplication
    def test_run_command(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
96
        ospd_ssh.paramiko = fakeparamiko
97
98
        daemon = DummyWrapper(niceness=10)
99
        scanid = daemon.create_scan(
100
            None,
101
            {
102
                'target': 'host.example.com',
103
                'ports': '80, 443',
104
                'credentials': {},
105
                'exclude_hosts': '',
106
                'finished_hosts': '',
107
                'options': {},
108
            },
109
            dict(port=5, ssh_timeout=15, username_password='dummy:pw'),
110
            '',
111
        )
112
        daemon.start_queued_scans()
113
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
114
115
        self.assertIsInstance(res, list)
116
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
117
118 View Code Duplication
    def test_run_command_legacy_credential(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
119
        ospd_ssh.paramiko = fakeparamiko
120
121
        daemon = DummyWrapper(niceness=10)
122
        scanid = daemon.create_scan(
123
            None,
124
            {
125
                'target': 'host.example.com',
126
                'ports': '80, 443',
127
                'credentials': {},
128
                'exclude_hosts': '',
129
                'finished_hosts': '',
130
                'options': {},
131
            },
132
            dict(port=5, ssh_timeout=15, username='dummy', password='pw'),
133
            '',
134
        )
135
        daemon.start_queued_scans()
136
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
137
138
        self.assertIsInstance(res, list)
139
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
140
141
    def test_run_command_new_credential(self):
142
        ospd_ssh.paramiko = fakeparamiko
143
144
        daemon = DummyWrapper(niceness=10)
145
146
        cred_dict = {
147
            'ssh': {
148
                'type': 'up',
149
                'password': 'mypass',
150
                'port': '22',
151
                'username': 'scanuser',
152
            },
153
            'smb': {'type': 'up', 'password': 'mypass', 'username': 'smbuser'},
154
        }
155
156
        scanid = daemon.create_scan(
157
            None,
158
            {
159
                'target': 'host.example.com',
160
                'ports': '80, 443',
161
                'credentials': cred_dict,
162
                'exclude_hosts': '',
163
                'finished_hosts': '',
164
                'options': {},
165
            },
166
            dict(port=5, ssh_timeout=15),
167
            '',
168
        )
169
        daemon.start_queued_scans()
170
        res = daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
171
172
        self.assertIsInstance(res, list)
173
        self.assertEqual(commands, ['nice -n 10 cat /etc/passwd'])
174
175
    def test_run_command_no_credential(self):
176
        ospd_ssh.paramiko = fakeparamiko
177
178
        daemon = DummyWrapper(niceness=10)
179
        scanid = daemon.create_scan(
180
            None,
181
            {
182
                'target': 'host.example.com',
183
                'ports': '80, 443',
184
                'credentials': {},
185
                'exclude_hosts': '',
186
                'finished_hosts': '',
187
                'options': {},
188
            },
189
            dict(port=5, ssh_timeout=15),
190
            '',
191
        )
192
        daemon.start_queued_scans()
193
194
        with self.assertRaises(ValueError):
195
            daemon.run_command(scanid, 'host.example.com', 'cat /etc/passwd')
196