Test Failed
Pull Request — master (#3658)
by Lakshmi
06:41
created

PythonRunner.run()   C

Complexity

Conditions 7

Size

Total Lines 90

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
dl 0
loc 90
rs 5.3392
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import os
17
import re
18
import sys
19
import json
20
import uuid
21
import functools
22
from StringIO import StringIO
23
from subprocess import list2cmdline
24
25
from eventlet.green import subprocess
26
from oslo_config import cfg
27
28
from st2common import log as logging
29
from st2common.persistence.pack import Pack
30
from st2common.runners.base import ActionRunner
31
from st2common.util.green.shell import run_command
32
from st2common.constants.action import ACTION_OUTPUT_RESULT_DELIMITER
33
from st2common.constants.action import LIVEACTION_STATUS_SUCCEEDED
34
from st2common.constants.action import LIVEACTION_STATUS_FAILED
35
from st2common.constants.action import LIVEACTION_STATUS_TIMED_OUT
36
from st2common.constants.runners import PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE
37
from st2common.constants.error_messages import PACK_VIRTUALENV_DOESNT_EXIST
38
from st2common.constants.runners import PYTHON_RUNNER_DEFAULT_ACTION_TIMEOUT
39
from st2common.constants.system import API_URL_ENV_VARIABLE_NAME
40
from st2common.constants.system import AUTH_TOKEN_ENV_VARIABLE_NAME
41
from st2common.util.api import get_full_public_api_url
42
from st2common.util.pack import get_pack_common_libs_path
43
from st2common.util.sandboxing import get_sandbox_path
44
from st2common.util.sandboxing import get_sandbox_python_path
45
from st2common.util.sandboxing import get_sandbox_python_binary_path
46
from st2common.util.sandboxing import get_sandbox_virtualenv_path
47
from st2common.runners import python_action_wrapper
48
from st2common.services.action import store_execution_output_data
49
from st2common.runners.utils import make_read_and_store_stream_func
50
51
LOG = logging.getLogger(__name__)
52
53
__all__ = [
54
    'get_runner',
55
    'PythonRunner',
56
]
57
58
# constants to lookup in runner_parameters.
59
RUNNER_ENV = 'env'
60
RUNNER_TIMEOUT = 'timeout'
61
62
# Environment variables which can't be specified by the user
63
BLACKLISTED_ENV_VARS = [
64
    # We don't allow user to override PYTHONPATH since this would break things
65
    'pythonpath'
66
]
67
68
BASE_DIR = os.path.dirname(os.path.abspath(python_action_wrapper.__file__))
69
WRAPPER_SCRIPT_NAME = 'python_action_wrapper.py'
70
WRAPPER_SCRIPT_PATH = os.path.join(BASE_DIR, WRAPPER_SCRIPT_NAME)
71
72
73
def get_runner():
74
    'RunnerTestCase',
75
    return PythonRunner(str(uuid.uuid4()))
76
77
78
class PythonRunner(ActionRunner):
79
80
    def __init__(self, runner_id, timeout=PYTHON_RUNNER_DEFAULT_ACTION_TIMEOUT):
81
        """
82
        :param timeout: Action execution timeout in seconds.
83
        :type timeout: ``int``
84
        """
85
        super(PythonRunner, self).__init__(runner_id=runner_id)
86
        self._timeout = timeout
87
        self._enable_common_pack_libs = cfg.CONF.packs.enable_common_libs or False
88
89
    def pre_run(self):
90
        super(PythonRunner, self).pre_run()
91
92
        # TODO :This is awful, but the way "runner_parameters" and other variables get
93
        # assigned on the runner instance is even worse. Those arguments should
94
        # be passed to the constructor.
95
        self._env = self.runner_parameters.get(RUNNER_ENV, {})
96
        self._timeout = self.runner_parameters.get(RUNNER_TIMEOUT, self._timeout)
97
98
    def run(self, action_parameters):
99
        LOG.debug('Running pythonrunner.')
100
        LOG.debug('Getting pack name.')
101
        pack = self.get_pack_ref()
102
        pack_db = Pack.get_by_ref(pack)
103
        LOG.debug('Getting user.')
104
        user = self.get_user()
105
        LOG.debug('Serializing parameters.')
106
        serialized_parameters = json.dumps(action_parameters) if action_parameters else ''
107
        LOG.debug('Getting virtualenv_path.')
108
        virtualenv_path = get_sandbox_virtualenv_path(pack=pack)
109
        LOG.debug('Getting python path.')
110
        python_path = get_sandbox_python_binary_path(pack=pack)
111
112
        LOG.debug('Checking virtualenv path.')
113
        if virtualenv_path and not os.path.isdir(virtualenv_path):
114
            format_values = {'pack': pack, 'virtualenv_path': virtualenv_path}
115
            msg = PACK_VIRTUALENV_DOESNT_EXIST % format_values
116
            LOG.error('virtualenv_path set but not a directory: %s', msg)
117
            raise Exception(msg)
118
119
        LOG.debug('Checking entry_point.')
120
        if not self.entry_point:
121
            LOG.error('Action "%s" is missing entry_point attribute' % (self.action.name))
122
            raise Exception('Action "%s" is missing entry_point attribute' % (self.action.name))
123
124
        LOG.debug('Setting args.')
125
        args = [
126
            python_path,
127
            '-u',
128
            WRAPPER_SCRIPT_PATH,
129
            '--pack=%s' % (pack),
130
            '--file-path=%s' % (self.entry_point),
131
            '--parameters=%s' % (serialized_parameters),
132
            '--user=%s' % (user),
133
            '--parent-args=%s' % (json.dumps(sys.argv[1:]))
134
        ]
135
136
        # We need to ensure all the st2 dependencies are also available to the
137
        # subprocess
138
        LOG.debug('Setting env.')
139
        env = os.environ.copy()
140
        env['PATH'] = get_sandbox_path(virtualenv_path=virtualenv_path)
141
142
        sandbox_python_path = get_sandbox_python_path(inherit_from_parent=True,
143
                                                      inherit_parent_virtualenv=True)
144
        pack_common_libs_path = get_pack_common_libs_path(pack_db=pack_db)
145
146
        if self._enable_common_pack_libs and pack_common_libs_path:
147
            env['PYTHONPATH'] = pack_common_libs_path + ':' + sandbox_python_path
148
        else:
149
            env['PYTHONPATH'] = sandbox_python_path
150
151
        # Include user provided environment variables (if any)
152
        user_env_vars = self._get_env_vars()
153
        env.update(user_env_vars)
154
155
        # Include common st2 environment variables
156
        st2_env_vars = self._get_common_action_env_variables()
157
        env.update(st2_env_vars)
158
        datastore_env_vars = self._get_datastore_access_env_vars()
159
        env.update(datastore_env_vars)
160
161
        stdout = StringIO()
162
        stderr = StringIO()
163
164
        store_execution_stdout_line = functools.partial(store_execution_output_data,
165
                                                        output_type='stdout')
166
        store_execution_stderr_line = functools.partial(store_execution_output_data,
167
                                                        output_type='stderr')
168
169
        read_and_store_stdout = make_read_and_store_stream_func(execution_db=self.execution,
170
            action_db=self.action, store_data_func=store_execution_stdout_line)
171
        read_and_store_stderr = make_read_and_store_stream_func(execution_db=self.execution,
172
            action_db=self.action, store_data_func=store_execution_stderr_line)
173
174
        command_string = list2cmdline(args)
175
        LOG.debug('Running command: PATH=%s PYTHONPATH=%s %s' % (env['PATH'], env['PYTHONPATH'],
176
                                                                 command_string))
177
        exit_code, stdout, stderr, timed_out = run_command(cmd=args, stdout=subprocess.PIPE,
178
                                                           stderr=subprocess.PIPE, shell=False,
179
                                                           env=env,
180
                                                           timeout=self._timeout,
181
                                                           read_stdout_func=read_and_store_stdout,
182
                                                           read_stderr_func=read_and_store_stderr,
183
                                                           read_stdout_buffer=stdout,
184
                                                           read_stderr_buffer=stderr)
185
        LOG.debug('Returning values: %s, %s, %s, %s' % (exit_code, stdout, stderr, timed_out))
186
        LOG.debug('Returning.')
187
        return self._get_output_values(exit_code, stdout, stderr, timed_out)
188
189
    def _get_output_values(self, exit_code, stdout, stderr, timed_out):
190
        """
191
        Return sanitized output values.
192
193
        :return: Tuple with status, output and None
194
195
        :rtype: ``tuple``
196
        """
197
        if timed_out:
198
            error = 'Action failed to complete in %s seconds' % (self._timeout)
199
        else:
200
            error = None
201
202
        if exit_code == PYTHON_RUNNER_INVALID_ACTION_STATUS_EXIT_CODE:
203
            # TODO: Mark as failed instead
204
            raise ValueError(stderr)
205
206
        if ACTION_OUTPUT_RESULT_DELIMITER in stdout:
207
            split = stdout.split(ACTION_OUTPUT_RESULT_DELIMITER)
208
            assert len(split) == 3
209
            action_result = split[1].strip()
210
            stdout = split[0] + split[2]
211
        else:
212
            action_result = None
213
214
        # Parse the serialized action result object
215
        try:
216
            action_result = json.loads(action_result)
217
        except Exception as e:
218
            # Failed to de-serialize the result, probably it contains non-simple type or similar
219
            LOG.warning('Failed to de-serialize result "%s": %s' % (str(action_result), str(e)))
220
221
        if action_result:
222
            if isinstance(action_result, dict):
223
                result = action_result.get('result', None)
224
                status = action_result.get('status', None)
225
            else:
226
                # Failed to de-serialize action result aka result is a string
227
                match = re.search("'result': (.*?)$", action_result or '')
228
229
                if match:
230
                    action_result = match.groups()[0]
231
232
                result = action_result
233
                status = None
234
        else:
235
            result = 'None'
236
            status = None
237
238
        output = {
239
            'stdout': stdout,
240
            'stderr': stderr,
241
            'exit_code': exit_code,
242
            'result': result
243
        }
244
245
        if error:
246
            output['error'] = error
247
248
        status = self._get_final_status(action_status=status, timed_out=timed_out,
249
                                        exit_code=exit_code)
250
        return (status, output, None)
251
252
    def _get_final_status(self, action_status, timed_out, exit_code):
253
        """
254
        Return final status based on action's status, time out value and
255
        exit code. Example: succeeded, failed, timeout.
256
257
        :return: status
258
259
        :rtype: ``str``
260
        """
261
        if action_status is not None:
262
            if exit_code == 0 and action_status is True:
263
                status = LIVEACTION_STATUS_SUCCEEDED
264
            elif exit_code == 0 and action_status is False:
265
                status = LIVEACTION_STATUS_FAILED
266
            else:
267
                status = LIVEACTION_STATUS_FAILED
268
        else:
269
            if exit_code == 0:
270
                status = LIVEACTION_STATUS_SUCCEEDED
271
            else:
272
                status = LIVEACTION_STATUS_FAILED
273
274
        if timed_out:
275
            status = LIVEACTION_STATUS_TIMED_OUT
276
277
        return status
278
279
    def _get_env_vars(self):
280
        """
281
        Return sanitized environment variables which will be used when launching
282
        a subprocess.
283
284
        :rtype: ``dict``
285
        """
286
        env_vars = {}
287
288
        if self._env:
289
            env_vars.update(self._env)
290
291
        # Remove "blacklisted" environment variables
292
        to_delete = []
293
        for key, value in env_vars.items():
294
            if key.lower() in BLACKLISTED_ENV_VARS:
295
                to_delete.append(key)
296
297
        for key in to_delete:
298
            LOG.debug('User specified environment variable "%s" which is being ignored...' %
299
                      (key))
300
            del env_vars[key]
301
302
        return env_vars
303
304
    def _get_datastore_access_env_vars(self):
305
        """
306
        Return environment variables so datastore access using client (from st2client)
307
        is possible with actions. This is done to be compatible with sensors.
308
309
        :rtype: ``dict``
310
        """
311
        env_vars = {}
312
        if self.auth_token:
313
            env_vars[AUTH_TOKEN_ENV_VARIABLE_NAME] = self.auth_token.token
314
        env_vars[API_URL_ENV_VARIABLE_NAME] = get_full_public_api_url()
315
316
        return env_vars
317