1
|
|
|
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils |
2
|
|
|
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils |
3
|
|
|
# SPDX-License-Identifier: Apache-2.0 |
4
|
|
|
|
5
|
|
|
import contextlib |
6
|
|
|
import logging |
7
|
|
|
import subprocess # noqa: S404 |
8
|
|
|
from collections.abc import Callable, Generator, Mapping, Sequence |
9
|
|
|
from copy import copy |
10
|
|
|
from dataclasses import dataclass |
11
|
|
|
from pathlib import PurePath |
12
|
|
|
from queue import Queue |
13
|
|
|
from subprocess import CalledProcessError, CompletedProcess |
14
|
|
|
from threading import Thread |
15
|
|
|
from typing import IO, Any, AnyStr, Self, Unpack |
16
|
|
|
|
17
|
|
|
from pocketutils.core.input_output import DevNull |
18
|
|
|
|
19
|
|
|
__all__ = ["CallUtils", "CallTools"] |
20
|
|
|
|
21
|
|
|
logger = logging.getLogger("pocketutils") |
22
|
|
|
|
23
|
|
|
|
24
|
|
|
@contextlib.contextmanager |
25
|
|
|
def null_context(): |
26
|
|
|
yield |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
@dataclass(slots=True, frozen=True) |
30
|
|
|
class CallUtils: |
31
|
|
|
@contextlib.contextmanager |
32
|
|
|
def silenced(self: Self, no_stdout: bool = True, no_stderr: bool = True) -> Generator[None, None, None]: |
33
|
|
|
""" |
34
|
|
|
Context manager that suppresses stdout and stderr. |
35
|
|
|
""" |
36
|
|
|
# noinspection PyTypeChecker |
37
|
|
|
with contextlib.redirect_stdout(DevNull()) if no_stdout else null_context(): |
38
|
|
|
# noinspection PyTypeChecker |
39
|
|
|
with contextlib.redirect_stderr(DevNull()) if no_stderr else null_context(): |
40
|
|
|
yield |
41
|
|
|
|
42
|
|
|
def call_cmd_utf( |
43
|
|
|
self: Self, |
44
|
|
|
*cmd: str, |
45
|
|
|
log_fn: Callable[[str], Any] | None = logger.debug, |
46
|
|
|
**kwargs: Unpack[Mapping[str, Any]], |
47
|
|
|
) -> CompletedProcess: |
48
|
|
|
""" |
49
|
|
|
Like `call_cmd` for utf-8 only. |
50
|
|
|
Set `text=True` and `encoding=utf-8`, |
51
|
|
|
and strips stdout and stderr of start/end whitespace before returning. |
52
|
|
|
Can also log formatted stdout and stderr on failure. |
53
|
|
|
Otherwise, logs the output, unformatted and unstripped, as DEBUG |
54
|
|
|
|
55
|
|
|
See Also: |
56
|
|
|
`subprocess.check_output`, which only returns stdout |
57
|
|
|
""" |
58
|
|
|
log_fn("Calling '{}'".format(" ".join(cmd))) |
59
|
|
|
kwargs = copy(kwargs) |
60
|
|
|
if "cwd" in kwargs and isinstance(kwargs["path"], PurePath): |
61
|
|
|
kwargs["cwd"] = str(kwargs["cwd"]) |
62
|
|
|
calling = dict( |
63
|
|
|
*[str(c) for c in cmd], |
64
|
|
|
capture_output=True, |
65
|
|
|
check=True, |
66
|
|
|
text=True, |
67
|
|
|
encoding="utf-8", |
68
|
|
|
**kwargs, |
69
|
|
|
) |
70
|
|
|
x = subprocess.run(**calling) # noqa: S603,S607 |
71
|
|
|
log_fn(f"stdout: '{x.stdout}'") |
72
|
|
|
log_fn(f"stderr: '{x.stderr}'") |
73
|
|
|
x.stdout = x.stdout.strip() |
74
|
|
|
x.stderr = x.stderr.strip() |
75
|
|
|
return x |
76
|
|
|
|
77
|
|
|
def log_called_process_error(self: Self, e: CalledProcessError, *, log_fn: Callable[[str], None]) -> None: |
78
|
|
|
""" |
79
|
|
|
Outputs some formatted text describing the error with its full stdout and stderr. |
80
|
|
|
|
81
|
|
|
Args: |
82
|
|
|
e: The error |
83
|
|
|
log_fn: For example, `logger.warning` |
84
|
|
|
""" |
85
|
|
|
log_fn(f'Failed on command: {" ".join(e.cmd)}') |
86
|
|
|
out = None |
87
|
|
|
if e.stdout is not None: |
88
|
|
|
out = e.stdout.decode(encoding="utf-8") if isinstance(e.stdout, bytes) else e.stdout |
89
|
|
|
log_fn("<no stdout>" if out is None else f"stdout: {out}") |
90
|
|
|
err = None |
91
|
|
|
if e.stderr is not None: |
92
|
|
|
err = e.stderr.decode(encoding="utf-8") if isinstance(e.stderr, bytes) else e.stderr |
93
|
|
|
log_fn("<no stderr>" if err is None else f"stderr: {err}") |
94
|
|
|
|
95
|
|
|
def stream_cmd_call( |
96
|
|
|
self: Self, |
97
|
|
|
cmd: Sequence[str], |
98
|
|
|
*, |
99
|
|
|
callback: Callable[[bool, bytes], None] | None = None, |
100
|
|
|
timeout_secs: float | None = None, |
101
|
|
|
**kwargs: Unpack[Mapping[str, Any]], |
102
|
|
|
) -> None: |
103
|
|
|
""" |
104
|
|
|
Processes stdout and stderr on separate threads. |
105
|
|
|
Streamed -- can avoid filling a stdout or stderr buffer. |
106
|
|
|
Calls an external command, waits, and throws a |
107
|
|
|
ExternalCommandFailed for nonzero exit codes. |
108
|
|
|
|
109
|
|
|
Args: |
110
|
|
|
cmd: The command args |
111
|
|
|
callback: A function that processes (is_stderr, piped line). |
112
|
|
|
If `None`, uses :meth:`smart_log`. |
113
|
|
|
timeout_secs: Max seconds to wait for the next line |
114
|
|
|
kwargs: Passed to `subprocess.Popen`; do not pass `stdout` or `stderr`. |
115
|
|
|
|
116
|
|
|
Raises: |
117
|
|
|
CalledProcessError: If the exit code is nonzero |
118
|
|
|
""" |
119
|
|
|
if callback is None: |
120
|
|
|
callback = self.smart_log |
121
|
|
|
cmd = [str(p) for p in cmd] |
122
|
|
|
logger.debug("Streaming '{}'".format(" ".join(cmd))) |
123
|
|
|
calling = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs) |
124
|
|
|
p = subprocess.Popen(cmd, **calling) # noqa: S603,S607 |
125
|
|
|
try: |
126
|
|
|
q = Queue() |
127
|
|
|
Thread(target=self._reader, args=[False, p.stdout, q]).start() |
128
|
|
|
Thread(target=self._reader, args=[True, p.stderr, q]).start() |
129
|
|
|
for _ in range(2): |
130
|
|
|
for source, line in iter(q.get, None): |
131
|
|
|
callback(source, line) |
132
|
|
|
exit_code = p.wait(timeout=timeout_secs) |
133
|
|
|
finally: |
134
|
|
|
p.kill() |
135
|
|
|
if exit_code != 0: |
136
|
|
|
raise CalledProcessError(exit_code, " ".join(cmd), "<unknown>", "<unknown>") |
137
|
|
|
|
138
|
|
|
def smart_log( |
139
|
|
|
self: Self, |
140
|
|
|
is_stderr: bool, |
141
|
|
|
line: bytes, |
142
|
|
|
*, |
143
|
|
|
prefix: str = "", |
144
|
|
|
log: logging.Logger = logger, |
145
|
|
|
) -> None: |
146
|
|
|
""" |
147
|
|
|
Maps (is_stderr, piped line) pairs to logging statements. |
148
|
|
|
The data must be utf-8-encoded. |
149
|
|
|
If the line starts with `warning:` (case-insensitive), uses `log.warning`. |
150
|
|
|
The same is true for any other method attached to `log`. |
151
|
|
|
Falls back to DEBUG if no valid prefix is found. |
152
|
|
|
This is useful if you wrote an external application (e.g. in C) |
153
|
|
|
and want those logging statements mapped into your calling Python code. |
154
|
|
|
""" |
155
|
|
|
line = line.decode("utf-8") |
156
|
|
|
try: |
157
|
|
|
fn = getattr(log, line.split(":")[0].lower()) |
158
|
|
|
except AttributeError: |
159
|
|
|
fn = log.debug |
160
|
|
|
source = "stderr" if is_stderr else "stdout" |
161
|
|
|
fn(f"{fn.__name__.upper()} [{source}]: {line}") |
162
|
|
|
if line.lower().startswith("FATAL:"): |
163
|
|
|
logger.fatal(prefix + line[6:]) |
164
|
|
|
elif line.startswith("ERROR:"): |
165
|
|
|
logger.error(prefix + line[6:]) |
166
|
|
|
elif line.startswith("WARNING:"): |
167
|
|
|
logger.warning(prefix + line[8:]) |
168
|
|
|
elif line.startswith("INFO:"): |
169
|
|
|
logger.info(prefix + line[5:]) |
170
|
|
|
elif line.startswith("DEBUG:"): |
171
|
|
|
logger.debug(prefix + line[6:]) |
172
|
|
|
else: |
173
|
|
|
logger.debug(prefix + line) |
174
|
|
|
|
175
|
|
|
def _reader(self: Self, is_stderr: bool, pipe: IO[AnyStr], queue: Queue): |
176
|
|
|
try: |
177
|
|
|
with pipe: |
178
|
|
|
for line in iter(pipe.readline, b""): |
179
|
|
|
queue.put((is_stderr, line)) |
180
|
|
|
finally: |
181
|
|
|
queue.put(None) |
182
|
|
|
|
183
|
|
|
|
184
|
|
|
CallTools = CallUtils() |
185
|
|
|
|