GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — develop-v1.6.0 ( 9d5181...7efb31 )
by
unknown
04:49
created

ParallelSSHClient   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 340
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 340
rs 8.3396
c 0
b 0
f 0
wmc 44

19 Methods

Rating   Name   Duplication   Size   Complexity  
B __init__() 0 25 3
B connect() 0 29 4
A _sanitize_command_string() 0 13 2
A delete_file() 0 14 1
A _run_command() 0 14 2
A delete_dir() 0 15 1
A close() 0 10 3
A _get_host_port_info() 0 6 2
A mkdir() 0 14 1
A _mkdir() 0 8 2
A run() 0 23 1
A _delete_dir() 0 8 2
B _connect() 0 34 6
A __repr__() 0 3 1
B _generate_error_result() 0 34 2
B put() 0 31 2
A _put_files() 0 16 3
A _delete_file() 0 8 2
A _execute_in_pool() 0 13 4

How to fix   Complexity   

Complex Class

Complex classes like ParallelSSHClient often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import re
18
import os
19
import traceback
20
21
import eventlet
22
23
from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE
24
from st2actions.runners.ssh.paramiko_ssh import ParamikoSSHClient
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2actions.runners.ssh.
Loading history...
25
from st2actions.runners.ssh.paramiko_ssh import SSHCommandTimeoutError
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2actions.runners.ssh.
Loading history...
26
from st2common import log as logging
27
from st2common.exceptions.ssh import NoHostsConnectedToException
28
import st2common.util.jsonify as jsonify
29
from st2common.util import ip_utils
30
31
LOG = logging.getLogger(__name__)
32
33
34
class ParallelSSHClient(object):
35
    KEYS_TO_TRANSFORM = ['stdout', 'stderr']
36
    CONNECT_ERROR = 'Cannot connect to host.'
37
38
    def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_material=None, port=22,
39
                 bastion_host=None, concurrency=10, raise_on_any_error=False, connect=True,
40
                 passphrase=None):
41
        self._ssh_user = user
42
        self._ssh_key_file = pkey_file
43
        self._ssh_key_material = pkey_material
44
        self._ssh_password = password
45
        self._hosts = hosts
46
        self._successful_connects = 0
47
        self._ssh_port = port
48
        self._bastion_host = bastion_host
49
        self._passphrase = passphrase
50
51
        if not hosts:
52
            raise Exception('Need an non-empty list of hosts to talk to.')
53
54
        self._pool = eventlet.GreenPool(concurrency)
55
        self._hosts_client = {}
56
        self._bad_hosts = {}
57
        self._scan_interval = 0.1
58
59
        if connect:
60
            connect_results = self.connect(raise_on_any_error=raise_on_any_error)
61
            extra = {'_connect_results': connect_results}
62
            LOG.debug('Connect to hosts complete.', extra=extra)
63
64
    def connect(self, raise_on_any_error=False):
65
        """
66
        Connect to hosts in hosts list. Returns status of connect as a dict.
67
68
        :param raise_on_any_error: Optional Raise an exception even if connecting to one
69
                                   of the hosts fails.
70
        :type raise_on_any_error: ``boolean``
71
72
        :rtype: ``dict`` of ``str`` to ``dict``
73
        """
74
        results = {}
75
76
        for host in self._hosts:
77
            while not self._pool.free():
78
                eventlet.sleep(self._scan_interval)
79
            self._pool.spawn(self._connect, host=host, results=results,
80
                             raise_on_any_error=raise_on_any_error)
81
82
        self._pool.waitall()
83
84
        if self._successful_connects < 1:
85
            # We definitely have to raise an exception in this case.
86
            LOG.error('Unable to connect to any of the hosts.',
87
                      extra={'connect_results': results})
88
            msg = ('Unable to connect to any one of the hosts: %s.\n\n connect_errors=%s' %
89
                   (self._hosts, json.dumps(results, indent=2)))
90
            raise NoHostsConnectedToException(msg)
91
92
        return results
93
94
    def run(self, cmd, timeout=None):
95
        """
96
        Run a command on remote hosts. Returns a dict containing results
97
        of execution from all hosts.
98
99
        :param cmd: Command to run. Must be shlex quoted.
100
        :type cmd: ``str``
101
102
        :param timeout: Optional Timeout for the command.
103
        :type timeout: ``int``
104
105
        :param cwd: Optional Current working directory. Must be shlex quoted.
106
        :type cwd: ``str``
107
108
        :rtype: ``dict`` of ``str`` to ``dict``
109
        """
110
111
        options = {
112
            'cmd': cmd,
113
            'timeout': timeout
114
        }
115
        results = self._execute_in_pool(self._run_command, **options)
116
        return results
117
118
    def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
119
        """
120
        Copy a file or folder to remote host.
121
122
        :param local_path: Path to local file or dir. Must be shlex quoted.
123
        :type local_path: ``str``
124
125
        :param remote_path: Path to remote file or dir. Must be shlex quoted.
126
        :type remote_path: ``str``
127
128
        :param mode: Optional mode to use for the file or dir.
129
        :type mode: ``int``
130
131
        :param mirror_local_mode: Optional Flag to mirror the mode
132
                                           on local file/dir on remote host.
133
        :type mirror_local_mode: ``boolean``
134
135
        :rtype: ``dict`` of ``str`` to ``dict``
136
        """
137
138
        if not os.path.exists(local_path):
139
            raise Exception('Local path %s does not exist.' % local_path)
140
141
        options = {
142
            'local_path': local_path,
143
            'remote_path': remote_path,
144
            'mode': mode,
145
            'mirror_local_mode': mirror_local_mode
146
        }
147
148
        return self._execute_in_pool(self._put_files, **options)
149
150
    def mkdir(self, path):
151
        """
152
        Create a directory on remote hosts.
153
154
        :param path: Path to remote dir that must be created. Must be shlex quoted.
155
        :type path: ``str``
156
157
        :rtype path: ``dict`` of ``str`` to ``dict``
158
        """
159
160
        options = {
161
            'path': path
162
        }
163
        return self._execute_in_pool(self._mkdir, **options)
164
165
    def delete_file(self, path):
166
        """
167
        Delete a file on remote hosts.
168
169
        :param path: Path to remote file that must be deleted. Must be shlex quoted.
170
        :type path: ``str``
171
172
        :rtype path: ``dict`` of ``str`` to ``dict``
173
        """
174
175
        options = {
176
            'path': path
177
        }
178
        return self._execute_in_pool(self._delete_file, **options)
179
180
    def delete_dir(self, path, force=False, timeout=None):
181
        """
182
        Delete a dir on remote hosts.
183
184
        :param path: Path to remote dir that must be deleted. Must be shlex quoted.
185
        :type path: ``str``
186
187
        :rtype path: ``dict`` of ``str`` to ``dict``
188
        """
189
190
        options = {
191
            'path': path,
192
            'force': force
193
        }
194
        return self._execute_in_pool(self._delete_dir, **options)
195
196
    def close(self):
197
        """
198
        Close all open SSH connections to hosts.
199
        """
200
201
        for host in self._hosts_client.keys():
202
            try:
203
                self._hosts_client[host].close()
204
            except:
205
                LOG.exception('Failed shutting down SSH connection to host: %s', host)
206
207
    def _execute_in_pool(self, execute_method, **kwargs):
208
        results = {}
209
210
        for host in self._bad_hosts.keys():
211
            results[host] = self._bad_hosts[host]
212
213
        for host in self._hosts_client.keys():
214
            while not self._pool.free():
215
                eventlet.sleep(self._scan_interval)
216
            self._pool.spawn(execute_method, host=host, results=results, **kwargs)
217
218
        self._pool.waitall()
219
        return results
220
221
    def _connect(self, host, results, raise_on_any_error=False):
222
        (hostname, port) = self._get_host_port_info(host)
223
224
        extra = {'host': host, 'port': port, 'user': self._ssh_user}
225
        if self._ssh_password:
226
            extra['password'] = '<redacted>'
227
        elif self._ssh_key_file:
228
            extra['key_file_path'] = self._ssh_key_file
229
        else:
230
            extra['private_key'] = '<redacted>'
231
232
        LOG.debug('Connecting to host.', extra=extra)
233
234
        client = ParamikoSSHClient(hostname, username=self._ssh_user,
235
                                   password=self._ssh_password,
236
                                   bastion_host=self._bastion_host,
237
                                   key_files=self._ssh_key_file,
238
                                   key_material=self._ssh_key_material,
239
                                   passphrase=self._passphrase,
240
                                   port=port)
241
        try:
242
            client.connect()
243
        except Exception as ex:
244
            error = 'Failed connecting to host %s.' % hostname
245
            LOG.exception(error)
246
            if raise_on_any_error:
247
                raise
248
            error_dict = self._generate_error_result(exc=ex, message=error)
249
            self._bad_hosts[hostname] = error_dict
250
            results[hostname] = error_dict
251
        else:
252
            self._successful_connects += 1
253
            self._hosts_client[hostname] = client
254
            results[hostname] = {'message': 'Connected to host.'}
255
256
    def _run_command(self, host, cmd, results, timeout=None):
257
        try:
258
            LOG.debug('Running command: %s on host: %s.', cmd, host)
259
            client = self._hosts_client[host]
260
            (stdout, stderr, exit_code) = client.run(cmd, timeout=timeout)
261
            is_succeeded = (exit_code == 0)
262
            result_dict = {'stdout': stdout, 'stderr': stderr, 'return_code': exit_code,
263
                           'succeeded': is_succeeded, 'failed': not is_succeeded}
264
            results[host] = jsonify.json_loads(result_dict, ParallelSSHClient.KEYS_TO_TRANSFORM)
265
        except Exception as ex:
266
            cmd = self._sanitize_command_string(cmd=cmd)
267
            error = 'Failed executing command "%s" on host "%s"' % (cmd, host)
268
            LOG.exception(error)
269
            results[host] = self._generate_error_result(exc=ex, message=error)
270
271
    def _put_files(self, local_path, remote_path, host, results, mode=None,
272
                   mirror_local_mode=False):
273
        try:
274
            LOG.debug('Copying file to host: %s' % host)
275
            if os.path.isdir(local_path):
276
                result = self._hosts_client[host].put_dir(local_path, remote_path)
277
            else:
278
                result = self._hosts_client[host].put(local_path, remote_path,
279
                                                      mirror_local_mode=mirror_local_mode,
280
                                                      mode=mode)
281
            LOG.debug('Result of copy: %s' % result)
282
            results[host] = result
283
        except Exception as ex:
284
            error = 'Failed sending file(s) in path %s to host %s' % (local_path, host)
285
            LOG.exception(error)
286
            results[host] = self._generate_error_result(exc=ex, message=error)
287
288
    def _mkdir(self, host, path, results):
289
        try:
290
            result = self._hosts_client[host].mkdir(path)
291
            results[host] = result
292
        except Exception as ex:
293
            error = 'Failed "mkdir %s" on host %s.' % (path, host)
294
            LOG.exception(error)
295
            results[host] = self._generate_error_result(exc=ex, message=error)
296
297
    def _delete_file(self, host, path, results):
298
        try:
299
            result = self._hosts_client[host].delete_file(path)
300
            results[host] = result
301
        except Exception as ex:
302
            error = 'Failed deleting file %s on host %s.' % (path, host)
303
            LOG.exception(error)
304
            results[host] = self._generate_error_result(exc=ex, message=error)
305
306
    def _delete_dir(self, host, path, results, force=False, timeout=None):
307
        try:
308
            result = self._hosts_client[host].delete_dir(path, force=force, timeout=timeout)
309
            results[host] = result
310
        except Exception as ex:
311
            error = 'Failed deleting dir %s on host %s.' % (path, host)
312
            LOG.exception(error)
313
            results[host] = self._generate_error_result(exc=ex, message=error)
314
315
    def _get_host_port_info(self, host_str):
316
        (hostname, port) = ip_utils.split_host_port(host_str)
317
        if not port:
318
            port = self._ssh_port
319
320
        return (hostname, port)
321
322
    @staticmethod
323
    def _sanitize_command_string(cmd):
324
        """
325
        Remove any potentially sensitive information from the command string.
326
327
        For now we only mask the values of the sensitive environment variables.
328
        """
329
        if not cmd:
330
            return cmd
331
332
        result = re.sub('ST2_ACTION_AUTH_TOKEN=(.+?)\s+?', 'ST2_ACTION_AUTH_TOKEN=%s ' %
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \s was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
333
                        (MASKED_ATTRIBUTE_VALUE), cmd)
334
        return result
335
336
    @staticmethod
337
    def _generate_error_result(exc, message):
338
        """
339
        :param exc: Raised exception.
340
        :type exc: Exception.
341
342
        :param message: Error message which will be prefixed to the exception exception message.
343
        :type message: ``str``
344
        """
345
        exc_message = getattr(exc, 'message', str(exc))
346
        error_message = '%s: %s' % (message, exc_message)
347
        traceback_message = traceback.format_exc()
348
349
        if isinstance(exc, SSHCommandTimeoutError):
350
            return_code = -9
351
            timeout = True
352
        else:
353
            timeout = False
354
            return_code = 255
355
356
        stdout = getattr(exc, 'stdout', None) or ''
357
        stderr = getattr(exc, 'stderr', None) or ''
358
359
        error_dict = {
360
            'failed': True,
361
            'succeeded': False,
362
            'timeout': timeout,
363
            'return_code': return_code,
364
            'stdout': stdout,
365
            'stderr': stderr,
366
            'error': error_message,
367
            'traceback': traceback_message,
368
        }
369
        return error_dict
370
371
    def __repr__(self):
372
        return ('<ParallelSSHClient hosts=%s,user=%s,id=%s>' %
373
                (repr(self._hosts), self._ssh_user, id(self)))
374