| Total Complexity | 69 |
| Total Lines | 492 |
| Duplicated Lines | 0 % |
Complex classes like st2actions.runners.ssh.ParamikoSSHClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Licensed to the Apache Software Foundation (ASF) under one or more |
||
| 71 | class ParamikoSSHClient(object): |
||
| 72 | """ |
||
| 73 | A SSH Client powered by Paramiko. |
||
| 74 | """ |
||
| 75 | |||
| 76 | # Maximum number of bytes to read at once from a socket |
||
| 77 | CHUNK_SIZE = 1024 |
||
| 78 | # How long to sleep while waiting for command to finish |
||
| 79 | SLEEP_DELAY = 1.5 |
||
| 80 | # Connect socket timeout |
||
| 81 | CONNECT_TIMEOUT = 60 |
||
| 82 | |||
| 83 | def __init__(self, hostname, port=22, username=None, password=None, bastion_host=None, |
||
| 84 | key=None, key_files=None, key_material=None, timeout=None, passphrase=None): |
||
| 85 | """ |
||
| 86 | Authentication is always attempted in the following order: |
||
| 87 | |||
| 88 | - The key passed in (if key is provided) |
||
| 89 | - Any key we can find through an SSH agent (only if no password and |
||
| 90 | key is provided) |
||
| 91 | - Any "id_rsa" or "id_dsa" key discoverable in ~/.ssh/ (only if no |
||
| 92 | password and key is provided) |
||
| 93 | - Plain username/password auth, if a password was given (if password is |
||
| 94 | provided) |
||
| 95 | """ |
||
| 96 | if key_files and key_material: |
||
| 97 | raise ValueError(('key_files and key_material arguments are ' |
||
| 98 | 'mutually exclusive')) |
||
| 99 | |||
| 100 | if passphrase and not key_material: |
||
| 101 | raise ValueError('passphrase should accompany private key material') |
||
| 102 | |||
| 103 | self.hostname = hostname |
||
| 104 | self.port = port |
||
| 105 | self.username = username if username else cfg.CONF.system_user |
||
| 106 | self.password = password |
||
| 107 | self.key = key if key else cfg.CONF.system_user.ssh_key_file |
||
| 108 | self.key_files = key_files |
||
| 109 | if not self.key_files and self.key: |
||
| 110 | self.key_files = key # `key` arg is deprecated. |
||
| 111 | self.timeout = timeout or ParamikoSSHClient.CONNECT_TIMEOUT |
||
| 112 | self.key_material = key_material |
||
| 113 | self.client = None |
||
| 114 | self.logger = logging.getLogger(__name__) |
||
| 115 | self.sftp = None |
||
| 116 | self.bastion_host = bastion_host |
||
| 117 | self.bastion_client = None |
||
| 118 | self.bastion_socket = None |
||
| 119 | self.passphrase = passphrase |
||
| 120 | |||
| 121 | def connect(self): |
||
| 122 | """ |
||
| 123 | Connect to the remote node over SSH. |
||
| 124 | |||
| 125 | :return: True if the connection has been successfully established, |
||
| 126 | False otherwise. |
||
| 127 | :rtype: ``bool`` |
||
| 128 | """ |
||
| 129 | if self.bastion_host: |
||
| 130 | self.logger.debug('Bastion host specified, connecting') |
||
| 131 | self.bastion_client = self._connect(host=self.bastion_host) |
||
| 132 | transport = self.bastion_client.get_transport() |
||
| 133 | real_addr = (self.hostname, self.port) |
||
| 134 | # fabric uses ('', 0) for direct-tcpip, this duplicates that behaviour |
||
| 135 | # see https://github.com/fabric/fabric/commit/c2a9bbfd50f560df6c6f9675603fb405c4071cad |
||
| 136 | local_addr = ('', 0) |
||
| 137 | self.bastion_socket = transport.open_channel('direct-tcpip', real_addr, local_addr) |
||
| 138 | |||
| 139 | self.client = self._connect(host=self.hostname, socket=self.bastion_socket) |
||
| 140 | self.sftp = self.client.open_sftp() |
||
| 141 | return True |
||
| 142 | |||
| 143 | def put(self, local_path, remote_path, mode=None, mirror_local_mode=False): |
||
| 144 | """ |
||
| 145 | Upload a file to the remote node. |
||
| 146 | |||
| 147 | :type local_path: ``st`` |
||
| 148 | :param local_path: File path on the local node. |
||
| 149 | |||
| 150 | :type remote_path: ``str`` |
||
| 151 | :param remote_path: File path on the remote node. |
||
| 152 | |||
| 153 | :type mode: ``int`` |
||
| 154 | :param mode: Permissions mode for the file. E.g. 0744. |
||
| 155 | |||
| 156 | :type mirror_local_mode: ``int`` |
||
| 157 | :param mirror_local_mode: Should remote file mirror local mode. |
||
| 158 | |||
| 159 | :return: Attributes of the remote file. |
||
| 160 | :rtype: :class:`posix.stat_result` or ``None`` |
||
| 161 | """ |
||
| 162 | |||
| 163 | if not local_path or not remote_path: |
||
| 164 | raise Exception('Need both local_path and remote_path. local: %s, remote: %s' % |
||
| 165 | local_path, remote_path) |
||
| 166 | local_path = quote_unix(local_path) |
||
| 167 | remote_path = quote_unix(remote_path) |
||
| 168 | |||
| 169 | extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode, |
||
| 170 | '_mirror_local_mode': mirror_local_mode} |
||
| 171 | self.logger.debug('Uploading file', extra=extra) |
||
| 172 | |||
| 173 | if not os.path.exists(local_path): |
||
| 174 | raise Exception('Path %s does not exist locally.' % local_path) |
||
| 175 | |||
| 176 | rattrs = self.sftp.put(local_path, remote_path) |
||
| 177 | |||
| 178 | if mode or mirror_local_mode: |
||
| 179 | local_mode = mode |
||
| 180 | if not mode or mirror_local_mode: |
||
| 181 | local_mode = os.stat(local_path).st_mode |
||
| 182 | |||
| 183 | # Cast to octal integer in case of string |
||
| 184 | if isinstance(local_mode, basestring): |
||
| 185 | local_mode = int(local_mode, 8) |
||
| 186 | local_mode = local_mode & 07777 |
||
| 187 | remote_mode = rattrs.st_mode |
||
| 188 | # Only bitshift if we actually got an remote_mode |
||
| 189 | if remote_mode is not None: |
||
| 190 | remote_mode = (remote_mode & 07777) |
||
| 191 | if local_mode != remote_mode: |
||
| 192 | self.sftp.chmod(remote_path, local_mode) |
||
| 193 | |||
| 194 | return rattrs |
||
| 195 | |||
| 196 | def put_dir(self, local_path, remote_path, mode=None, mirror_local_mode=False): |
||
| 197 | """ |
||
| 198 | Upload a dir to the remote node. |
||
| 199 | |||
| 200 | :type local_path: ``str`` |
||
| 201 | :param local_path: Dir path on the local node. |
||
| 202 | |||
| 203 | :type remote_path: ``str`` |
||
| 204 | :param remote_path: Base dir path on the remote node. |
||
| 205 | |||
| 206 | :type mode: ``int`` |
||
| 207 | :param mode: Permissions mode for the file. E.g. 0744. |
||
| 208 | |||
| 209 | :type mirror_local_mode: ``int`` |
||
| 210 | :param mirror_local_mode: Should remote file mirror local mode. |
||
| 211 | |||
| 212 | :return: List of files created on remote node. |
||
| 213 | :rtype: ``list`` of ``str`` |
||
| 214 | """ |
||
| 215 | |||
| 216 | extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode, |
||
| 217 | '_mirror_local_mode': mirror_local_mode} |
||
| 218 | self.logger.debug('Uploading dir', extra=extra) |
||
| 219 | |||
| 220 | if os.path.basename(local_path): |
||
| 221 | strip = os.path.dirname(local_path) |
||
| 222 | else: |
||
| 223 | strip = os.path.dirname(os.path.dirname(local_path)) |
||
| 224 | |||
| 225 | remote_paths = [] |
||
| 226 | |||
| 227 | for context, dirs, files in os.walk(local_path): |
||
| 228 | rcontext = context.replace(strip, '', 1) |
||
| 229 | # normalize pathname separators with POSIX separator |
||
| 230 | rcontext = rcontext.replace(os.sep, '/') |
||
| 231 | rcontext = rcontext.lstrip('/') |
||
| 232 | rcontext = posixpath.join(remote_path, rcontext) |
||
| 233 | |||
| 234 | if not self.exists(rcontext): |
||
| 235 | self.sftp.mkdir(rcontext) |
||
| 236 | |||
| 237 | for d in dirs: |
||
| 238 | n = posixpath.join(rcontext, d) |
||
| 239 | if not self.exists(n): |
||
| 240 | self.sftp.mkdir(n) |
||
| 241 | |||
| 242 | for f in files: |
||
| 243 | local_path = os.path.join(context, f) |
||
| 244 | n = posixpath.join(rcontext, f) |
||
| 245 | # Note that quote_unix is done by put anyways. |
||
| 246 | p = self.put(local_path=local_path, remote_path=n, |
||
| 247 | mirror_local_mode=mirror_local_mode, mode=mode) |
||
| 248 | remote_paths.append(p) |
||
| 249 | |||
| 250 | return remote_paths |
||
| 251 | |||
| 252 | def exists(self, remote_path): |
||
| 253 | """ |
||
| 254 | Validate whether a remote file or directory exists. |
||
| 255 | |||
| 256 | :param remote_path: Path to remote file. |
||
| 257 | :type remote_path: ``str`` |
||
| 258 | |||
| 259 | :rtype: ``bool`` |
||
| 260 | """ |
||
| 261 | try: |
||
| 262 | self.sftp.lstat(remote_path).st_mode |
||
| 263 | except IOError: |
||
| 264 | return False |
||
| 265 | |||
| 266 | return True |
||
| 267 | |||
| 268 | def mkdir(self, dir_path): |
||
| 269 | """ |
||
| 270 | Create a directory on remote box. |
||
| 271 | |||
| 272 | :param dir_path: Path to remote directory to be created. |
||
| 273 | :type dir_path: ``str`` |
||
| 274 | |||
| 275 | :return: Returns nothing if successful else raises IOError exception. |
||
| 276 | |||
| 277 | :rtype: ``None`` |
||
| 278 | """ |
||
| 279 | |||
| 280 | dir_path = quote_unix(dir_path) |
||
| 281 | extra = {'_dir_path': dir_path} |
||
| 282 | self.logger.debug('mkdir', extra=extra) |
||
| 283 | return self.sftp.mkdir(dir_path) |
||
| 284 | |||
| 285 | def delete_file(self, path): |
||
| 286 | """ |
||
| 287 | Delete a file on remote box. |
||
| 288 | |||
| 289 | :param path: Path to remote file to be deleted. |
||
| 290 | :type path: ``str`` |
||
| 291 | |||
| 292 | :return: True if the file has been successfully deleted, False |
||
| 293 | otherwise. |
||
| 294 | :rtype: ``bool`` |
||
| 295 | """ |
||
| 296 | |||
| 297 | path = quote_unix(path) |
||
| 298 | extra = {'_path': path} |
||
| 299 | self.logger.debug('Deleting file', extra=extra) |
||
| 300 | self.sftp.unlink(path) |
||
| 301 | return True |
||
| 302 | |||
| 303 | def delete_dir(self, path, force=False, timeout=None): |
||
| 304 | """ |
||
| 305 | Delete a dir on remote box. |
||
| 306 | |||
| 307 | :param path: Path to remote dir to be deleted. |
||
| 308 | :type path: ``str`` |
||
| 309 | |||
| 310 | :param force: Optional Forcefully remove dir. |
||
| 311 | :type force: ``bool`` |
||
| 312 | |||
| 313 | :param timeout: Optional Time to wait for dir to be deleted. Only relevant for force. |
||
| 314 | :type timeout: ``int`` |
||
| 315 | |||
| 316 | :return: True if the file has been successfully deleted, False |
||
| 317 | otherwise. |
||
| 318 | :rtype: ``bool`` |
||
| 319 | """ |
||
| 320 | |||
| 321 | path = quote_unix(path) |
||
| 322 | extra = {'_path': path} |
||
| 323 | if force: |
||
| 324 | command = 'rm -rf %s' % path |
||
| 325 | extra['_command'] = command |
||
| 326 | extra['_force'] = force |
||
| 327 | self.logger.debug('Deleting dir', extra=extra) |
||
| 328 | return self.run(command, timeout=timeout) |
||
| 329 | |||
| 330 | self.logger.debug('Deleting dir', extra=extra) |
||
| 331 | return self.sftp.rmdir(path) |
||
| 332 | |||
| 333 | def run(self, cmd, timeout=None, quote=False): |
||
| 334 | """ |
||
| 335 | Note: This function is based on paramiko's exec_command() |
||
| 336 | method. |
||
| 337 | |||
| 338 | :param timeout: How long to wait (in seconds) for the command to |
||
| 339 | finish (optional). |
||
| 340 | :type timeout: ``float`` |
||
| 341 | """ |
||
| 342 | |||
| 343 | if quote: |
||
| 344 | cmd = quote_unix(cmd) |
||
| 345 | |||
| 346 | extra = {'_cmd': cmd} |
||
| 347 | self.logger.info('Executing command', extra=extra) |
||
| 348 | |||
| 349 | # Use the system default buffer size |
||
| 350 | bufsize = -1 |
||
| 351 | |||
| 352 | transport = self.client.get_transport() |
||
| 353 | chan = transport.open_session() |
||
| 354 | |||
| 355 | start_time = time.time() |
||
| 356 | if cmd.startswith('sudo'): |
||
| 357 | # Note that fabric does this as well. If you set pty, stdout and stderr |
||
| 358 | # streams will be combined into one. |
||
| 359 | chan.get_pty() |
||
| 360 | chan.exec_command(cmd) |
||
| 361 | |||
| 362 | stdout = StringIO() |
||
| 363 | stderr = StringIO() |
||
| 364 | |||
| 365 | # Create a stdin file and immediately close it to prevent any |
||
| 366 | # interactive script from hanging the process. |
||
| 367 | stdin = chan.makefile('wb', bufsize) |
||
| 368 | stdin.close() |
||
| 369 | |||
| 370 | # Receive all the output |
||
| 371 | # Note #1: This is used instead of chan.makefile approach to prevent |
||
| 372 | # buffering issues and hanging if the executed command produces a lot |
||
| 373 | # of output. |
||
| 374 | # |
||
| 375 | # Note #2: If you are going to remove "ready" checks inside the loop |
||
| 376 | # you are going to have a bad time. Trying to consume from a channel |
||
| 377 | # which is not ready will block for indefinitely. |
||
| 378 | exit_status_ready = chan.exit_status_ready() |
||
| 379 | |||
| 380 | if exit_status_ready: |
||
| 381 | stdout.write(self._consume_stdout(chan).getvalue()) |
||
| 382 | stderr.write(self._consume_stderr(chan).getvalue()) |
||
| 383 | |||
| 384 | while not exit_status_ready: |
||
| 385 | current_time = time.time() |
||
| 386 | elapsed_time = (current_time - start_time) |
||
| 387 | |||
| 388 | if timeout and (elapsed_time > timeout): |
||
| 389 | # TODO: Is this the right way to clean up? |
||
| 390 | chan.close() |
||
| 391 | |||
| 392 | stdout = strip_shell_chars(stdout.getvalue()) |
||
| 393 | stderr = strip_shell_chars(stderr.getvalue()) |
||
| 394 | raise SSHCommandTimeoutError(cmd=cmd, timeout=timeout, stdout=stdout, |
||
| 395 | stderr=stderr) |
||
| 396 | |||
| 397 | stdout.write(self._consume_stdout(chan).getvalue()) |
||
| 398 | stderr.write(self._consume_stderr(chan).getvalue()) |
||
| 399 | |||
| 400 | # We need to check the exist status here, because the command could |
||
| 401 | # print some output and exit during this sleep bellow. |
||
| 402 | exit_status_ready = chan.exit_status_ready() |
||
| 403 | |||
| 404 | if exit_status_ready: |
||
| 405 | break |
||
| 406 | |||
| 407 | # Short sleep to prevent busy waiting |
||
| 408 | eventlet.sleep(self.SLEEP_DELAY) |
||
| 409 | # print('Wait over. Channel must be ready for host: %s' % self.hostname) |
||
| 410 | |||
| 411 | # Receive the exit status code of the command we ran. |
||
| 412 | status = chan.recv_exit_status() |
||
| 413 | |||
| 414 | stdout = strip_shell_chars(stdout.getvalue()) |
||
| 415 | stderr = strip_shell_chars(stderr.getvalue()) |
||
| 416 | |||
| 417 | extra = {'_status': status, '_stdout': stdout, '_stderr': stderr} |
||
| 418 | self.logger.debug('Command finished', extra=extra) |
||
| 419 | |||
| 420 | return [stdout, stderr, status] |
||
| 421 | |||
| 422 | def close(self): |
||
| 423 | self.logger.debug('Closing server connection') |
||
| 424 | |||
| 425 | self.client.close() |
||
| 426 | if self.sftp: |
||
| 427 | self.sftp.close() |
||
| 428 | if self.bastion_client: |
||
| 429 | self.bastion_client.close() |
||
| 430 | return True |
||
| 431 | |||
| 432 | def _consume_stdout(self, chan): |
||
| 433 | """ |
||
| 434 | Try to consume stdout data from chan if it's receive ready. |
||
| 435 | """ |
||
| 436 | |||
| 437 | out = bytearray() |
||
| 438 | stdout = StringIO() |
||
| 439 | if chan.recv_ready(): |
||
| 440 | data = chan.recv(self.CHUNK_SIZE) |
||
| 441 | out += data |
||
| 442 | |||
| 443 | while data: |
||
| 444 | ready = chan.recv_ready() |
||
| 445 | |||
| 446 | if not ready: |
||
| 447 | break |
||
| 448 | |||
| 449 | data = chan.recv(self.CHUNK_SIZE) |
||
| 450 | out += data |
||
| 451 | |||
| 452 | stdout.write(self._get_decoded_data(out)) |
||
| 453 | return stdout |
||
| 454 | |||
| 455 | def _consume_stderr(self, chan): |
||
| 456 | """ |
||
| 457 | Try to consume stderr data from chan if it's receive ready. |
||
| 458 | """ |
||
| 459 | |||
| 460 | out = bytearray() |
||
| 461 | stderr = StringIO() |
||
| 462 | if chan.recv_stderr_ready(): |
||
| 463 | data = chan.recv_stderr(self.CHUNK_SIZE) |
||
| 464 | out += data |
||
| 465 | |||
| 466 | while data: |
||
| 467 | ready = chan.recv_stderr_ready() |
||
| 468 | |||
| 469 | if not ready: |
||
| 470 | break |
||
| 471 | |||
| 472 | data = chan.recv_stderr(self.CHUNK_SIZE) |
||
| 473 | out += data |
||
| 474 | |||
| 475 | stderr.write(self._get_decoded_data(out)) |
||
| 476 | return stderr |
||
| 477 | |||
| 478 | def _get_decoded_data(self, data): |
||
| 479 | try: |
||
| 480 | return data.decode('utf-8') |
||
| 481 | except: |
||
| 482 | self.logger.exception('Non UTF-8 character found in data: %s', data) |
||
| 483 | raise |
||
| 484 | |||
| 485 | def _get_pkey_object(self, key_material, passphrase): |
||
| 486 | """ |
||
| 487 | Try to detect private key type and return paramiko.PKey object. |
||
| 488 | """ |
||
| 489 | |||
| 490 | for cls in [paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey]: |
||
| 491 | try: |
||
| 492 | key = cls.from_private_key(StringIO(key_material), password=passphrase) |
||
| 493 | except paramiko.ssh_exception.SSHException: |
||
| 494 | # Invalid key, try other key type |
||
| 495 | pass |
||
| 496 | else: |
||
| 497 | return key |
||
| 498 | |||
| 499 | # If a user passes in something which looks like file path we throw a more friendly |
||
| 500 | # exception letting the user know we expect the contents a not a path. |
||
| 501 | # Note: We do it here and not up the stack to avoid false positives. |
||
| 502 | contains_header = PRIVATE_KEY_HEADER in key_material.lower() |
||
| 503 | if not contains_header and (key_material.count('/') >= 1 or key_material.count('\\') >= 1): |
||
| 504 | msg = ('"private_key" parameter needs to contain private key data / content and not ' |
||
| 505 | 'a path') |
||
| 506 | elif passphrase: |
||
| 507 | msg = 'Invalid passphrase or invalid/unsupported key type' |
||
| 508 | else: |
||
| 509 | msg = 'Invalid or unsupported key type' |
||
| 510 | |||
| 511 | raise paramiko.ssh_exception.SSHException(msg) |
||
| 512 | |||
| 513 | def _connect(self, host, socket=None): |
||
| 514 | """ |
||
| 515 | |||
| 516 | :type host: ``str`` |
||
| 517 | :param host: Host to connect to |
||
| 518 | |||
| 519 | :type socket: :class:`paramiko.Channel` or an opened :class:`socket.socket` |
||
| 520 | :param socket: If specified, won't open a socket for communication to the specified host |
||
| 521 | and will use this instead |
||
| 522 | |||
| 523 | :return: A connected SSHClient |
||
| 524 | :rtype: :class:`paramiko.SSHClient` |
||
| 525 | """ |
||
| 526 | conninfo = {'hostname': host, |
||
| 527 | 'port': self.port, |
||
| 528 | 'username': self.username, |
||
| 529 | 'allow_agent': False, |
||
| 530 | 'look_for_keys': False, |
||
| 531 | 'timeout': self.timeout} |
||
| 532 | |||
| 533 | if self.password: |
||
| 534 | conninfo['password'] = self.password |
||
| 535 | |||
| 536 | if self.key_files: |
||
| 537 | conninfo['key_filename'] = self.key_files |
||
| 538 | |||
| 539 | if self.key_material: |
||
| 540 | conninfo['pkey'] = self._get_pkey_object(key_material=self.key_material, |
||
| 541 | passphrase=self.passphrase) |
||
| 542 | |||
| 543 | if not self.password and not (self.key_files or self.key_material): |
||
| 544 | conninfo['allow_agent'] = True |
||
| 545 | conninfo['look_for_keys'] = True |
||
| 546 | |||
| 547 | extra = {'_hostname': host, '_port': self.port, |
||
| 548 | '_username': self.username, '_timeout': self.timeout} |
||
| 549 | self.logger.debug('Connecting to server', extra=extra) |
||
| 550 | |||
| 551 | if socket: |
||
| 552 | conninfo['sock'] = socket |
||
| 553 | |||
| 554 | client = paramiko.SSHClient() |
||
| 555 | client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
||
| 556 | client.connect(**conninfo) |
||
| 557 | |||
| 558 | return client |
||
| 559 | |||
| 560 | def __repr__(self): |
||
| 561 | return ('<ParamikoSSHClient hostname=%s,port=%s,username=%s,id=%s>' % |
||
| 562 | (self.hostname, self.port, self.username, id(self))) |
||
| 563 |