GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Passed
Push — develop ( 406a70...58fbab )
by Plexxi
14:12 queued 07:28
created

BaseParallelSSHRunner.__init__()   B

Complexity

Conditions 3

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 26
rs 8.8571
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
18
from oslo_config import cfg
19
import six
20
21
from st2actions.runners import ShellRunnerMixin
22
from st2actions.runners import ActionRunner
23
from st2common.constants.runners import REMOTE_RUNNER_PRIVATE_KEY_HEADER
24
from st2actions.runners.ssh.parallel_ssh import ParallelSSHClient
25
from st2common import log as logging
26
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
27
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
28
from st2common.constants.action import LIVEACTION_STATUS_FAILED
29
from st2common.constants.runners import REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT
30
from st2common.exceptions.actionrunner import ActionRunnerPreRunError
31
from st2common.exceptions.ssh import InvalidCredentialsException
32
33
__all__ = [
34
    'BaseParallelSSHRunner'
35
]
36
37
LOG = logging.getLogger(__name__)
38
39
# constants to lookup in runner_parameters.
40
RUNNER_HOSTS = 'hosts'
41
RUNNER_USERNAME = 'username'
42
RUNNER_PASSWORD = 'password'
43
RUNNER_PRIVATE_KEY = 'private_key'
44
RUNNER_PARALLEL = 'parallel'
45
RUNNER_SUDO = 'sudo'
46
RUNNER_ON_BEHALF_USER = 'user'
47
RUNNER_REMOTE_DIR = 'dir'
48
RUNNER_COMMAND = 'cmd'
49
RUNNER_CWD = 'cwd'
50
RUNNER_ENV = 'env'
51
RUNNER_KWARG_OP = 'kwarg_op'
52
RUNNER_TIMEOUT = 'timeout'
53
RUNNER_SSH_PORT = 'port'
54
RUNNER_BASTION_HOST = 'bastion_host'
55
RUNNER_PASSPHRASE = 'passphrase'
56
57
58
class BaseParallelSSHRunner(ActionRunner, ShellRunnerMixin):
59
60
    def __init__(self, runner_id):
61
        super(BaseParallelSSHRunner, self).__init__(runner_id=runner_id)
62
        self._hosts = None
63
        self._parallel = True
64
        self._sudo = False
65
        self._on_behalf_user = None
66
        self._username = None
67
        self._password = None
68
        self._private_key = None
69
        self._passphrase = None
70
        self._kwarg_op = '--'
71
        self._cwd = None
72
        self._env = None
73
        self._timeout = None
74
        self._bastion_host = None
75
        self._on_behalf_user = cfg.CONF.system_user.user
76
77
        self._ssh_key_file = None
78
        ssh_key_file = cfg.CONF.system_user.ssh_key_file
79
        if ssh_key_file:
80
            ssh_key_file = os.path.expanduser(ssh_key_file)
81
            if os.path.exists(ssh_key_file):
82
                self._ssh_key_file = ssh_key_file
83
84
        self._parallel_ssh_client = None
85
        self._max_concurrency = cfg.CONF.ssh_runner.max_parallel_actions
86
87
    def pre_run(self):
88
        LOG.debug('Entering BaseParallelSSHRunner.pre_run() for liveaction_id="%s"',
89
                  self.liveaction_id)
90
        hosts = self.runner_parameters.get(RUNNER_HOSTS, '').split(',')
91
        self._hosts = [h.strip() for h in hosts if len(h) > 0]
92
        if len(self._hosts) < 1:
93
            raise ActionRunnerPreRunError('No hosts specified to run action for action %s.',
94
                                          self.liveaction_id)
95
        self._username = self.runner_parameters.get(RUNNER_USERNAME, None)
96
        self._password = self.runner_parameters.get(RUNNER_PASSWORD, None)
97
        self._private_key = self.runner_parameters.get(RUNNER_PRIVATE_KEY, None)
98
        self._passphrase = self.runner_parameters.get(RUNNER_PASSPHRASE, None)
99
100
        if self._username:
101
            if not self._password and not self._private_key:
102
                msg = ('Either password or private_key data needs to be supplied for user: %s' %
103
                       self._username)
104
                raise InvalidCredentialsException(msg)
105
106
        self._username = self._username or cfg.CONF.system_user.user
107
        self._ssh_port = self.runner_parameters.get(RUNNER_SSH_PORT, 22)
108
        self._ssh_key_file = self._private_key or self._ssh_key_file
109
        self._parallel = self.runner_parameters.get(RUNNER_PARALLEL, True)
110
        self._sudo = self.runner_parameters.get(RUNNER_SUDO, False)
111
        self._sudo = self._sudo if self._sudo else False
112
        self._on_behalf_user = self.context.get(RUNNER_ON_BEHALF_USER, self._on_behalf_user)
113
        self._cwd = self.runner_parameters.get(RUNNER_CWD, None)
114
        self._env = self.runner_parameters.get(RUNNER_ENV, {})
115
        self._kwarg_op = self.runner_parameters.get(RUNNER_KWARG_OP, '--')
116
        self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT,
117
                                                   REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT)
118
        self._bastion_host = self.runner_parameters.get(RUNNER_BASTION_HOST, None)
119
120
        LOG.info('[BaseParallelSSHRunner="%s", liveaction_id="%s"] Finished pre_run.',
121
                 self.runner_id, self.liveaction_id)
122
123
        concurrency = int(len(self._hosts) / 3) + 1 if self._parallel else 1
124
        if concurrency > self._max_concurrency:
125
            LOG.debug('Limiting parallel SSH concurrency to %d.', concurrency)
126
            concurrency = self._max_concurrency
127
128
        client_kwargs = {
129
            'hosts': self._hosts,
130
            'user': self._username,
131
            'port': self._ssh_port,
132
            'concurrency': concurrency,
133
            'bastion_host': self._bastion_host,
134
            'raise_on_any_error': False,
135
            'connect': True
136
        }
137
138
        if self._password:
139
            client_kwargs['password'] = self._password
140
        elif self._private_key:
141
            # Determine if the private_key is a path to the key file or the raw key material
142
            is_key_material = self._is_private_key_material(private_key=self._private_key)
143
144
            if is_key_material:
145
                # Raw key material
146
                client_kwargs['pkey_material'] = self._private_key
147
            else:
148
                # Assume it's a path to the key file, verify the file exists
149
                client_kwargs['pkey_file'] = self._private_key
150
151
            if self._passphrase:
152
                client_kwargs['passphrase'] = self._passphrase
153
        else:
154
            # Default to stanley key file specified in the config
155
            client_kwargs['pkey_file'] = self._ssh_key_file
156
157
        self._parallel_ssh_client = ParallelSSHClient(**client_kwargs)
158
159
    def _is_private_key_material(self, private_key):
160
        return private_key and REMOTE_RUNNER_PRIVATE_KEY_HEADER in private_key.lower()
161
162
    def _get_env_vars(self):
163
        """
164
        :rtype: ``dict``
165
        """
166
        env_vars = {}
167
168
        if self._env:
169
            env_vars.update(self._env)
170
171
        # Include common st2 env vars
172
        st2_env_vars = self._get_common_action_env_variables()
173
        env_vars.update(st2_env_vars)
174
175
        return env_vars
176
177
    @staticmethod
178
    def _get_result_status(result, allow_partial_failure):
179
180
        if 'error' in result and 'traceback' in result:
181
            # Assume this is a global failure where the result dictionary doesn't contain entry
182
            # per host
183
            timeout = False
184
            success = result.get('succeeded', False)
185
            status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success,
186
                                                                               timeout=timeout)
187
            return status
188
189
        success = not allow_partial_failure
190
        timeout = True
191
192
        for r in six.itervalues(result):
193
            r_succeess = r.get('succeeded', False) if r else False
194
            r_timeout = r.get('timeout', False) if r else False
195
196
            timeout &= r_timeout
197
198
            if allow_partial_failure:
199
                success |= r_succeess
200
                if success:
201
                    break
202
            else:
203
                success &= r_succeess
204
                if not success:
205
                    break
206
207
        status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success,
208
                                                                           timeout=timeout)
209
210
        return status
211
212
    @staticmethod
213
    def _get_status_for_success_and_timeout(success, timeout):
214
        if success:
215
            status = LIVEACTION_STATUS_SUCCEEDED
216
        elif timeout:
217
            # Note: Right now we only set status to timeout if all the hosts have timed out
218
            status = LIVEACTION_STATUS_TIMED_OUT
219
        else:
220
            status = LIVEACTION_STATUS_FAILED
221
        return status
222