Passed
Pull Request — master (#3658)
by Lakshmi
05:23
created

PythonRunner   B

Complexity

Total Complexity 37

Size/Duplication

Total Lines 263
Duplicated Lines 0 %

Importance

Changes 4
Bugs 0 Features 2
Metric Value
c 4
b 0
f 2
dl 0
loc 263
rs 8.6
wmc 37

7 Methods

Rating   Name   Duplication   Size   Complexity  
F run() 0 102 10
D _get_output_values() 0 62 10
A _get_datastore_access_env_vars() 0 13 2
B _get_env_vars() 0 24 5
D _get_final_status() 0 26 8
A __init__() 0 20 1
A pre_run() 0 8 1
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import re
18
import sys
19
import json
20
import uuid
21
import functools
22
from StringIO import StringIO
23
from subprocess import list2cmdline
24
25
from eventlet.green import subprocess
26
from oslo_config import cfg
27
28
from st2common import log as logging
29
from st2common.persistence.pack import Pack
30
from st2common.runners.base import ActionRunner
31
from st2common.runners.base import get_metadata as get_runner_metadata
32
from st2common.util.green.shell import run_command
33
from st2common.constants.action import ACTION_OUTPUT_RESULT_DELIMITER
34
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
35
from st2common.constants.action import LIVEACTION_STATUS_FAILED
36
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
37
from st2common.constants.runners import PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE
38
from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST
39
from st2common.constants.runners import PYTHON_RUNNER_DEFAULT_ACTION_TIMEOUT
40
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME
41
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
42
from st2common.util.api import get_full_public_api_url
43
from st2common.util.pack import get_pack_common_libs_path
44
from st2common.util.sandboxing import get_sandbox_path
45
from st2common.util.sandboxing import get_sandbox_python_path
46
from st2common.util.sandboxing import get_sandbox_python_binary_path
47
from st2common.util.sandboxing import get_sandbox_virtualenv_path
48
from st2common.runners import python_action_wrapper
49
from st2common.services.action import store_execution_output_data
50
from st2common.runners.utils import make_read_and_store_stream_func
51
52
__all__ = [
53
    'PythonRunner',
54
55
    'get_runner',
56
    'get_runner_metadata',
57
]
58
59
LOG = logging.getLogger(__name__)
60
61
# constants to lookup in runner_parameters.
62
RUNNER_ENV = 'env'
63
RUNNER_TIMEOUT = 'timeout'
64
RUNNER_LOG_LEVEL = 'log_level'
65
66
# Environment variables which can't be specified by the user
67
BLACKLISTED_ENV_VARS = [
68
    # We don't allow user to override PYTHONPATH since this would break things
69
    'pythonpath'
70
]
71
72
BASE_DIR = os.path.dirname(os.path.abspath(python_action_wrapper.__file__))
73
WRAPPER_SCRIPT_NAME = 'python_action_wrapper.py'
74
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME)
75
76
77
class PythonRunner(ActionRunner):
78
79
    def __init__(self, runner_id, config=None, timeout=PYTHON_RUNNER_DEFAULT_ACTION_TIMEOUT,
80
                 log_level='debug', sandbox=True):
81
82
        """
83
        :param timeout: Action execution timeout in seconds.
84
        :type timeout: ``int``
85
86
        :param log_level: Log level to use for the child actions.
87
        :type log_level: ``str``
88
89
        :param sandbox: True to use python binary from pack-specific virtual environment for the
90
                        child action False to use a default system python binary from PATH.
91
        :type sandbox: ``bool``
92
        """
93
        super(PythonRunner, self).__init__(runner_id=runner_id)
94
        self._config = config
95
        self._timeout = timeout
96
        self._enable_common_pack_libs = cfg.CONF.packs.enable_common_libs or False
97
        self._log_level = log_level
98
        self._sandbox = sandbox
99
100
    def pre_run(self):
101
        super(PythonRunner, self).pre_run()
102
103
        # TODO: This is awful, but the way "runner_parameters" and other variables get assigned on
104
        # the runner instance is even worse. Those arguments should be passed to the constructor.
105
        self._env = self.runner_parameters.get(RUNNER_ENV, {})
106
        self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT, self._timeout)
107
        self._log_level = self.runner_parameters.get(RUNNER_LOG_LEVEL, self._log_level)
108
109
    def run(self, action_parameters):
110
        LOG.debug('Running pythonrunner.')
111
        LOG.debug('Getting pack name.')
112
        pack = self.get_pack_ref()
113
        pack_db = Pack.get_by_ref(pack)
114
        LOG.debug('Getting user.')
115
        user = self.get_user()
116
        LOG.debug('Serializing parameters.')
117
        serialized_parameters = json.dumps(action_parameters) if action_parameters else ''
118
        LOG.debug('Getting virtualenv_path.')
119
        virtualenv_path = get_sandbox_virtualenv_path(pack=pack)
120
        LOG.debug('Getting python path.')
121
        if self._sandbox:
122
            python_path = get_sandbox_python_binary_path(pack=pack)
123
        else:
124
            python_path = sys.executable
125
126
        LOG.debug('Checking virtualenv path.')
127
        if virtualenv_path and not os.path.isdir(virtualenv_path):
128
            format_values = {'pack': pack, 'virtualenv_path': virtualenv_path}
129
            msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values
130
            LOG.error('virtualenv_path set but not a directory: %s', msg)
131
            raise Exception(msg)
132
133
        LOG.debug('Checking entry_point.')
134
        if not self.entry_point:
135
            LOG.error('Action "%s" is missing entry_point attribute' % (self.action.name))
136
            raise Exception('Action "%s" is missing entry_point attribute' % (self.action.name))
137
138
        # Note: We pass config as command line args so the actual wrapper process is standalone
139
        # and doesn't need access to db
140
        LOG.debug('Setting args.')
141
        args = [
142
            python_path,
143
            '-u',  # unbuffered mode so streaming mode works as expected
144
            WRAPPER_SCRIPT_PATH,
145
            '--pack=%s' % (pack),
146
            '--file-path=%s' % (self.entry_point),
147
            '--parameters=%s' % (serialized_parameters),
148
            '--user=%s' % (user),
149
            '--parent-args=%s' % (json.dumps(sys.argv[1:])),
150
        ]
151
152
        if self._config:
153
            args.append('--config=%s' % (json.dumps(self._config)))
154
155
        if self._log_level != 'debug':
156
            # We only pass --log-level parameter if non default log level value is specified
157
            args.append('--log-level=%s' % (self._log_level))
158
159
        # We need to ensure all the st2 dependencies are also available to the
160
        # subprocess
161
        LOG.debug('Setting env.')
162
        env = os.environ.copy()
163
        env['PATH'] = get_sandbox_path(virtualenv_path=virtualenv_path)
164
165
        sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True,
166
                                                      inherit_parent_virtualenv=True)
167
        pack_common_libs_path = get_pack_common_libs_path(pack_db=pack_db)
168
169
        if self._enable_common_pack_libs and pack_common_libs_path:
170
            env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path
171
        else:
172
            env['PYTHONPATH'] = sandbox_python_path
173
174
        # Include user provided environment variables (if any)
175
        user_env_vars = self._get_env_vars()
176
        env.update(user_env_vars)
177
178
        # Include common st2 environment variables
179
        st2_env_vars = self._get_common_action_env_variables()
180
        env.update(st2_env_vars)
181
        datastore_env_vars = self._get_datastore_access_env_vars()
182
        env.update(datastore_env_vars)
183
184
        stdout = StringIO()
185
        stderr = StringIO()
186
187
        store_execution_stdout_line = functools.partial(store_execution_output_data,
188
                                                        output_type='stdout')
189
        store_execution_stderr_line = functools.partial(store_execution_output_data,
190
                                                        output_type='stderr')
191
192
        read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
193
            action_db=self.action, store_data_func=store_execution_stdout_line)
194
        read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
195
            action_db=self.action, store_data_func=store_execution_stderr_line)
196
197
        command_string = list2cmdline(args)
198
        LOG.debug('Running command: PATH=%s PYTHONPATH=%s %s' % (env['PATH'], env['PYTHONPATH'],
199
                                                                 command_string))
200
        exit_code, stdout, stderr, timed_out = run_command(cmd=args, stdout=subprocess.PIPE,
201
                                                           stderr=subprocess.PIPE, shell=False,
202
                                                           env=env,
203
                                                           timeout=self._timeout,
204
                                                           read_stdout_func=read_and_store_stdout,
205
                                                           read_stderr_func=read_and_store_stderr,
206
                                                           read_stdout_buffer=stdout,
207
                                                           read_stderr_buffer=stderr)
208
        LOG.debug('Returning values: %s, %s, %s, %s' % (exit_code, stdout, stderr, timed_out))
209
        LOG.debug('Returning.')
210
        return self._get_output_values(exit_code, stdout, stderr, timed_out)
211
212
    def _get_output_values(self, exit_code, stdout, stderr, timed_out):
213
        """
214
        Return sanitized output values.
215
216
        :return: Tuple with status, output and None
217
218
        :rtype: ``tuple``
219
        """
220
        if timed_out:
221
            error = 'Action failed to complete in %s seconds' % (self._timeout)
222
        else:
223
            error = None
224
225
        if exit_code == PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE:
226
            # TODO: Mark as failed instead
227
            raise ValueError(stderr)
228
229
        if ACTION_OUTPUT_RESULT_DELIMITER in stdout:
230
            split = stdout.split(ACTION_OUTPUT_RESULT_DELIMITER)
231
            assert len(split) == 3
232
            action_result = split[1].strip()
233
            stdout = split[0] + split[2]
234
        else:
235
            action_result = None
236
237
        # Parse the serialized action result object
238
        try:
239
            action_result = json.loads(action_result)
240
        except Exception as e:
241
            # Failed to de-serialize the result, probably it contains non-simple type or similar
242
            LOG.warning('Failed to de-serialize result "%s": %s' % (str(action_result), str(e)))
243
244
        if action_result:
245
            if isinstance(action_result, dict):
246
                result = action_result.get('result', None)
247
                status = action_result.get('status', None)
248
            else:
249
                # Failed to de-serialize action result aka result is a string
250
                match = re.search("'result': (.*?)$", action_result or '')
251
252
                if match:
253
                    action_result = match.groups()[0]
254
255
                result = action_result
256
                status = None
257
        else:
258
            result = 'None'
259
            status = None
260
261
        output = {
262
            'stdout': stdout,
263
            'stderr': stderr,
264
            'exit_code': exit_code,
265
            'result': result
266
        }
267
268
        if error:
269
            output['error'] = error
270
271
        status = self._get_final_status(action_status=status, timed_out=timed_out,
272
                                        exit_code=exit_code)
273
        return (status, output, None)
274
275
    def _get_final_status(self, action_status, timed_out, exit_code):
276
        """
277
        Return final status based on action's status, time out value and
278
        exit code. Example: succeeded, failed, timeout.
279
280
        :return: status
281
282
        :rtype: ``str``
283
        """
284
        if action_status is not None:
285
            if exit_code == 0 and action_status is True:
286
                status = LIVEACTION_STATUS_SUCCEEDED
287
            elif exit_code == 0 and action_status is False:
288
                status = LIVEACTION_STATUS_FAILED
289
            else:
290
                status = LIVEACTION_STATUS_FAILED
291
        else:
292
            if exit_code == 0:
293
                status = LIVEACTION_STATUS_SUCCEEDED
294
            else:
295
                status = LIVEACTION_STATUS_FAILED
296
297
        if timed_out:
298
            status = LIVEACTION_STATUS_TIMED_OUT
299
300
        return status
301
302
    def _get_env_vars(self):
303
        """
304
        Return sanitized environment variables which will be used when launching
305
        a subprocess.
306
307
        :rtype: ``dict``
308
        """
309
        env_vars = {}
310
311
        if self._env:
312
            env_vars.update(self._env)
313
314
        # Remove "blacklisted" environment variables
315
        to_delete = []
316
        for key, value in env_vars.items():
317
            if key.lower() in BLACKLISTED_ENV_VARS:
318
                to_delete.append(key)
319
320
        for key in to_delete:
321
            LOG.debug('User specified environment variable "%s" which is being ignored...' %
322
                      (key))
323
            del env_vars[key]
324
325
        return env_vars
326
327
    def _get_datastore_access_env_vars(self):
328
        """
329
        Return environment variables so datastore access using client (from st2client)
330
        is possible with actions. This is done to be compatible with sensors.
331
332
        :rtype: ``dict``
333
        """
334
        env_vars = {}
335
        if self.auth_token:
336
            env_vars[AUTH_TOKEN_ENV_VARIABLE_NAME] = self.auth_token.token
337
        env_vars[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url()
338
339
        return env_vars
340
341
342
def get_runner(config=None):
343
    return PythonRunner(runner_id=str(uuid.uuid4()), config=config)
344
345
346
def get_metadata():
347
    return get_runner_metadata('python_runner')
348