1
|
|
|
from __future__ import annotations |
|
|
|
|
2
|
|
|
import logging |
3
|
|
|
import os |
4
|
|
|
import sys |
5
|
|
|
from dataclasses import dataclass |
6
|
|
|
from inspect import cleandoc |
7
|
|
|
from pathlib import Path |
8
|
|
|
from typing import Mapping, Optional, Union, Any, Callable, TypeVar, Generic, TextIO, AbstractSet |
9
|
|
|
|
10
|
|
|
# noinspection PyProtectedMember |
11
|
|
|
import loguru._defaults as _defaults |
|
|
|
|
12
|
|
|
import regex |
|
|
|
|
13
|
|
|
from loguru import logger |
|
|
|
|
14
|
|
|
|
15
|
|
|
# noinspection PyProtectedMember |
16
|
|
|
from loguru._logger import Logger |
|
|
|
|
17
|
|
|
from pocketutils.core.exceptions import IllegalStateError, XValueError |
|
|
|
|
18
|
|
|
|
19
|
|
|
|
20
|
|
|
Formatter = Union[str, Callable[[Mapping[str, Any]], str]] |
21
|
|
|
_FMT = cleandoc( |
22
|
|
|
r""" |
23
|
|
|
<bold>{time:YYYY-MM-DD HH:mm:ss.SSS}</bold> | |
24
|
|
|
<level>{level: <8}</level> | |
25
|
|
|
<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> |
26
|
|
|
— <level>{message}[EXTRA]</level> |
27
|
|
|
{exception} |
28
|
|
|
""" |
29
|
|
|
).replace("\n", " ") |
30
|
|
|
|
31
|
|
|
|
32
|
|
|
class FormatFactory: |
|
|
|
|
33
|
|
|
@classmethod |
34
|
|
|
def with_extras( |
|
|
|
|
35
|
|
|
cls, |
|
|
|
|
36
|
|
|
*, |
|
|
|
|
37
|
|
|
fmt: str = _FMT, |
|
|
|
|
38
|
|
|
sep: str = "; ", |
|
|
|
|
39
|
|
|
eq_sign: str = " ", |
|
|
|
|
40
|
|
|
) -> Callable[[Mapping[str, Any]], str]: |
41
|
|
|
def FMT(record: Mapping[str, Any]) -> str: |
|
|
|
|
42
|
|
|
extra = sep.join([e + eq_sign + "{extra[" + e + "]}" for e in record["extra"].keys()]) |
43
|
|
|
if len(extra) > 0: |
44
|
|
|
extra = f" [ {extra} ]" |
45
|
|
|
return fmt.replace("[EXTRA]", extra) + os.linesep |
46
|
|
|
|
47
|
|
|
return FMT |
48
|
|
|
|
49
|
|
|
@classmethod |
50
|
|
|
def plain(cls, *, fmt: str = _FMT) -> Callable[[Mapping[str, Any]], str]: |
|
|
|
|
51
|
|
|
def FMT(record: Mapping[str, Any]) -> str: |
|
|
|
|
52
|
|
|
return fmt.replace("[EXTRA]", "") |
53
|
|
|
|
54
|
|
|
return FMT |
55
|
|
|
|
56
|
|
|
|
57
|
|
|
class _SENTINEL: |
58
|
|
|
pass |
59
|
|
|
|
60
|
|
|
|
61
|
|
|
T = TypeVar("T", covariant=True, bound=Logger) |
|
|
|
|
62
|
|
|
|
63
|
|
|
|
64
|
|
|
_LOGGER_ARG_PATTERN = regex.compile(r"(?:([a-zA-Z]+):)?(.*)", flags=regex.V1) |
65
|
|
|
log_compressions = { |
66
|
|
|
".xz", |
67
|
|
|
".lzma", |
68
|
|
|
".gz", |
69
|
|
|
".zip", |
70
|
|
|
".bz2", |
71
|
|
|
".tar", |
72
|
|
|
".tar.gz", |
73
|
|
|
".tar.bz2", |
74
|
|
|
".tar.xz", |
75
|
|
|
} |
76
|
|
|
valid_log_suffixes = { |
77
|
|
|
*{f".log{c}" for c in log_compressions}, |
78
|
|
|
*{f".txt{c}" for c in log_compressions}, |
79
|
|
|
*{f".json{c}" for c in log_compressions}, |
80
|
|
|
} |
81
|
|
|
|
82
|
|
|
|
83
|
|
|
class Defaults: |
|
|
|
|
84
|
|
|
|
85
|
|
|
levels_built_in = dict( |
86
|
|
|
TRACE=_defaults.LOGURU_TRACE_NO, |
87
|
|
|
DEBUG=_defaults.LOGURU_DEBUG_NO, |
88
|
|
|
INFO=_defaults.LOGURU_INFO_NO, |
89
|
|
|
WARNING=_defaults.LOGURU_WARNING_NO, |
90
|
|
|
ERROR=_defaults.LOGURU_ERROR_NO, |
91
|
|
|
CRITICAL=_defaults.LOGURU_CRITICAL_NO, |
92
|
|
|
) |
93
|
|
|
|
94
|
|
|
# the levels for caution and notice are DEFINED here |
95
|
|
|
# trace and success must match loguru's |
96
|
|
|
# and the rest must match logging's |
97
|
|
|
# note that most of these alternate between informative and problematic |
98
|
|
|
# i.e. info (ok), caution (bad), success (ok), warning (bad), notice (ok), error (bad) |
99
|
|
|
levels_extended = { |
100
|
|
|
**levels_built_in, |
101
|
|
|
**dict( |
102
|
|
|
CAUTION=23, |
103
|
|
|
SUCCESS=25, |
104
|
|
|
NOTICE=35, |
105
|
|
|
), |
106
|
|
|
} |
107
|
|
|
|
108
|
|
|
colors = dict( |
109
|
|
|
TRACE="<dim>", |
110
|
|
|
DEBUG="<dim>", |
111
|
|
|
INFO="<bold>", |
112
|
|
|
CAUTION="<yellow>", |
113
|
|
|
SUCCESS="<blue>", |
114
|
|
|
WARNING="<yellow>", |
115
|
|
|
NOTICE="<blue>", |
116
|
|
|
ERROR="<red>", |
117
|
|
|
CRITICAL="<red>", |
118
|
|
|
) |
119
|
|
|
icons = dict( |
120
|
|
|
TRACE=_defaults.LOGURU_TRACE_ICON, |
121
|
|
|
DEBUG=_defaults.LOGURU_DEBUG_ICON, |
122
|
|
|
INFO=_defaults.LOGURU_INFO_ICON, |
123
|
|
|
CAUTION="⚐", # or ☡ |
124
|
|
|
SUCCESS=_defaults.LOGURU_SUCCESS_ICON, |
125
|
|
|
WARNING=_defaults.LOGURU_WARNING_ICON, |
126
|
|
|
NOTICE="★", |
127
|
|
|
ERROR=_defaults.LOGURU_ERROR_ICON, |
128
|
|
|
CRITICAL=_defaults.LOGURU_CRITICAL_ICON, |
129
|
|
|
) |
130
|
|
|
|
131
|
|
|
level: str = "INFO" |
132
|
|
|
|
133
|
|
|
fmt = FormatFactory.with_extras() |
134
|
|
|
|
135
|
|
|
aliases = dict(NONE=None, NO=None, OFF=None, VERBOSE="INFO", QUIET="ERROR") |
136
|
|
|
|
137
|
|
|
|
138
|
|
|
@dataclass(frozen=True, repr=True, order=True) |
|
|
|
|
139
|
|
|
class HandlerInfo: |
140
|
|
|
hid: int |
141
|
|
|
path: Optional[Path] |
142
|
|
|
level: Optional[str] |
143
|
|
|
fmt: Formatter |
144
|
|
|
|
145
|
|
|
|
146
|
|
|
@dataclass(frozen=False, repr=True) |
147
|
|
|
class _HandlerInfo: |
148
|
|
|
hid: int |
149
|
|
|
sink: Any |
150
|
|
|
level: Optional[int] |
151
|
|
|
fmt: Formatter |
152
|
|
|
|
153
|
|
|
|
154
|
|
|
@dataclass(frozen=True, repr=True, order=True) |
|
|
|
|
155
|
|
|
class LogSinkInfo: |
156
|
|
|
path: Path |
157
|
|
|
base: Path |
158
|
|
|
suffix: str |
159
|
|
|
serialize: bool |
160
|
|
|
compression: Optional[str] |
161
|
|
|
|
162
|
|
|
@classmethod |
163
|
|
|
def guess(cls, path: Union[str, Path]) -> LogSinkInfo: |
|
|
|
|
164
|
|
|
path = Path(path) |
165
|
|
|
base, compression = path.name, None |
166
|
|
|
for c in log_compressions: |
|
|
|
|
167
|
|
|
if path.name.endswith(c): |
168
|
|
|
base, compression = path.name[: -len(c)], c |
169
|
|
|
if not [base.endswith(s) for s in [".json", ".log", ".txt"]]: |
170
|
|
|
raise XValueError( |
171
|
|
|
f"Log filename {path.name} is not .json, .log, .txt, or a compressed variant" |
172
|
|
|
) |
173
|
|
|
return LogSinkInfo( |
174
|
|
|
path=path, |
175
|
|
|
base=path.parent / base, |
176
|
|
|
suffix=compression, |
177
|
|
|
serialize=base.endswith(".json"), |
178
|
|
|
compression=compression, |
179
|
|
|
) |
180
|
|
|
|
181
|
|
|
|
182
|
|
|
class InterceptHandler(logging.Handler): |
183
|
|
|
""" |
184
|
|
|
Redirects standard logging to loguru. |
185
|
|
|
""" |
186
|
|
|
|
187
|
|
|
def emit(self, record): |
188
|
|
|
# Get corresponding Loguru level if it exists |
189
|
|
|
try: |
190
|
|
|
level = logger.level(record.levelname).name |
191
|
|
|
except ValueError: |
192
|
|
|
level = record.levelno |
193
|
|
|
# Find caller from where originated the logged message |
194
|
|
|
frame, depth = logging.currentframe(), 2 |
195
|
|
|
while frame.f_code.co_filename == logging.__file__: |
196
|
|
|
frame = frame.f_back |
197
|
|
|
depth += 1 |
198
|
|
|
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) |
199
|
|
|
|
200
|
|
|
|
201
|
|
|
class FancyLoguru(Generic[T]): |
|
|
|
|
202
|
|
|
def __init__(self, log: T): |
203
|
|
|
self._levels = dict(Defaults.levels_built_in) |
204
|
|
|
self._logger = log |
205
|
|
|
self._main = None |
206
|
|
|
self._paths = {} |
207
|
|
|
self._aliases = dict(Defaults.aliases) |
208
|
|
|
|
209
|
|
|
@property |
210
|
|
|
def logger(self) -> T: |
|
|
|
|
211
|
|
|
return self._logger |
212
|
|
|
|
213
|
|
|
@property |
214
|
|
|
def levels(self) -> Mapping[str, int]: |
|
|
|
|
215
|
|
|
return dict(self._levels) |
216
|
|
|
|
217
|
|
|
@property |
218
|
|
|
def aliases(self) -> Mapping[str, str]: |
|
|
|
|
219
|
|
|
return self._aliases |
220
|
|
|
|
221
|
|
|
@property |
222
|
|
|
def main(self) -> Optional[HandlerInfo]: |
|
|
|
|
223
|
|
|
if self._main is None: |
224
|
|
|
return None |
225
|
|
|
return HandlerInfo( |
226
|
|
|
hid=self._main.hid, level=self._main.level, fmt=self._main.fmt, path=None |
227
|
|
|
) |
228
|
|
|
|
229
|
|
|
@property |
230
|
|
|
def paths(self) -> AbstractSet[HandlerInfo]: |
|
|
|
|
231
|
|
|
if self._main is None: |
232
|
|
|
return set() |
233
|
|
|
return {HandlerInfo(hid=h.hid, level=h.level, fmt=h.fmt, path=h.sink) for h in self._paths} |
234
|
|
|
|
235
|
|
|
def config_levels( |
|
|
|
|
236
|
|
|
self, |
|
|
|
|
237
|
|
|
*, |
|
|
|
|
238
|
|
|
levels: Mapping[str, int] = _SENTINEL, |
|
|
|
|
239
|
|
|
colors: Mapping[str, str] = _SENTINEL, |
|
|
|
|
240
|
|
|
icons: Mapping[str, str] = _SENTINEL, |
|
|
|
|
241
|
|
|
aliases: Mapping[str, str] = _SENTINEL, |
|
|
|
|
242
|
|
|
) -> __qualname__: |
|
|
|
|
243
|
|
|
levels = Defaults.levels_extended if levels is _SENTINEL else levels |
244
|
|
|
colors = Defaults.colors if colors is _SENTINEL else colors |
245
|
|
|
icons = Defaults.icons if icons is _SENTINEL else icons |
246
|
|
|
aliases = Defaults.aliases if aliases is _SENTINEL else aliases |
247
|
|
|
for k, v in levels.items(): |
|
|
|
|
248
|
|
|
self.config_level(k, v, color=colors.get(k, _SENTINEL), icon=icons.get(k, _SENTINEL)) |
249
|
|
|
self._aliases = dict(aliases) |
250
|
|
|
return self |
251
|
|
|
|
252
|
|
|
def init( |
253
|
|
|
self, |
|
|
|
|
254
|
|
|
*, |
|
|
|
|
255
|
|
|
level: str = Defaults.level, |
|
|
|
|
256
|
|
|
sink=sys.stderr, |
|
|
|
|
257
|
|
|
fmt: Formatter = Defaults.fmt, |
|
|
|
|
258
|
|
|
intercept: bool = True, |
|
|
|
|
259
|
|
|
) -> __qualname__: |
260
|
|
|
""" |
261
|
|
|
Sets an initial configuration. |
262
|
|
|
""" |
263
|
|
|
if intercept: |
264
|
|
|
logging.basicConfig(handlers=[InterceptHandler()], level=0, encoding="utf-8") |
265
|
|
|
self.logger.remove(None) # get rid of the built-in handler |
266
|
|
|
self.config_main(level=level, sink=sink, fmt=fmt) |
267
|
|
|
return self |
268
|
|
|
|
269
|
|
|
def config_level( |
|
|
|
|
270
|
|
|
self, |
|
|
|
|
271
|
|
|
name: str, |
|
|
|
|
272
|
|
|
level: int, |
|
|
|
|
273
|
|
|
*, |
|
|
|
|
274
|
|
|
color: Union[None, str, _SENTINEL] = _SENTINEL, |
|
|
|
|
275
|
|
|
icon: Union[None, str, _SENTINEL] = _SENTINEL, |
|
|
|
|
276
|
|
|
replace: bool = True, |
|
|
|
|
277
|
|
|
) -> __qualname__: |
278
|
|
|
try: |
279
|
|
|
data = logger.level(name) |
280
|
|
|
except ValueError: |
281
|
|
|
data = None |
282
|
|
|
if data is None: |
283
|
|
|
logger.level( |
284
|
|
|
name, |
285
|
|
|
no=level, |
286
|
|
|
color=None if color is _SENTINEL else color, |
287
|
|
|
icon=None if icon is _SENTINEL else icon, |
288
|
|
|
) |
289
|
|
|
elif replace: |
290
|
|
|
if level != data.no: # loguru doesn't check whether they're eq; it just errors |
291
|
|
|
raise IllegalStateError(f"Cannot set level={level}!={data.no} for {name}") |
292
|
|
|
logger.level( |
293
|
|
|
name, |
294
|
|
|
color=data.color if color is _SENTINEL else color, |
295
|
|
|
icon=data.icon if icon is _SENTINEL else icon, |
296
|
|
|
) |
297
|
|
|
return self |
298
|
|
|
|
299
|
|
|
def config_main( |
300
|
|
|
self, |
|
|
|
|
301
|
|
|
*, |
|
|
|
|
302
|
|
|
sink: TextIO = _SENTINEL, |
|
|
|
|
303
|
|
|
level: Optional[str] = _SENTINEL, |
|
|
|
|
304
|
|
|
fmt: Formatter = _SENTINEL, |
|
|
|
|
305
|
|
|
) -> __qualname__: |
306
|
|
|
""" |
307
|
|
|
Sets the logging level for the main handler (normally stderr). |
308
|
|
|
""" |
309
|
|
|
if level is not None and level is not _SENTINEL: |
310
|
|
|
level = level.upper() |
311
|
|
|
if self._main is None: |
312
|
|
|
self._main = _HandlerInfo( |
313
|
|
|
hid=-1, sink=sys.stderr, level=self._levels[Defaults.level], fmt=Defaults.fmt |
314
|
|
|
) |
315
|
|
|
else: |
316
|
|
|
try: |
317
|
|
|
logger.remove(self._main.hid) |
318
|
|
|
except ValueError: |
319
|
|
|
logger.error(f"Cannot remove handler {self._main.hid}") |
320
|
|
|
self._main.level = self._main.level if level is _SENTINEL else level |
321
|
|
|
self._main.sink = self._main.sink if sink is _SENTINEL else sink |
322
|
|
|
self._main.fmt = self._main.fmt if fmt is _SENTINEL else fmt |
323
|
|
|
self._main.hid = logger.add(self._main.sink, level=self._main.level, format=self._main.fmt) |
324
|
|
|
return self |
325
|
|
|
|
326
|
|
|
def remove_path(self, path: Path) -> __qualname__: |
|
|
|
|
327
|
|
|
for k, h in self._paths.items(): |
|
|
|
|
328
|
|
|
if h.path.resolve() == path.resolve(): |
329
|
|
|
h.level = None |
330
|
|
|
try: |
331
|
|
|
logger.remove(k) |
332
|
|
|
except ValueError: |
333
|
|
|
logger.error(f"Cannot remove handler {k} to {path}") |
334
|
|
|
|
335
|
|
|
def add_path( |
|
|
|
|
336
|
|
|
self, path: Path, level: str = Defaults.level, *, fmt: str = Defaults.fmt |
|
|
|
|
337
|
|
|
) -> __qualname__: |
338
|
|
|
level = level.upper() |
339
|
|
|
ell = self._levels[level] |
340
|
|
|
info = LogSinkInfo.guess(path) |
341
|
|
|
x = logger.add( |
|
|
|
|
342
|
|
|
str(info.base), |
343
|
|
|
format=fmt, |
344
|
|
|
level=level, |
345
|
|
|
compression=info.compression, |
346
|
|
|
serialize=info.serialize, |
347
|
|
|
backtrace=True, |
348
|
|
|
diagnose=True, |
349
|
|
|
enqueue=True, |
350
|
|
|
encoding="utf-8", |
351
|
|
|
) |
352
|
|
|
self._paths[x] = _HandlerInfo(hid=x, sink=info.base, level=ell, fmt=fmt) |
353
|
|
|
return self |
354
|
|
|
|
355
|
|
|
def __call__( |
356
|
|
|
self, |
|
|
|
|
357
|
|
|
path: Union[None, str, Path] = None, |
|
|
|
|
358
|
|
|
main: Optional[str] = Defaults.level, |
|
|
|
|
359
|
|
|
) -> __qualname__: |
360
|
|
|
""" |
361
|
|
|
This function controls logging set via command-line. |
362
|
|
|
|
363
|
|
|
Args: |
364
|
|
|
main: The level for stderr |
365
|
|
|
path: If set, the path to a file. Can be prefixed with ``:level:`` to set the level |
366
|
|
|
(e.g. ``:INFO:mandos-run.log.gz``). Can serialize to JSON if .json is used |
367
|
|
|
instead of .log or .txt. |
368
|
|
|
""" |
369
|
|
|
if main is None: |
370
|
|
|
main = Defaults.level |
371
|
|
|
main = self._aliases.get(main.upper(), main.upper()) |
372
|
|
|
if main not in Defaults.levels_extended: |
373
|
|
|
_permitted = ", ".join([*Defaults.levels_extended, *Defaults.aliases.keys()]) |
374
|
|
|
raise XValueError(f"{main.lower()} not a permitted log level (allowed: {_permitted}") |
375
|
|
|
self.config_main(level=main) |
376
|
|
|
if path is not None or len(str(path)) == 0: |
377
|
|
|
match = _LOGGER_ARG_PATTERN.match(str(path)) |
378
|
|
|
path_level = "DEBUG" if match.group(1) is None else match.group(1) |
379
|
|
|
path = Path(match.group(2)) |
380
|
|
|
self.add_path(path, path_level) |
381
|
|
|
self.logger.info(f"Added logger to {path} at level {path_level}") |
382
|
|
|
self.logger.info(f"Set main log level to {main}") |
383
|
|
|
return self |
384
|
|
|
|
385
|
|
|
|
386
|
|
|
class LoguruUtils: |
|
|
|
|
387
|
|
|
@classmethod |
388
|
|
|
def force_streams_to_utf8(cls) -> None: |
|
|
|
|
389
|
|
|
# we warn the user about this in the docs! |
390
|
|
|
sys.stderr.reconfigure(encoding="utf-8") |
391
|
|
|
sys.stdout.reconfigure(encoding="utf-8") |
392
|
|
|
sys.stdin.reconfigure(encoding="utf-8") |
393
|
|
|
|
394
|
|
|
|
395
|
|
|
__all__ = ["Defaults", "FancyLoguru", "LoguruUtils", "HandlerInfo"] |
396
|
|
|
|