Conditions | 8 |
Total Lines | 66 |
Code Lines | 40 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
1 | # Copyright (C) 2014-2021 Greenbone Networks GmbH |
||
97 | def run_command(self, scan_id: str, host: str, cmd: str) -> Optional[str]: |
||
98 | """ |
||
99 | Run a single command via SSH and return the content of stdout or |
||
100 | None in case of an Error. A scan error is issued in the latter |
||
101 | case. |
||
102 | |||
103 | For logging into 'host', the scan options 'port', 'username', |
||
104 | 'password' and 'ssh_timeout' are used. |
||
105 | """ |
||
106 | |||
107 | ssh = paramiko.SSHClient() |
||
108 | ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
||
109 | |||
110 | options = self.get_scan_options(scan_id) |
||
111 | |||
112 | port = int(options['port']) |
||
113 | timeout = int(options['ssh_timeout']) |
||
114 | |||
115 | # For backward compatibility, consider the legacy mode to get |
||
116 | # credentials as scan_option. |
||
117 | # First and second modes should be removed in future releases. |
||
118 | # On the third case it receives the credentials as a subelement of |
||
119 | # the <target>. |
||
120 | credentials = self.get_scan_credentials(scan_id) |
||
121 | if ( |
||
122 | 'username_password' in options |
||
123 | and ':' in options['username_password'] |
||
124 | ): |
||
125 | username, password = options['username_password'].split(':', 1) |
||
126 | elif 'username' in options and options['username']: |
||
127 | username = options['username'] |
||
128 | password = options['password'] |
||
129 | elif credentials: |
||
130 | cred_params = credentials.get('ssh') |
||
131 | username = cred_params.get('username', '') |
||
132 | password = cred_params.get('password', '') |
||
133 | else: |
||
134 | self.add_scan_error( |
||
135 | scan_id, host=host, value='Erroneous username_password value' |
||
136 | ) |
||
137 | raise ValueError('Erroneous username_password value') |
||
138 | |||
139 | try: |
||
140 | ssh.connect( |
||
141 | hostname=host, |
||
142 | username=username, |
||
143 | password=password, |
||
144 | timeout=timeout, |
||
145 | port=port, |
||
146 | ) |
||
147 | except ( |
||
148 | paramiko.ssh_exception.AuthenticationException, |
||
149 | socket.error, |
||
150 | ) as err: |
||
151 | # Errors: No route to host, connection timeout, authentication |
||
152 | # failure etc,. |
||
153 | self.add_scan_error(scan_id, host=host, value=str(err)) |
||
154 | return None |
||
155 | |||
156 | if self._niceness is not None: |
||
157 | cmd = "nice -n %s %s" % (self._niceness, cmd) |
||
158 | _, stdout, _ = ssh.exec_command(cmd) |
||
159 | result = stdout.readlines() |
||
160 | ssh.close() |
||
161 | |||
162 | return result |
||
163 |