GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — develop-v1.3.1 ( 969842...8fa207 )
by
unknown
05:56
created

ParallelSSHClient._mkdir()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 8
rs 9.4285
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import re
18
import os
19
import traceback
20
21
import eventlet
22
23
from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE
24
from st2actions.runners.ssh.paramiko_ssh import ParamikoSSHClient
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2actions.runners.ssh.
Loading history...
25
from st2actions.runners.ssh.paramiko_ssh import SSHCommandTimeoutError
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2actions.runners.ssh.
Loading history...
26
from st2common import log as logging
27
from st2common.exceptions.ssh import NoHostsConnectedToException
28
import st2common.util.jsonify as jsonify
29
from st2common.util import ip_utils
30
31
LOG = logging.getLogger(__name__)
32
33
34
class ParallelSSHClient(object):
35
    KEYS_TO_TRANSFORM = ['stdout', 'stderr']
36
    CONNECT_ERROR = 'Cannot connect to host.'
37
38
    def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_material=None, port=22,
39
                 bastion_host=None, concurrency=10, raise_on_any_error=False, connect=True):
40
        self._ssh_user = user
41
        self._ssh_key_file = pkey_file
42
        self._ssh_key_material = pkey_material
43
        self._ssh_password = password
44
        self._hosts = hosts
45
        self._successful_connects = 0
46
        self._ssh_port = port
47
        self._bastion_host = bastion_host
48
49
        if not hosts:
50
            raise Exception('Need an non-empty list of hosts to talk to.')
51
52
        self._pool = eventlet.GreenPool(concurrency)
53
        self._hosts_client = {}
54
        self._bad_hosts = {}
55
        self._scan_interval = 0.1
56
57
        if connect:
58
            connect_results = self.connect(raise_on_any_error=raise_on_any_error)
59
            extra = {'_connect_results': connect_results}
60
            LOG.debug('Connect to hosts complete.', extra=extra)
61
62
    def connect(self, raise_on_any_error=False):
63
        """
64
        Connect to hosts in hosts list. Returns status of connect as a dict.
65
66
        :param raise_on_any_error: Optional Raise an exception even if connecting to one
67
                                   of the hosts fails.
68
        :type raise_on_any_error: ``boolean``
69
70
        :rtype: ``dict`` of ``str`` to ``dict``
71
        """
72
        results = {}
73
74
        for host in self._hosts:
75
            while not self._pool.free():
76
                eventlet.sleep(self._scan_interval)
77
            self._pool.spawn(self._connect, host=host, results=results,
78
                             raise_on_any_error=raise_on_any_error)
79
80
        self._pool.waitall()
81
82
        if self._successful_connects < 1:
83
            # We definitely have to raise an exception in this case.
84
            LOG.error('Unable to connect to any of the hosts.',
85
                      extra={'connect_results': results})
86
            msg = ('Unable to connect to any one of the hosts: %s.\n\n connect_errors=%s' %
87
                   (self._hosts, json.dumps(results, indent=2)))
88
            raise NoHostsConnectedToException(msg)
89
90
        return results
91
92
    def run(self, cmd, timeout=None):
93
        """
94
        Run a command on remote hosts. Returns a dict containing results
95
        of execution from all hosts.
96
97
        :param cmd: Command to run. Must be shlex quoted.
98
        :type cmd: ``str``
99
100
        :param timeout: Optional Timeout for the command.
101
        :type timeout: ``int``
102
103
        :param cwd: Optional Current working directory. Must be shlex quoted.
104
        :type cwd: ``str``
105
106
        :rtype: ``dict`` of ``str`` to ``dict``
107
        """
108
109
        options = {
110
            'cmd': cmd,
111
            'timeout': timeout
112
        }
113
        results = self._execute_in_pool(self._run_command, **options)
114
        return results
115
116
    def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
117
        """
118
        Copy a file or folder to remote host.
119
120
        :param local_path: Path to local file or dir. Must be shlex quoted.
121
        :type local_path: ``str``
122
123
        :param remote_path: Path to remote file or dir. Must be shlex quoted.
124
        :type remote_path: ``str``
125
126
        :param mode: Optional mode to use for the file or dir.
127
        :type mode: ``int``
128
129
        :param mirror_local_mode: Optional Flag to mirror the mode
130
                                           on local file/dir on remote host.
131
        :type mirror_local_mode: ``boolean``
132
133
        :rtype: ``dict`` of ``str`` to ``dict``
134
        """
135
136
        if not os.path.exists(local_path):
137
            raise Exception('Local path %s does not exist.' % local_path)
138
139
        options = {
140
            'local_path': local_path,
141
            'remote_path': remote_path,
142
            'mode': mode,
143
            'mirror_local_mode': mirror_local_mode
144
        }
145
146
        return self._execute_in_pool(self._put_files, **options)
147
148
    def mkdir(self, path):
149
        """
150
        Create a directory on remote hosts.
151
152
        :param path: Path to remote dir that must be created. Must be shlex quoted.
153
        :type path: ``str``
154
155
        :rtype path: ``dict`` of ``str`` to ``dict``
156
        """
157
158
        options = {
159
            'path': path
160
        }
161
        return self._execute_in_pool(self._mkdir, **options)
162
163
    def delete_file(self, path):
164
        """
165
        Delete a file on remote hosts.
166
167
        :param path: Path to remote file that must be deleted. Must be shlex quoted.
168
        :type path: ``str``
169
170
        :rtype path: ``dict`` of ``str`` to ``dict``
171
        """
172
173
        options = {
174
            'path': path
175
        }
176
        return self._execute_in_pool(self._delete_file, **options)
177
178
    def delete_dir(self, path, force=False, timeout=None):
179
        """
180
        Delete a dir on remote hosts.
181
182
        :param path: Path to remote dir that must be deleted. Must be shlex quoted.
183
        :type path: ``str``
184
185
        :rtype path: ``dict`` of ``str`` to ``dict``
186
        """
187
188
        options = {
189
            'path': path,
190
            'force': force
191
        }
192
        return self._execute_in_pool(self._delete_dir, **options)
193
194
    def close(self):
195
        """
196
        Close all open SSH connections to hosts.
197
        """
198
199
        for host in self._hosts_client.keys():
200
            try:
201
                self._hosts_client[host].close()
202
            except:
203
                LOG.exception('Failed shutting down SSH connection to host: %s', host)
204
205
    def _execute_in_pool(self, execute_method, **kwargs):
206
        results = {}
207
208
        for host in self._bad_hosts.keys():
209
            results[host] = self._bad_hosts[host]
210
211
        for host in self._hosts_client.keys():
212
            while not self._pool.free():
213
                eventlet.sleep(self._scan_interval)
214
            self._pool.spawn(execute_method, host=host, results=results, **kwargs)
215
216
        self._pool.waitall()
217
        return results
218
219
    def _connect(self, host, results, raise_on_any_error=False):
220
        (hostname, port) = self._get_host_port_info(host)
221
222
        extra = {'host': host, 'port': port, 'user': self._ssh_user}
223
        if self._ssh_password:
224
            extra['password'] = '<redacted>'
225
        elif self._ssh_key_file:
226
            extra['key_file_path'] = self._ssh_key_file
227
        else:
228
            extra['private_key'] = '<redacted>'
229
230
        LOG.debug('Connecting to host.', extra=extra)
231
232
        client = ParamikoSSHClient(hostname, username=self._ssh_user,
233
                                   password=self._ssh_password,
234
                                   key=self._ssh_key_file,
235
                                   key_material=self._ssh_key_material,
236
                                   port=port)
237
        try:
238
            client.connect()
239
        except Exception as ex:
240
            error = 'Failed connecting to host %s.' % hostname
241
            LOG.exception(error)
242
            if raise_on_any_error:
243
                raise
244
            error = ' '.join([self.CONNECT_ERROR, str(ex)])
245
            error_dict = self._generate_error_result(exc=ex, message=error)
246
            self._bad_hosts[hostname] = error_dict
247
            results[hostname] = error_dict
248
        else:
249
            self._successful_connects += 1
250
            self._hosts_client[hostname] = client
251
            results[hostname] = {'message': 'Connected to host.'}
252
253
    def _run_command(self, host, cmd, results, timeout=None):
254
        try:
255
            LOG.debug('Running command: %s on host: %s.', cmd, host)
256
            client = self._hosts_client[host]
257
            (stdout, stderr, exit_code) = client.run(cmd, timeout=timeout)
258
            is_succeeded = (exit_code == 0)
259
            result_dict = {'stdout': stdout, 'stderr': stderr, 'return_code': exit_code,
260
                           'succeeded': is_succeeded, 'failed': not is_succeeded}
261
            results[host] = jsonify.json_loads(result_dict, ParallelSSHClient.KEYS_TO_TRANSFORM)
262
        except Exception as ex:
263
            cmd = self._sanitize_command_string(cmd=cmd)
264
            error = 'Failed executing command "%s" on host "%s"' % (cmd, host)
265
            LOG.exception(error)
266
            results[host] = self._generate_error_result(exc=ex, message=error)
267
268
    def _put_files(self, local_path, remote_path, host, results, mode=None,
269
                   mirror_local_mode=False):
270
        try:
271
            LOG.debug('Copying file to host: %s' % host)
272
            if os.path.isdir(local_path):
273
                result = self._hosts_client[host].put_dir(local_path, remote_path)
274
            else:
275
                result = self._hosts_client[host].put(local_path, remote_path,
276
                                                      mirror_local_mode=mirror_local_mode,
277
                                                      mode=mode)
278
            LOG.debug('Result of copy: %s' % result)
279
            results[host] = result
280
        except Exception as ex:
281
            error = 'Failed sending file(s) in path %s to host %s' % (local_path, host)
282
            LOG.exception(error)
283
            results[host] = self._generate_error_result(exc=ex, message=error)
284
285
    def _mkdir(self, host, path, results):
286
        try:
287
            result = self._hosts_client[host].mkdir(path)
288
            results[host] = result
289
        except Exception as ex:
290
            error = 'Failed "mkdir %s" on host %s.' % (path, host)
291
            LOG.exception(error)
292
            results[host] = self._generate_error_result(exc=ex, message=error)
293
294
    def _delete_file(self, host, path, results):
295
        try:
296
            result = self._hosts_client[host].delete_file(path)
297
            results[host] = result
298
        except Exception as ex:
299
            error = 'Failed deleting file %s on host %s.' % (path, host)
300
            LOG.exception(error)
301
            results[host] = self._generate_error_result(exc=ex, message=error)
302
303
    def _delete_dir(self, host, path, results, force=False, timeout=None):
304
        try:
305
            result = self._hosts_client[host].delete_dir(path, force=force, timeout=timeout)
306
            results[host] = result
307
        except Exception as ex:
308
            error = 'Failed deleting dir %s on host %s.' % (path, host)
309
            LOG.exception(error)
310
            results[host] = self._generate_error_result(exc=ex, message=error)
311
312
    def _get_host_port_info(self, host_str):
313
        (hostname, port) = ip_utils.split_host_port(host_str)
314
        if not port:
315
            port = self._ssh_port
316
317
        return (hostname, port)
318
319
    @staticmethod
320
    def _sanitize_command_string(cmd):
321
        """
322
        Remove any potentially sensitive information from the command string.
323
324
        For now we only mask the values of the sensitive environment variables.
325
        """
326
        if not cmd:
327
            return cmd
328
329
        result = re.sub('ST2_ACTION_AUTH_TOKEN=(.+?)\s+?', 'ST2_ACTION_AUTH_TOKEN=%s ' %
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \s was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
330
                        (MASKED_ATTRIBUTE_VALUE), cmd)
331
        return result
332
333
    @staticmethod
334
    def _generate_error_result(exc, message):
335
        """
336
        :param exc: Raised exception.
337
        :type exc: Exception.
338
339
        :param message: Error message which will be prefixed to the exception exception message.
340
        :type message: ``str``
341
        """
342
        exc_message = getattr(exc, 'message', str(exc))
343
        error_message = '%s: %s' % (message, exc_message)
344
        traceback_message = traceback.format_exc()
345
346
        if isinstance(exc, SSHCommandTimeoutError):
347
            return_code = -9
348
            timeout = True
349
        else:
350
            timeout = False
351
            return_code = 255
352
353
        stdout = getattr(exc, 'stdout', None) or ''
354
        stderr = getattr(exc, 'stderr', None) or ''
355
356
        error_dict = {
357
            'failed': True,
358
            'succeeded': False,
359
            'timeout': timeout,
360
            'return_code': return_code,
361
            'stdout': stdout,
362
            'stderr': stderr,
363
            'error': error_message,
364
            'traceback': traceback_message,
365
        }
366
        return error_dict
367
368
    def __repr__(self):
369
        return ('<ParallelSSHClient hosts=%s,user=%s,id=%s>' %
370
                (repr(self._hosts), self._ssh_user, id(self)))
371