| Total Complexity | 66 |
| Total Lines | 485 |
| Duplicated Lines | 0 % |
Complex classes like st2actions.runners.ssh.ParamikoSSHClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Licensed to the Apache Software Foundation (ASF) under one or more |
||
| 71 | class ParamikoSSHClient(object): |
||
| 72 | """ |
||
| 73 | A SSH Client powered by Paramiko. |
||
| 74 | """ |
||
| 75 | |||
| 76 | # Maximum number of bytes to read at once from a socket |
||
| 77 | CHUNK_SIZE = 1024 |
||
| 78 | # How long to sleep while waiting for command to finish |
||
| 79 | SLEEP_DELAY = 1.5 |
||
| 80 | # Connect socket timeout |
||
| 81 | CONNECT_TIMEOUT = 60 |
||
| 82 | |||
| 83 | def __init__(self, hostname, port=22, username=None, password=None, bastion_host=None, |
||
| 84 | key=None, key_files=None, key_material=None, timeout=None): |
||
| 85 | """ |
||
| 86 | Authentication is always attempted in the following order: |
||
| 87 | |||
| 88 | - The key passed in (if key is provided) |
||
| 89 | - Any key we can find through an SSH agent (only if no password and |
||
| 90 | key is provided) |
||
| 91 | - Any "id_rsa" or "id_dsa" key discoverable in ~/.ssh/ (only if no |
||
| 92 | password and key is provided) |
||
| 93 | - Plain username/password auth, if a password was given (if password is |
||
| 94 | provided) |
||
| 95 | """ |
||
| 96 | if key_files and key_material: |
||
| 97 | raise ValueError(('key_files and key_material arguments are ' |
||
| 98 | 'mutually exclusive')) |
||
| 99 | |||
| 100 | self.hostname = hostname |
||
| 101 | self.port = port |
||
| 102 | self.username = username if username else cfg.CONF.system_user |
||
| 103 | self.password = password |
||
| 104 | self.key = key if key else cfg.CONF.system_user.ssh_key_file |
||
| 105 | self.key_files = key_files |
||
| 106 | if not self.key_files and self.key: |
||
| 107 | self.key_files = key # `key` arg is deprecated. |
||
| 108 | self.timeout = timeout or ParamikoSSHClient.CONNECT_TIMEOUT |
||
| 109 | self.key_material = key_material |
||
| 110 | self.client = None |
||
| 111 | self.logger = logging.getLogger(__name__) |
||
| 112 | self.sftp = None |
||
| 113 | self.bastion_host = bastion_host |
||
| 114 | self.bastion_client = None |
||
| 115 | self.bastion_socket = None |
||
| 116 | |||
| 117 | def connect(self): |
||
| 118 | """ |
||
| 119 | Connect to the remote node over SSH. |
||
| 120 | |||
| 121 | :return: True if the connection has been successfully established, |
||
| 122 | False otherwise. |
||
| 123 | :rtype: ``bool`` |
||
| 124 | """ |
||
| 125 | if self.bastion_host: |
||
| 126 | self.logger.debug('Bastion host specified, connecting') |
||
| 127 | self.bastion_client = self._connect(host=self.bastion_host) |
||
| 128 | transport = self.bastion_client.get_transport() |
||
| 129 | real_addr = (self.hostname, self.port) |
||
| 130 | # fabric uses ('', 0) for direct-tcpip, this duplicates that behaviour |
||
| 131 | # see https://github.com/fabric/fabric/commit/c2a9bbfd50f560df6c6f9675603fb405c4071cad |
||
| 132 | local_addr = ('', 0) |
||
| 133 | self.bastion_socket = transport.open_channel('direct-tcpip', real_addr, local_addr) |
||
| 134 | |||
| 135 | self.client = self._connect(host=self.hostname, socket=self.bastion_socket) |
||
| 136 | self.sftp = self.client.open_sftp() |
||
| 137 | return True |
||
| 138 | |||
| 139 | def put(self, local_path, remote_path, mode=None, mirror_local_mode=False): |
||
| 140 | """ |
||
| 141 | Upload a file to the remote node. |
||
| 142 | |||
| 143 | :type local_path: ``st`` |
||
| 144 | :param local_path: File path on the local node. |
||
| 145 | |||
| 146 | :type remote_path: ``str`` |
||
| 147 | :param remote_path: File path on the remote node. |
||
| 148 | |||
| 149 | :type mode: ``int`` |
||
| 150 | :param mode: Permissions mode for the file. E.g. 0744. |
||
| 151 | |||
| 152 | :type mirror_local_mode: ``int`` |
||
| 153 | :param mirror_local_mode: Should remote file mirror local mode. |
||
| 154 | |||
| 155 | :return: Attributes of the remote file. |
||
| 156 | :rtype: :class:`posix.stat_result` or ``None`` |
||
| 157 | """ |
||
| 158 | |||
| 159 | if not local_path or not remote_path: |
||
| 160 | raise Exception('Need both local_path and remote_path. local: %s, remote: %s' % |
||
| 161 | local_path, remote_path) |
||
| 162 | local_path = quote_unix(local_path) |
||
| 163 | remote_path = quote_unix(remote_path) |
||
| 164 | |||
| 165 | extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode, |
||
| 166 | '_mirror_local_mode': mirror_local_mode} |
||
| 167 | self.logger.debug('Uploading file', extra=extra) |
||
| 168 | |||
| 169 | if not os.path.exists(local_path): |
||
| 170 | raise Exception('Path %s does not exist locally.' % local_path) |
||
| 171 | |||
| 172 | rattrs = self.sftp.put(local_path, remote_path) |
||
| 173 | |||
| 174 | if mode or mirror_local_mode: |
||
| 175 | local_mode = mode |
||
| 176 | if not mode or mirror_local_mode: |
||
| 177 | local_mode = os.stat(local_path).st_mode |
||
| 178 | |||
| 179 | # Cast to octal integer in case of string |
||
| 180 | if isinstance(local_mode, basestring): |
||
| 181 | local_mode = int(local_mode, 8) |
||
| 182 | local_mode = local_mode & 07777 |
||
| 183 | remote_mode = rattrs.st_mode |
||
| 184 | # Only bitshift if we actually got an remote_mode |
||
| 185 | if remote_mode is not None: |
||
| 186 | remote_mode = (remote_mode & 07777) |
||
| 187 | if local_mode != remote_mode: |
||
| 188 | self.sftp.chmod(remote_path, local_mode) |
||
| 189 | |||
| 190 | return rattrs |
||
| 191 | |||
| 192 | def put_dir(self, local_path, remote_path, mode=None, mirror_local_mode=False): |
||
| 193 | """ |
||
| 194 | Upload a dir to the remote node. |
||
| 195 | |||
| 196 | :type local_path: ``str`` |
||
| 197 | :param local_path: Dir path on the local node. |
||
| 198 | |||
| 199 | :type remote_path: ``str`` |
||
| 200 | :param remote_path: Base dir path on the remote node. |
||
| 201 | |||
| 202 | :type mode: ``int`` |
||
| 203 | :param mode: Permissions mode for the file. E.g. 0744. |
||
| 204 | |||
| 205 | :type mirror_local_mode: ``int`` |
||
| 206 | :param mirror_local_mode: Should remote file mirror local mode. |
||
| 207 | |||
| 208 | :return: List of files created on remote node. |
||
| 209 | :rtype: ``list`` of ``str`` |
||
| 210 | """ |
||
| 211 | |||
| 212 | extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode, |
||
| 213 | '_mirror_local_mode': mirror_local_mode} |
||
| 214 | self.logger.debug('Uploading dir', extra=extra) |
||
| 215 | |||
| 216 | if os.path.basename(local_path): |
||
| 217 | strip = os.path.dirname(local_path) |
||
| 218 | else: |
||
| 219 | strip = os.path.dirname(os.path.dirname(local_path)) |
||
| 220 | |||
| 221 | remote_paths = [] |
||
| 222 | |||
| 223 | for context, dirs, files in os.walk(local_path): |
||
| 224 | rcontext = context.replace(strip, '', 1) |
||
| 225 | # normalize pathname separators with POSIX separator |
||
| 226 | rcontext = rcontext.replace(os.sep, '/') |
||
| 227 | rcontext = rcontext.lstrip('/') |
||
| 228 | rcontext = posixpath.join(remote_path, rcontext) |
||
| 229 | |||
| 230 | if not self.exists(rcontext): |
||
| 231 | self.sftp.mkdir(rcontext) |
||
| 232 | |||
| 233 | for d in dirs: |
||
| 234 | n = posixpath.join(rcontext, d) |
||
| 235 | if not self.exists(n): |
||
| 236 | self.sftp.mkdir(n) |
||
| 237 | |||
| 238 | for f in files: |
||
| 239 | local_path = os.path.join(context, f) |
||
| 240 | n = posixpath.join(rcontext, f) |
||
| 241 | # Note that quote_unix is done by put anyways. |
||
| 242 | p = self.put(local_path=local_path, remote_path=n, |
||
| 243 | mirror_local_mode=mirror_local_mode, mode=mode) |
||
| 244 | remote_paths.append(p) |
||
| 245 | |||
| 246 | return remote_paths |
||
| 247 | |||
| 248 | def exists(self, remote_path): |
||
| 249 | """ |
||
| 250 | Validate whether a remote file or directory exists. |
||
| 251 | |||
| 252 | :param remote_path: Path to remote file. |
||
| 253 | :type remote_path: ``str`` |
||
| 254 | |||
| 255 | :rtype: ``bool`` |
||
| 256 | """ |
||
| 257 | try: |
||
| 258 | self.sftp.lstat(remote_path).st_mode |
||
| 259 | except IOError: |
||
| 260 | return False |
||
| 261 | |||
| 262 | return True |
||
| 263 | |||
| 264 | def mkdir(self, dir_path): |
||
| 265 | """ |
||
| 266 | Create a directory on remote box. |
||
| 267 | |||
| 268 | :param dir_path: Path to remote directory to be created. |
||
| 269 | :type dir_path: ``str`` |
||
| 270 | |||
| 271 | :return: Returns nothing if successful else raises IOError exception. |
||
| 272 | |||
| 273 | :rtype: ``None`` |
||
| 274 | """ |
||
| 275 | |||
| 276 | dir_path = quote_unix(dir_path) |
||
| 277 | extra = {'_dir_path': dir_path} |
||
| 278 | self.logger.debug('mkdir', extra=extra) |
||
| 279 | return self.sftp.mkdir(dir_path) |
||
| 280 | |||
| 281 | def delete_file(self, path): |
||
| 282 | """ |
||
| 283 | Delete a file on remote box. |
||
| 284 | |||
| 285 | :param path: Path to remote file to be deleted. |
||
| 286 | :type path: ``str`` |
||
| 287 | |||
| 288 | :return: True if the file has been successfully deleted, False |
||
| 289 | otherwise. |
||
| 290 | :rtype: ``bool`` |
||
| 291 | """ |
||
| 292 | |||
| 293 | path = quote_unix(path) |
||
| 294 | extra = {'_path': path} |
||
| 295 | self.logger.debug('Deleting file', extra=extra) |
||
| 296 | self.sftp.unlink(path) |
||
| 297 | return True |
||
| 298 | |||
| 299 | def delete_dir(self, path, force=False, timeout=None): |
||
| 300 | """ |
||
| 301 | Delete a dir on remote box. |
||
| 302 | |||
| 303 | :param path: Path to remote dir to be deleted. |
||
| 304 | :type path: ``str`` |
||
| 305 | |||
| 306 | :param force: Optional Forcefully remove dir. |
||
| 307 | :type force: ``bool`` |
||
| 308 | |||
| 309 | :param timeout: Optional Time to wait for dir to be deleted. Only relevant for force. |
||
| 310 | :type timeout: ``int`` |
||
| 311 | |||
| 312 | :return: True if the file has been successfully deleted, False |
||
| 313 | otherwise. |
||
| 314 | :rtype: ``bool`` |
||
| 315 | """ |
||
| 316 | |||
| 317 | path = quote_unix(path) |
||
| 318 | extra = {'_path': path} |
||
| 319 | if force: |
||
| 320 | command = 'rm -rf %s' % path |
||
| 321 | extra['_command'] = command |
||
| 322 | extra['_force'] = force |
||
| 323 | self.logger.debug('Deleting dir', extra=extra) |
||
| 324 | return self.run(command, timeout=timeout) |
||
| 325 | |||
| 326 | self.logger.debug('Deleting dir', extra=extra) |
||
| 327 | return self.sftp.rmdir(path) |
||
| 328 | |||
| 329 | def run(self, cmd, timeout=None, quote=False): |
||
| 330 | """ |
||
| 331 | Note: This function is based on paramiko's exec_command() |
||
| 332 | method. |
||
| 333 | |||
| 334 | :param timeout: How long to wait (in seconds) for the command to |
||
| 335 | finish (optional). |
||
| 336 | :type timeout: ``float`` |
||
| 337 | """ |
||
| 338 | |||
| 339 | if quote: |
||
| 340 | cmd = quote_unix(cmd) |
||
| 341 | |||
| 342 | extra = {'_cmd': cmd} |
||
| 343 | self.logger.info('Executing command', extra=extra) |
||
| 344 | |||
| 345 | # Use the system default buffer size |
||
| 346 | bufsize = -1 |
||
| 347 | |||
| 348 | transport = self.client.get_transport() |
||
| 349 | chan = transport.open_session() |
||
| 350 | |||
| 351 | start_time = time.time() |
||
| 352 | if cmd.startswith('sudo'): |
||
| 353 | # Note that fabric does this as well. If you set pty, stdout and stderr |
||
| 354 | # streams will be combined into one. |
||
| 355 | chan.get_pty() |
||
| 356 | chan.exec_command(cmd) |
||
| 357 | |||
| 358 | stdout = StringIO() |
||
| 359 | stderr = StringIO() |
||
| 360 | |||
| 361 | # Create a stdin file and immediately close it to prevent any |
||
| 362 | # interactive script from hanging the process. |
||
| 363 | stdin = chan.makefile('wb', bufsize) |
||
| 364 | stdin.close() |
||
| 365 | |||
| 366 | # Receive all the output |
||
| 367 | # Note #1: This is used instead of chan.makefile approach to prevent |
||
| 368 | # buffering issues and hanging if the executed command produces a lot |
||
| 369 | # of output. |
||
| 370 | # |
||
| 371 | # Note #2: If you are going to remove "ready" checks inside the loop |
||
| 372 | # you are going to have a bad time. Trying to consume from a channel |
||
| 373 | # which is not ready will block for indefinitely. |
||
| 374 | exit_status_ready = chan.exit_status_ready() |
||
| 375 | |||
| 376 | if exit_status_ready: |
||
| 377 | stdout.write(self._consume_stdout(chan).getvalue()) |
||
| 378 | stderr.write(self._consume_stderr(chan).getvalue()) |
||
| 379 | |||
| 380 | while not exit_status_ready: |
||
| 381 | current_time = time.time() |
||
| 382 | elapsed_time = (current_time - start_time) |
||
| 383 | |||
| 384 | if timeout and (elapsed_time > timeout): |
||
| 385 | # TODO: Is this the right way to clean up? |
||
| 386 | chan.close() |
||
| 387 | |||
| 388 | stdout = strip_shell_chars(stdout.getvalue()) |
||
| 389 | stderr = strip_shell_chars(stderr.getvalue()) |
||
| 390 | raise SSHCommandTimeoutError(cmd=cmd, timeout=timeout, stdout=stdout, |
||
| 391 | stderr=stderr) |
||
| 392 | |||
| 393 | stdout.write(self._consume_stdout(chan).getvalue()) |
||
| 394 | stderr.write(self._consume_stderr(chan).getvalue()) |
||
| 395 | |||
| 396 | # We need to check the exist status here, because the command could |
||
| 397 | # print some output and exit during this sleep bellow. |
||
| 398 | exit_status_ready = chan.exit_status_ready() |
||
| 399 | |||
| 400 | if exit_status_ready: |
||
| 401 | break |
||
| 402 | |||
| 403 | # Short sleep to prevent busy waiting |
||
| 404 | eventlet.sleep(self.SLEEP_DELAY) |
||
| 405 | # print('Wait over. Channel must be ready for host: %s' % self.hostname) |
||
| 406 | |||
| 407 | # Receive the exit status code of the command we ran. |
||
| 408 | status = chan.recv_exit_status() |
||
| 409 | |||
| 410 | stdout = strip_shell_chars(stdout.getvalue()) |
||
| 411 | stderr = strip_shell_chars(stderr.getvalue()) |
||
| 412 | |||
| 413 | extra = {'_status': status, '_stdout': stdout, '_stderr': stderr} |
||
| 414 | self.logger.debug('Command finished', extra=extra) |
||
| 415 | |||
| 416 | return [stdout, stderr, status] |
||
| 417 | |||
| 418 | def close(self): |
||
| 419 | self.logger.debug('Closing server connection') |
||
| 420 | |||
| 421 | self.client.close() |
||
| 422 | if self.sftp: |
||
| 423 | self.sftp.close() |
||
| 424 | if self.bastion_client: |
||
| 425 | self.bastion_client.close() |
||
| 426 | return True |
||
| 427 | |||
| 428 | def _consume_stdout(self, chan): |
||
| 429 | """ |
||
| 430 | Try to consume stdout data from chan if it's receive ready. |
||
| 431 | """ |
||
| 432 | |||
| 433 | out = bytearray() |
||
| 434 | stdout = StringIO() |
||
| 435 | if chan.recv_ready(): |
||
| 436 | data = chan.recv(self.CHUNK_SIZE) |
||
| 437 | out += data |
||
| 438 | |||
| 439 | while data: |
||
| 440 | ready = chan.recv_ready() |
||
| 441 | |||
| 442 | if not ready: |
||
| 443 | break |
||
| 444 | |||
| 445 | data = chan.recv(self.CHUNK_SIZE) |
||
| 446 | out += data |
||
| 447 | |||
| 448 | stdout.write(self._get_decoded_data(out)) |
||
| 449 | return stdout |
||
| 450 | |||
| 451 | def _consume_stderr(self, chan): |
||
| 452 | """ |
||
| 453 | Try to consume stderr data from chan if it's receive ready. |
||
| 454 | """ |
||
| 455 | |||
| 456 | out = bytearray() |
||
| 457 | stderr = StringIO() |
||
| 458 | if chan.recv_stderr_ready(): |
||
| 459 | data = chan.recv_stderr(self.CHUNK_SIZE) |
||
| 460 | out += data |
||
| 461 | |||
| 462 | while data: |
||
| 463 | ready = chan.recv_stderr_ready() |
||
| 464 | |||
| 465 | if not ready: |
||
| 466 | break |
||
| 467 | |||
| 468 | data = chan.recv_stderr(self.CHUNK_SIZE) |
||
| 469 | out += data |
||
| 470 | |||
| 471 | stderr.write(self._get_decoded_data(out)) |
||
| 472 | return stderr |
||
| 473 | |||
| 474 | def _get_decoded_data(self, data): |
||
| 475 | try: |
||
| 476 | return data.decode('utf-8') |
||
| 477 | except: |
||
| 478 | self.logger.exception('Non UTF-8 character found in data: %s', data) |
||
| 479 | raise |
||
| 480 | |||
| 481 | def _get_pkey_object(self, key_material): |
||
| 482 | """ |
||
| 483 | Try to detect private key type and return paramiko.PKey object. |
||
| 484 | """ |
||
| 485 | |||
| 486 | for cls in [paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey]: |
||
| 487 | try: |
||
| 488 | key = cls.from_private_key(StringIO(key_material)) |
||
| 489 | except paramiko.ssh_exception.SSHException: |
||
| 490 | # Invalid key, try other key type |
||
| 491 | pass |
||
| 492 | else: |
||
| 493 | return key |
||
| 494 | |||
| 495 | # If a user passes in something which looks like file path we throw a more friendly |
||
| 496 | # exception letting the user know we expect the contents a not a path. |
||
| 497 | # Note: We do it here and not up the stack to avoid false positives. |
||
| 498 | contains_header = PRIVATE_KEY_HEADER in key_material.lower() |
||
| 499 | if not contains_header and (key_material.count('/') >= 1 or key_material.count('\\') >= 1): |
||
| 500 | msg = ('"private_key" parameter needs to contain private key data / content and not ' |
||
| 501 | 'a path') |
||
| 502 | else: |
||
| 503 | msg = 'Invalid or unsupported key type' |
||
| 504 | |||
| 505 | raise paramiko.ssh_exception.SSHException(msg) |
||
| 506 | |||
| 507 | def _connect(self, host, socket=None): |
||
| 508 | """ |
||
| 509 | |||
| 510 | :type host: ``str`` |
||
| 511 | :param host: Host to connect to |
||
| 512 | |||
| 513 | :type socket: :class:`paramiko.Channel` or an opened :class:`socket.socket` |
||
| 514 | :param socket: If specified, won't open a socket for communication to the specified host |
||
| 515 | and will use this instead |
||
| 516 | |||
| 517 | :return: A connected SSHClient |
||
| 518 | :rtype: :class:`paramiko.SSHClient` |
||
| 519 | """ |
||
| 520 | conninfo = {'hostname': host, |
||
| 521 | 'port': self.port, |
||
| 522 | 'username': self.username, |
||
| 523 | 'allow_agent': False, |
||
| 524 | 'look_for_keys': False, |
||
| 525 | 'timeout': self.timeout} |
||
| 526 | |||
| 527 | if self.password: |
||
| 528 | conninfo['password'] = self.password |
||
| 529 | |||
| 530 | if self.key_files: |
||
| 531 | conninfo['key_filename'] = self.key_files |
||
| 532 | |||
| 533 | if self.key_material: |
||
| 534 | conninfo['pkey'] = self._get_pkey_object(key_material=self.key_material) |
||
| 535 | |||
| 536 | if not self.password and not (self.key_files or self.key_material): |
||
| 537 | conninfo['allow_agent'] = True |
||
| 538 | conninfo['look_for_keys'] = True |
||
| 539 | |||
| 540 | extra = {'_hostname': host, '_port': self.port, |
||
| 541 | '_username': self.username, '_timeout': self.timeout} |
||
| 542 | self.logger.debug('Connecting to server', extra=extra) |
||
| 543 | |||
| 544 | if socket: |
||
| 545 | conninfo['sock'] = socket |
||
| 546 | |||
| 547 | client = paramiko.SSHClient() |
||
| 548 | client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
||
| 549 | client.connect(**conninfo) |
||
| 550 | |||
| 551 | return client |
||
| 552 | |||
| 553 | def __repr__(self): |
||
| 554 | return ('<ParamikoSSHClient hostname=%s,port=%s,username=%s,id=%s>' % |
||
| 555 | (self.hostname, self.port, self.username, id(self))) |
||
| 556 |