Completed
Pull Request — master (#2334)
by Edward
06:02
created

_get_env_vars()   A

Complexity

Conditions 2

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 14
rs 9.4286
cc 2
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
18
from oslo_config import cfg
19
import six
20
21
from st2actions.runners import ShellRunnerMixin
22
from st2actions.runners import ActionRunner
23
from st2actions.runners.ssh.parallel_ssh import ParallelSSHClient
24
from st2common import log as logging
25
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
26
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
27
from st2common.constants.action import LIVEACTION_STATUS_FAILED
28
from st2common.constants.runners import FABRIC_RUNNER_DEFAULT_ACTION_TIMEOUT
29
from st2common.exceptions.actionrunner import ActionRunnerPreRunError
30
from st2common.exceptions.ssh import InvalidCredentialsException
31
32
LOG = logging.getLogger(__name__)
33
34
# constants to lookup in runner_parameters.
35
RUNNER_HOSTS = 'hosts'
36
RUNNER_USERNAME = 'username'
37
RUNNER_PASSWORD = 'password'
38
RUNNER_PRIVATE_KEY = 'private_key'
39
RUNNER_PARALLEL = 'parallel'
40
RUNNER_SUDO = 'sudo'
41
RUNNER_ON_BEHALF_USER = 'user'
42
RUNNER_REMOTE_DIR = 'dir'
43
RUNNER_COMMAND = 'cmd'
44
RUNNER_CWD = 'cwd'
45
RUNNER_ENV = 'env'
46
RUNNER_KWARG_OP = 'kwarg_op'
47
RUNNER_TIMEOUT = 'timeout'
48
RUNNER_SSH_PORT = 'port'
49
RUNNER_BASTION_HOST = 'bastion_host'
50
51
52
class BaseParallelSSHRunner(ActionRunner, ShellRunnerMixin):
53
54
    def __init__(self, runner_id):
55
        super(BaseParallelSSHRunner, self).__init__(runner_id=runner_id)
56
        self._hosts = None
57
        self._parallel = True
58
        self._sudo = False
59
        self._on_behalf_user = None
60
        self._username = None
61
        self._password = None
62
        self._private_key = None
63
        self._kwarg_op = '--'
64
        self._cwd = None
65
        self._env = None
66
        self._timeout = None
67
        self._bastion_host = None
68
        self._on_behalf_user = cfg.CONF.system_user.user
69
70
        self._ssh_key_file = None
71
        ssh_key_file = cfg.CONF.system_user.ssh_key_file
72
        if ssh_key_file:
73
            ssh_key_file = os.path.expanduser(ssh_key_file)
74
            if os.path.exists(ssh_key_file):
75
                self._ssh_key_file = ssh_key_file
76
77
        self._parallel_ssh_client = None
78
        self._max_concurrency = cfg.CONF.ssh_runner.max_parallel_actions
79
80
    def pre_run(self):
81
        LOG.debug('Entering BaseParallelSSHRunner.pre_run() for liveaction_id="%s"',
82
                  self.liveaction_id)
83
        hosts = self.runner_parameters.get(RUNNER_HOSTS, '').split(',')
84
        self._hosts = [h.strip() for h in hosts if len(h) > 0]
85
        if len(self._hosts) < 1:
86
            raise ActionRunnerPreRunError('No hosts specified to run action for action %s.',
87
                                          self.liveaction_id)
88
        self._username = self.runner_parameters.get(RUNNER_USERNAME, None)
89
        self._password = self.runner_parameters.get(RUNNER_PASSWORD, None)
90
        self._private_key = self.runner_parameters.get(RUNNER_PRIVATE_KEY, None)
91
92
        if self._username:
93
            if not self._password and not self._private_key:
94
                msg = ('Either password or private_key data needs to be supplied for user: %s' %
95
                       self._username)
96
                raise InvalidCredentialsException(msg)
97
98
        self._username = self._username or cfg.CONF.system_user.user
99
        self._ssh_port = self.runner_parameters.get(RUNNER_SSH_PORT, 22)
100
        self._ssh_key_file = self._private_key or self._ssh_key_file
101
        self._parallel = self.runner_parameters.get(RUNNER_PARALLEL, True)
102
        self._sudo = self.runner_parameters.get(RUNNER_SUDO, False)
103
        self._sudo = self._sudo if self._sudo else False
104
        self._on_behalf_user = self.context.get(RUNNER_ON_BEHALF_USER, self._on_behalf_user)
105
        self._cwd = self.runner_parameters.get(RUNNER_CWD, None)
106
        self._env = self.runner_parameters.get(RUNNER_ENV, {})
107
        self._kwarg_op = self.runner_parameters.get(RUNNER_KWARG_OP, '--')
108
        self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT,
109
                                                   FABRIC_RUNNER_DEFAULT_ACTION_TIMEOUT)
110
        self._bastion_host = self.runner_parameters.get(RUNNER_BASTION_HOST, None)
111
112
        LOG.info('[BaseParallelSSHRunner="%s", liveaction_id="%s"] Finished pre_run.',
113
                 self.runner_id, self.liveaction_id)
114
115
        concurrency = int(len(self._hosts) / 3) + 1 if self._parallel else 1
116
        if concurrency > self._max_concurrency:
117
            LOG.debug('Limiting parallel SSH concurrency to %d.', concurrency)
118
            concurrency = self._max_concurrency
119
120
        if self._password:
121
            self._parallel_ssh_client = ParallelSSHClient(
122
                hosts=self._hosts,
123
                user=self._username, password=self._password,
124
                port=self._ssh_port, concurrency=concurrency,
125
                bastion_host=self._bastion_host,
126
                raise_on_any_error=False,
127
                connect=True
128
            )
129
        elif self._private_key:
130
            self._parallel_ssh_client = ParallelSSHClient(
131
                hosts=self._hosts,
132
                user=self._username, pkey_material=self._private_key,
133
                port=self._ssh_port, concurrency=concurrency,
134
                bastion_host=self._bastion_host,
135
                raise_on_any_error=False,
136
                connect=True
137
            )
138
        else:
139
            self._parallel_ssh_client = ParallelSSHClient(
140
                hosts=self._hosts,
141
                user=self._username, pkey_file=self._ssh_key_file,
142
                port=self._ssh_port, concurrency=concurrency,
143
                bastion_host=self._bastion_host,
144
                raise_on_any_error=False,
145
                connect=True
146
            )
147
148
    def _get_env_vars(self):
149
        """
150
        :rtype: ``dict``
151
        """
152
        env_vars = {}
153
154
        if self._env:
155
            env_vars.update(self._env)
156
157
        # Include common st2 env vars
158
        st2_env_vars = self._get_common_action_env_variables()
159
        env_vars.update(st2_env_vars)
160
161
        return env_vars
162
163
    @staticmethod
164
    def _get_result_status(result, allow_partial_failure):
165
        success = not allow_partial_failure
166
        timeout = True
167
168
        for r in six.itervalues(result):
169
            r_succeess = r.get('succeeded', False) if r else False
170
            r_timeout = r.get('timeout', False) if r else False
171
172
            timeout &= r_timeout
173
174
            if allow_partial_failure:
175
                success |= r_succeess
176
                if success:
177
                    break
178
            else:
179
                success &= r_succeess
180
                if not success:
181
                    break
182
183
        if success:
184
            status = LIVEACTION_STATUS_SUCCEEDED
185
        elif timeout:
186
            # Note: Right now we only set status to timeout if all the hosts have timed out
187
            status = LIVEACTION_STATUS_TIMED_OUT
188
        else:
189
            status = LIVEACTION_STATUS_FAILED
190
        return status
191