| Total Complexity | 46 | 
| Total Lines | 340 | 
| Duplicated Lines | 0 % | 
Complex classes like st2actions.runners.ssh.ParallelSSHClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more | ||
| 34 | class ParallelSSHClient(object): | ||
| 35 | KEYS_TO_TRANSFORM = ['stdout', 'stderr'] | ||
| 36 | CONNECT_ERROR = 'Cannot connect to host.' | ||
| 37 | |||
| 38 | def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_material=None, port=22, | ||
| 39 | bastion_host=None, concurrency=10, raise_on_any_error=False, connect=True, | ||
| 40 | passphrase=None): | ||
| 41 | self._ssh_user = user | ||
| 42 | self._ssh_key_file = pkey_file | ||
| 43 | self._ssh_key_material = pkey_material | ||
| 44 | self._ssh_password = password | ||
| 45 | self._hosts = hosts | ||
| 46 | self._successful_connects = 0 | ||
| 47 | self._ssh_port = port | ||
| 48 | self._bastion_host = bastion_host | ||
| 49 | self._passphrase = passphrase | ||
| 50 | |||
| 51 | if not hosts: | ||
| 52 |             raise Exception('Need an non-empty list of hosts to talk to.') | ||
| 53 | |||
| 54 | self._pool = eventlet.GreenPool(concurrency) | ||
| 55 |         self._hosts_client = {} | ||
| 56 |         self._bad_hosts = {} | ||
| 57 | self._scan_interval = 0.1 | ||
| 58 | |||
| 59 | if connect: | ||
| 60 | connect_results = self.connect(raise_on_any_error=raise_on_any_error) | ||
| 61 |             extra = {'_connect_results': connect_results} | ||
| 62 |             LOG.debug('Connect to hosts complete.', extra=extra) | ||
| 63 | |||
| 64 | def connect(self, raise_on_any_error=False): | ||
| 65 | """ | ||
| 66 | Connect to hosts in hosts list. Returns status of connect as a dict. | ||
| 67 | |||
| 68 | :param raise_on_any_error: Optional Raise an exception even if connecting to one | ||
| 69 | of the hosts fails. | ||
| 70 | :type raise_on_any_error: ``boolean`` | ||
| 71 | |||
| 72 | :rtype: ``dict`` of ``str`` to ``dict`` | ||
| 73 | """ | ||
| 74 |         results = {} | ||
| 75 | |||
| 76 | for host in self._hosts: | ||
| 77 | while not self._pool.free(): | ||
| 78 | eventlet.sleep(self._scan_interval) | ||
| 79 | self._pool.spawn(self._connect, host=host, results=results, | ||
| 80 | raise_on_any_error=raise_on_any_error) | ||
| 81 | |||
| 82 | self._pool.waitall() | ||
| 83 | |||
| 84 | if self._successful_connects < 1: | ||
| 85 | # We definitely have to raise an exception in this case. | ||
| 86 |             LOG.error('Unable to connect to any of the hosts.', | ||
| 87 |                       extra={'connect_results': results}) | ||
| 88 |             msg = ('Unable to connect to any one of the hosts: %s.\n\n connect_errors=%s' % | ||
| 89 | (self._hosts, json.dumps(results, indent=2))) | ||
| 90 | raise NoHostsConnectedToException(msg) | ||
| 91 | |||
| 92 | return results | ||
| 93 | |||
| 94 | def run(self, cmd, timeout=None): | ||
| 95 | """ | ||
| 96 | Run a command on remote hosts. Returns a dict containing results | ||
| 97 | of execution from all hosts. | ||
| 98 | |||
| 99 | :param cmd: Command to run. Must be shlex quoted. | ||
| 100 | :type cmd: ``str`` | ||
| 101 | |||
| 102 | :param timeout: Optional Timeout for the command. | ||
| 103 | :type timeout: ``int`` | ||
| 104 | |||
| 105 | :param cwd: Optional Current working directory. Must be shlex quoted. | ||
| 106 | :type cwd: ``str`` | ||
| 107 | |||
| 108 | :rtype: ``dict`` of ``str`` to ``dict`` | ||
| 109 | """ | ||
| 110 | |||
| 111 |         options = { | ||
| 112 | 'cmd': cmd, | ||
| 113 | 'timeout': timeout | ||
| 114 | } | ||
| 115 | results = self._execute_in_pool(self._run_command, **options) | ||
| 116 | return results | ||
| 117 | |||
| 118 | def put(self, local_path, remote_path, mode=None, mirror_local_mode=False): | ||
| 119 | """ | ||
| 120 | Copy a file or folder to remote host. | ||
| 121 | |||
| 122 | :param local_path: Path to local file or dir. Must be shlex quoted. | ||
| 123 | :type local_path: ``str`` | ||
| 124 | |||
| 125 | :param remote_path: Path to remote file or dir. Must be shlex quoted. | ||
| 126 | :type remote_path: ``str`` | ||
| 127 | |||
| 128 | :param mode: Optional mode to use for the file or dir. | ||
| 129 | :type mode: ``int`` | ||
| 130 | |||
| 131 | :param mirror_local_mode: Optional Flag to mirror the mode | ||
| 132 | on local file/dir on remote host. | ||
| 133 | :type mirror_local_mode: ``boolean`` | ||
| 134 | |||
| 135 | :rtype: ``dict`` of ``str`` to ``dict`` | ||
| 136 | """ | ||
| 137 | |||
| 138 | if not os.path.exists(local_path): | ||
| 139 |             raise Exception('Local path %s does not exist.' % local_path) | ||
| 140 | |||
| 141 |         options = { | ||
| 142 | 'local_path': local_path, | ||
| 143 | 'remote_path': remote_path, | ||
| 144 | 'mode': mode, | ||
| 145 | 'mirror_local_mode': mirror_local_mode | ||
| 146 | } | ||
| 147 | |||
| 148 | return self._execute_in_pool(self._put_files, **options) | ||
| 149 | |||
| 150 | def mkdir(self, path): | ||
| 151 | """ | ||
| 152 | Create a directory on remote hosts. | ||
| 153 | |||
| 154 | :param path: Path to remote dir that must be created. Must be shlex quoted. | ||
| 155 | :type path: ``str`` | ||
| 156 | |||
| 157 | :rtype path: ``dict`` of ``str`` to ``dict`` | ||
| 158 | """ | ||
| 159 | |||
| 160 |         options = { | ||
| 161 | 'path': path | ||
| 162 | } | ||
| 163 | return self._execute_in_pool(self._mkdir, **options) | ||
| 164 | |||
| 165 | def delete_file(self, path): | ||
| 166 | """ | ||
| 167 | Delete a file on remote hosts. | ||
| 168 | |||
| 169 | :param path: Path to remote file that must be deleted. Must be shlex quoted. | ||
| 170 | :type path: ``str`` | ||
| 171 | |||
| 172 | :rtype path: ``dict`` of ``str`` to ``dict`` | ||
| 173 | """ | ||
| 174 | |||
| 175 |         options = { | ||
| 176 | 'path': path | ||
| 177 | } | ||
| 178 | return self._execute_in_pool(self._delete_file, **options) | ||
| 179 | |||
| 180 | def delete_dir(self, path, force=False, timeout=None): | ||
| 181 | """ | ||
| 182 | Delete a dir on remote hosts. | ||
| 183 | |||
| 184 | :param path: Path to remote dir that must be deleted. Must be shlex quoted. | ||
| 185 | :type path: ``str`` | ||
| 186 | |||
| 187 | :rtype path: ``dict`` of ``str`` to ``dict`` | ||
| 188 | """ | ||
| 189 | |||
| 190 |         options = { | ||
| 191 | 'path': path, | ||
| 192 | 'force': force | ||
| 193 | } | ||
| 194 | return self._execute_in_pool(self._delete_dir, **options) | ||
| 195 | |||
| 196 | def close(self): | ||
| 197 | """ | ||
| 198 | Close all open SSH connections to hosts. | ||
| 199 | """ | ||
| 200 | |||
| 201 | for host in self._hosts_client.keys(): | ||
| 202 | try: | ||
| 203 | self._hosts_client[host].close() | ||
| 204 | except: | ||
| 205 |                 LOG.exception('Failed shutting down SSH connection to host: %s', host) | ||
| 206 | |||
| 207 | def _execute_in_pool(self, execute_method, **kwargs): | ||
| 208 |         results = {} | ||
| 209 | |||
| 210 | for host in self._bad_hosts.keys(): | ||
| 211 | results[host] = self._bad_hosts[host] | ||
| 212 | |||
| 213 | for host in self._hosts_client.keys(): | ||
| 214 | while not self._pool.free(): | ||
| 215 | eventlet.sleep(self._scan_interval) | ||
| 216 | self._pool.spawn(execute_method, host=host, results=results, **kwargs) | ||
| 217 | |||
| 218 | self._pool.waitall() | ||
| 219 | return results | ||
| 220 | |||
| 221 | def _connect(self, host, results, raise_on_any_error=False): | ||
| 222 | (hostname, port) = self._get_host_port_info(host) | ||
| 223 | |||
| 224 |         extra = {'host': host, 'port': port, 'user': self._ssh_user} | ||
| 225 | if self._ssh_password: | ||
| 226 | extra['password'] = '<redacted>' | ||
| 227 | elif self._ssh_key_file: | ||
| 228 | extra['key_file_path'] = self._ssh_key_file | ||
| 229 | else: | ||
| 230 | extra['private_key'] = '<redacted>' | ||
| 231 | |||
| 232 |         LOG.debug('Connecting to host.', extra=extra) | ||
| 233 | |||
| 234 | client = ParamikoSSHClient(hostname, username=self._ssh_user, | ||
| 235 | password=self._ssh_password, | ||
| 236 | key=self._ssh_key_file, | ||
| 237 | key_material=self._ssh_key_material, | ||
| 238 | passphrase=self._passphrase, | ||
| 239 | port=port) | ||
| 240 | try: | ||
| 241 | client.connect() | ||
| 242 | except Exception as ex: | ||
| 243 | error = 'Failed connecting to host %s.' % hostname | ||
| 244 | LOG.exception(error) | ||
| 245 | if raise_on_any_error: | ||
| 246 | raise | ||
| 247 | error = ' '.join([self.CONNECT_ERROR, str(ex)]) | ||
| 248 | error_dict = self._generate_error_result(exc=ex, message=error) | ||
| 249 | self._bad_hosts[hostname] = error_dict | ||
| 250 | results[hostname] = error_dict | ||
| 251 | else: | ||
| 252 | self._successful_connects += 1 | ||
| 253 | self._hosts_client[hostname] = client | ||
| 254 |             results[hostname] = {'message': 'Connected to host.'} | ||
| 255 | |||
| 256 | def _run_command(self, host, cmd, results, timeout=None): | ||
| 257 | try: | ||
| 258 |             LOG.debug('Running command: %s on host: %s.', cmd, host) | ||
| 259 | client = self._hosts_client[host] | ||
| 260 | (stdout, stderr, exit_code) = client.run(cmd, timeout=timeout) | ||
| 261 | is_succeeded = (exit_code == 0) | ||
| 262 |             result_dict = {'stdout': stdout, 'stderr': stderr, 'return_code': exit_code, | ||
| 263 | 'succeeded': is_succeeded, 'failed': not is_succeeded} | ||
| 264 | results[host] = jsonify.json_loads(result_dict, ParallelSSHClient.KEYS_TO_TRANSFORM) | ||
| 265 | except Exception as ex: | ||
| 266 | cmd = self._sanitize_command_string(cmd=cmd) | ||
| 267 | error = 'Failed executing command "%s" on host "%s"' % (cmd, host) | ||
| 268 | LOG.exception(error) | ||
| 269 | results[host] = self._generate_error_result(exc=ex, message=error) | ||
| 270 | |||
| 271 | def _put_files(self, local_path, remote_path, host, results, mode=None, | ||
| 272 | mirror_local_mode=False): | ||
| 273 | try: | ||
| 274 |             LOG.debug('Copying file to host: %s' % host) | ||
| 275 | if os.path.isdir(local_path): | ||
| 276 | result = self._hosts_client[host].put_dir(local_path, remote_path) | ||
| 277 | else: | ||
| 278 | result = self._hosts_client[host].put(local_path, remote_path, | ||
| 279 | mirror_local_mode=mirror_local_mode, | ||
| 280 | mode=mode) | ||
| 281 |             LOG.debug('Result of copy: %s' % result) | ||
| 282 | results[host] = result | ||
| 283 | except Exception as ex: | ||
| 284 | error = 'Failed sending file(s) in path %s to host %s' % (local_path, host) | ||
| 285 | LOG.exception(error) | ||
| 286 | results[host] = self._generate_error_result(exc=ex, message=error) | ||
| 287 | |||
| 288 | def _mkdir(self, host, path, results): | ||
| 289 | try: | ||
| 290 | result = self._hosts_client[host].mkdir(path) | ||
| 291 | results[host] = result | ||
| 292 | except Exception as ex: | ||
| 293 | error = 'Failed "mkdir %s" on host %s.' % (path, host) | ||
| 294 | LOG.exception(error) | ||
| 295 | results[host] = self._generate_error_result(exc=ex, message=error) | ||
| 296 | |||
| 297 | def _delete_file(self, host, path, results): | ||
| 298 | try: | ||
| 299 | result = self._hosts_client[host].delete_file(path) | ||
| 300 | results[host] = result | ||
| 301 | except Exception as ex: | ||
| 302 | error = 'Failed deleting file %s on host %s.' % (path, host) | ||
| 303 | LOG.exception(error) | ||
| 304 | results[host] = self._generate_error_result(exc=ex, message=error) | ||
| 305 | |||
| 306 | def _delete_dir(self, host, path, results, force=False, timeout=None): | ||
| 307 | try: | ||
| 308 | result = self._hosts_client[host].delete_dir(path, force=force, timeout=timeout) | ||
| 309 | results[host] = result | ||
| 310 | except Exception as ex: | ||
| 311 | error = 'Failed deleting dir %s on host %s.' % (path, host) | ||
| 312 | LOG.exception(error) | ||
| 313 | results[host] = self._generate_error_result(exc=ex, message=error) | ||
| 314 | |||
| 315 | def _get_host_port_info(self, host_str): | ||
| 316 | (hostname, port) = ip_utils.split_host_port(host_str) | ||
| 317 | if not port: | ||
| 318 | port = self._ssh_port | ||
| 319 | |||
| 320 | return (hostname, port) | ||
| 321 | |||
| 322 | @staticmethod | ||
| 323 | def _sanitize_command_string(cmd): | ||
| 324 | """ | ||
| 325 | Remove any potentially sensitive information from the command string. | ||
| 326 | |||
| 327 | For now we only mask the values of the sensitive environment variables. | ||
| 328 | """ | ||
| 329 | if not cmd: | ||
| 330 | return cmd | ||
| 331 | |||
| 332 |         result = re.sub('ST2_ACTION_AUTH_TOKEN=(.+?)\s+?', 'ST2_ACTION_AUTH_TOKEN=%s ' % | ||
| 333 | (MASKED_ATTRIBUTE_VALUE), cmd) | ||
| 334 | return result | ||
| 335 | |||
| 336 | @staticmethod | ||
| 337 | def _generate_error_result(exc, message): | ||
| 338 | """ | ||
| 339 | :param exc: Raised exception. | ||
| 340 | :type exc: Exception. | ||
| 341 | |||
| 342 | :param message: Error message which will be prefixed to the exception exception message. | ||
| 343 | :type message: ``str`` | ||
| 344 | """ | ||
| 345 | exc_message = getattr(exc, 'message', str(exc)) | ||
| 346 | error_message = '%s: %s' % (message, exc_message) | ||
| 347 | traceback_message = traceback.format_exc() | ||
| 348 | |||
| 349 | if isinstance(exc, SSHCommandTimeoutError): | ||
| 350 | return_code = -9 | ||
| 351 | timeout = True | ||
| 352 | else: | ||
| 353 | timeout = False | ||
| 354 | return_code = 255 | ||
| 355 | |||
| 356 | stdout = getattr(exc, 'stdout', None) or '' | ||
| 357 | stderr = getattr(exc, 'stderr', None) or '' | ||
| 358 | |||
| 359 |         error_dict = { | ||
| 360 | 'failed': True, | ||
| 361 | 'succeeded': False, | ||
| 362 | 'timeout': timeout, | ||
| 363 | 'return_code': return_code, | ||
| 364 | 'stdout': stdout, | ||
| 365 | 'stderr': stderr, | ||
| 366 | 'error': error_message, | ||
| 367 | 'traceback': traceback_message, | ||
| 368 | } | ||
| 369 | return error_dict | ||
| 370 | |||
| 371 | def __repr__(self): | ||
| 372 |         return ('<ParallelSSHClient hosts=%s,user=%s,id=%s>' % | ||
| 373 | (repr(self._hosts), self._ssh_user, id(self))) | ||
| 374 |