Completed
Pull Request — master (#2334)
by Edward
06:02
created

st2actions.runners.ssh.ParamikoSSHClient   F

Complexity

Total Complexity 66

Size/Duplication

Total Lines 485
Duplicated Lines 0 %
Metric Value
wmc 66
dl 0
loc 485
rs 3.1915

16 Methods

Rating   Name   Duplication   Size   Complexity  
A exists() 0 15 1
B delete_dir() 0 29 2
B _consume_stderr() 0 22 4
B _get_pkey_object() 0 25 5
A __repr__() 0 3 1
D _connect() 0 45 8
F put() 0 52 11
A delete_file() 0 17 1
C put_dir() 0 55 7
A connect() 0 21 2
A close() 0 9 3
B _consume_stdout() 0 22 4
C __init__() 0 33 7
A _get_decoded_data() 0 6 1
D run() 0 88 8
A mkdir() 0 16 1

How to fix   Complexity   

Complex Class

Complex classes like st2actions.runners.ssh.ParamikoSSHClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import posixpath
18
from StringIO import StringIO
19
import time
20
21
import eventlet
22
from oslo_config import cfg
23
24
import paramiko
25
26
# Depending on your version of Paramiko, it may cause a deprecation
27
# warning on Python 2.6.
28
# Ref: https://bugs.launchpad.net/paramiko/+bug/392973
29
30
from st2common.log import logging
31
from st2common.util.misc import strip_shell_chars
32
from st2common.util.shell import quote_unix
33
34
__all__ = [
35
    'ParamikoSSHClient',
36
37
    'SSHCommandTimeoutError'
38
]
39
40
PRIVATE_KEY_HEADER = 'PRIVATE KEY-----'.lower()
41
42
43
class SSHCommandTimeoutError(Exception):
44
    """
45
    Exception which is raised when an SSH command times out.
46
    """
47
48
    def __init__(self, cmd, timeout, stdout=None, stderr=None):
49
        """
50
        :param stdout: Stdout which was consumed until the timeout occured.
51
        :type stdout: ``str``
52
53
        :param stdout: Stderr which was consumed until the timeout occured.
54
        :type stderr: ``str``
55
        """
56
        self.cmd = cmd
57
        self.timeout = timeout
58
        self.stdout = stdout
59
        self.stderr = stderr
60
        message = 'Command didn\'t finish in %s seconds' % (timeout)
61
        super(SSHCommandTimeoutError, self).__init__(message)
62
63
    def __repr__(self):
64
        return ('<SSHCommandTimeoutError: cmd="%s",timeout=%s)>' %
65
                (self.cmd, self.timeout))
66
67
    def __str__(self):
68
        return self.message
69
70
71
class ParamikoSSHClient(object):
72
    """
73
    A SSH Client powered by Paramiko.
74
    """
75
76
    # Maximum number of bytes to read at once from a socket
77
    CHUNK_SIZE = 1024
78
    # How long to sleep while waiting for command to finish
79
    SLEEP_DELAY = 1.5
80
    # Connect socket timeout
81
    CONNECT_TIMEOUT = 60
82
83
    def __init__(self, hostname, port=22, username=None, password=None, bastion_host=None,
84
                 key=None, key_files=None, key_material=None, timeout=None):
85
        """
86
        Authentication is always attempted in the following order:
87
88
        - The key passed in (if key is provided)
89
        - Any key we can find through an SSH agent (only if no password and
90
          key is provided)
91
        - Any "id_rsa" or "id_dsa" key discoverable in ~/.ssh/ (only if no
92
          password and key is provided)
93
        - Plain username/password auth, if a password was given (if password is
94
          provided)
95
        """
96
        if key_files and key_material:
97
            raise ValueError(('key_files and key_material arguments are '
98
                              'mutually exclusive'))
99
100
        self.hostname = hostname
101
        self.port = port
102
        self.username = username if username else cfg.CONF.system_user
103
        self.password = password
104
        self.key = key if key else cfg.CONF.system_user.ssh_key_file
105
        self.key_files = key_files
106
        if not self.key_files and self.key:
107
            self.key_files = key  # `key` arg is deprecated.
108
        self.timeout = timeout or ParamikoSSHClient.CONNECT_TIMEOUT
109
        self.key_material = key_material
110
        self.client = None
111
        self.logger = logging.getLogger(__name__)
112
        self.sftp = None
113
        self.bastion_host = bastion_host
114
        self.bastion_client = None
115
        self.bastion_socket = None
116
117
    def connect(self):
118
        """
119
        Connect to the remote node over SSH.
120
121
        :return: True if the connection has been successfully established,
122
                 False otherwise.
123
        :rtype: ``bool``
124
        """
125
        if self.bastion_host:
126
            self.logger.debug('Bastion host specified, connecting')
127
            self.bastion_client = self._connect(host=self.bastion_host)
128
            transport = self.bastion_client.get_transport()
129
            real_addr = (self.hostname, self.port)
130
            # fabric uses ('', 0) for direct-tcpip, this duplicates that behaviour
131
            # see https://github.com/fabric/fabric/commit/c2a9bbfd50f560df6c6f9675603fb405c4071cad
132
            local_addr = ('', 0)
133
            self.bastion_socket = transport.open_channel('direct-tcpip', real_addr, local_addr)
134
135
        self.client = self._connect(host=self.hostname, socket=self.bastion_socket)
136
        self.sftp = self.client.open_sftp()
137
        return True
138
139
    def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
140
        """
141
        Upload a file to the remote node.
142
143
        :type local_path: ``st``
144
        :param local_path: File path on the local node.
145
146
        :type remote_path: ``str``
147
        :param remote_path: File path on the remote node.
148
149
        :type mode: ``int``
150
        :param mode: Permissions mode for the file. E.g. 0744.
151
152
        :type mirror_local_mode: ``int``
153
        :param mirror_local_mode: Should remote file mirror local mode.
154
155
        :return: Attributes of the remote file.
156
        :rtype: :class:`posix.stat_result` or ``None``
157
        """
158
159
        if not local_path or not remote_path:
160
            raise Exception('Need both local_path and remote_path. local: %s, remote: %s' %
161
                            local_path, remote_path)
162
        local_path = quote_unix(local_path)
163
        remote_path = quote_unix(remote_path)
164
165
        extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode,
166
                 '_mirror_local_mode': mirror_local_mode}
167
        self.logger.debug('Uploading file', extra=extra)
168
169
        if not os.path.exists(local_path):
170
            raise Exception('Path %s does not exist locally.' % local_path)
171
172
        rattrs = self.sftp.put(local_path, remote_path)
173
174
        if mode or mirror_local_mode:
175
            local_mode = mode
176
            if not mode or mirror_local_mode:
177
                local_mode = os.stat(local_path).st_mode
178
179
            # Cast to octal integer in case of string
180
            if isinstance(local_mode, basestring):
181
                local_mode = int(local_mode, 8)
182
            local_mode = local_mode & 07777
183
            remote_mode = rattrs.st_mode
184
            # Only bitshift if we actually got an remote_mode
185
            if remote_mode is not None:
186
                remote_mode = (remote_mode & 07777)
187
            if local_mode != remote_mode:
188
                self.sftp.chmod(remote_path, local_mode)
189
190
        return rattrs
191
192
    def put_dir(self, local_path, remote_path, mode=None, mirror_local_mode=False):
193
        """
194
        Upload a dir to the remote node.
195
196
        :type local_path: ``str``
197
        :param local_path: Dir path on the local node.
198
199
        :type remote_path: ``str``
200
        :param remote_path: Base dir path on the remote node.
201
202
        :type mode: ``int``
203
        :param mode: Permissions mode for the file. E.g. 0744.
204
205
        :type mirror_local_mode: ``int``
206
        :param mirror_local_mode: Should remote file mirror local mode.
207
208
        :return: List of files created on remote node.
209
        :rtype: ``list`` of ``str``
210
        """
211
212
        extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode,
213
                 '_mirror_local_mode': mirror_local_mode}
214
        self.logger.debug('Uploading dir', extra=extra)
215
216
        if os.path.basename(local_path):
217
            strip = os.path.dirname(local_path)
218
        else:
219
            strip = os.path.dirname(os.path.dirname(local_path))
220
221
        remote_paths = []
222
223
        for context, dirs, files in os.walk(local_path):
224
            rcontext = context.replace(strip, '', 1)
225
            # normalize pathname separators with POSIX separator
226
            rcontext = rcontext.replace(os.sep, '/')
227
            rcontext = rcontext.lstrip('/')
228
            rcontext = posixpath.join(remote_path, rcontext)
229
230
            if not self.exists(rcontext):
231
                self.sftp.mkdir(rcontext)
232
233
            for d in dirs:
234
                n = posixpath.join(rcontext, d)
235
                if not self.exists(n):
236
                    self.sftp.mkdir(n)
237
238
            for f in files:
239
                local_path = os.path.join(context, f)
240
                n = posixpath.join(rcontext, f)
241
                # Note that quote_unix is done by put anyways.
242
                p = self.put(local_path=local_path, remote_path=n,
243
                             mirror_local_mode=mirror_local_mode, mode=mode)
244
                remote_paths.append(p)
245
246
        return remote_paths
247
248
    def exists(self, remote_path):
249
        """
250
        Validate whether a remote file or directory exists.
251
252
        :param remote_path: Path to remote file.
253
        :type remote_path: ``str``
254
255
        :rtype: ``bool``
256
        """
257
        try:
258
            self.sftp.lstat(remote_path).st_mode
259
        except IOError:
260
            return False
261
262
        return True
263
264
    def mkdir(self, dir_path):
265
        """
266
        Create a directory on remote box.
267
268
        :param dir_path: Path to remote directory to be created.
269
        :type dir_path: ``str``
270
271
        :return: Returns nothing if successful else raises IOError exception.
272
273
        :rtype: ``None``
274
        """
275
276
        dir_path = quote_unix(dir_path)
277
        extra = {'_dir_path': dir_path}
278
        self.logger.debug('mkdir', extra=extra)
279
        return self.sftp.mkdir(dir_path)
280
281
    def delete_file(self, path):
282
        """
283
        Delete a file on remote box.
284
285
        :param path: Path to remote file to be deleted.
286
        :type path: ``str``
287
288
        :return: True if the file has been successfully deleted, False
289
                 otherwise.
290
        :rtype: ``bool``
291
        """
292
293
        path = quote_unix(path)
294
        extra = {'_path': path}
295
        self.logger.debug('Deleting file', extra=extra)
296
        self.sftp.unlink(path)
297
        return True
298
299
    def delete_dir(self, path, force=False, timeout=None):
300
        """
301
        Delete a dir on remote box.
302
303
        :param path: Path to remote dir to be deleted.
304
        :type path: ``str``
305
306
        :param force: Optional Forcefully remove dir.
307
        :type force: ``bool``
308
309
        :param timeout: Optional Time to wait for dir to be deleted. Only relevant for force.
310
        :type timeout: ``int``
311
312
        :return: True if the file has been successfully deleted, False
313
                 otherwise.
314
        :rtype: ``bool``
315
        """
316
317
        path = quote_unix(path)
318
        extra = {'_path': path}
319
        if force:
320
            command = 'rm -rf %s' % path
321
            extra['_command'] = command
322
            extra['_force'] = force
323
            self.logger.debug('Deleting dir', extra=extra)
324
            return self.run(command, timeout=timeout)
325
326
        self.logger.debug('Deleting dir', extra=extra)
327
        return self.sftp.rmdir(path)
328
329
    def run(self, cmd, timeout=None, quote=False):
330
        """
331
        Note: This function is based on paramiko's exec_command()
332
        method.
333
334
        :param timeout: How long to wait (in seconds) for the command to
335
                        finish (optional).
336
        :type timeout: ``float``
337
        """
338
339
        if quote:
340
            cmd = quote_unix(cmd)
341
342
        extra = {'_cmd': cmd}
343
        self.logger.info('Executing command', extra=extra)
344
345
        # Use the system default buffer size
346
        bufsize = -1
347
348
        transport = self.client.get_transport()
349
        chan = transport.open_session()
350
351
        start_time = time.time()
352
        if cmd.startswith('sudo'):
353
            # Note that fabric does this as well. If you set pty, stdout and stderr
354
            # streams will be combined into one.
355
            chan.get_pty()
356
        chan.exec_command(cmd)
357
358
        stdout = StringIO()
359
        stderr = StringIO()
360
361
        # Create a stdin file and immediately close it to prevent any
362
        # interactive script from hanging the process.
363
        stdin = chan.makefile('wb', bufsize)
364
        stdin.close()
365
366
        # Receive all the output
367
        # Note #1: This is used instead of chan.makefile approach to prevent
368
        # buffering issues and hanging if the executed command produces a lot
369
        # of output.
370
        #
371
        # Note #2: If you are going to remove "ready" checks inside the loop
372
        # you are going to have a bad time. Trying to consume from a channel
373
        # which is not ready will block for indefinitely.
374
        exit_status_ready = chan.exit_status_ready()
375
376
        if exit_status_ready:
377
            stdout.write(self._consume_stdout(chan).getvalue())
378
            stderr.write(self._consume_stderr(chan).getvalue())
379
380
        while not exit_status_ready:
381
            current_time = time.time()
382
            elapsed_time = (current_time - start_time)
383
384
            if timeout and (elapsed_time > timeout):
385
                # TODO: Is this the right way to clean up?
386
                chan.close()
387
388
                stdout = strip_shell_chars(stdout.getvalue())
389
                stderr = strip_shell_chars(stderr.getvalue())
390
                raise SSHCommandTimeoutError(cmd=cmd, timeout=timeout, stdout=stdout,
391
                                             stderr=stderr)
392
393
            stdout.write(self._consume_stdout(chan).getvalue())
394
            stderr.write(self._consume_stderr(chan).getvalue())
395
396
            # We need to check the exist status here, because the command could
397
            # print some output and exit during this sleep bellow.
398
            exit_status_ready = chan.exit_status_ready()
399
400
            if exit_status_ready:
401
                break
402
403
            # Short sleep to prevent busy waiting
404
            eventlet.sleep(self.SLEEP_DELAY)
405
        # print('Wait over. Channel must be ready for host: %s' % self.hostname)
406
407
        # Receive the exit status code of the command we ran.
408
        status = chan.recv_exit_status()
409
410
        stdout = strip_shell_chars(stdout.getvalue())
411
        stderr = strip_shell_chars(stderr.getvalue())
412
413
        extra = {'_status': status, '_stdout': stdout, '_stderr': stderr}
414
        self.logger.debug('Command finished', extra=extra)
415
416
        return [stdout, stderr, status]
417
418
    def close(self):
419
        self.logger.debug('Closing server connection')
420
421
        self.client.close()
422
        if self.sftp:
423
            self.sftp.close()
424
        if self.bastion_client:
425
            self.bastion_client.close()
426
        return True
427
428
    def _consume_stdout(self, chan):
429
        """
430
        Try to consume stdout data from chan if it's receive ready.
431
        """
432
433
        out = bytearray()
434
        stdout = StringIO()
435
        if chan.recv_ready():
436
            data = chan.recv(self.CHUNK_SIZE)
437
            out += data
438
439
            while data:
440
                ready = chan.recv_ready()
441
442
                if not ready:
443
                    break
444
445
                data = chan.recv(self.CHUNK_SIZE)
446
                out += data
447
448
        stdout.write(self._get_decoded_data(out))
449
        return stdout
450
451
    def _consume_stderr(self, chan):
452
        """
453
        Try to consume stderr data from chan if it's receive ready.
454
        """
455
456
        out = bytearray()
457
        stderr = StringIO()
458
        if chan.recv_stderr_ready():
459
            data = chan.recv_stderr(self.CHUNK_SIZE)
460
            out += data
461
462
            while data:
463
                ready = chan.recv_stderr_ready()
464
465
                if not ready:
466
                    break
467
468
                data = chan.recv_stderr(self.CHUNK_SIZE)
469
                out += data
470
471
        stderr.write(self._get_decoded_data(out))
472
        return stderr
473
474
    def _get_decoded_data(self, data):
475
        try:
476
            return data.decode('utf-8')
477
        except:
478
            self.logger.exception('Non UTF-8 character found in data: %s', data)
479
            raise
480
481
    def _get_pkey_object(self, key_material):
482
        """
483
        Try to detect private key type and return paramiko.PKey object.
484
        """
485
486
        for cls in [paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey]:
487
            try:
488
                key = cls.from_private_key(StringIO(key_material))
489
            except paramiko.ssh_exception.SSHException:
490
                # Invalid key, try other key type
491
                pass
492
            else:
493
                return key
494
495
        # If a user passes in something which looks like file path we throw a more friendly
496
        # exception letting the user know we expect the contents a not a path.
497
        # Note: We do it here and not up the stack to avoid false positives.
498
        contains_header = PRIVATE_KEY_HEADER in key_material.lower()
499
        if not contains_header and (key_material.count('/') >= 1 or key_material.count('\\') >= 1):
500
            msg = ('"private_key" parameter needs to contain private key data / content and not '
501
                   'a path')
502
        else:
503
            msg = 'Invalid or unsupported key type'
504
505
        raise paramiko.ssh_exception.SSHException(msg)
506
507
    def _connect(self, host, socket=None):
508
        """
509
510
        :type host: ``str``
511
        :param host: Host to connect to
512
513
        :type socket: :class:`paramiko.Channel` or an opened :class:`socket.socket`
514
        :param socket: If specified, won't open a socket for communication to the specified host
515
                       and will use this instead
516
517
        :return: A connected SSHClient
518
        :rtype: :class:`paramiko.SSHClient`
519
        """
520
        conninfo = {'hostname': host,
521
                    'port': self.port,
522
                    'username': self.username,
523
                    'allow_agent': False,
524
                    'look_for_keys': False,
525
                    'timeout': self.timeout}
526
527
        if self.password:
528
            conninfo['password'] = self.password
529
530
        if self.key_files:
531
            conninfo['key_filename'] = self.key_files
532
533
        if self.key_material:
534
            conninfo['pkey'] = self._get_pkey_object(key_material=self.key_material)
535
536
        if not self.password and not (self.key_files or self.key_material):
537
            conninfo['allow_agent'] = True
538
            conninfo['look_for_keys'] = True
539
540
        extra = {'_hostname': host, '_port': self.port,
541
                 '_username': self.username, '_timeout': self.timeout}
542
        self.logger.debug('Connecting to server', extra=extra)
543
544
        if socket:
545
            conninfo['sock'] = socket
546
547
        client = paramiko.SSHClient()
548
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
549
        client.connect(**conninfo)
550
551
        return client
552
553
    def __repr__(self):
554
        return ('<ParamikoSSHClient hostname=%s,port=%s,username=%s,id=%s>' %
555
                (self.hostname, self.port, self.username, id(self)))
556