| Conditions | 8 |
| Total Lines | 66 |
| Code Lines | 40 |
| Lines | 0 |
| Ratio | 0 % |
| Changes | 0 | ||
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
| 1 | # Copyright (C) 2014-2021 Greenbone Networks GmbH |
||
| 97 | def run_command(self, scan_id: str, host: str, cmd: str) -> Optional[str]: |
||
| 98 | """ |
||
| 99 | Run a single command via SSH and return the content of stdout or |
||
| 100 | None in case of an Error. A scan error is issued in the latter |
||
| 101 | case. |
||
| 102 | |||
| 103 | For logging into 'host', the scan options 'port', 'username', |
||
| 104 | 'password' and 'ssh_timeout' are used. |
||
| 105 | """ |
||
| 106 | |||
| 107 | ssh = paramiko.SSHClient() |
||
| 108 | ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
||
| 109 | |||
| 110 | options = self.get_scan_options(scan_id) |
||
| 111 | |||
| 112 | port = int(options['port']) |
||
| 113 | timeout = int(options['ssh_timeout']) |
||
| 114 | |||
| 115 | # For backward compatibility, consider the legacy mode to get |
||
| 116 | # credentials as scan_option. |
||
| 117 | # First and second modes should be removed in future releases. |
||
| 118 | # On the third case it receives the credentials as a subelement of |
||
| 119 | # the <target>. |
||
| 120 | credentials = self.get_scan_credentials(scan_id) |
||
| 121 | if ( |
||
| 122 | 'username_password' in options |
||
| 123 | and ':' in options['username_password'] |
||
| 124 | ): |
||
| 125 | username, password = options['username_password'].split(':', 1) |
||
| 126 | elif 'username' in options and options['username']: |
||
| 127 | username = options['username'] |
||
| 128 | password = options['password'] |
||
| 129 | elif credentials: |
||
| 130 | cred_params = credentials.get('ssh') |
||
| 131 | username = cred_params.get('username', '') |
||
| 132 | password = cred_params.get('password', '') |
||
| 133 | else: |
||
| 134 | self.add_scan_error( |
||
| 135 | scan_id, host=host, value='Erroneous username_password value' |
||
| 136 | ) |
||
| 137 | raise ValueError('Erroneous username_password value') |
||
| 138 | |||
| 139 | try: |
||
| 140 | ssh.connect( |
||
| 141 | hostname=host, |
||
| 142 | username=username, |
||
| 143 | password=password, |
||
| 144 | timeout=timeout, |
||
| 145 | port=port, |
||
| 146 | ) |
||
| 147 | except ( |
||
| 148 | paramiko.ssh_exception.AuthenticationException, |
||
| 149 | socket.error, |
||
| 150 | ) as err: |
||
| 151 | # Errors: No route to host, connection timeout, authentication |
||
| 152 | # failure etc,. |
||
| 153 | self.add_scan_error(scan_id, host=host, value=str(err)) |
||
| 154 | return None |
||
| 155 | |||
| 156 | if self._niceness is not None: |
||
| 157 | cmd = "nice -n %s %s" % (self._niceness, cmd) |
||
| 158 | _, stdout, _ = ssh.exec_command(cmd) |
||
| 159 | result = stdout.readlines() |
||
| 160 | ssh.close() |
||
| 161 | |||
| 162 | return result |
||
| 163 |