Completed
Pull Request — master (#2151)
by Mischa
01:55
created

prepare_string_argument()   A

Complexity

Conditions 2

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 17
rs 9.4285
1
from contextlib import contextmanager
2
import shlex
3
from subprocess import PIPE, Popen
4
5
6
@contextmanager
7
def run_interactive_shell_command(command, **kwargs):
8
    """
9
    Runs a single command in shell and provides stdout, stderr and stdin
10
    streams.
11
12
    This function creates a context manager that sets up the process (using
13
    ``subprocess.Popen()``), returns to caller and waits for process to exit on
14
    leaving.
15
16
    By default the process is opened in ``universal_newlines`` mode and creates
17
    pipes for all streams (stdout, stderr and stdin) using ``subprocess.PIPE``
18
    special value. These pipes are closed automatically, so if you want to get
19
    the contents of the streams you should retrieve them before the context
20
    manager exits.
21
22
    >>> with run_interactive_shell_command(["echo", "TEXT"]) as p:
23
    ...     stdout = p.stdout
24
    ...     stdout_text = stdout.read()
25
    >>> stdout_text
26
    'TEXT\\n'
27
    >>> stdout.closed
28
    True
29
30
    Custom streams provided are not closed except of ``subprocess.PIPE``.
31
32
    >>> from tempfile import TemporaryFile
33
    >>> stream = TemporaryFile()
34
    >>> with run_interactive_shell_command(["echo", "TEXT"],
35
    ...                                    stdout=stream) as p:
36
    ...     stderr = p.stderr
37
    >>> stderr.closed
38
    True
39
    >>> stream.closed
40
    False
41
42
    :param command: The command to run on shell. This parameter can either
43
                    be a sequence of arguments that are directly passed to
44
                    the process or a string. A string gets splitted beforehand
45
                    using ``shlex.split()``. If providing ``shell=True`` as a
46
                    keyword-argument, no ``shlex.split()`` is performed and the
47
                    command string goes directly to ``subprocess.Popen()``.
48
    :param kwargs:  Additional keyword arguments to pass to
49
                    ``subprocess.Popen`` that are used to spawn the process.
50
    :return:        A context manager yielding the process started from the
51
                    command.
52
    """
53
    if not kwargs.get("shell", False) and isinstance(command, str):
54
        command = shlex.split(command)
55
56
    args = {"stdout": PIPE,
57
            "stderr": PIPE,
58
            "stdin": PIPE,
59
            "universal_newlines": True}
60
    args.update(kwargs)
61
62
    process = Popen(command, **args)
63
    try:
64
        yield process
65
    finally:
66
        if args["stdout"] is PIPE:
67
            process.stdout.close()
68
        if args["stderr"] is PIPE:
69
            process.stderr.close()
70
        if args["stdin"] is PIPE:
71
            process.stdin.close()
72
73
        process.wait()
74
75
76
def run_shell_command(command, stdin=None, **kwargs):
77
    """
78
    Runs a single command in shell and returns the read stdout and stderr data.
79
80
    This function waits for the process (created using ``subprocess.Popen()``)
81
    to exit. Effectively it wraps ``run_interactive_shell_command()`` and uses
82
    ``communicate()`` on the process.
83
84
    See also ``run_interactive_shell_command()``.
85
86
    :param command: The command to run on shell. This parameter can either
87
                    be a sequence of arguments that are directly passed to
88
                    the process or a string. A string gets splitted beforehand
89
                    using ``shlex.split()``.
90
    :param stdin:   Initial input to send to the process.
91
    :param kwargs:  Additional keyword arguments to pass to
92
                    ``subprocess.Popen`` that is used to spawn the process.
93
    :return:        A tuple with ``(stdoutstring, stderrstring)``.
94
    """
95
    with run_interactive_shell_command(command, **kwargs) as p:
96
        ret = p.communicate(stdin)
97
    return ret
98
99
100
def get_shell_type():  # pragma: no cover
101
    """
102
    Finds the current shell type based on the outputs of common pre-defined
103
    variables in them. This is useful to identify which sort of escaping
104
    is required for strings.
105
106
    :return: The shell type. This can be either "powershell" if Windows
107
             Powershell is detected, "cmd" if command prompt is been
108
             detected or "sh" if it's neither of these.
109
    """
110
    out = run_shell_command("echo $host.name", shell=True)[0]
111
    if out.strip() == "ConsoleHost":
112
        return "powershell"
113
    out = run_shell_command("echo $0", shell=True)[0]
114
    if out.strip() == "$0":
115
        return "cmd"
116
    return "sh"
117