|
1
|
|
|
from __future__ import annotations |
|
|
|
|
|
|
2
|
|
|
import logging |
|
3
|
|
|
import os |
|
4
|
|
|
import sys |
|
5
|
|
|
from dataclasses import dataclass |
|
6
|
|
|
from inspect import cleandoc |
|
7
|
|
|
from pathlib import Path |
|
8
|
|
|
from typing import Mapping, Optional, Union, Any, Callable, TypeVar, Generic, TextIO, AbstractSet |
|
9
|
|
|
|
|
10
|
|
|
# noinspection PyProtectedMember |
|
11
|
|
|
import loguru._defaults as _defaults |
|
|
|
|
|
|
12
|
|
|
import regex |
|
|
|
|
|
|
13
|
|
|
from loguru import logger |
|
|
|
|
|
|
14
|
|
|
|
|
15
|
|
|
# noinspection PyProtectedMember |
|
16
|
|
|
from loguru._logger import Logger |
|
|
|
|
|
|
17
|
|
|
from pocketutils.core.exceptions import IllegalStateError, XValueError |
|
|
|
|
|
|
18
|
|
|
|
|
19
|
|
|
|
|
20
|
|
|
Formatter = Union[str, Callable[[Mapping[str, Any]], str]] |
|
21
|
|
|
_FMT = cleandoc( |
|
22
|
|
|
r""" |
|
23
|
|
|
<bold>{time:YYYY-MM-DD HH:mm:ss.SSS}</bold> | |
|
24
|
|
|
<level>{level: <8}</level> | |
|
25
|
|
|
<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> |
|
26
|
|
|
— <level>{message}[EXTRA]</level> |
|
27
|
|
|
{exception} |
|
28
|
|
|
""" |
|
29
|
|
|
).replace("\n", " ") |
|
30
|
|
|
|
|
31
|
|
|
|
|
32
|
|
|
class FormatFactory: |
|
|
|
|
|
|
33
|
|
|
@classmethod |
|
34
|
|
|
def with_extras( |
|
|
|
|
|
|
35
|
|
|
cls, |
|
|
|
|
|
|
36
|
|
|
*, |
|
|
|
|
|
|
37
|
|
|
fmt: str = _FMT, |
|
|
|
|
|
|
38
|
|
|
sep: str = "; ", |
|
|
|
|
|
|
39
|
|
|
eq_sign: str = " ", |
|
|
|
|
|
|
40
|
|
|
) -> Callable[[Mapping[str, Any]], str]: |
|
41
|
|
|
def FMT(record: Mapping[str, Any]) -> str: |
|
|
|
|
|
|
42
|
|
|
extra = sep.join([e + eq_sign + "{extra[" + e + "]}" for e in record["extra"].keys()]) |
|
43
|
|
|
if len(extra) > 0: |
|
44
|
|
|
extra = f" [ {extra} ]" |
|
45
|
|
|
return fmt.replace("[EXTRA]", extra) + os.linesep |
|
46
|
|
|
|
|
47
|
|
|
return FMT |
|
48
|
|
|
|
|
49
|
|
|
@classmethod |
|
50
|
|
|
def plain(cls, *, fmt: str = _FMT) -> Callable[[Mapping[str, Any]], str]: |
|
|
|
|
|
|
51
|
|
|
def FMT(record: Mapping[str, Any]) -> str: |
|
|
|
|
|
|
52
|
|
|
return fmt.replace("[EXTRA]", "") |
|
53
|
|
|
|
|
54
|
|
|
return FMT |
|
55
|
|
|
|
|
56
|
|
|
|
|
57
|
|
|
class _SENTINEL: |
|
58
|
|
|
pass |
|
59
|
|
|
|
|
60
|
|
|
|
|
61
|
|
|
T = TypeVar("T", covariant=True, bound=Logger) |
|
|
|
|
|
|
62
|
|
|
|
|
63
|
|
|
|
|
64
|
|
|
_LOGGER_ARG_PATTERN = regex.compile(r"(?:([a-zA-Z]+):)?(.*)", flags=regex.V1) |
|
65
|
|
|
log_compressions = { |
|
66
|
|
|
".xz", |
|
67
|
|
|
".lzma", |
|
68
|
|
|
".gz", |
|
69
|
|
|
".zip", |
|
70
|
|
|
".bz2", |
|
71
|
|
|
".tar", |
|
72
|
|
|
".tar.gz", |
|
73
|
|
|
".tar.bz2", |
|
74
|
|
|
".tar.xz", |
|
75
|
|
|
} |
|
76
|
|
|
valid_log_suffixes = { |
|
77
|
|
|
*{f".log{c}" for c in log_compressions}, |
|
78
|
|
|
*{f".txt{c}" for c in log_compressions}, |
|
79
|
|
|
*{f".json{c}" for c in log_compressions}, |
|
80
|
|
|
} |
|
81
|
|
|
|
|
82
|
|
|
|
|
83
|
|
|
class Defaults: |
|
|
|
|
|
|
84
|
|
|
|
|
85
|
|
|
levels_built_in = dict( |
|
86
|
|
|
TRACE=_defaults.LOGURU_TRACE_NO, |
|
87
|
|
|
DEBUG=_defaults.LOGURU_DEBUG_NO, |
|
88
|
|
|
INFO=_defaults.LOGURU_INFO_NO, |
|
89
|
|
|
WARNING=_defaults.LOGURU_WARNING_NO, |
|
90
|
|
|
ERROR=_defaults.LOGURU_ERROR_NO, |
|
91
|
|
|
CRITICAL=_defaults.LOGURU_CRITICAL_NO, |
|
92
|
|
|
) |
|
93
|
|
|
|
|
94
|
|
|
# the levels for caution and notice are DEFINED here |
|
95
|
|
|
# trace and success must match loguru's |
|
96
|
|
|
# and the rest must match logging's |
|
97
|
|
|
# note that most of these alternate between informative and problematic |
|
98
|
|
|
# i.e. info (ok), caution (bad), success (ok), warning (bad), notice (ok), error (bad) |
|
99
|
|
|
levels_extended = { |
|
100
|
|
|
**levels_built_in, |
|
101
|
|
|
**dict( |
|
102
|
|
|
CAUTION=23, |
|
103
|
|
|
SUCCESS=25, |
|
104
|
|
|
NOTICE=35, |
|
105
|
|
|
), |
|
106
|
|
|
} |
|
107
|
|
|
|
|
108
|
|
|
colors = dict( |
|
109
|
|
|
TRACE="<dim>", |
|
110
|
|
|
DEBUG="<dim>", |
|
111
|
|
|
INFO="<bold>", |
|
112
|
|
|
CAUTION="<yellow>", |
|
113
|
|
|
SUCCESS="<blue>", |
|
114
|
|
|
WARNING="<yellow>", |
|
115
|
|
|
NOTICE="<blue>", |
|
116
|
|
|
ERROR="<red>", |
|
117
|
|
|
CRITICAL="<red>", |
|
118
|
|
|
) |
|
119
|
|
|
icons = dict( |
|
120
|
|
|
TRACE=_defaults.LOGURU_TRACE_ICON, |
|
121
|
|
|
DEBUG=_defaults.LOGURU_DEBUG_ICON, |
|
122
|
|
|
INFO=_defaults.LOGURU_INFO_ICON, |
|
123
|
|
|
CAUTION="⚐", # or ☡ |
|
124
|
|
|
SUCCESS=_defaults.LOGURU_SUCCESS_ICON, |
|
125
|
|
|
WARNING=_defaults.LOGURU_WARNING_ICON, |
|
126
|
|
|
NOTICE="★", |
|
127
|
|
|
ERROR=_defaults.LOGURU_ERROR_ICON, |
|
128
|
|
|
CRITICAL=_defaults.LOGURU_CRITICAL_ICON, |
|
129
|
|
|
) |
|
130
|
|
|
|
|
131
|
|
|
level: str = "INFO" |
|
132
|
|
|
|
|
133
|
|
|
fmt = FormatFactory.with_extras() |
|
134
|
|
|
|
|
135
|
|
|
aliases = dict(NONE=None, NO=None, OFF=None, VERBOSE="INFO", QUIET="ERROR") |
|
136
|
|
|
|
|
137
|
|
|
|
|
138
|
|
|
@dataclass(frozen=True, repr=True, order=True) |
|
|
|
|
|
|
139
|
|
|
class HandlerInfo: |
|
140
|
|
|
hid: int |
|
141
|
|
|
path: Optional[Path] |
|
142
|
|
|
level: Optional[str] |
|
143
|
|
|
fmt: Formatter |
|
144
|
|
|
|
|
145
|
|
|
|
|
146
|
|
|
@dataclass(frozen=False, repr=True) |
|
147
|
|
|
class _HandlerInfo: |
|
148
|
|
|
hid: int |
|
149
|
|
|
sink: Any |
|
150
|
|
|
level: Optional[int] |
|
151
|
|
|
fmt: Formatter |
|
152
|
|
|
|
|
153
|
|
|
|
|
154
|
|
|
@dataclass(frozen=True, repr=True, order=True) |
|
|
|
|
|
|
155
|
|
|
class LogSinkInfo: |
|
156
|
|
|
path: Path |
|
157
|
|
|
base: Path |
|
158
|
|
|
suffix: str |
|
159
|
|
|
serialize: bool |
|
160
|
|
|
compression: Optional[str] |
|
161
|
|
|
|
|
162
|
|
|
@classmethod |
|
163
|
|
|
def guess(cls, path: Union[str, Path]) -> LogSinkInfo: |
|
|
|
|
|
|
164
|
|
|
path = Path(path) |
|
165
|
|
|
base, compression = path.name, None |
|
166
|
|
|
for c in log_compressions: |
|
|
|
|
|
|
167
|
|
|
if path.name.endswith(c): |
|
168
|
|
|
base, compression = path.name[: -len(c)], c |
|
169
|
|
|
if not [base.endswith(s) for s in [".json", ".log", ".txt"]]: |
|
170
|
|
|
raise XValueError( |
|
171
|
|
|
f"Log filename {path.name} is not .json, .log, .txt, or a compressed variant" |
|
172
|
|
|
) |
|
173
|
|
|
return LogSinkInfo( |
|
174
|
|
|
path=path, |
|
175
|
|
|
base=path.parent / base, |
|
176
|
|
|
suffix=compression, |
|
177
|
|
|
serialize=base.endswith(".json"), |
|
178
|
|
|
compression=compression, |
|
179
|
|
|
) |
|
180
|
|
|
|
|
181
|
|
|
|
|
182
|
|
|
class InterceptHandler(logging.Handler): |
|
183
|
|
|
""" |
|
184
|
|
|
Redirects standard logging to loguru. |
|
185
|
|
|
""" |
|
186
|
|
|
|
|
187
|
|
|
def emit(self, record): |
|
188
|
|
|
# Get corresponding Loguru level if it exists |
|
189
|
|
|
try: |
|
190
|
|
|
level = logger.level(record.levelname).name |
|
191
|
|
|
except ValueError: |
|
192
|
|
|
level = record.levelno |
|
193
|
|
|
# Find caller from where originated the logged message |
|
194
|
|
|
frame, depth = logging.currentframe(), 2 |
|
195
|
|
|
while frame.f_code.co_filename == logging.__file__: |
|
196
|
|
|
frame = frame.f_back |
|
197
|
|
|
depth += 1 |
|
198
|
|
|
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) |
|
199
|
|
|
|
|
200
|
|
|
|
|
201
|
|
|
class FancyLoguru(Generic[T]): |
|
|
|
|
|
|
202
|
|
|
def __init__(self, log: T): |
|
203
|
|
|
self._levels = dict(Defaults.levels_built_in) |
|
204
|
|
|
self._logger = log |
|
205
|
|
|
self._main = None |
|
206
|
|
|
self._paths = {} |
|
207
|
|
|
self._aliases = dict(Defaults.aliases) |
|
208
|
|
|
|
|
209
|
|
|
@property |
|
210
|
|
|
def logger(self) -> T: |
|
|
|
|
|
|
211
|
|
|
return self._logger |
|
212
|
|
|
|
|
213
|
|
|
@property |
|
214
|
|
|
def levels(self) -> Mapping[str, int]: |
|
|
|
|
|
|
215
|
|
|
return dict(self._levels) |
|
216
|
|
|
|
|
217
|
|
|
@property |
|
218
|
|
|
def aliases(self) -> Mapping[str, str]: |
|
|
|
|
|
|
219
|
|
|
return self._aliases |
|
220
|
|
|
|
|
221
|
|
|
@property |
|
222
|
|
|
def main(self) -> Optional[HandlerInfo]: |
|
|
|
|
|
|
223
|
|
|
if self._main is None: |
|
224
|
|
|
return None |
|
225
|
|
|
return HandlerInfo( |
|
226
|
|
|
hid=self._main.hid, level=self._main.level, fmt=self._main.fmt, path=None |
|
227
|
|
|
) |
|
228
|
|
|
|
|
229
|
|
|
@property |
|
230
|
|
|
def paths(self) -> AbstractSet[HandlerInfo]: |
|
|
|
|
|
|
231
|
|
|
if self._main is None: |
|
232
|
|
|
return set() |
|
233
|
|
|
return {HandlerInfo(hid=h.hid, level=h.level, fmt=h.fmt, path=h.sink) for h in self._paths} |
|
234
|
|
|
|
|
235
|
|
|
def config_levels( |
|
|
|
|
|
|
236
|
|
|
self, |
|
|
|
|
|
|
237
|
|
|
*, |
|
|
|
|
|
|
238
|
|
|
levels: Mapping[str, int] = _SENTINEL, |
|
|
|
|
|
|
239
|
|
|
colors: Mapping[str, str] = _SENTINEL, |
|
|
|
|
|
|
240
|
|
|
icons: Mapping[str, str] = _SENTINEL, |
|
|
|
|
|
|
241
|
|
|
aliases: Mapping[str, str] = _SENTINEL, |
|
|
|
|
|
|
242
|
|
|
) -> __qualname__: |
|
|
|
|
|
|
243
|
|
|
levels = Defaults.levels_extended if levels is _SENTINEL else levels |
|
244
|
|
|
colors = Defaults.colors if colors is _SENTINEL else colors |
|
245
|
|
|
icons = Defaults.icons if icons is _SENTINEL else icons |
|
246
|
|
|
aliases = Defaults.aliases if aliases is _SENTINEL else aliases |
|
247
|
|
|
for k, v in levels.items(): |
|
|
|
|
|
|
248
|
|
|
self.config_level(k, v, color=colors.get(k, _SENTINEL), icon=icons.get(k, _SENTINEL)) |
|
249
|
|
|
self._aliases = dict(aliases) |
|
250
|
|
|
return self |
|
251
|
|
|
|
|
252
|
|
|
def init( |
|
253
|
|
|
self, |
|
|
|
|
|
|
254
|
|
|
*, |
|
|
|
|
|
|
255
|
|
|
level: str = Defaults.level, |
|
|
|
|
|
|
256
|
|
|
sink=sys.stderr, |
|
|
|
|
|
|
257
|
|
|
fmt: Formatter = Defaults.fmt, |
|
|
|
|
|
|
258
|
|
|
intercept: bool = True, |
|
|
|
|
|
|
259
|
|
|
) -> __qualname__: |
|
260
|
|
|
""" |
|
261
|
|
|
Sets an initial configuration. |
|
262
|
|
|
""" |
|
263
|
|
|
if intercept: |
|
264
|
|
|
logging.basicConfig(handlers=[InterceptHandler()], level=0, encoding="utf-8") |
|
265
|
|
|
self.logger.remove(None) # get rid of the built-in handler |
|
266
|
|
|
self.config_main(level=level, sink=sink, fmt=fmt) |
|
267
|
|
|
return self |
|
268
|
|
|
|
|
269
|
|
|
def config_level( |
|
|
|
|
|
|
270
|
|
|
self, |
|
|
|
|
|
|
271
|
|
|
name: str, |
|
|
|
|
|
|
272
|
|
|
level: int, |
|
|
|
|
|
|
273
|
|
|
*, |
|
|
|
|
|
|
274
|
|
|
color: Union[None, str, _SENTINEL] = _SENTINEL, |
|
|
|
|
|
|
275
|
|
|
icon: Union[None, str, _SENTINEL] = _SENTINEL, |
|
|
|
|
|
|
276
|
|
|
replace: bool = True, |
|
|
|
|
|
|
277
|
|
|
) -> __qualname__: |
|
278
|
|
|
try: |
|
279
|
|
|
data = logger.level(name) |
|
280
|
|
|
except ValueError: |
|
281
|
|
|
data = None |
|
282
|
|
|
if data is None: |
|
283
|
|
|
logger.level( |
|
284
|
|
|
name, |
|
285
|
|
|
no=level, |
|
286
|
|
|
color=None if color is _SENTINEL else color, |
|
287
|
|
|
icon=None if icon is _SENTINEL else icon, |
|
288
|
|
|
) |
|
289
|
|
|
elif replace: |
|
290
|
|
|
if level != data.no: # loguru doesn't check whether they're eq; it just errors |
|
291
|
|
|
raise IllegalStateError(f"Cannot set level={level}!={data.no} for {name}") |
|
292
|
|
|
logger.level( |
|
293
|
|
|
name, |
|
294
|
|
|
color=data.color if color is _SENTINEL else color, |
|
295
|
|
|
icon=data.icon if icon is _SENTINEL else icon, |
|
296
|
|
|
) |
|
297
|
|
|
return self |
|
298
|
|
|
|
|
299
|
|
|
def config_main( |
|
300
|
|
|
self, |
|
|
|
|
|
|
301
|
|
|
*, |
|
|
|
|
|
|
302
|
|
|
sink: TextIO = _SENTINEL, |
|
|
|
|
|
|
303
|
|
|
level: Optional[str] = _SENTINEL, |
|
|
|
|
|
|
304
|
|
|
fmt: Formatter = _SENTINEL, |
|
|
|
|
|
|
305
|
|
|
) -> __qualname__: |
|
306
|
|
|
""" |
|
307
|
|
|
Sets the logging level for the main handler (normally stderr). |
|
308
|
|
|
""" |
|
309
|
|
|
if level is not None and level is not _SENTINEL: |
|
310
|
|
|
level = level.upper() |
|
311
|
|
|
if self._main is None: |
|
312
|
|
|
self._main = _HandlerInfo( |
|
313
|
|
|
hid=-1, sink=sys.stderr, level=self._levels[Defaults.level], fmt=Defaults.fmt |
|
314
|
|
|
) |
|
315
|
|
|
else: |
|
316
|
|
|
try: |
|
317
|
|
|
logger.remove(self._main.hid) |
|
318
|
|
|
except ValueError: |
|
319
|
|
|
logger.error(f"Cannot remove handler {self._main.hid}") |
|
320
|
|
|
self._main.level = self._main.level if level is _SENTINEL else level |
|
321
|
|
|
self._main.sink = self._main.sink if sink is _SENTINEL else sink |
|
322
|
|
|
self._main.fmt = self._main.fmt if fmt is _SENTINEL else fmt |
|
323
|
|
|
self._main.hid = logger.add(self._main.sink, level=self._main.level, format=self._main.fmt) |
|
324
|
|
|
return self |
|
325
|
|
|
|
|
326
|
|
|
def remove_path(self, path: Path) -> __qualname__: |
|
|
|
|
|
|
327
|
|
|
for k, h in self._paths.items(): |
|
|
|
|
|
|
328
|
|
|
if h.path.resolve() == path.resolve(): |
|
329
|
|
|
h.level = None |
|
330
|
|
|
try: |
|
331
|
|
|
logger.remove(k) |
|
332
|
|
|
except ValueError: |
|
333
|
|
|
logger.error(f"Cannot remove handler {k} to {path}") |
|
334
|
|
|
|
|
335
|
|
|
def add_path( |
|
|
|
|
|
|
336
|
|
|
self, path: Path, level: str = Defaults.level, *, fmt: str = Defaults.fmt |
|
|
|
|
|
|
337
|
|
|
) -> __qualname__: |
|
338
|
|
|
level = level.upper() |
|
339
|
|
|
ell = self._levels[level] |
|
340
|
|
|
info = LogSinkInfo.guess(path) |
|
341
|
|
|
x = logger.add( |
|
|
|
|
|
|
342
|
|
|
str(info.base), |
|
343
|
|
|
format=fmt, |
|
344
|
|
|
level=level, |
|
345
|
|
|
compression=info.compression, |
|
346
|
|
|
serialize=info.serialize, |
|
347
|
|
|
backtrace=True, |
|
348
|
|
|
diagnose=True, |
|
349
|
|
|
enqueue=True, |
|
350
|
|
|
encoding="utf-8", |
|
351
|
|
|
) |
|
352
|
|
|
self._paths[x] = _HandlerInfo(hid=x, sink=info.base, level=ell, fmt=fmt) |
|
353
|
|
|
return self |
|
354
|
|
|
|
|
355
|
|
|
def __call__( |
|
356
|
|
|
self, |
|
|
|
|
|
|
357
|
|
|
path: Union[None, str, Path] = None, |
|
|
|
|
|
|
358
|
|
|
main: Optional[str] = Defaults.level, |
|
|
|
|
|
|
359
|
|
|
) -> __qualname__: |
|
360
|
|
|
""" |
|
361
|
|
|
This function controls logging set via command-line. |
|
362
|
|
|
|
|
363
|
|
|
Args: |
|
364
|
|
|
main: The level for stderr |
|
365
|
|
|
path: If set, the path to a file. Can be prefixed with ``:level:`` to set the level |
|
366
|
|
|
(e.g. ``:INFO:mandos-run.log.gz``). Can serialize to JSON if .json is used |
|
367
|
|
|
instead of .log or .txt. |
|
368
|
|
|
""" |
|
369
|
|
|
if main is None: |
|
370
|
|
|
main = Defaults.level |
|
371
|
|
|
main = self._aliases.get(main.upper(), main.upper()) |
|
372
|
|
|
if main not in Defaults.levels_extended: |
|
373
|
|
|
_permitted = ", ".join([*Defaults.levels_extended, *Defaults.aliases.keys()]) |
|
374
|
|
|
raise XValueError(f"{main.lower()} not a permitted log level (allowed: {_permitted}") |
|
375
|
|
|
self.config_main(level=main) |
|
376
|
|
|
if path is not None or len(str(path)) == 0: |
|
377
|
|
|
match = _LOGGER_ARG_PATTERN.match(str(path)) |
|
378
|
|
|
path_level = "DEBUG" if match.group(1) is None else match.group(1) |
|
379
|
|
|
path = Path(match.group(2)) |
|
380
|
|
|
self.add_path(path, path_level) |
|
381
|
|
|
self.logger.info(f"Added logger to {path} at level {path_level}") |
|
382
|
|
|
self.logger.info(f"Set main log level to {main}") |
|
383
|
|
|
return self |
|
384
|
|
|
|
|
385
|
|
|
|
|
386
|
|
|
class LoguruUtils: |
|
|
|
|
|
|
387
|
|
|
@classmethod |
|
388
|
|
|
def force_streams_to_utf8(cls) -> None: |
|
|
|
|
|
|
389
|
|
|
# we warn the user about this in the docs! |
|
390
|
|
|
sys.stderr.reconfigure(encoding="utf-8") |
|
391
|
|
|
sys.stdout.reconfigure(encoding="utf-8") |
|
392
|
|
|
sys.stdin.reconfigure(encoding="utf-8") |
|
393
|
|
|
|
|
394
|
|
|
|
|
395
|
|
|
__all__ = ["Defaults", "FancyLoguru", "LoguruUtils", "HandlerInfo"] |
|
396
|
|
|
|