GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Test Failed
Push — develop-v1.6.0 ( 9d5181...7efb31 )
by
unknown
04:49
created

BaseParallelSSHRunner.pre_run()   F

Complexity

Conditions 14

Size

Total Lines 73

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
dl 0
loc 73
rs 2.3293
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like BaseParallelSSHRunner.pre_run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
18
from oslo_config import cfg
19
import six
20
21
from st2actions.runners import ShellRunnerMixin
22
from st2actions.runners import ActionRunner
23
from st2common.constants.runners import REMOTE_RUNNER_PRIVATE_KEY_HEADER
24
from st2actions.runners.ssh.parallel_ssh import ParallelSSHClient
25
from st2common import log as logging
26
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
27
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
28
from st2common.constants.action import LIVEACTION_STATUS_FAILED
29
from st2common.constants.runners import REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT
30
from st2common.exceptions.actionrunner import ActionRunnerPreRunError
31
from st2common.exceptions.ssh import InvalidCredentialsException
32
33
__all__ = [
34
    'BaseParallelSSHRunner'
35
]
36
37
LOG = logging.getLogger(__name__)
38
39
# constants to lookup in runner_parameters.
40
RUNNER_HOSTS = 'hosts'
41
RUNNER_USERNAME = 'username'
42
RUNNER_PASSWORD = 'password'
43
RUNNER_PRIVATE_KEY = 'private_key'
44
RUNNER_PARALLEL = 'parallel'
45
RUNNER_SUDO = 'sudo'
46
RUNNER_ON_BEHALF_USER = 'user'
47
RUNNER_REMOTE_DIR = 'dir'
48
RUNNER_COMMAND = 'cmd'
49
RUNNER_CWD = 'cwd'
50
RUNNER_ENV = 'env'
51
RUNNER_KWARG_OP = 'kwarg_op'
52
RUNNER_TIMEOUT = 'timeout'
53
RUNNER_SSH_PORT = 'port'
54
RUNNER_BASTION_HOST = 'bastion_host'
55
RUNNER_PASSPHRASE = 'passphrase'
56
57
58
class BaseParallelSSHRunner(ActionRunner, ShellRunnerMixin):
59
60
    def __init__(self, runner_id):
61
        super(BaseParallelSSHRunner, self).__init__(runner_id=runner_id)
62
        self._hosts = None
63
        self._parallel = True
64
        self._sudo = False
65
        self._on_behalf_user = None
66
        self._username = None
67
        self._password = None
68
        self._private_key = None
69
        self._passphrase = None
70
        self._kwarg_op = '--'
71
        self._cwd = None
72
        self._env = None
73
        self._timeout = None
74
        self._bastion_host = None
75
        self._on_behalf_user = cfg.CONF.system_user.user
76
77
        self._ssh_key_file = None
78
        ssh_key_file = cfg.CONF.system_user.ssh_key_file
79
        if ssh_key_file:
80
            ssh_key_file = os.path.expanduser(ssh_key_file)
81
            if os.path.exists(ssh_key_file):
82
                self._ssh_key_file = ssh_key_file
83
84
        self._parallel_ssh_client = None
85
        self._max_concurrency = cfg.CONF.ssh_runner.max_parallel_actions
86
87
    def pre_run(self):
88
        super(BaseParallelSSHRunner, self).pre_run()
89
90
        LOG.debug('Entering BaseParallelSSHRunner.pre_run() for liveaction_id="%s"',
91
                  self.liveaction_id)
92
        hosts = self.runner_parameters.get(RUNNER_HOSTS, '').split(',')
93
        self._hosts = [h.strip() for h in hosts if len(h) > 0]
94
        if len(self._hosts) < 1:
95
            raise ActionRunnerPreRunError('No hosts specified to run action for action %s.',
96
                                          self.liveaction_id)
97
        self._username = self.runner_parameters.get(RUNNER_USERNAME, None)
98
        self._password = self.runner_parameters.get(RUNNER_PASSWORD, None)
99
        self._private_key = self.runner_parameters.get(RUNNER_PRIVATE_KEY, None)
100
        self._passphrase = self.runner_parameters.get(RUNNER_PASSPHRASE, None)
101
102
        if self._username:
103
            if not self._password and not self._private_key:
104
                msg = ('Either password or private_key data needs to be supplied for user: %s' %
105
                       self._username)
106
                raise InvalidCredentialsException(msg)
107
108
        self._username = self._username or cfg.CONF.system_user.user
109
        self._ssh_port = self.runner_parameters.get(RUNNER_SSH_PORT, 22)
110
        self._ssh_key_file = self._private_key or self._ssh_key_file
111
        self._parallel = self.runner_parameters.get(RUNNER_PARALLEL, True)
112
        self._sudo = self.runner_parameters.get(RUNNER_SUDO, False)
113
        self._sudo = self._sudo if self._sudo else False
114
        self._on_behalf_user = self.context.get(RUNNER_ON_BEHALF_USER, self._on_behalf_user)
115
        self._cwd = self.runner_parameters.get(RUNNER_CWD, None)
116
        self._env = self.runner_parameters.get(RUNNER_ENV, {})
117
        self._kwarg_op = self.runner_parameters.get(RUNNER_KWARG_OP, '--')
118
        self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT,
119
                                                   REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT)
120
        self._bastion_host = self.runner_parameters.get(RUNNER_BASTION_HOST, None)
121
122
        LOG.info('[BaseParallelSSHRunner="%s", liveaction_id="%s"] Finished pre_run.',
123
                 self.runner_id, self.liveaction_id)
124
125
        concurrency = int(len(self._hosts) / 3) + 1 if self._parallel else 1
126
        if concurrency > self._max_concurrency:
127
            LOG.debug('Limiting parallel SSH concurrency to %d.', concurrency)
128
            concurrency = self._max_concurrency
129
130
        client_kwargs = {
131
            'hosts': self._hosts,
132
            'user': self._username,
133
            'port': self._ssh_port,
134
            'concurrency': concurrency,
135
            'bastion_host': self._bastion_host,
136
            'raise_on_any_error': False,
137
            'connect': True
138
        }
139
140
        if self._password:
141
            client_kwargs['password'] = self._password
142
        elif self._private_key:
143
            # Determine if the private_key is a path to the key file or the raw key material
144
            is_key_material = self._is_private_key_material(private_key=self._private_key)
145
146
            if is_key_material:
147
                # Raw key material
148
                client_kwargs['pkey_material'] = self._private_key
149
            else:
150
                # Assume it's a path to the key file, verify the file exists
151
                client_kwargs['pkey_file'] = self._private_key
152
153
            if self._passphrase:
154
                client_kwargs['passphrase'] = self._passphrase
155
        else:
156
            # Default to stanley key file specified in the config
157
            client_kwargs['pkey_file'] = self._ssh_key_file
158
159
        self._parallel_ssh_client = ParallelSSHClient(**client_kwargs)
160
161
    def _is_private_key_material(self, private_key):
162
        return private_key and REMOTE_RUNNER_PRIVATE_KEY_HEADER in private_key.lower()
163
164
    def _get_env_vars(self):
165
        """
166
        :rtype: ``dict``
167
        """
168
        env_vars = {}
169
170
        if self._env:
171
            env_vars.update(self._env)
172
173
        # Include common st2 env vars
174
        st2_env_vars = self._get_common_action_env_variables()
175
        env_vars.update(st2_env_vars)
176
177
        return env_vars
178
179
    @staticmethod
180
    def _get_result_status(result, allow_partial_failure):
181
182
        if 'error' in result and 'traceback' in result:
183
            # Assume this is a global failure where the result dictionary doesn't contain entry
184
            # per host
185
            timeout = False
186
            success = result.get('succeeded', False)
187
            status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success,
188
                                                                               timeout=timeout)
189
            return status
190
191
        success = not allow_partial_failure
192
        timeout = True
193
194
        for r in six.itervalues(result):
195
            r_succeess = r.get('succeeded', False) if r else False
196
            r_timeout = r.get('timeout', False) if r else False
197
198
            timeout &= r_timeout
199
200
            if allow_partial_failure:
201
                success |= r_succeess
202
                if success:
203
                    break
204
            else:
205
                success &= r_succeess
206
                if not success:
207
                    break
208
209
        status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success,
210
                                                                           timeout=timeout)
211
212
        return status
213
214
    @staticmethod
215
    def _get_status_for_success_and_timeout(success, timeout):
216
        if success:
217
            status = LIVEACTION_STATUS_SUCCEEDED
218
        elif timeout:
219
            # Note: Right now we only set status to timeout if all the hosts have timed out
220
            status = LIVEACTION_STATUS_TIMED_OUT
221
        else:
222
            status = LIVEACTION_STATUS_FAILED
223
        return status
224