1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
0 ignored issues
–
show
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.liveaction -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.execution -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.liveaction -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.actionalias -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.db.action -> st2common.models.db.liveaction -> st2common.util.action_db -> st2common.persistence.action).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.executionstate -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.executionstate -> st2common.transport -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.runner -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
There seems to be a cyclic import (st2common.models.api.base -> st2common.util.schema -> st2common.util.action_db -> st2common.persistence.action -> st2common.persistence.liveaction -> st2common.persistence.base -> st2common.transport.reactor -> st2common.models.api.trace).
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.
Loading history...
|
|||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | from oslo_config import cfg |
||
18 | import six |
||
19 | |||
20 | from st2common.runners.base import ShellRunnerMixin |
||
21 | from st2common.runners.base import ActionRunner |
||
22 | from st2common.constants.runners import REMOTE_RUNNER_PRIVATE_KEY_HEADER |
||
23 | from st2common.runners.parallel_ssh import ParallelSSHClient |
||
24 | from st2common import log as logging |
||
25 | from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED |
||
26 | from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT |
||
27 | from st2common.constants.action import LIVEACTION_STATUS_FAILED |
||
28 | from st2common.constants.runners import REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT |
||
29 | from st2common.exceptions.actionrunner import ActionRunnerPreRunError |
||
30 | from st2common.services.action import store_execution_output_data |
||
31 | |||
32 | __all__ = [ |
||
33 | 'BaseParallelSSHRunner' |
||
34 | ] |
||
35 | |||
36 | LOG = logging.getLogger(__name__) |
||
37 | |||
38 | # constants to lookup in runner_parameters. |
||
39 | RUNNER_HOSTS = 'hosts' |
||
40 | RUNNER_USERNAME = 'username' |
||
41 | RUNNER_PASSWORD = 'password' |
||
42 | RUNNER_PRIVATE_KEY = 'private_key' |
||
43 | RUNNER_PARALLEL = 'parallel' |
||
44 | RUNNER_SUDO = 'sudo' |
||
45 | RUNNER_SUDO_PASSWORD = 'sudo_password' |
||
46 | RUNNER_ON_BEHALF_USER = 'user' |
||
47 | RUNNER_REMOTE_DIR = 'dir' |
||
48 | RUNNER_COMMAND = 'cmd' |
||
49 | RUNNER_CWD = 'cwd' |
||
50 | RUNNER_ENV = 'env' |
||
51 | RUNNER_KWARG_OP = 'kwarg_op' |
||
52 | RUNNER_TIMEOUT = 'timeout' |
||
53 | RUNNER_SSH_PORT = 'port' |
||
54 | RUNNER_BASTION_HOST = 'bastion_host' |
||
55 | RUNNER_PASSPHRASE = 'passphrase' |
||
56 | |||
57 | |||
58 | class BaseParallelSSHRunner(ActionRunner, ShellRunnerMixin): |
||
59 | |||
60 | def __init__(self, runner_id): |
||
61 | super(BaseParallelSSHRunner, self).__init__(runner_id=runner_id) |
||
62 | self._hosts = None |
||
63 | self._parallel = True |
||
64 | self._sudo = False |
||
65 | self._sudo_password = None |
||
66 | self._on_behalf_user = None |
||
67 | self._username = None |
||
68 | self._password = None |
||
69 | self._private_key = None |
||
70 | self._passphrase = None |
||
71 | self._kwarg_op = '--' |
||
72 | self._cwd = None |
||
73 | self._env = None |
||
74 | self._ssh_port = None |
||
75 | self._timeout = None |
||
76 | self._bastion_host = None |
||
77 | self._on_behalf_user = cfg.CONF.system_user.user |
||
78 | |||
79 | self._ssh_key_file = None |
||
80 | self._parallel_ssh_client = None |
||
81 | self._max_concurrency = cfg.CONF.ssh_runner.max_parallel_actions |
||
82 | |||
83 | def pre_run(self): |
||
84 | super(BaseParallelSSHRunner, self).pre_run() |
||
85 | |||
86 | LOG.debug('Entering BaseParallelSSHRunner.pre_run() for liveaction_id="%s"', |
||
87 | self.liveaction_id) |
||
88 | hosts = self.runner_parameters.get(RUNNER_HOSTS, '').split(',') |
||
89 | self._hosts = [h.strip() for h in hosts if len(h) > 0] |
||
90 | if len(self._hosts) < 1: |
||
91 | raise ActionRunnerPreRunError('No hosts specified to run action for action %s.', |
||
92 | self.liveaction_id) |
||
93 | self._username = self.runner_parameters.get(RUNNER_USERNAME, None) |
||
94 | self._password = self.runner_parameters.get(RUNNER_PASSWORD, None) |
||
95 | self._private_key = self.runner_parameters.get(RUNNER_PRIVATE_KEY, None) |
||
96 | self._passphrase = self.runner_parameters.get(RUNNER_PASSPHRASE, None) |
||
97 | |||
98 | self._ssh_port = self.runner_parameters.get(RUNNER_SSH_PORT, None) |
||
99 | self._ssh_key_file = self._private_key |
||
100 | self._parallel = self.runner_parameters.get(RUNNER_PARALLEL, True) |
||
101 | self._sudo = self.runner_parameters.get(RUNNER_SUDO, False) |
||
102 | self._sudo = self._sudo if self._sudo else False |
||
103 | self._sudo_password = self.runner_parameters.get(RUNNER_SUDO_PASSWORD, None) |
||
104 | |||
105 | if self.context: |
||
106 | self._on_behalf_user = self.context.get(RUNNER_ON_BEHALF_USER, self._on_behalf_user) |
||
107 | |||
108 | self._cwd = self.runner_parameters.get(RUNNER_CWD, None) |
||
109 | self._env = self.runner_parameters.get(RUNNER_ENV, {}) |
||
110 | self._kwarg_op = self.runner_parameters.get(RUNNER_KWARG_OP, '--') |
||
111 | self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT, |
||
112 | REMOTE_RUNNER_DEFAULT_ACTION_TIMEOUT) |
||
113 | self._bastion_host = self.runner_parameters.get(RUNNER_BASTION_HOST, None) |
||
114 | |||
115 | LOG.info('[BaseParallelSSHRunner="%s", liveaction_id="%s"] Finished pre_run.', |
||
116 | self.runner_id, self.liveaction_id) |
||
117 | |||
118 | concurrency = int(len(self._hosts) / 3) + 1 if self._parallel else 1 |
||
119 | if concurrency > self._max_concurrency: |
||
120 | LOG.debug('Limiting parallel SSH concurrency to %d.', concurrency) |
||
121 | concurrency = self._max_concurrency |
||
122 | |||
123 | client_kwargs = { |
||
124 | 'hosts': self._hosts, |
||
125 | 'user': self._username, |
||
126 | 'port': self._ssh_port, |
||
127 | 'concurrency': concurrency, |
||
128 | 'bastion_host': self._bastion_host, |
||
129 | 'raise_on_any_error': False, |
||
130 | 'connect': True |
||
131 | } |
||
132 | |||
133 | def make_store_stdout_line_func(execution_db, action_db): |
||
134 | def store_stdout_line(line): |
||
135 | if cfg.CONF.actionrunner.stream_output: |
||
136 | store_execution_output_data(execution_db=execution_db, action_db=action_db, |
||
137 | data=line, output_type='stdout') |
||
138 | |||
139 | return store_stdout_line |
||
140 | |||
141 | def make_store_stderr_line_func(execution_db, action_db): |
||
142 | def store_stderr_line(line): |
||
143 | if cfg.CONF.actionrunner.stream_output: |
||
144 | store_execution_output_data(execution_db=execution_db, action_db=action_db, |
||
145 | data=line, output_type='stderr') |
||
146 | |||
147 | return store_stderr_line |
||
148 | |||
149 | handle_stdout_line_func = make_store_stdout_line_func(execution_db=self.execution, |
||
150 | action_db=self.action) |
||
151 | handle_stderr_line_func = make_store_stderr_line_func(execution_db=self.execution, |
||
152 | action_db=self.action) |
||
153 | |||
154 | if len(self._hosts) == 1: |
||
155 | # We only support streaming output when running action on one host. That is because |
||
156 | # the action output is tied to a particulat execution. User can still achieve output |
||
157 | # streaming for multiple hosts by running one execution per host. |
||
158 | client_kwargs['handle_stdout_line_func'] = handle_stdout_line_func |
||
159 | client_kwargs['handle_stderr_line_func'] = handle_stderr_line_func |
||
160 | else: |
||
161 | LOG.debug('Real-time action output streaming is disabled, because action is running ' |
||
162 | 'on more than one host') |
||
163 | |||
164 | if self._password: |
||
165 | client_kwargs['password'] = self._password |
||
166 | elif self._private_key: |
||
167 | # Determine if the private_key is a path to the key file or the raw key material |
||
168 | is_key_material = self._is_private_key_material(private_key=self._private_key) |
||
169 | |||
170 | if is_key_material: |
||
171 | # Raw key material |
||
172 | client_kwargs['pkey_material'] = self._private_key |
||
173 | else: |
||
174 | # Assume it's a path to the key file, verify the file exists |
||
175 | client_kwargs['pkey_file'] = self._private_key |
||
176 | |||
177 | if self._passphrase: |
||
178 | client_kwargs['passphrase'] = self._passphrase |
||
179 | else: |
||
180 | # Default to stanley key file specified in the config |
||
181 | client_kwargs['pkey_file'] = self._ssh_key_file |
||
182 | |||
183 | if self._sudo_password: |
||
184 | client_kwargs['sudo_password'] = True |
||
185 | |||
186 | self._parallel_ssh_client = ParallelSSHClient(**client_kwargs) |
||
187 | |||
188 | def post_run(self, status, result): |
||
189 | super(BaseParallelSSHRunner, self).post_run(status=status, result=result) |
||
190 | |||
191 | # Ensure we close the connection when the action execution finishes |
||
192 | if self._parallel_ssh_client: |
||
193 | self._parallel_ssh_client.close() |
||
194 | |||
195 | def _is_private_key_material(self, private_key): |
||
196 | return private_key and REMOTE_RUNNER_PRIVATE_KEY_HEADER in private_key.lower() |
||
197 | |||
198 | def _get_env_vars(self): |
||
199 | """ |
||
200 | :rtype: ``dict`` |
||
201 | """ |
||
202 | env_vars = {} |
||
203 | |||
204 | if self._env: |
||
205 | env_vars.update(self._env) |
||
206 | |||
207 | # Include common st2 env vars |
||
208 | st2_env_vars = self._get_common_action_env_variables() |
||
209 | env_vars.update(st2_env_vars) |
||
210 | |||
211 | return env_vars |
||
212 | |||
213 | @staticmethod |
||
214 | def _get_result_status(result, allow_partial_failure): |
||
215 | |||
216 | if 'error' in result and 'traceback' in result: |
||
217 | # Assume this is a global failure where the result dictionary doesn't contain entry |
||
218 | # per host |
||
219 | timeout = False |
||
220 | success = result.get('succeeded', False) |
||
221 | status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success, |
||
222 | timeout=timeout) |
||
223 | return status |
||
224 | |||
225 | success = not allow_partial_failure |
||
226 | timeout = True |
||
227 | |||
228 | for r in six.itervalues(result): |
||
229 | r_succeess = r.get('succeeded', False) if r else False |
||
230 | r_timeout = r.get('timeout', False) if r else False |
||
231 | |||
232 | timeout &= r_timeout |
||
233 | |||
234 | if allow_partial_failure: |
||
235 | success |= r_succeess |
||
236 | if success: |
||
237 | break |
||
238 | else: |
||
239 | success &= r_succeess |
||
240 | if not success: |
||
241 | break |
||
242 | |||
243 | status = BaseParallelSSHRunner._get_status_for_success_and_timeout(success=success, |
||
244 | timeout=timeout) |
||
245 | |||
246 | return status |
||
247 | |||
248 | @staticmethod |
||
249 | def _get_status_for_success_and_timeout(success, timeout): |
||
250 | if success: |
||
251 | status = LIVEACTION_STATUS_SUCCEEDED |
||
252 | elif timeout: |
||
253 | # Note: Right now we only set status to timeout if all the hosts have timed out |
||
254 | status = LIVEACTION_STATUS_TIMED_OUT |
||
255 | else: |
||
256 | status = LIVEACTION_STATUS_FAILED |
||
257 | return status |
||
258 |
Cyclic imports may cause partly loaded modules to be returned. This might lead to unexpected runtime behavior which is hard to debug.