GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — develop-v1.3.1 ( 969842...8fa207 )
by
unknown
05:56
created

ParamikoSSHClient.put_dir()   C

Complexity

Conditions 7

Size

Total Lines 55

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 7
dl 0
loc 55
rs 6.822

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import posixpath
18
from StringIO import StringIO
19
import time
20
21
import eventlet
22
from oslo_config import cfg
23
24
import paramiko
25
26
# Depending on your version of Paramiko, it may cause a deprecation
27
# warning on Python 2.6.
28
# Ref: https://bugs.launchpad.net/paramiko/+bug/392973
29
30
from st2common.log import logging
31
from st2common.util.misc import strip_shell_chars
32
from st2common.util.shell import quote_unix
33
34
__all__ = [
35
    'ParamikoSSHClient',
36
37
    'SSHCommandTimeoutError'
38
]
39
40
PRIVATE_KEY_HEADER = 'PRIVATE KEY-----'.lower()
41
42
43
class SSHCommandTimeoutError(Exception):
44
    """
45
    Exception which is raised when an SSH command times out.
46
    """
47
48
    def __init__(self, cmd, timeout, stdout=None, stderr=None):
49
        """
50
        :param stdout: Stdout which was consumed until the timeout occured.
51
        :type stdout: ``str``
52
53
        :param stdout: Stderr which was consumed until the timeout occured.
54
        :type stderr: ``str``
55
        """
56
        self.cmd = cmd
57
        self.timeout = timeout
58
        self.stdout = stdout
59
        self.stderr = stderr
60
        message = 'Command didn\'t finish in %s seconds' % (timeout)
61
        super(SSHCommandTimeoutError, self).__init__(message)
62
63
    def __repr__(self):
64
        return ('<SSHCommandTimeoutError: cmd="%s",timeout=%s)>' %
65
                (self.cmd, self.timeout))
66
67
    def __str__(self):
68
        return self.message
69
70
71
class ParamikoSSHClient(object):
72
    """
73
    A SSH Client powered by Paramiko.
74
    """
75
76
    # Maximum number of bytes to read at once from a socket
77
    CHUNK_SIZE = 1024
78
    # How long to sleep while waiting for command to finish
79
    SLEEP_DELAY = 1.5
80
    # Connect socket timeout
81
    CONNECT_TIMEOUT = 60
82
83
    def __init__(self, hostname, port=22, username=None, password=None, bastion_host=None,
84
                 key=None, key_files=None, key_material=None, timeout=None):
85
        """
86
        Authentication is always attempted in the following order:
87
88
        - The key passed in (if key is provided)
89
        - Any key we can find through an SSH agent (only if no password and
90
          key is provided)
91
        - Any "id_rsa" or "id_dsa" key discoverable in ~/.ssh/ (only if no
92
          password and key is provided)
93
        - Plain username/password auth, if a password was given (if password is
94
          provided)
95
        """
96
        if key_files and key_material:
97
            raise ValueError(('key_files and key_material arguments are '
98
                              'mutually exclusive'))
99
100
        self.hostname = hostname
101
        self.port = port
102
        self.username = username if username else cfg.CONF.system_user
103
        self.password = password
104
        self.key = key if key else cfg.CONF.system_user.ssh_key_file
105
        self.key_files = key_files
106
        if not self.key_files and self.key:
107
            self.key_files = key  # `key` arg is deprecated.
108
        self.timeout = timeout or ParamikoSSHClient.CONNECT_TIMEOUT
109
        self.key_material = key_material
110
        self.client = None
111
        self.logger = logging.getLogger(__name__)
112
        self.sftp = None
113
        self.bastion_host = bastion_host
114
        self.bastion_client = None
115
        self.bastion_socket = None
116
117
    def connect(self):
118
        """
119
        Connect to the remote node over SSH.
120
121
        :return: True if the connection has been successfully established,
122
                 False otherwise.
123
        :rtype: ``bool``
124
        """
125
        if self.bastion_host:
126
            self.logger.debug('Bastion host specified, connecting')
127
            self.bastion_client = self._connect(host=self.bastion_host)
128
            transport = self.bastion_client.get_transport()
129
            real_addr = (self.hostname, self.port)
130
            # fabric uses ('', 0) for direct-tcpip, this duplicates that behaviour
131
            # see https://github.com/fabric/fabric/commit/c2a9bbfd50f560df6c6f9675603fb405c4071cad
132
            local_addr = ('', 0)
133
            self.bastion_socket = transport.open_channel('direct-tcpip', real_addr, local_addr)
134
135
        self.client = self._connect(host=self.hostname, socket=self.bastion_socket)
136
        self.sftp = self.client.open_sftp()
137
        return True
138
139
    def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
140
        """
141
        Upload a file to the remote node.
142
143
        :type local_path: ``st``
144
        :param local_path: File path on the local node.
145
146
        :type remote_path: ``str``
147
        :param remote_path: File path on the remote node.
148
149
        :type mode: ``int``
150
        :param mode: Permissions mode for the file. E.g. 0744.
151
152
        :type mirror_local_mode: ``int``
153
        :param mirror_local_mode: Should remote file mirror local mode.
154
155
        :return: Attributes of the remote file.
156
        :rtype: :class:`posix.stat_result` or ``None``
157
        """
158
159
        if not local_path or not remote_path:
160
            raise Exception('Need both local_path and remote_path. local: %s, remote: %s' %
161
                            local_path, remote_path)
162
        local_path = quote_unix(local_path)
163
        remote_path = quote_unix(remote_path)
164
165
        extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode,
166
                 '_mirror_local_mode': mirror_local_mode}
167
        self.logger.debug('Uploading file', extra=extra)
168
169
        if not os.path.exists(local_path):
170
            raise Exception('Path %s does not exist locally.' % local_path)
171
172
        rattrs = self.sftp.put(local_path, remote_path)
173
174
        if mode or mirror_local_mode:
175
            local_mode = mode
176
            if not mode or mirror_local_mode:
177
                local_mode = os.stat(local_path).st_mode
178
179
            # Cast to octal integer in case of string
180
            if isinstance(local_mode, basestring):
181
                local_mode = int(local_mode, 8)
182
            local_mode = local_mode & 07777
183
            remote_mode = rattrs.st_mode
184
            # Only bitshift if we actually got an remote_mode
185
            if remote_mode is not None:
186
                remote_mode = (remote_mode & 07777)
187
            if local_mode != remote_mode:
188
                self.sftp.chmod(remote_path, local_mode)
189
190
        return rattrs
191
192
    def put_dir(self, local_path, remote_path, mode=None, mirror_local_mode=False):
193
        """
194
        Upload a dir to the remote node.
195
196
        :type local_path: ``str``
197
        :param local_path: Dir path on the local node.
198
199
        :type remote_path: ``str``
200
        :param remote_path: Base dir path on the remote node.
201
202
        :type mode: ``int``
203
        :param mode: Permissions mode for the file. E.g. 0744.
204
205
        :type mirror_local_mode: ``int``
206
        :param mirror_local_mode: Should remote file mirror local mode.
207
208
        :return: List of files created on remote node.
209
        :rtype: ``list`` of ``str``
210
        """
211
212
        extra = {'_local_path': local_path, '_remote_path': remote_path, '_mode': mode,
213
                 '_mirror_local_mode': mirror_local_mode}
214
        self.logger.debug('Uploading dir', extra=extra)
215
216
        if os.path.basename(local_path):
217
            strip = os.path.dirname(local_path)
218
        else:
219
            strip = os.path.dirname(os.path.dirname(local_path))
220
221
        remote_paths = []
222
223
        for context, dirs, files in os.walk(local_path):
224
            rcontext = context.replace(strip, '', 1)
225
            # normalize pathname separators with POSIX separator
226
            rcontext = rcontext.replace(os.sep, '/')
227
            rcontext = rcontext.lstrip('/')
228
            rcontext = posixpath.join(remote_path, rcontext)
229
230
            if not self.exists(rcontext):
231
                self.sftp.mkdir(rcontext)
232
233
            for d in dirs:
234
                n = posixpath.join(rcontext, d)
235
                if not self.exists(n):
236
                    self.sftp.mkdir(n)
237
238
            for f in files:
239
                local_path = os.path.join(context, f)
240
                n = posixpath.join(rcontext, f)
241
                # Note that quote_unix is done by put anyways.
242
                p = self.put(local_path=local_path, remote_path=n,
243
                             mirror_local_mode=mirror_local_mode, mode=mode)
244
                remote_paths.append(p)
245
246
        return remote_paths
247
248
    def exists(self, remote_path):
249
        """
250
        Validate whether a remote file or directory exists.
251
252
        :param remote_path: Path to remote file.
253
        :type remote_path: ``str``
254
255
        :rtype: ``bool``
256
        """
257
        try:
258
            self.sftp.lstat(remote_path).st_mode
259
        except IOError:
260
            return False
261
262
        return True
263
264
    def mkdir(self, dir_path):
265
        """
266
        Create a directory on remote box.
267
268
        :param dir_path: Path to remote directory to be created.
269
        :type dir_path: ``str``
270
271
        :return: Returns nothing if successful else raises IOError exception.
272
273
        :rtype: ``None``
274
        """
275
276
        dir_path = quote_unix(dir_path)
277
        extra = {'_dir_path': dir_path}
278
        self.logger.debug('mkdir', extra=extra)
279
        return self.sftp.mkdir(dir_path)
280
281
    def delete_file(self, path):
282
        """
283
        Delete a file on remote box.
284
285
        :param path: Path to remote file to be deleted.
286
        :type path: ``str``
287
288
        :return: True if the file has been successfully deleted, False
289
                 otherwise.
290
        :rtype: ``bool``
291
        """
292
293
        path = quote_unix(path)
294
        extra = {'_path': path}
295
        self.logger.debug('Deleting file', extra=extra)
296
        self.sftp.unlink(path)
297
        return True
298
299
    def delete_dir(self, path, force=False, timeout=None):
300
        """
301
        Delete a dir on remote box.
302
303
        :param path: Path to remote dir to be deleted.
304
        :type path: ``str``
305
306
        :param force: Optional Forcefully remove dir.
307
        :type force: ``bool``
308
309
        :param timeout: Optional Time to wait for dir to be deleted. Only relevant for force.
310
        :type timeout: ``int``
311
312
        :return: True if the file has been successfully deleted, False
313
                 otherwise.
314
        :rtype: ``bool``
315
        """
316
317
        path = quote_unix(path)
318
        extra = {'_path': path}
319
        if force:
320
            command = 'rm -rf %s' % path
321
            extra['_command'] = command
322
            extra['_force'] = force
323
            self.logger.debug('Deleting dir', extra=extra)
324
            return self.run(command, timeout=timeout)
325
326
        self.logger.debug('Deleting dir', extra=extra)
327
        return self.sftp.rmdir(path)
328
329
    def run(self, cmd, timeout=None, quote=False):
330
        """
331
        Note: This function is based on paramiko's exec_command()
332
        method.
333
334
        :param timeout: How long to wait (in seconds) for the command to
335
                        finish (optional).
336
        :type timeout: ``float``
337
        """
338
339
        if quote:
340
            cmd = quote_unix(cmd)
341
342
        extra = {'_cmd': cmd}
343
        self.logger.info('Executing command', extra=extra)
344
345
        # Use the system default buffer size
346
        bufsize = -1
347
348
        transport = self.client.get_transport()
349
        chan = transport.open_session()
350
351
        start_time = time.time()
352
        if cmd.startswith('sudo'):
353
            # Note that fabric does this as well. If you set pty, stdout and stderr
354
            # streams will be combined into one.
355
            chan.get_pty()
356
        chan.exec_command(cmd)
357
358
        stdout = StringIO()
359
        stderr = StringIO()
360
361
        # Create a stdin file and immediately close it to prevent any
362
        # interactive script from hanging the process.
363
        stdin = chan.makefile('wb', bufsize)
364
        stdin.close()
365
366
        # Receive all the output
367
        # Note #1: This is used instead of chan.makefile approach to prevent
368
        # buffering issues and hanging if the executed command produces a lot
369
        # of output.
370
        #
371
        # Note #2: If you are going to remove "ready" checks inside the loop
372
        # you are going to have a bad time. Trying to consume from a channel
373
        # which is not ready will block for indefinitely.
374
        exit_status_ready = chan.exit_status_ready()
375
376
        if exit_status_ready:
377
            stdout.write(self._consume_stdout(chan).getvalue())
378
            stderr.write(self._consume_stderr(chan).getvalue())
379
380
        while not exit_status_ready:
381
            current_time = time.time()
382
            elapsed_time = (current_time - start_time)
383
384
            if timeout and (elapsed_time > timeout):
385
                # TODO: Is this the right way to clean up?
386
                chan.close()
387
388
                stdout = strip_shell_chars(stdout.getvalue())
389
                stderr = strip_shell_chars(stderr.getvalue())
390
                raise SSHCommandTimeoutError(cmd=cmd, timeout=timeout, stdout=stdout,
391
                                             stderr=stderr)
392
393
            stdout.write(self._consume_stdout(chan).getvalue())
394
            stderr.write(self._consume_stderr(chan).getvalue())
395
396
            # We need to check the exist status here, because the command could
397
            # print some output and exit during this sleep bellow.
398
            exit_status_ready = chan.exit_status_ready()
399
400
            if exit_status_ready:
401
                break
402
403
            # Short sleep to prevent busy waiting
404
            eventlet.sleep(self.SLEEP_DELAY)
405
        # print('Wait over. Channel must be ready for host: %s' % self.hostname)
406
407
        # Receive the exit status code of the command we ran.
408
        status = chan.recv_exit_status()
409
410
        stdout = strip_shell_chars(stdout.getvalue())
411
        stderr = strip_shell_chars(stderr.getvalue())
412
413
        extra = {'_status': status, '_stdout': stdout, '_stderr': stderr}
414
        self.logger.debug('Command finished', extra=extra)
415
416
        return [stdout, stderr, status]
417
418
    def close(self):
419
        self.logger.debug('Closing server connection')
420
421
        self.client.close()
422
        if self.sftp:
423
            self.sftp.close()
424
        if self.bastion_client:
425
            self.bastion_client.close()
426
        return True
427
428
    def _consume_stdout(self, chan):
429
        """
430
        Try to consume stdout data from chan if it's receive ready.
431
        """
432
433
        out = bytearray()
434
        stdout = StringIO()
435
        if chan.recv_ready():
436
            data = chan.recv(self.CHUNK_SIZE)
437
            out += data
438
439
            while data:
440
                ready = chan.recv_ready()
441
442
                if not ready:
443
                    break
444
445
                data = chan.recv(self.CHUNK_SIZE)
446
                out += data
447
448
        stdout.write(self._get_decoded_data(out))
449
        return stdout
450
451
    def _consume_stderr(self, chan):
452
        """
453
        Try to consume stderr data from chan if it's receive ready.
454
        """
455
456
        out = bytearray()
457
        stderr = StringIO()
458
        if chan.recv_stderr_ready():
459
            data = chan.recv_stderr(self.CHUNK_SIZE)
460
            out += data
461
462
            while data:
463
                ready = chan.recv_stderr_ready()
464
465
                if not ready:
466
                    break
467
468
                data = chan.recv_stderr(self.CHUNK_SIZE)
469
                out += data
470
471
        stderr.write(self._get_decoded_data(out))
472
        return stderr
473
474
    def _get_decoded_data(self, data):
475
        try:
476
            return data.decode('utf-8')
477
        except:
478
            self.logger.exception('Non UTF-8 character found in data: %s', data)
479
            raise
480
481
    def _get_pkey_object(self, key_material):
482
        """
483
        Try to detect private key type and return paramiko.PKey object.
484
        """
485
486
        for cls in [paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey]:
487
            try:
488
                key = cls.from_private_key(StringIO(key_material))
489
            except paramiko.ssh_exception.SSHException:
490
                # Invalid key, try other key type
491
                pass
492
            else:
493
                return key
494
495
        # If a user passes in something which looks like file path we throw a more friendly
496
        # exception letting the user know we expect the contents a not a path.
497
        # Note: We do it here and not up the stack to avoid false positives.
498
        contains_header = PRIVATE_KEY_HEADER in key_material.lower()
499
        if not contains_header and (key_material.count('/') >= 1 or key_material.count('\\') >= 1):
500
            msg = ('"private_key" parameter needs to contain private key data / content and not '
501
                   'a path')
502
        else:
503
            msg = 'Invalid or unsupported key type'
504
505
        raise paramiko.ssh_exception.SSHException(msg)
506
507
    def _connect(self, host, socket=None):
508
        """
509
510
        :type host: ``str``
511
        :param host: Host to connect to
512
513
        :type socket: :class:`paramiko.Channel` or an opened :class:`socket.socket`
514
        :param socket: If specified, won't open a socket for communication to the specified host
515
                       and will use this instead
516
517
        :return: A connected SSHClient
518
        :rtype: :class:`paramiko.SSHClient`
519
        """
520
        conninfo = {'hostname': host,
521
                    'port': self.port,
522
                    'username': self.username,
523
                    'allow_agent': False,
524
                    'look_for_keys': False,
525
                    'timeout': self.timeout}
526
527
        if self.password:
528
            conninfo['password'] = self.password
529
530
        if self.key_files:
531
            conninfo['key_filename'] = self.key_files
532
533
        if self.key_material:
534
            conninfo['pkey'] = self._get_pkey_object(key_material=self.key_material)
535
536
        if not self.password and not (self.key_files or self.key_material):
537
            conninfo['allow_agent'] = True
538
            conninfo['look_for_keys'] = True
539
540
        extra = {'_hostname': host, '_port': self.port,
541
                 '_username': self.username, '_timeout': self.timeout}
542
        self.logger.debug('Connecting to server', extra=extra)
543
544
        if socket:
545
            conninfo['sock'] = socket
546
547
        client = paramiko.SSHClient()
548
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
549
        client.connect(**conninfo)
550
551
        return client
552
553
    def __repr__(self):
554
        return ('<ParamikoSSHClient hostname=%s,port=%s,username=%s,id=%s>' %
555
                (self.hostname, self.port, self.username, id(self)))
556