GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — develop ( 8ec028...a6b0a6 )
by Plexxi
19:39 queued 10:44
created

ParallelSSHClient._connect()   D

Complexity

Conditions 8

Size

Total Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
c 0
b 0
f 0
dl 0
loc 41
rs 4
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import re
18
import os
19
import traceback
20
21
import eventlet
22
from paramiko.ssh_exception import SSHException
23
24
from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE
25
from st2common.runners.paramiko_ssh import ParamikoSSHClient
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2common.runners.
Loading history...
26
from st2common.runners.paramiko_ssh import SSHCommandTimeoutError
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2common.runners.
Loading history...
27
from st2common import log as logging
28
from st2common.exceptions.ssh import NoHostsConnectedToException
29
import st2common.util.jsonify as jsonify
30
from st2common.util import ip_utils
31
32
LOG = logging.getLogger(__name__)
33
34
35
class ParallelSSHClient(object):
36
    KEYS_TO_TRANSFORM = ['stdout', 'stderr']
37
    CONNECT_ERROR = 'Cannot connect to host.'
38
39
    def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_material=None, port=22,
40
                 bastion_host=None, concurrency=10, raise_on_any_error=False, connect=True,
41
                 passphrase=None):
42
        self._ssh_user = user
43
        self._ssh_key_file = pkey_file
44
        self._ssh_key_material = pkey_material
45
        self._ssh_password = password
46
        self._hosts = hosts
47
        self._successful_connects = 0
48
        self._ssh_port = port
49
        self._bastion_host = bastion_host
50
        self._passphrase = passphrase
51
52
        if not hosts:
53
            raise Exception('Need an non-empty list of hosts to talk to.')
54
55
        self._pool = eventlet.GreenPool(concurrency)
56
        self._hosts_client = {}
57
        self._bad_hosts = {}
58
        self._scan_interval = 0.1
59
60
        if connect:
61
            connect_results = self.connect(raise_on_any_error=raise_on_any_error)
62
            extra = {'_connect_results': connect_results}
63
            LOG.debug('Connect to hosts complete.', extra=extra)
64
65
    def connect(self, raise_on_any_error=False):
66
        """
67
        Connect to hosts in hosts list. Returns status of connect as a dict.
68
69
        :param raise_on_any_error: Optional Raise an exception even if connecting to one
70
                                   of the hosts fails.
71
        :type raise_on_any_error: ``boolean``
72
73
        :rtype: ``dict`` of ``str`` to ``dict``
74
        """
75
        results = {}
76
77
        for host in self._hosts:
78
            while not self._pool.free():
79
                eventlet.sleep(self._scan_interval)
80
            self._pool.spawn(self._connect, host=host, results=results,
81
                             raise_on_any_error=raise_on_any_error)
82
83
        self._pool.waitall()
84
85
        if self._successful_connects < 1:
86
            # We definitely have to raise an exception in this case.
87
            LOG.error('Unable to connect to any of the hosts.',
88
                      extra={'connect_results': results})
89
            msg = ('Unable to connect to any one of the hosts: %s.\n\n connect_errors=%s' %
90
                   (self._hosts, json.dumps(results, indent=2)))
91
            raise NoHostsConnectedToException(msg)
92
93
        return results
94
95
    def run(self, cmd, timeout=None):
96
        """
97
        Run a command on remote hosts. Returns a dict containing results
98
        of execution from all hosts.
99
100
        :param cmd: Command to run. Must be shlex quoted.
101
        :type cmd: ``str``
102
103
        :param timeout: Optional Timeout for the command.
104
        :type timeout: ``int``
105
106
        :param cwd: Optional Current working directory. Must be shlex quoted.
107
        :type cwd: ``str``
108
109
        :rtype: ``dict`` of ``str`` to ``dict``
110
        """
111
112
        options = {
113
            'cmd': cmd,
114
            'timeout': timeout
115
        }
116
        results = self._execute_in_pool(self._run_command, **options)
117
        return results
118
119
    def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
120
        """
121
        Copy a file or folder to remote host.
122
123
        :param local_path: Path to local file or dir. Must be shlex quoted.
124
        :type local_path: ``str``
125
126
        :param remote_path: Path to remote file or dir. Must be shlex quoted.
127
        :type remote_path: ``str``
128
129
        :param mode: Optional mode to use for the file or dir.
130
        :type mode: ``int``
131
132
        :param mirror_local_mode: Optional Flag to mirror the mode
133
                                           on local file/dir on remote host.
134
        :type mirror_local_mode: ``boolean``
135
136
        :rtype: ``dict`` of ``str`` to ``dict``
137
        """
138
139
        if not os.path.exists(local_path):
140
            raise Exception('Local path %s does not exist.' % local_path)
141
142
        options = {
143
            'local_path': local_path,
144
            'remote_path': remote_path,
145
            'mode': mode,
146
            'mirror_local_mode': mirror_local_mode
147
        }
148
149
        return self._execute_in_pool(self._put_files, **options)
150
151
    def mkdir(self, path):
152
        """
153
        Create a directory on remote hosts.
154
155
        :param path: Path to remote dir that must be created. Must be shlex quoted.
156
        :type path: ``str``
157
158
        :rtype path: ``dict`` of ``str`` to ``dict``
159
        """
160
161
        options = {
162
            'path': path
163
        }
164
        return self._execute_in_pool(self._mkdir, **options)
165
166
    def delete_file(self, path):
167
        """
168
        Delete a file on remote hosts.
169
170
        :param path: Path to remote file that must be deleted. Must be shlex quoted.
171
        :type path: ``str``
172
173
        :rtype path: ``dict`` of ``str`` to ``dict``
174
        """
175
176
        options = {
177
            'path': path
178
        }
179
        return self._execute_in_pool(self._delete_file, **options)
180
181
    def delete_dir(self, path, force=False, timeout=None):
182
        """
183
        Delete a dir on remote hosts.
184
185
        :param path: Path to remote dir that must be deleted. Must be shlex quoted.
186
        :type path: ``str``
187
188
        :rtype path: ``dict`` of ``str`` to ``dict``
189
        """
190
191
        options = {
192
            'path': path,
193
            'force': force
194
        }
195
        return self._execute_in_pool(self._delete_dir, **options)
196
197
    def close(self):
198
        """
199
        Close all open SSH connections to hosts.
200
        """
201
202
        for host in self._hosts_client.keys():
203
            try:
204
                self._hosts_client[host].close()
205
            except:
206
                LOG.exception('Failed shutting down SSH connection to host: %s', host)
207
208
    def _execute_in_pool(self, execute_method, **kwargs):
209
        results = {}
210
211
        for host in self._bad_hosts.keys():
212
            results[host] = self._bad_hosts[host]
213
214
        for host in self._hosts_client.keys():
215
            while not self._pool.free():
216
                eventlet.sleep(self._scan_interval)
217
            self._pool.spawn(execute_method, host=host, results=results, **kwargs)
218
219
        self._pool.waitall()
220
        return results
221
222
    def _connect(self, host, results, raise_on_any_error=False):
223
        (hostname, port) = self._get_host_port_info(host)
224
225
        extra = {'host': host, 'port': port, 'user': self._ssh_user}
226
        if self._ssh_password:
227
            extra['password'] = '<redacted>'
228
        elif self._ssh_key_file:
229
            extra['key_file_path'] = self._ssh_key_file
230
        else:
231
            extra['private_key'] = '<redacted>'
232
233
        LOG.debug('Connecting to host.', extra=extra)
234
235
        client = ParamikoSSHClient(hostname, username=self._ssh_user,
236
                                   password=self._ssh_password,
237
                                   bastion_host=self._bastion_host,
238
                                   key_files=self._ssh_key_file,
239
                                   key_material=self._ssh_key_material,
240
                                   passphrase=self._passphrase,
241
                                   port=port)
242
        try:
243
            client.connect()
244
        except SSHException as ex:
245
            LOG.exception(ex)
246
            if raise_on_any_error:
247
                raise
248
            error_dict = self._generate_error_result(exc=ex, message='Connection error.')
249
            self._bad_hosts[hostname] = error_dict
250
            results[hostname] = error_dict
251
        except Exception as ex:
252
            error = 'Failed connecting to host %s.' % hostname
253
            LOG.exception(error)
254
            if raise_on_any_error:
255
                raise
256
            error_dict = self._generate_error_result(exc=ex, message=error)
257
            self._bad_hosts[hostname] = error_dict
258
            results[hostname] = error_dict
259
        else:
260
            self._successful_connects += 1
261
            self._hosts_client[hostname] = client
262
            results[hostname] = {'message': 'Connected to host.'}
263
264
    def _run_command(self, host, cmd, results, timeout=None):
265
        try:
266
            LOG.debug('Running command: %s on host: %s.', cmd, host)
267
            client = self._hosts_client[host]
268
            (stdout, stderr, exit_code) = client.run(cmd, timeout=timeout)
269
            is_succeeded = (exit_code == 0)
270
            result_dict = {'stdout': stdout, 'stderr': stderr, 'return_code': exit_code,
271
                           'succeeded': is_succeeded, 'failed': not is_succeeded}
272
            results[host] = jsonify.json_loads(result_dict, ParallelSSHClient.KEYS_TO_TRANSFORM)
273
        except Exception as ex:
274
            cmd = self._sanitize_command_string(cmd=cmd)
275
            error = 'Failed executing command "%s" on host "%s"' % (cmd, host)
276
            LOG.exception(error)
277
            results[host] = self._generate_error_result(exc=ex, message=error)
278
279
    def _put_files(self, local_path, remote_path, host, results, mode=None,
280
                   mirror_local_mode=False):
281
        try:
282
            LOG.debug('Copying file to host: %s' % host)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
283
            if os.path.isdir(local_path):
284
                result = self._hosts_client[host].put_dir(local_path, remote_path)
285
            else:
286
                result = self._hosts_client[host].put(local_path, remote_path,
287
                                                      mirror_local_mode=mirror_local_mode,
288
                                                      mode=mode)
289
            LOG.debug('Result of copy: %s' % result)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
290
            results[host] = result
291
        except Exception as ex:
292
            error = 'Failed sending file(s) in path %s to host %s' % (local_path, host)
293
            LOG.exception(error)
294
            results[host] = self._generate_error_result(exc=ex, message=error)
295
296
    def _mkdir(self, host, path, results):
297
        try:
298
            result = self._hosts_client[host].mkdir(path)
299
            results[host] = result
300
        except Exception as ex:
301
            error = 'Failed "mkdir %s" on host %s.' % (path, host)
302
            LOG.exception(error)
303
            results[host] = self._generate_error_result(exc=ex, message=error)
304
305
    def _delete_file(self, host, path, results):
306
        try:
307
            result = self._hosts_client[host].delete_file(path)
308
            results[host] = result
309
        except Exception as ex:
310
            error = 'Failed deleting file %s on host %s.' % (path, host)
311
            LOG.exception(error)
312
            results[host] = self._generate_error_result(exc=ex, message=error)
313
314
    def _delete_dir(self, host, path, results, force=False, timeout=None):
315
        try:
316
            result = self._hosts_client[host].delete_dir(path, force=force, timeout=timeout)
317
            results[host] = result
318
        except Exception as ex:
319
            error = 'Failed deleting dir %s on host %s.' % (path, host)
320
            LOG.exception(error)
321
            results[host] = self._generate_error_result(exc=ex, message=error)
322
323
    def _get_host_port_info(self, host_str):
324
        (hostname, port) = ip_utils.split_host_port(host_str)
325
        if not port:
326
            port = self._ssh_port
327
328
        return (hostname, port)
329
330
    @staticmethod
331
    def _sanitize_command_string(cmd):
332
        """
333
        Remove any potentially sensitive information from the command string.
334
335
        For now we only mask the values of the sensitive environment variables.
336
        """
337
        if not cmd:
338
            return cmd
339
340
        result = re.sub('ST2_ACTION_AUTH_TOKEN=(.+?)\s+?', 'ST2_ACTION_AUTH_TOKEN=%s ' %
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \s was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
341
                        (MASKED_ATTRIBUTE_VALUE), cmd)
342
        return result
343
344
    @staticmethod
345
    def _generate_error_result(exc, message):
346
        """
347
        :param exc: Raised exception.
348
        :type exc: Exception.
349
350
        :param message: Error message which will be prefixed to the exception exception message.
351
        :type message: ``str``
352
        """
353
        exc_message = getattr(exc, 'message', str(exc))
354
        error_message = '%s %s' % (message, exc_message)
355
        traceback_message = traceback.format_exc()
356
357
        if isinstance(exc, SSHCommandTimeoutError):
358
            return_code = -9
359
            timeout = True
360
        else:
361
            timeout = False
362
            return_code = 255
363
364
        stdout = getattr(exc, 'stdout', None) or ''
365
        stderr = getattr(exc, 'stderr', None) or ''
366
367
        error_dict = {
368
            'failed': True,
369
            'succeeded': False,
370
            'timeout': timeout,
371
            'return_code': return_code,
372
            'stdout': stdout,
373
            'stderr': stderr,
374
            'error': error_message,
375
            'traceback': traceback_message,
376
        }
377
        return error_dict
378
379
    def __repr__(self):
380
        return ('<ParallelSSHClient hosts=%s,user=%s,id=%s>' %
381
                (repr(self._hosts), self._ssh_user, id(self)))
382