Passed
Push — master ( 6aeca2...dec5f2 )
by
unknown
03:57
created

ParallelSSHClient._mkdir()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
dl 0
loc 8
rs 9.4285
c 0
b 0
f 0
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import json
17
import re
18
import os
19
import traceback
20
21
import eventlet
22
from paramiko.ssh_exception import SSHException
23
24
from st2common.constants.secrets import MASKED_ATTRIBUTE_VALUE
25
from st2common.runners.paramiko_ssh import ParamikoSSHClient
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2common.runners.
Loading history...
26
from st2common.runners.paramiko_ssh import SSHCommandTimeoutError
0 ignored issues
show
Bug introduced by
The name paramiko_ssh does not seem to exist in module st2common.runners.
Loading history...
27
from st2common import log as logging
28
from st2common.exceptions.ssh import NoHostsConnectedToException
29
import st2common.util.jsonify as jsonify
30
from st2common.util import ip_utils
31
32
LOG = logging.getLogger(__name__)
33
34
35
class ParallelSSHClient(object):
36
    KEYS_TO_TRANSFORM = ['stdout', 'stderr']
37
    CONNECT_ERROR = 'Cannot connect to host.'
38
39
    def __init__(self, hosts, user=None, password=None, pkey_file=None, pkey_material=None, port=22,
40
                 bastion_host=None, concurrency=10, raise_on_any_error=False, connect=True,
41
                 passphrase=None, handle_stdout_line_func=None, handle_stderr_line_func=None,
42
                 sudo_password=False):
43
        """
44
        :param handle_stdout_line_func: Callback function which is called dynamically each time a
45
                                        new stdout line is received.
46
        :type handle_stdout_line_func: ``func``
47
48
        :param handle_stderr_line_func: Callback function which is called dynamically each time a
49
                                        new stderr line is received.
50
        :type handle_stderr_line_func: ``func``
51
        """
52
        self._ssh_user = user
53
54
        self._ssh_user = user
55
        self._ssh_key_file = pkey_file
56
        self._ssh_key_material = pkey_material
57
        self._ssh_password = password
58
        self._hosts = hosts
59
        self._successful_connects = 0
60
        self._ssh_port = port
61
        self._bastion_host = bastion_host
62
        self._passphrase = passphrase
63
        self._handle_stdout_line_func = handle_stdout_line_func
64
        self._handle_stderr_line_func = handle_stderr_line_func
65
        self._sudo_password = sudo_password
66
67
        if not hosts:
68
            raise Exception('Need an non-empty list of hosts to talk to.')
69
70
        self._pool = eventlet.GreenPool(concurrency)
71
        self._hosts_client = {}
72
        self._bad_hosts = {}
73
        self._scan_interval = 0.1
74
75
        if connect:
76
            connect_results = self.connect(raise_on_any_error=raise_on_any_error)
77
            extra = {'_connect_results': connect_results}
78
            LOG.debug('Connect to hosts complete.', extra=extra)
79
80
    def connect(self, raise_on_any_error=False):
81
        """
82
        Connect to hosts in hosts list. Returns status of connect as a dict.
83
84
        :param raise_on_any_error: Optional Raise an exception even if connecting to one
85
                                   of the hosts fails.
86
        :type raise_on_any_error: ``boolean``
87
88
        :rtype: ``dict`` of ``str`` to ``dict``
89
        """
90
        results = {}
91
92
        for host in self._hosts:
93
            while not self._pool.free():
94
                eventlet.sleep(self._scan_interval)
95
            self._pool.spawn(self._connect, host=host, results=results,
96
                             raise_on_any_error=raise_on_any_error)
97
98
        self._pool.waitall()
99
100
        if self._successful_connects < 1:
101
            # We definitely have to raise an exception in this case.
102
            LOG.error('Unable to connect to any of the hosts.',
103
                      extra={'connect_results': results})
104
            msg = ('Unable to connect to any one of the hosts: %s.\n\n connect_errors=%s' %
105
                   (self._hosts, json.dumps(results, indent=2)))
106
            raise NoHostsConnectedToException(msg)
107
108
        return results
109
110
    def run(self, cmd, timeout=None):
111
        """
112
        Run a command on remote hosts. Returns a dict containing results
113
        of execution from all hosts.
114
115
        :param cmd: Command to run. Must be shlex quoted.
116
        :type cmd: ``str``
117
118
        :param timeout: Optional Timeout for the command.
119
        :type timeout: ``int``
120
121
        :param cwd: Optional Current working directory. Must be shlex quoted.
122
        :type cwd: ``str``
123
124
        :rtype: ``dict`` of ``str`` to ``dict``
125
        """
126
127
        options = {
128
            'cmd': cmd,
129
            'timeout': timeout
130
        }
131
        results = self._execute_in_pool(self._run_command, **options)
132
        return results
133
134
    def put(self, local_path, remote_path, mode=None, mirror_local_mode=False):
135
        """
136
        Copy a file or folder to remote host.
137
138
        :param local_path: Path to local file or dir. Must be shlex quoted.
139
        :type local_path: ``str``
140
141
        :param remote_path: Path to remote file or dir. Must be shlex quoted.
142
        :type remote_path: ``str``
143
144
        :param mode: Optional mode to use for the file or dir.
145
        :type mode: ``int``
146
147
        :param mirror_local_mode: Optional Flag to mirror the mode
148
                                           on local file/dir on remote host.
149
        :type mirror_local_mode: ``boolean``
150
151
        :rtype: ``dict`` of ``str`` to ``dict``
152
        """
153
154
        if not os.path.exists(local_path):
155
            raise Exception('Local path %s does not exist.' % local_path)
156
157
        options = {
158
            'local_path': local_path,
159
            'remote_path': remote_path,
160
            'mode': mode,
161
            'mirror_local_mode': mirror_local_mode
162
        }
163
164
        return self._execute_in_pool(self._put_files, **options)
165
166
    def mkdir(self, path):
167
        """
168
        Create a directory on remote hosts.
169
170
        :param path: Path to remote dir that must be created. Must be shlex quoted.
171
        :type path: ``str``
172
173
        :rtype path: ``dict`` of ``str`` to ``dict``
174
        """
175
176
        options = {
177
            'path': path
178
        }
179
        return self._execute_in_pool(self._mkdir, **options)
180
181
    def delete_file(self, path):
182
        """
183
        Delete a file on remote hosts.
184
185
        :param path: Path to remote file that must be deleted. Must be shlex quoted.
186
        :type path: ``str``
187
188
        :rtype path: ``dict`` of ``str`` to ``dict``
189
        """
190
191
        options = {
192
            'path': path
193
        }
194
        return self._execute_in_pool(self._delete_file, **options)
195
196
    def delete_dir(self, path, force=False, timeout=None):
197
        """
198
        Delete a dir on remote hosts.
199
200
        :param path: Path to remote dir that must be deleted. Must be shlex quoted.
201
        :type path: ``str``
202
203
        :rtype path: ``dict`` of ``str`` to ``dict``
204
        """
205
206
        options = {
207
            'path': path,
208
            'force': force
209
        }
210
        return self._execute_in_pool(self._delete_dir, **options)
211
212
    def close(self):
213
        """
214
        Close all open SSH connections to hosts.
215
        """
216
217
        for host in self._hosts_client.keys():
218
            try:
219
                self._hosts_client[host].close()
220
            except:
221
                LOG.exception('Failed shutting down SSH connection to host: %s', host)
222
223
    def _execute_in_pool(self, execute_method, **kwargs):
224
        results = {}
225
226
        for host in self._bad_hosts.keys():
227
            results[host] = self._bad_hosts[host]
228
229
        for host in self._hosts_client.keys():
230
            while not self._pool.free():
231
                eventlet.sleep(self._scan_interval)
232
            self._pool.spawn(execute_method, host=host, results=results, **kwargs)
233
234
        self._pool.waitall()
235
        return results
236
237
    def _connect(self, host, results, raise_on_any_error=False):
238
        (hostname, port) = self._get_host_port_info(host)
239
240
        extra = {'host': host, 'port': port, 'user': self._ssh_user}
241
        if self._ssh_password:
242
            extra['password'] = '<redacted>'
243
        elif self._ssh_key_file:
244
            extra['key_file_path'] = self._ssh_key_file
245
        else:
246
            extra['private_key'] = '<redacted>'
247
248
        LOG.debug('Connecting to host.', extra=extra)
249
250
        client = ParamikoSSHClient(hostname=hostname, port=port,
251
                                   username=self._ssh_user,
252
                                   password=self._ssh_password,
253
                                   bastion_host=self._bastion_host,
254
                                   key_files=self._ssh_key_file,
255
                                   key_material=self._ssh_key_material,
256
                                   passphrase=self._passphrase,
257
                                   handle_stdout_line_func=self._handle_stdout_line_func,
258
                                   handle_stderr_line_func=self._handle_stderr_line_func)
259
        try:
260
            client.connect()
261
        except SSHException as ex:
262
            LOG.exception(ex)
263
            if raise_on_any_error:
264
                raise
265
            error_dict = self._generate_error_result(exc=ex, message='Connection error.')
266
            self._bad_hosts[hostname] = error_dict
267
            results[hostname] = error_dict
268
        except Exception as ex:
269
            error = 'Failed connecting to host %s.' % hostname
270
            LOG.exception(error)
271
            if raise_on_any_error:
272
                raise
273
            error_dict = self._generate_error_result(exc=ex, message=error)
274
            self._bad_hosts[hostname] = error_dict
275
            results[hostname] = error_dict
276
        else:
277
            self._successful_connects += 1
278
            self._hosts_client[hostname] = client
279
            results[hostname] = {'message': 'Connected to host.'}
280
281
    def _run_command(self, host, cmd, results, timeout=None):
282
        try:
283
            LOG.debug('Running command: %s on host: %s.', cmd, host)
284
            client = self._hosts_client[host]
285
            (stdout, stderr, exit_code) = client.run(cmd, timeout=timeout,
286
                                                     call_line_handler_func=True)
287
288
            result = self._handle_command_result(stdout=stdout, stderr=stderr, exit_code=exit_code)
289
            results[host] = result
290
        except Exception as ex:
291
            cmd = self._sanitize_command_string(cmd=cmd)
292
            error = 'Failed executing command "%s" on host "%s"' % (cmd, host)
293
            LOG.exception(error)
294
            results[host] = self._generate_error_result(exc=ex, message=error)
295
296
    def _put_files(self, local_path, remote_path, host, results, mode=None,
297
                   mirror_local_mode=False):
298
        try:
299
            LOG.debug('Copying file to host: %s' % host)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
300
            if os.path.isdir(local_path):
301
                result = self._hosts_client[host].put_dir(local_path, remote_path)
302
            else:
303
                result = self._hosts_client[host].put(local_path, remote_path,
304
                                                      mirror_local_mode=mirror_local_mode,
305
                                                      mode=mode)
306
            LOG.debug('Result of copy: %s' % result)
0 ignored issues
show
Coding Style Best Practice introduced by
Specify string format arguments as logging function parameters
Loading history...
307
            results[host] = result
308
        except Exception as ex:
309
            error = 'Failed sending file(s) in path %s to host %s' % (local_path, host)
310
            LOG.exception(error)
311
            results[host] = self._generate_error_result(exc=ex, message=error)
312
313
    def _mkdir(self, host, path, results):
314
        try:
315
            result = self._hosts_client[host].mkdir(path)
316
            results[host] = result
317
        except Exception as ex:
318
            error = 'Failed "mkdir %s" on host %s.' % (path, host)
319
            LOG.exception(error)
320
            results[host] = self._generate_error_result(exc=ex, message=error)
321
322
    def _delete_file(self, host, path, results):
323
        try:
324
            result = self._hosts_client[host].delete_file(path)
325
            results[host] = result
326
        except Exception as ex:
327
            error = 'Failed deleting file %s on host %s.' % (path, host)
328
            LOG.exception(error)
329
            results[host] = self._generate_error_result(exc=ex, message=error)
330
331
    def _delete_dir(self, host, path, results, force=False, timeout=None):
332
        try:
333
            result = self._hosts_client[host].delete_dir(path, force=force, timeout=timeout)
334
            results[host] = result
335
        except Exception as ex:
336
            error = 'Failed deleting dir %s on host %s.' % (path, host)
337
            LOG.exception(error)
338
            results[host] = self._generate_error_result(exc=ex, message=error)
339
340
    def _get_host_port_info(self, host_str):
341
        (hostname, port) = ip_utils.split_host_port(host_str)
342
        if not port:
343
            port = self._ssh_port
344
345
        return (hostname, port)
346
347
    def _handle_command_result(self, stdout, stderr, exit_code):
348
        # Detect if user provided an invalid sudo password or sudo is not configured for that user
349
        if self._sudo_password:
350
            if re.search('sudo: \d+ incorrect password attempts', stderr):
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \d was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
351
                match = re.search('\[sudo\] password for (.+?)\:', stderr)
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \[ was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Bug introduced by
A suspicious escape sequence \] was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
Bug introduced by
A suspicious escape sequence \: was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
352
353
                if match:
354
                    username = match.groups()[0]
355
                else:
356
                    username = 'unknown'
357
358
                error = ('Invalid sudo password provided or sudo is not configured for this user '
359
                        '(%s)' % (username))
360
                raise ValueError(error)
361
        is_succeeded = (exit_code == 0)
362
        result_dict = {'stdout': stdout, 'stderr': stderr, 'return_code': exit_code,
363
                       'succeeded': is_succeeded, 'failed': not is_succeeded}
364
365
        result = jsonify.json_loads(result_dict, ParallelSSHClient.KEYS_TO_TRANSFORM)
366
        return result
367
368
    @staticmethod
369
    def _sanitize_command_string(cmd):
370
        """
371
        Remove any potentially sensitive information from the command string.
372
373
        For now we only mask the values of the sensitive environment variables.
374
        """
375
        if not cmd:
376
            return cmd
377
378
        result = re.sub('ST2_ACTION_AUTH_TOKEN=(.+?)\s+?', 'ST2_ACTION_AUTH_TOKEN=%s ' %
0 ignored issues
show
Bug introduced by
A suspicious escape sequence \s was found. Did you maybe forget to add an r prefix?

Escape sequences in Python are generally interpreted according to rules similar to standard C. Only if strings are prefixed with r or R are they interpreted as regular expressions.

The escape sequence that was used indicates that you might have intended to write a regular expression.

Learn more about the available escape sequences. in the Python documentation.

Loading history...
379
                        (MASKED_ATTRIBUTE_VALUE), cmd)
380
        return result
381
382
    @staticmethod
383
    def _generate_error_result(exc, message):
384
        """
385
        :param exc: Raised exception.
386
        :type exc: Exception.
387
388
        :param message: Error message which will be prefixed to the exception exception message.
389
        :type message: ``str``
390
        """
391
        exc_message = getattr(exc, 'message', str(exc))
392
        error_message = '%s %s' % (message, exc_message)
393
        traceback_message = traceback.format_exc()
394
395
        if isinstance(exc, SSHCommandTimeoutError):
396
            return_code = -9
397
            timeout = True
398
        else:
399
            timeout = False
400
            return_code = 255
401
402
        stdout = getattr(exc, 'stdout', None) or ''
403
        stderr = getattr(exc, 'stderr', None) or ''
404
405
        error_dict = {
406
            'failed': True,
407
            'succeeded': False,
408
            'timeout': timeout,
409
            'return_code': return_code,
410
            'stdout': stdout,
411
            'stderr': stderr,
412
            'error': error_message,
413
            'traceback': traceback_message,
414
        }
415
        return error_dict
416
417
    def __repr__(self):
418
        return ('<ParallelSSHClient hosts=%s,user=%s,id=%s>' %
419
                (repr(self._hosts), self._ssh_user, id(self)))
420