1
|
|
|
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils |
2
|
|
|
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils |
3
|
|
|
# SPDX-License-Identifier: Apache-2.0 |
4
|
|
|
""" |
5
|
|
|
Low-level tools (e.g. memory management). |
6
|
|
|
""" |
7
|
|
|
|
8
|
|
|
import atexit |
9
|
|
|
import importlib |
10
|
|
|
import importlib.metadata |
11
|
|
|
import locale |
12
|
|
|
import logging |
13
|
|
|
import os |
14
|
|
|
import platform |
15
|
|
|
import signal |
16
|
|
|
import socket |
17
|
|
|
import struct |
18
|
|
|
import sys |
19
|
|
|
import traceback |
20
|
|
|
from collections.abc import Callable, Mapping, Sequence |
21
|
|
|
from dataclasses import asdict, dataclass |
22
|
|
|
from datetime import UTC, datetime |
23
|
|
|
from getpass import getuser |
24
|
|
|
from typing import Any, NamedTuple, Self |
25
|
|
|
|
26
|
|
|
from pocketutils.core.input_output import Writeable |
27
|
|
|
|
28
|
|
|
__all__ = ["Frame", "SerializedException", "SignalHandler", "ExitHandler", "SystemUtils", "SystemTools"] |
29
|
|
|
|
30
|
|
|
logger = logging.getLogger("pocketutils") |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
@dataclass(frozen=True, slots=True, order=True, kw_only=True) |
34
|
|
|
class Frame: |
35
|
|
|
depth: int |
36
|
|
|
filename: str |
37
|
|
|
line: int |
38
|
|
|
name: str |
39
|
|
|
repeats: int |
40
|
|
|
|
41
|
|
|
def as_dict(self: Self) -> Mapping[str, int | str]: |
42
|
|
|
return asdict(self) |
43
|
|
|
|
44
|
|
|
|
45
|
|
|
class SerializedException(NamedTuple): |
46
|
|
|
message: Sequence[str] |
47
|
|
|
stacktrace: Sequence[Frame] |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
@dataclass(frozen=True, slots=True) |
51
|
|
|
class SignalHandler: |
52
|
|
|
name: str |
53
|
|
|
code: int |
54
|
|
|
desc: str |
55
|
|
|
sink: Writeable | Callable[[str], Any] |
56
|
|
|
|
57
|
|
|
def __call__(self: Self) -> None: |
58
|
|
|
self.sink.write(f"~~{self.name}[{self.code}] ({self.desc})~~") |
59
|
|
|
traceback.print_stack(file=self.sink) |
60
|
|
|
for line in traceback.format_stack(): |
61
|
|
|
self.sink.write(line) |
62
|
|
|
|
63
|
|
|
|
64
|
|
|
@dataclass(frozen=True, slots=True) |
65
|
|
|
class ExitHandler: |
66
|
|
|
sink: Writeable |
67
|
|
|
|
68
|
|
|
def __call__(self: Self) -> None: |
69
|
|
|
self.sink.write("~~EXIT~~") |
70
|
|
|
traceback.print_stack(file=self.sink) |
71
|
|
|
for line in traceback.format_stack(): |
72
|
|
|
self.sink.write(line) |
73
|
|
|
|
74
|
|
|
|
75
|
|
|
@dataclass(slots=True, frozen=True) |
76
|
|
|
class SystemUtils: |
77
|
|
|
def is_linux(self: Self) -> bool: |
78
|
|
|
return sys.platform == "linux" |
79
|
|
|
|
80
|
|
|
def is_windows(self: Self) -> bool: |
81
|
|
|
return sys.platform == "win32" |
82
|
|
|
|
83
|
|
|
def is_macos(self: Self) -> bool: |
84
|
|
|
return sys.platform == "darwin" |
85
|
|
|
|
86
|
|
|
def get_env_info(self: Self, *, extended: bool = False, insecure: bool = False) -> Mapping[str, str]: |
87
|
|
|
""" |
88
|
|
|
Get a dictionary of some system and environment information. |
89
|
|
|
Includes os_release, hostname, username, mem + disk, shell, etc. |
90
|
|
|
|
91
|
|
|
Args: |
92
|
|
|
extended: Get info from psutil |
93
|
|
|
insecure: Include data like hostname and username |
94
|
|
|
|
95
|
|
|
Warning: |
96
|
|
|
Even with `include_insecure=False`, avoid exposing this data to untrusted |
97
|
|
|
sources. For example, this includes the specific OS release, which could |
98
|
|
|
be used in attack. |
99
|
|
|
""" |
100
|
|
|
|
101
|
|
|
now = datetime.now(UTC).astimezone().isoformat() |
102
|
|
|
uname = platform.uname() |
103
|
|
|
lang_code, encoding = locale.getlocale() |
104
|
|
|
# build up this dict: |
105
|
|
|
data = {} |
106
|
|
|
|
107
|
|
|
def _try(os_fn, k: str, *args): |
108
|
|
|
if any(a is None for a in args): |
109
|
|
|
return None |
110
|
|
|
try: |
111
|
|
|
v = os_fn(*args) |
112
|
|
|
data[k] = v |
113
|
|
|
return v |
114
|
|
|
except (OSError, ImportError): |
115
|
|
|
return None |
116
|
|
|
|
117
|
|
|
data.update( |
118
|
|
|
{ |
119
|
|
|
"platform": platform.platform(), |
120
|
|
|
"python": ".".join(str(i) for i in sys.version_info), |
121
|
|
|
"os": uname.system, |
122
|
|
|
"os_release": uname.release, |
123
|
|
|
"os_version": uname.version, |
124
|
|
|
"machine": uname.machine, |
125
|
|
|
"byte_order": sys.byteorder, |
126
|
|
|
"processor": uname.processor, |
127
|
|
|
"build": sys.version, |
128
|
|
|
"python_bits": 8 * struct.calcsize("P"), |
129
|
|
|
"environment_info_capture_datetime": now, |
130
|
|
|
"encoding": encoding, |
131
|
|
|
"lang_code": lang_code, |
132
|
|
|
"recursion_limit": sys.getrecursionlimit(), |
133
|
|
|
"float_info": sys.float_info, |
134
|
|
|
"int_info": sys.int_info, |
135
|
|
|
"flags": sys.flags, |
136
|
|
|
"hash_info": sys.hash_info, |
137
|
|
|
"implementation": sys.implementation, |
138
|
|
|
"switch_interval": sys.getswitchinterval(), |
139
|
|
|
"filesystem_encoding": sys.getfilesystemencoding(), |
140
|
|
|
}, |
141
|
|
|
) |
142
|
|
|
if "LANG" in os.environ: |
143
|
|
|
data["lang"] = os.environ["LANG"] |
144
|
|
|
if "SHELL" in os.environ: |
145
|
|
|
data["shell"] = os.environ["SHELL"] |
146
|
|
|
if "LC_ALL" in os.environ: |
147
|
|
|
data["lc_all"] = os.environ["LC_ALL"] |
148
|
|
|
if hasattr(sys, "winver"): |
149
|
|
|
data["win_ver"] = sys.getwindowsversion() |
150
|
|
|
if hasattr(sys, "mac_ver"): |
151
|
|
|
data["mac_ver"] = sys.mac_ver() |
152
|
|
|
if hasattr(sys, "linux_distribution"): |
153
|
|
|
data["linux_distribution"] = sys.linux_distribution() |
154
|
|
|
if insecure: |
155
|
|
|
_try(getuser, "username") |
156
|
|
|
_try(os.getlogin, "login") |
157
|
|
|
_try(socket.gethostname, "hostname") |
158
|
|
|
_try(os.getcwd, "cwd") |
159
|
|
|
pid = _try(os.getpid, "pid") |
160
|
|
|
ppid = _try(os.getppid, "parent_pid") |
161
|
|
|
if hasattr(os, "getpriority"): |
162
|
|
|
_try(os.getpriority, "priority", os.PRIO_PROCESS, pid) |
163
|
|
|
_try(os.getpriority, "parent_priority", os.PRIO_PROCESS, ppid) |
164
|
|
|
if extended: |
165
|
|
|
import psutil |
166
|
|
|
|
167
|
|
|
data.update( |
168
|
|
|
{ |
169
|
|
|
"disk_used": psutil.disk_usage(".").used, |
170
|
|
|
"disk_free": psutil.disk_usage(".").free, |
171
|
|
|
"memory_used": psutil.virtual_memory().used, |
172
|
|
|
"memory_available": psutil.virtual_memory().available, |
173
|
|
|
}, |
174
|
|
|
) |
175
|
|
|
return {k: str(v) for k, v in dict(data).items()} |
176
|
|
|
|
177
|
|
|
def list_package_versions(self: Self) -> Mapping[str, str]: |
178
|
|
|
""" |
179
|
|
|
Returns installed packages and their version numbers. |
180
|
|
|
Reliable; uses importlib (Python 3.8+). |
181
|
|
|
""" |
182
|
|
|
# calling .metadata reads the metadata file |
183
|
|
|
# and .version is an alias to .metadata["version"] |
184
|
|
|
# so make sure to only read once |
185
|
|
|
dct = {} |
186
|
|
|
for d in importlib.metadata.distributions(): |
187
|
|
|
meta = d.metadata |
188
|
|
|
dct[meta["name"]] = meta["version"] |
189
|
|
|
return dct |
190
|
|
|
|
191
|
|
|
def serialize_exception(self: Self, e: BaseException) -> SerializedException: |
192
|
|
|
tbe = traceback.TracebackException.from_exception(e) |
193
|
|
|
msg = list(tbe.format_exception_only()) |
194
|
|
|
tb = self.build_traceback(e) |
195
|
|
|
return SerializedException(msg, tb) |
196
|
|
|
|
197
|
|
|
def serialize_exception_msg(self: Self, e: BaseException) -> Sequence[str]: |
198
|
|
|
tbe = traceback.TracebackException.from_exception(e) |
199
|
|
|
return list(tbe.format_exception_only()) |
200
|
|
|
|
201
|
|
|
def build_traceback(self: Self, e: BaseException) -> Sequence[Frame]: |
202
|
|
|
tb = [] |
203
|
|
|
current = None |
204
|
|
|
tbe = traceback.TracebackException.from_exception(e) |
205
|
|
|
last, repeats = None, 0 |
206
|
|
|
for i, s in enumerate(tbe.stack): |
207
|
|
|
current = Frame(depth=i, filename=s.filename, line=int(s.line), name=s.name, repeats=-1) |
208
|
|
|
if current == last: |
209
|
|
|
repeats += 1 |
210
|
|
|
else: |
211
|
|
|
current = Frame( |
212
|
|
|
depth=current.depth, |
213
|
|
|
filename=current.filename, |
214
|
|
|
line=current.line, |
215
|
|
|
name=current.name, |
216
|
|
|
repeats=repeats, |
217
|
|
|
) |
218
|
|
|
tb.append(current) |
219
|
|
|
repeats = 0 |
220
|
|
|
last = current |
221
|
|
|
if current is not None and current == last: |
222
|
|
|
tb.append(current) |
223
|
|
|
return tb |
224
|
|
|
|
225
|
|
|
def trace_signals(self: Self, sink: Writeable = sys.stderr) -> None: |
226
|
|
|
""" |
227
|
|
|
Registers signal handlers for all signals that log the traceback. |
228
|
|
|
Uses `signal.signal`. |
229
|
|
|
""" |
230
|
|
|
for sig in signal.valid_signals(): |
231
|
|
|
handler = SignalHandler(sig.name, sig.value, signal.strsignal(sig), sink) |
232
|
|
|
signal.signal(sig.value, handler) |
233
|
|
|
|
234
|
|
|
def trace_exit(self: Self, sink: Writeable = sys.stderr) -> None: |
235
|
|
|
""" |
236
|
|
|
Registers an exit handler via `atexit.register` that logs the traceback. |
237
|
|
|
""" |
238
|
|
|
atexit.register(ExitHandler(sink)) |
239
|
|
|
|
240
|
|
|
|
241
|
|
|
SystemTools = SystemUtils() |
242
|
|
|
|