Total Complexity | 66 |
Total Lines | 485 |
Duplicated Lines | 0 % |
Complex classes like st2actions.runners.ssh.ParamikoSSHClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # Licensed to the Apache Software Foundation (ASF) under one or more |
||
71 | class ParamikoSSHClient(object): |
||
72 | """ |
||
73 | A SSH Client powered by Paramiko. |
||
74 | """ |
||
75 | |||
76 | # Maximum number of bytes to read at once from a socket |
||
77 | CHUNK_SIZE = 1024 |
||
78 | # How long to sleep while waiting for command to finish |
||
79 | SLEEP_DELAY = 1.5 |
||
80 | # Connect socket timeout |
||
81 | CONNECT_TIMEOUT = 60 |
||
82 | |||
83 | def __init__(self, hostname, port=22, username=None, password=None, bastion_host=None, |
||
84 | key=None, key_files=None, key_material=None, timeout=None): |
||
85 | """ |
||
86 | Authentication is always attempted in the following order: |
||
87 | |||
88 | - The key passed in (if key is provided) |
||
89 | - Any key we can find through an SSH agent (only if no password and |
||
90 | key is provided) |
||
91 | - Any "id_rsa" or "id_dsa" key discoverable in ~/.ssh/ (only if no |
||
92 | password and key is provided) |
||
93 | - Plain username/password auth, if a password was given (if password is |
||
94 | provided) |
||
95 | """ |
||
96 | if key_files and key_material: |
||
97 | raise ValueError(('key_files and key_material arguments are ' |
||
98 | 'mutually exclusive')) |
||
99 | |||
100 | self.hostname = hostname |
||
101 | self.port = port |
||
102 | self.username = username if username else cfg.CONF.system_user |
||
103 | self.password = password |
||
104 | self.key = key if key else cfg.CONF.system_user.ssh_key_file |
||
105 | self.key_files = key_files |
||
106 | if not self.key_files and self.key: |
||
107 | self.key_files = key # `key` arg is deprecated. |
||
108 | self.timeout = timeout or ParamikoSSHClient.CONNECT_TIMEOUT |
||
109 | self.key_material = key_material |
||
110 | self.client = None |
||
111 | self.logger = logging.getLogger(__name__) |
||
112 | self.sftp = None |
||
113 | self.bastion_host = bastion_host |
||
114 | self.bastion_client = None |
||
115 | self.bastion_socket = None |
||
116 | |||
117 | def connect(self): |
||
118 | """ |
||
119 | Connect to the remote node over SSH. |
||
120 | |||
121 | :return: True if the connection has been successfully established, |
||
122 | False otherwise. |
||
123 | :rtype: ``bool`` |
||
124 | """ |
||
125 | if self.bastion_host: |
||
126 | self.logger.debug('Bastion host specified, connecting') |
||
127 | self.bastion_client = self._connect(host=self.bastion_host) |
||
128 | transport = self.bastion_client.get_transport() |
||
129 | real_addr = (self.hostname, self.port) |
||
130 | # fabric uses ('', 0) for direct-tcpip, this duplicates that behaviour |
||
131 | # see https://github.com/fabric/fabric/commit/c2a9bbfd50f560df6c6f9675603fb405c4071cad |
||
132 | local_addr = ('', 0) |
||
133 | self.bastion_socket = transport.open_channel('direct-tcpip', real_addr, local_addr) |
||
134 | |||
135 | self.client = self._connect(host=self.hostname, socket=self.bastion_socket) |
||
136 | self.sftp = self.client.open_sftp() |
||
137 | return True |
||
138 | |||
139 | def put(self, local_path, remote_path, mode=None, mirror_local_mode=False): |
||
140 | """ |
||
141 | Upload a file to the remote node. |
||
142 | |||
143 | :type local_path: ``st`` |
||
144 | :param local_path: File path on the local node. |
||
145 | |||
146 | :type remote_path: ``str`` |
||
147 | :param remote_path: File path on the remote node. |
||
148 | |||
149 | :type mode: ``int`` |
||
150 | :param mode: Permissions mode for the file. E.g. 0744. |
||
151 | |||
152 | :type mirror_local_mode: ``int`` |
||
153 | :param mirror_local_mode: Should remote file mirror local mode. |
||
154 | |||
155 | :return: Attributes of the remote file. |
||
156 | :rtype: :class:`posix.stat_result` or ``None`` |
||
157 | """ |
||
158 | |||
159 | if not local_path or not remote_path: |
||
160 | raise Exception('Need both local_path and remote_path. local: %s, remote: %s' % |
||
161 | local_path, remote_path) |
||
162 | local_path = quote_unix(local_path) |
||
163 | remote_path = quote_unix(remote_path) |
||
164 | |||
165 | extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode, |
||
166 | '_mirror_local_mode': mirror_local_mode} |
||
167 | self.logger.debug('Uploading file', extra=extra) |
||
168 | |||
169 | if not os.path.exists(local_path): |
||
170 | raise Exception('Path %s does not exist locally.' % local_path) |
||
171 | |||
172 | rattrs = self.sftp.put(local_path, remote_path) |
||
173 | |||
174 | if mode or mirror_local_mode: |
||
175 | local_mode = mode |
||
176 | if not mode or mirror_local_mode: |
||
177 | local_mode = os.stat(local_path).st_mode |
||
178 | |||
179 | # Cast to octal integer in case of string |
||
180 | if isinstance(local_mode, basestring): |
||
181 | local_mode = int(local_mode, 8) |
||
182 | local_mode = local_mode & 07777 |
||
183 | remote_mode = rattrs.st_mode |
||
184 | # Only bitshift if we actually got an remote_mode |
||
185 | if remote_mode is not None: |
||
186 | remote_mode = (remote_mode & 07777) |
||
187 | if local_mode != remote_mode: |
||
188 | self.sftp.chmod(remote_path, local_mode) |
||
189 | |||
190 | return rattrs |
||
191 | |||
192 | def put_dir(self, local_path, remote_path, mode=None, mirror_local_mode=False): |
||
193 | """ |
||
194 | Upload a dir to the remote node. |
||
195 | |||
196 | :type local_path: ``str`` |
||
197 | :param local_path: Dir path on the local node. |
||
198 | |||
199 | :type remote_path: ``str`` |
||
200 | :param remote_path: Base dir path on the remote node. |
||
201 | |||
202 | :type mode: ``int`` |
||
203 | :param mode: Permissions mode for the file. E.g. 0744. |
||
204 | |||
205 | :type mirror_local_mode: ``int`` |
||
206 | :param mirror_local_mode: Should remote file mirror local mode. |
||
207 | |||
208 | :return: List of files created on remote node. |
||
209 | :rtype: ``list`` of ``str`` |
||
210 | """ |
||
211 | |||
212 | extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode, |
||
213 | '_mirror_local_mode': mirror_local_mode} |
||
214 | self.logger.debug('Uploading dir', extra=extra) |
||
215 | |||
216 | if os.path.basename(local_path): |
||
217 | strip = os.path.dirname(local_path) |
||
218 | else: |
||
219 | strip = os.path.dirname(os.path.dirname(local_path)) |
||
220 | |||
221 | remote_paths = [] |
||
222 | |||
223 | for context, dirs, files in os.walk(local_path): |
||
224 | rcontext = context.replace(strip, '', 1) |
||
225 | # normalize pathname separators with POSIX separator |
||
226 | rcontext = rcontext.replace(os.sep, '/') |
||
227 | rcontext = rcontext.lstrip('/') |
||
228 | rcontext = posixpath.join(remote_path, rcontext) |
||
229 | |||
230 | if not self.exists(rcontext): |
||
231 | self.sftp.mkdir(rcontext) |
||
232 | |||
233 | for d in dirs: |
||
234 | n = posixpath.join(rcontext, d) |
||
235 | if not self.exists(n): |
||
236 | self.sftp.mkdir(n) |
||
237 | |||
238 | for f in files: |
||
239 | local_path = os.path.join(context, f) |
||
240 | n = posixpath.join(rcontext, f) |
||
241 | # Note that quote_unix is done by put anyways. |
||
242 | p = self.put(local_path=local_path, remote_path=n, |
||
243 | mirror_local_mode=mirror_local_mode, mode=mode) |
||
244 | remote_paths.append(p) |
||
245 | |||
246 | return remote_paths |
||
247 | |||
248 | def exists(self, remote_path): |
||
249 | """ |
||
250 | Validate whether a remote file or directory exists. |
||
251 | |||
252 | :param remote_path: Path to remote file. |
||
253 | :type remote_path: ``str`` |
||
254 | |||
255 | :rtype: ``bool`` |
||
256 | """ |
||
257 | try: |
||
258 | self.sftp.lstat(remote_path).st_mode |
||
259 | except IOError: |
||
260 | return False |
||
261 | |||
262 | return True |
||
263 | |||
264 | def mkdir(self, dir_path): |
||
265 | """ |
||
266 | Create a directory on remote box. |
||
267 | |||
268 | :param dir_path: Path to remote directory to be created. |
||
269 | :type dir_path: ``str`` |
||
270 | |||
271 | :return: Returns nothing if successful else raises IOError exception. |
||
272 | |||
273 | :rtype: ``None`` |
||
274 | """ |
||
275 | |||
276 | dir_path = quote_unix(dir_path) |
||
277 | extra = {'_dir_path': dir_path} |
||
278 | self.logger.debug('mkdir', extra=extra) |
||
279 | return self.sftp.mkdir(dir_path) |
||
280 | |||
281 | def delete_file(self, path): |
||
282 | """ |
||
283 | Delete a file on remote box. |
||
284 | |||
285 | :param path: Path to remote file to be deleted. |
||
286 | :type path: ``str`` |
||
287 | |||
288 | :return: True if the file has been successfully deleted, False |
||
289 | otherwise. |
||
290 | :rtype: ``bool`` |
||
291 | """ |
||
292 | |||
293 | path = quote_unix(path) |
||
294 | extra = {'_path': path} |
||
295 | self.logger.debug('Deleting file', extra=extra) |
||
296 | self.sftp.unlink(path) |
||
297 | return True |
||
298 | |||
299 | def delete_dir(self, path, force=False, timeout=None): |
||
300 | """ |
||
301 | Delete a dir on remote box. |
||
302 | |||
303 | :param path: Path to remote dir to be deleted. |
||
304 | :type path: ``str`` |
||
305 | |||
306 | :param force: Optional Forcefully remove dir. |
||
307 | :type force: ``bool`` |
||
308 | |||
309 | :param timeout: Optional Time to wait for dir to be deleted. Only relevant for force. |
||
310 | :type timeout: ``int`` |
||
311 | |||
312 | :return: True if the file has been successfully deleted, False |
||
313 | otherwise. |
||
314 | :rtype: ``bool`` |
||
315 | """ |
||
316 | |||
317 | path = quote_unix(path) |
||
318 | extra = {'_path': path} |
||
319 | if force: |
||
320 | command = 'rm -rf %s' % path |
||
321 | extra['_command'] = command |
||
322 | extra['_force'] = force |
||
323 | self.logger.debug('Deleting dir', extra=extra) |
||
324 | return self.run(command, timeout=timeout) |
||
325 | |||
326 | self.logger.debug('Deleting dir', extra=extra) |
||
327 | return self.sftp.rmdir(path) |
||
328 | |||
329 | def run(self, cmd, timeout=None, quote=False): |
||
330 | """ |
||
331 | Note: This function is based on paramiko's exec_command() |
||
332 | method. |
||
333 | |||
334 | :param timeout: How long to wait (in seconds) for the command to |
||
335 | finish (optional). |
||
336 | :type timeout: ``float`` |
||
337 | """ |
||
338 | |||
339 | if quote: |
||
340 | cmd = quote_unix(cmd) |
||
341 | |||
342 | extra = {'_cmd': cmd} |
||
343 | self.logger.info('Executing command', extra=extra) |
||
344 | |||
345 | # Use the system default buffer size |
||
346 | bufsize = -1 |
||
347 | |||
348 | transport = self.client.get_transport() |
||
349 | chan = transport.open_session() |
||
350 | |||
351 | start_time = time.time() |
||
352 | if cmd.startswith('sudo'): |
||
353 | # Note that fabric does this as well. If you set pty, stdout and stderr |
||
354 | # streams will be combined into one. |
||
355 | chan.get_pty() |
||
356 | chan.exec_command(cmd) |
||
357 | |||
358 | stdout = StringIO() |
||
359 | stderr = StringIO() |
||
360 | |||
361 | # Create a stdin file and immediately close it to prevent any |
||
362 | # interactive script from hanging the process. |
||
363 | stdin = chan.makefile('wb', bufsize) |
||
364 | stdin.close() |
||
365 | |||
366 | # Receive all the output |
||
367 | # Note #1: This is used instead of chan.makefile approach to prevent |
||
368 | # buffering issues and hanging if the executed command produces a lot |
||
369 | # of output. |
||
370 | # |
||
371 | # Note #2: If you are going to remove "ready" checks inside the loop |
||
372 | # you are going to have a bad time. Trying to consume from a channel |
||
373 | # which is not ready will block for indefinitely. |
||
374 | exit_status_ready = chan.exit_status_ready() |
||
375 | |||
376 | if exit_status_ready: |
||
377 | stdout.write(self._consume_stdout(chan).getvalue()) |
||
378 | stderr.write(self._consume_stderr(chan).getvalue()) |
||
379 | |||
380 | while not exit_status_ready: |
||
381 | current_time = time.time() |
||
382 | elapsed_time = (current_time - start_time) |
||
383 | |||
384 | if timeout and (elapsed_time > timeout): |
||
385 | # TODO: Is this the right way to clean up? |
||
386 | chan.close() |
||
387 | |||
388 | stdout = strip_shell_chars(stdout.getvalue()) |
||
389 | stderr = strip_shell_chars(stderr.getvalue()) |
||
390 | raise SSHCommandTimeoutError(cmd=cmd, timeout=timeout, stdout=stdout, |
||
391 | stderr=stderr) |
||
392 | |||
393 | stdout.write(self._consume_stdout(chan).getvalue()) |
||
394 | stderr.write(self._consume_stderr(chan).getvalue()) |
||
395 | |||
396 | # We need to check the exist status here, because the command could |
||
397 | # print some output and exit during this sleep bellow. |
||
398 | exit_status_ready = chan.exit_status_ready() |
||
399 | |||
400 | if exit_status_ready: |
||
401 | break |
||
402 | |||
403 | # Short sleep to prevent busy waiting |
||
404 | eventlet.sleep(self.SLEEP_DELAY) |
||
405 | # print('Wait over. Channel must be ready for host: %s' % self.hostname) |
||
406 | |||
407 | # Receive the exit status code of the command we ran. |
||
408 | status = chan.recv_exit_status() |
||
409 | |||
410 | stdout = strip_shell_chars(stdout.getvalue()) |
||
411 | stderr = strip_shell_chars(stderr.getvalue()) |
||
412 | |||
413 | extra = {'_status': status, '_stdout': stdout, '_stderr': stderr} |
||
414 | self.logger.debug('Command finished', extra=extra) |
||
415 | |||
416 | return [stdout, stderr, status] |
||
417 | |||
418 | def close(self): |
||
419 | self.logger.debug('Closing server connection') |
||
420 | |||
421 | self.client.close() |
||
422 | if self.sftp: |
||
423 | self.sftp.close() |
||
424 | if self.bastion_client: |
||
425 | self.bastion_client.close() |
||
426 | return True |
||
427 | |||
428 | def _consume_stdout(self, chan): |
||
429 | """ |
||
430 | Try to consume stdout data from chan if it's receive ready. |
||
431 | """ |
||
432 | |||
433 | out = bytearray() |
||
434 | stdout = StringIO() |
||
435 | if chan.recv_ready(): |
||
436 | data = chan.recv(self.CHUNK_SIZE) |
||
437 | out += data |
||
438 | |||
439 | while data: |
||
440 | ready = chan.recv_ready() |
||
441 | |||
442 | if not ready: |
||
443 | break |
||
444 | |||
445 | data = chan.recv(self.CHUNK_SIZE) |
||
446 | out += data |
||
447 | |||
448 | stdout.write(self._get_decoded_data(out)) |
||
449 | return stdout |
||
450 | |||
451 | def _consume_stderr(self, chan): |
||
452 | """ |
||
453 | Try to consume stderr data from chan if it's receive ready. |
||
454 | """ |
||
455 | |||
456 | out = bytearray() |
||
457 | stderr = StringIO() |
||
458 | if chan.recv_stderr_ready(): |
||
459 | data = chan.recv_stderr(self.CHUNK_SIZE) |
||
460 | out += data |
||
461 | |||
462 | while data: |
||
463 | ready = chan.recv_stderr_ready() |
||
464 | |||
465 | if not ready: |
||
466 | break |
||
467 | |||
468 | data = chan.recv_stderr(self.CHUNK_SIZE) |
||
469 | out += data |
||
470 | |||
471 | stderr.write(self._get_decoded_data(out)) |
||
472 | return stderr |
||
473 | |||
474 | def _get_decoded_data(self, data): |
||
475 | try: |
||
476 | return data.decode('utf-8') |
||
477 | except: |
||
478 | self.logger.exception('Non UTF-8 character found in data: %s', data) |
||
479 | raise |
||
480 | |||
481 | def _get_pkey_object(self, key_material): |
||
482 | """ |
||
483 | Try to detect private key type and return paramiko.PKey object. |
||
484 | """ |
||
485 | |||
486 | for cls in [paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey]: |
||
487 | try: |
||
488 | key = cls.from_private_key(StringIO(key_material)) |
||
489 | except paramiko.ssh_exception.SSHException: |
||
490 | # Invalid key, try other key type |
||
491 | pass |
||
492 | else: |
||
493 | return key |
||
494 | |||
495 | # If a user passes in something which looks like file path we throw a more friendly |
||
496 | # exception letting the user know we expect the contents a not a path. |
||
497 | # Note: We do it here and not up the stack to avoid false positives. |
||
498 | contains_header = PRIVATE_KEY_HEADER in key_material.lower() |
||
499 | if not contains_header and (key_material.count('/') >= 1 or key_material.count('\\') >= 1): |
||
500 | msg = ('"private_key" parameter needs to contain private key data / content and not ' |
||
501 | 'a path') |
||
502 | else: |
||
503 | msg = 'Invalid or unsupported key type' |
||
504 | |||
505 | raise paramiko.ssh_exception.SSHException(msg) |
||
506 | |||
507 | def _connect(self, host, socket=None): |
||
508 | """ |
||
509 | |||
510 | :type host: ``str`` |
||
511 | :param host: Host to connect to |
||
512 | |||
513 | :type socket: :class:`paramiko.Channel` or an opened :class:`socket.socket` |
||
514 | :param socket: If specified, won't open a socket for communication to the specified host |
||
515 | and will use this instead |
||
516 | |||
517 | :return: A connected SSHClient |
||
518 | :rtype: :class:`paramiko.SSHClient` |
||
519 | """ |
||
520 | conninfo = {'hostname': host, |
||
521 | 'port': self.port, |
||
522 | 'username': self.username, |
||
523 | 'allow_agent': False, |
||
524 | 'look_for_keys': False, |
||
525 | 'timeout': self.timeout} |
||
526 | |||
527 | if self.password: |
||
528 | conninfo['password'] = self.password |
||
529 | |||
530 | if self.key_files: |
||
531 | conninfo['key_filename'] = self.key_files |
||
532 | |||
533 | if self.key_material: |
||
534 | conninfo['pkey'] = self._get_pkey_object(key_material=self.key_material) |
||
535 | |||
536 | if not self.password and not (self.key_files or self.key_material): |
||
537 | conninfo['allow_agent'] = True |
||
538 | conninfo['look_for_keys'] = True |
||
539 | |||
540 | extra = {'_hostname': host, '_port': self.port, |
||
541 | '_username': self.username, '_timeout': self.timeout} |
||
542 | self.logger.debug('Connecting to server', extra=extra) |
||
543 | |||
544 | if socket: |
||
545 | conninfo['sock'] = socket |
||
546 | |||
547 | client = paramiko.SSHClient() |
||
548 | client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) |
||
549 | client.connect(**conninfo) |
||
550 | |||
551 | return client |
||
552 | |||
553 | def __repr__(self): |
||
554 | return ('<ParamikoSSHClient hostname=%s,port=%s,username=%s,id=%s>' % |
||
555 | (self.hostname, self.port, self.username, id(self))) |
||
556 |