pocketutils.tools.call_tools   A
last analyzed

Complexity

Total Complexity 32

Size/Duplication

Total Lines 185
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 32
eloc 114
dl 0
loc 185
rs 9.84
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
B CallUtils.stream_cmd_call() 0 42 5
A CallUtils.call_cmd_utf() 0 34 3
B CallUtils.smart_log() 0 36 8
A CallUtils.silenced() 0 10 5
B CallUtils.log_called_process_error() 0 17 7
A CallUtils._reader() 0 7 3

1 Function

Rating   Name   Duplication   Size   Complexity  
A null_context() 0 3 1
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
5
import contextlib
6
import logging
7
import subprocess  # noqa: S404
8
from collections.abc import Callable, Generator, Mapping, Sequence
9
from copy import copy
10
from dataclasses import dataclass
11
from pathlib import PurePath
12
from queue import Queue
13
from subprocess import CalledProcessError, CompletedProcess
14
from threading import Thread
15
from typing import IO, Any, AnyStr, Self, Unpack
16
17
from pocketutils.core.input_output import DevNull
18
19
__all__ = ["CallUtils", "CallTools"]
20
21
logger = logging.getLogger("pocketutils")
22
23
24
@contextlib.contextmanager
25
def null_context():
26
    yield
27
28
29
@dataclass(slots=True, frozen=True)
30
class CallUtils:
31
    @contextlib.contextmanager
32
    def silenced(self: Self, no_stdout: bool = True, no_stderr: bool = True) -> Generator[None, None, None]:
33
        """
34
        Context manager that suppresses stdout and stderr.
35
        """
36
        # noinspection PyTypeChecker
37
        with contextlib.redirect_stdout(DevNull()) if no_stdout else null_context():
38
            # noinspection PyTypeChecker
39
            with contextlib.redirect_stderr(DevNull()) if no_stderr else null_context():
40
                yield
41
42
    def call_cmd_utf(
43
        self: Self,
44
        *cmd: str,
45
        log_fn: Callable[[str], Any] | None = logger.debug,
46
        **kwargs: Unpack[Mapping[str, Any]],
47
    ) -> CompletedProcess:
48
        """
49
        Like `call_cmd` for utf-8 only.
50
        Set `text=True` and `encoding=utf-8`,
51
        and strips stdout and stderr of start/end whitespace before returning.
52
        Can also log formatted stdout and stderr on failure.
53
        Otherwise, logs the output, unformatted and unstripped, as DEBUG
54
55
        See Also:
56
            `subprocess.check_output`, which only returns stdout
57
        """
58
        log_fn("Calling '{}'".format(" ".join(cmd)))
59
        kwargs = copy(kwargs)
60
        if "cwd" in kwargs and isinstance(kwargs["path"], PurePath):
61
            kwargs["cwd"] = str(kwargs["cwd"])
62
        calling = dict(
63
            *[str(c) for c in cmd],
64
            capture_output=True,
65
            check=True,
66
            text=True,
67
            encoding="utf-8",
68
            **kwargs,
69
        )
70
        x = subprocess.run(**calling)  # noqa: S603,S607
71
        log_fn(f"stdout: '{x.stdout}'")
72
        log_fn(f"stderr: '{x.stderr}'")
73
        x.stdout = x.stdout.strip()
74
        x.stderr = x.stderr.strip()
75
        return x
76
77
    def log_called_process_error(self: Self, e: CalledProcessError, *, log_fn: Callable[[str], None]) -> None:
78
        """
79
        Outputs some formatted text describing the error with its full stdout and stderr.
80
81
        Args:
82
            e: The error
83
            log_fn: For example, `logger.warning`
84
        """
85
        log_fn(f'Failed on command: {" ".join(e.cmd)}')
86
        out = None
87
        if e.stdout is not None:
88
            out = e.stdout.decode(encoding="utf-8") if isinstance(e.stdout, bytes) else e.stdout
89
        log_fn("<no stdout>" if out is None else f"stdout: {out}")
90
        err = None
91
        if e.stderr is not None:
92
            err = e.stderr.decode(encoding="utf-8") if isinstance(e.stderr, bytes) else e.stderr
93
        log_fn("<no stderr>" if err is None else f"stderr: {err}")
94
95
    def stream_cmd_call(
96
        self: Self,
97
        cmd: Sequence[str],
98
        *,
99
        callback: Callable[[bool, bytes], None] | None = None,
100
        timeout_secs: float | None = None,
101
        **kwargs: Unpack[Mapping[str, Any]],
102
    ) -> None:
103
        """
104
        Processes stdout and stderr on separate threads.
105
        Streamed -- can avoid filling a stdout or stderr buffer.
106
        Calls an external command, waits, and throws a
107
        ExternalCommandFailed for nonzero exit codes.
108
109
        Args:
110
            cmd: The command args
111
            callback: A function that processes (is_stderr, piped line).
112
                      If `None`, uses :meth:`smart_log`.
113
            timeout_secs: Max seconds to wait for the next line
114
            kwargs: Passed to `subprocess.Popen`; do not pass `stdout` or `stderr`.
115
116
        Raises:
117
            CalledProcessError: If the exit code is nonzero
118
        """
119
        if callback is None:
120
            callback = self.smart_log
121
        cmd = [str(p) for p in cmd]
122
        logger.debug("Streaming '{}'".format(" ".join(cmd)))
123
        calling = dict(stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
124
        p = subprocess.Popen(cmd, **calling)  # noqa: S603,S607
125
        try:
126
            q = Queue()
127
            Thread(target=self._reader, args=[False, p.stdout, q]).start()
128
            Thread(target=self._reader, args=[True, p.stderr, q]).start()
129
            for _ in range(2):
130
                for source, line in iter(q.get, None):
131
                    callback(source, line)
132
            exit_code = p.wait(timeout=timeout_secs)
133
        finally:
134
            p.kill()
135
        if exit_code != 0:
136
            raise CalledProcessError(exit_code, " ".join(cmd), "<unknown>", "<unknown>")
137
138
    def smart_log(
139
        self: Self,
140
        is_stderr: bool,
141
        line: bytes,
142
        *,
143
        prefix: str = "",
144
        log: logging.Logger = logger,
145
    ) -> None:
146
        """
147
        Maps (is_stderr, piped line) pairs to logging statements.
148
        The data must be utf-8-encoded.
149
        If the line starts with `warning:` (case-insensitive), uses `log.warning`.
150
        The same is true for any other method attached to `log`.
151
        Falls back to DEBUG if no valid prefix is found.
152
        This is useful if you wrote an external application (e.g. in C)
153
        and want those logging statements mapped into your calling Python code.
154
        """
155
        line = line.decode("utf-8")
156
        try:
157
            fn = getattr(log, line.split(":")[0].lower())
158
        except AttributeError:
159
            fn = log.debug
160
        source = "stderr" if is_stderr else "stdout"
161
        fn(f"{fn.__name__.upper()} [{source}]: {line}")
162
        if line.lower().startswith("FATAL:"):
163
            logger.fatal(prefix + line[6:])
164
        elif line.startswith("ERROR:"):
165
            logger.error(prefix + line[6:])
166
        elif line.startswith("WARNING:"):
167
            logger.warning(prefix + line[8:])
168
        elif line.startswith("INFO:"):
169
            logger.info(prefix + line[5:])
170
        elif line.startswith("DEBUG:"):
171
            logger.debug(prefix + line[6:])
172
        else:
173
            logger.debug(prefix + line)
174
175
    def _reader(self: Self, is_stderr: bool, pipe: IO[AnyStr], queue: Queue):
176
        try:
177
            with pipe:
178
                for line in iter(pipe.readline, b""):
179
                    queue.put((is_stderr, line))
180
        finally:
181
            queue.put(None)
182
183
184
CallTools = CallUtils()
185