Passed
Push — master ( f1fe9e...5c5de8 )
by
unknown
03:44
created

on_timeout_expired()   B

Complexity

Conditions 5

Size

Total Lines 23

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 5
c 3
b 0
f 0
dl 0
loc 23
rs 8.2508
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
"""
17
Shell utility functions which use non-blocking and eventlet friendly code.
18
"""
19
20
import os
21
22
import six
23
import eventlet
24
from st2common import log as logging
25
from eventlet.green import subprocess
26
27
__all__ = [
28
    'run_command'
29
]
30
31
TIMEOUT_EXIT_CODE = -9
32
33
LOG = logging.getLogger(__name__)
34
35
36
def run_command(cmd, stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False,
37
                cwd=None, env=None, timeout=60, preexec_func=None, kill_func=None,
38
                read_stdout_func=None, read_stderr_func=None,
39
                read_stdout_buffer=None, read_stderr_buffer=None):
40
    """
41
    Run the provided command in a subprocess and wait until it completes.
42
43
    :param cmd: Command to run.
44
    :type cmd: ``str`` or ``list``
45
46
    :param stdin: Process stdin.
47
    :type stdin: ``object``
48
49
    :param stdout: Process stdout.
50
    :type stdout: ``object``
51
52
    :param stderr: Process stderr.
53
    :type stderr: ``object``
54
55
    :param shell: True to use a shell.
56
    :type shell ``boolean``
57
58
    :param cwd: Optional working directory.
59
    :type cwd: ``str``
60
61
    :param env: Optional environment to use with the command. If not provided,
62
                environment from the current process is inherited.
63
    :type env: ``dict``
64
65
    :param timeout: How long to wait before timing out.
66
    :type timeout: ``float``
67
68
    :param preexec_func: Optional pre-exec function.
69
    :type preexec_func: ``callable``
70
71
    :param kill_func: Optional function which will be called on timeout to kill the process.
72
                      If not provided, it defaults to `process.kill`
73
    :type kill_func: ``callable``
74
75
    :param read_stdout_func: Function which is responsible for reading process stdout when
76
                                 using live read mode.
77
    :type read_stdout_func: ``func``
78
79
    :param read_stdout_func: Function which is responsible for reading process stderr when
80
                                 using live read mode.
81
    :type read_stdout_func: ``func``
82
83
84
    :rtype: ``tuple`` (exit_code, stdout, stderr, timed_out)
85
    """
86
    LOG.debug('Entering st2common.util.green.run_command.')
87
88
    assert isinstance(cmd, (list, tuple) + six.string_types)
89
90
    if (read_stdout_func and not read_stderr_func) or (read_stderr_func and not read_stdout_func):
91
        raise ValueError('Both read_stdout_func and read_stderr_func arguments need '
92
                         'to be provided.')
93
94
    if read_stdout_func and not (read_stdout_buffer or read_stderr_buffer):
95
        raise ValueError('read_stdout_buffer and read_stderr_buffer arguments need to be provided '
96
                         'when read_stdout_func is provided')
97
98
    if not env:
99
        LOG.debug('env argument not provided. using process env (os.environ).')
100
        env = os.environ.copy()
101
102
    # Note: We are using eventlet friendly implementation of subprocess
103
    # which uses GreenPipe so it doesn't block
104
    LOG.debug('Creating subprocess.')
105
    process = subprocess.Popen(args=cmd, stdin=stdin, stdout=stdout, stderr=stderr,
106
                               env=env, cwd=cwd, shell=shell, preexec_fn=preexec_func)
107
108
    if read_stdout_func:
109
        LOG.debug('Spawning read_stdout_func function')
110
        read_stdout_thread = eventlet.spawn(read_stdout_func, process.stdout, read_stdout_buffer)
111
112
    if read_stderr_func:
113
        LOG.debug('Spawning read_stderr_func function')
114
        read_stderr_thread = eventlet.spawn(read_stderr_func, process.stderr, read_stderr_buffer)
115
116
    def on_timeout_expired(timeout):
117
        global timed_out
0 ignored issues
show
Unused Code introduced by
The variable timed_out was imported from global scope, but was never written to.
Loading history...
118
119
        try:
120
            LOG.debug('Starting process wait inside timeout handler.')
121
            process.wait(timeout=timeout)
122
        except subprocess.TimeoutExpired:
123
            # Command has timed out, kill the process and propagate the error.
124
            # Note: We explicitly set the returncode to indicate the timeout.
125
            LOG.debug('Command execution timeout reached.')
126
            process.returncode = TIMEOUT_EXIT_CODE
127
128
            if kill_func:
129
                LOG.debug('Calling kill_func.')
130
                kill_func(process=process)
131
            else:
132
                LOG.debug('Killing process.')
133
                process.kill()
134
135
            if read_stdout_func and read_stderr_func:
136
                LOG.debug('Killing read_stdout_thread and read_stderr_thread')
137
                read_stdout_thread.kill()
138
                read_stderr_thread.kill()
139
140
    LOG.debug('Spawning timeout handler thread.')
141
    timeout_thread = eventlet.spawn(on_timeout_expired, timeout)
142
    LOG.debug('Attaching to process.')
143
144
    if read_stdout_func and read_stderr_func:
145
        LOG.debug('Using real-time stdout and stderr read mode, calling process.wait()')
146
        process.wait()
147
    else:
148
        LOG.debug('Using delayed stdout and stderr read mode, calling process.communicate()')
149
        stdout, stderr = process.communicate()
150
151
    timeout_thread.cancel()
152
    exit_code = process.returncode
153
154
    if read_stdout_func and read_stderr_func:
155
        # Wait on those green threads to finish reading from stdout and stderr before continuing
156
        read_stdout_thread.wait()
157
        read_stderr_thread.wait()
158
159
        stdout = read_stdout_buffer.getvalue()
160
        stderr = read_stderr_buffer.getvalue()
161
162
    if exit_code == TIMEOUT_EXIT_CODE:
163
        LOG.debug('Timeout.')
164
        timed_out = True
165
    else:
166
        LOG.debug('No timeout.')
167
        timed_out = False
168
169
    LOG.debug('Returning.')
170
    return (exit_code, stdout, stderr, timed_out)
171