Test Failed
Push — master ( 21460f...e380d0 )
by Tomaz
01:48
created

st2common/st2common/runners/paramiko_ssh_runner.py (11 issues)

Labels
Severity
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
0 ignored issues
show
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.runner -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.db.liveaction -> st2common.util.action_db -> st2common.persistence.liveaction).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.liveaction -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.execution -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.liveaction -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.actionalias -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.db.action -> st2common.models.db.liveaction -> st2common.util.action_db -> st2common.persistence.action).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.executionstate -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.executionstate -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.runner -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.liveaction -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).

Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.

Loading history...
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
from oslo_config import cfg
18
import six
19
20
from st2common.runners.base import ShellRunnerMixin
21
from st2common.runners.base import ActionRunner
22
from st2common.constants.runners import REMOTE_RUNNER_PRIVATE_KEY_HEADER
23
from st2common.runners.parallel_ssh import ParallelSSHClient
24
from st2common import log as logging
25
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
26
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
27
from st2common.constants.action import LIVEACTION_STATUS_FAILED
28
from st2common.constants.runners import REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT
29
from st2common.exceptions.actionrunner import ActionRunnerPreRunError
30
from st2common.services.action import store_execution_output_data
31
32
__all__ = [
33
    'BaseParallelSSHRunner'
34
]
35
36
LOG = logging.getLogger(__name__)
37
38
# constants to lookup in runner_parameters.
39
RUNNER_HOSTS = 'hosts'
40
RUNNER_USERNAME = 'username'
41
RUNNER_PASSWORD = 'password'
42
RUNNER_PRIVATE_KEY = 'private_key'
43
RUNNER_PARALLEL = 'parallel'
44
RUNNER_SUDO = 'sudo'
45
RUNNER_SUDO_PASSWORD = 'sudo_password'
46
RUNNER_ON_BEHALF_USER = 'user'
47
RUNNER_REMOTE_DIR = 'dir'
48
RUNNER_COMMAND = 'cmd'
49
RUNNER_CWD = 'cwd'
50
RUNNER_ENV = 'env'
51
RUNNER_KWARG_OP = 'kwarg_op'
52
RUNNER_TIMEOUT = 'timeout'
53
RUNNER_SSH_PORT = 'port'
54
RUNNER_BASTION_HOST = 'bastion_host'
55
RUNNER_PASSPHRASE = 'passphrase'
56
57
58
class BaseParallelSSHRunner(ActionRunner, ShellRunnerMixin):
59
60
    def __init__(self, runner_id):
61
        super(BaseParallelSSHRunner, self).__init__(runner_id=runner_id)
62
        self._hosts = None
63
        self._parallel = True
64
        self._sudo = False
65
        self._sudo_password = None
66
        self._on_behalf_user = None
67
        self._username = None
68
        self._password = None
69
        self._private_key = None
70
        self._passphrase = None
71
        self._kwarg_op = '--'
72
        self._cwd = None
73
        self._env = None
74
        self._ssh_port = None
75
        self._timeout = None
76
        self._bastion_host = None
77
        self._on_behalf_user = cfg.CONF.system_user.user
78
79
        self._ssh_key_file = None
80
        self._parallel_ssh_client = None
81
        self._max_concurrency = cfg.CONF.ssh_runner.max_parallel_actions
82
83
    def pre_run(self):
84
        super(BaseParallelSSHRunner, self).pre_run()
85
86
        LOG.debug('Entering BaseParallelSSHRunner.pre_run() for liveaction_id="%s"',
87
                  self.liveaction_id)
88
        hosts = self.runner_parameters.get(RUNNER_HOSTS, '').split(',')
89
        self._hosts = [h.strip() for h in hosts if len(h) > 0]
90
        if len(self._hosts) < 1:
91
            raise ActionRunnerPreRunError('No hosts specified to run action for action %s.',
92
                                          self.liveaction_id)
93
        self._username = self.runner_parameters.get(RUNNER_USERNAME, None)
94
        self._password = self.runner_parameters.get(RUNNER_PASSWORD, None)
95
        self._private_key = self.runner_parameters.get(RUNNER_PRIVATE_KEY, None)
96
        self._passphrase = self.runner_parameters.get(RUNNER_PASSPHRASE, None)
97
98
        self._ssh_port = self.runner_parameters.get(RUNNER_SSH_PORT, None)
99
        self._ssh_key_file = self._private_key
100
        self._parallel = self.runner_parameters.get(RUNNER_PARALLEL, True)
101
        self._sudo = self.runner_parameters.get(RUNNER_SUDO, False)
102
        self._sudo = self._sudo if self._sudo else False
103
        self._sudo_password = self.runner_parameters.get(RUNNER_SUDO_PASSWORD, None)
104
105
        if self.context:
106
            self._on_behalf_user = self.context.get(RUNNER_ON_BEHALF_USER, self._on_behalf_user)
107
108
        self._cwd = self.runner_parameters.get(RUNNER_CWD, None)
109
        self._env = self.runner_parameters.get(RUNNER_ENV, {})
110
        self._kwarg_op = self.runner_parameters.get(RUNNER_KWARG_OP, '--')
111
        self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT,
112
                                                   REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT)
113
        self._bastion_host = self.runner_parameters.get(RUNNER_BASTION_HOST, None)
114
115
        LOG.info('[BaseParallelSSHRunner="%s", liveaction_id="%s"] Finished pre_run.',
116
                 self.runner_id, self.liveaction_id)
117
118
        concurrency = int(len(self._hosts) / 3) + 1 if self._parallel else 1
119
        if concurrency > self._max_concurrency:
120
            LOG.debug('Limiting parallel SSH concurrency to %d.', concurrency)
121
            concurrency = self._max_concurrency
122
123
        client_kwargs = {
124
            'hosts': self._hosts,
125
            'user': self._username,
126
            'port': self._ssh_port,
127
            'concurrency': concurrency,
128
            'bastion_host': self._bastion_host,
129
            'raise_on_any_error': False,
130
            'connect': True
131
        }
132
133
        def make_store_stdout_line_func(execution_db, action_db):
134
            def store_stdout_line(line):
135
                if cfg.CONF.actionrunner.stream_output:
136
                    store_execution_output_data(execution_db=execution_db, action_db=action_db,
137
                                                data=line, output_type='stdout')
138
139
            return store_stdout_line
140
141
        def make_store_stderr_line_func(execution_db, action_db):
142
            def store_stderr_line(line):
143
                if cfg.CONF.actionrunner.stream_output:
144
                    store_execution_output_data(execution_db=execution_db, action_db=action_db,
145
                                                data=line, output_type='stderr')
146
147
            return store_stderr_line
148
149
        handle_stdout_line_func = make_store_stdout_line_func(execution_db=self.execution,
150
                                                              action_db=self.action)
151
        handle_stderr_line_func = make_store_stderr_line_func(execution_db=self.execution,
152
                                                              action_db=self.action)
153
154
        if len(self._hosts) == 1:
155
            # We only support streaming output when running action on one host. That is because
156
            # the action output is tied to a particulat execution. User can still achieve output
157
            # streaming for multiple hosts by running one execution per host.
158
            client_kwargs['handle_stdout_line_func'] = handle_stdout_line_func
159
            client_kwargs['handle_stderr_line_func'] = handle_stderr_line_func
160
        else:
161
            LOG.debug('Real-time action output streaming is disabled, because action is running '
162
                      'on more than one host')
163
164
        if self._password:
165
            client_kwargs['password'] = self._password
166
        elif self._private_key:
167
            # Determine if the private_key is a path to the key file or the raw key material
168
            is_key_material = self._is_private_key_material(private_key=self._private_key)
169
170
            if is_key_material:
171
                # Raw key material
172
                client_kwargs['pkey_material'] = self._private_key
173
            else:
174
                # Assume it's a path to the key file, verify the file exists
175
                client_kwargs['pkey_file'] = self._private_key
176
177
            if self._passphrase:
178
                client_kwargs['passphrase'] = self._passphrase
179
        else:
180
            # Default to stanley key file specified in the config
181
            client_kwargs['pkey_file'] = self._ssh_key_file
182
183
        if self._sudo_password:
184
            client_kwargs['sudo_password'] = True
185
186
        self._parallel_ssh_client = ParallelSSHClient(**client_kwargs)
187
188
    def post_run(self, status, result):
189
        super(BaseParallelSSHRunner, self).post_run(status=status, result=result)
190
191
        # Ensure we close the connection when the action execution finishes
192
        if self._parallel_ssh_client:
193
            self._parallel_ssh_client.close()
194
195
    def _is_private_key_material(self, private_key):
196
        return private_key and REMOTE_RUNNER_PRIVATE_KEY_HEADER in private_key.lower()
197
198
    def _get_env_vars(self):
199
        """
200
        :rtype: ``dict``
201
        """
202
        env_vars = {}
203
204
        if self._env:
205
            env_vars.update(self._env)
206
207
        # Include common st2 env vars
208
        st2_env_vars = self._get_common_action_env_variables()
209
        env_vars.update(st2_env_vars)
210
211
        return env_vars
212
213
    @staticmethod
214
    def _get_result_status(result, allow_partial_failure):
215
216
        if 'error' in result and 'traceback' in result:
217
            # Assume this is a global failure where the result dictionary doesn't contain entry
218
            # per host
219
            timeout = False
220
            success = result.get('succeeded', False)
221
            status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success,
222
                                                                               timeout=timeout)
223
            return status
224
225
        success = not allow_partial_failure
226
        timeout = True
227
228
        for r in six.itervalues(result):
229
            r_succeess = r.get('succeeded', False) if r else False
230
            r_timeout = r.get('timeout', False) if r else False
231
232
            timeout &= r_timeout
233
234
            if allow_partial_failure:
235
                success |= r_succeess
236
                if success:
237
                    break
238
            else:
239
                success &= r_succeess
240
                if not success:
241
                    break
242
243
        status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success,
244
                                                                           timeout=timeout)
245
246
        return status
247
248
    @staticmethod
249
    def _get_status_for_success_and_timeout(success, timeout):
250
        if success:
251
            status = LIVEACTION_STATUS_SUCCEEDED
252
        elif timeout:
253
            # Note: Right now we only set status to timeout if all the hosts have timed out
254
            status = LIVEACTION_STATUS_TIMED_OUT
255
        else:
256
            status = LIVEACTION_STATUS_FAILED
257
        return status
258