|
1
|
|
|
from __future__ import annotations |
|
2
|
|
|
|
|
3
|
|
|
import abc |
|
4
|
|
|
import enum |
|
5
|
|
|
import logging |
|
6
|
|
|
from collections import UserDict |
|
7
|
|
|
from typing import Any, Callable, Generic, Iterable, Sequence, Type, TypeVar |
|
8
|
|
|
|
|
9
|
|
|
# noinspection PyProtectedMember |
|
10
|
|
|
from pocketutils.core._internal import PathLike, PathLikeUtils |
|
11
|
|
|
from pocketutils.core._internal import look as _look |
|
12
|
|
|
from pocketutils.core.exceptions import ImmutableError |
|
13
|
|
|
|
|
14
|
|
|
T = TypeVar("T", covariant=True) |
|
15
|
|
|
|
|
16
|
|
|
logger = logging.getLogger("pocketutils") |
|
17
|
|
|
|
|
18
|
|
|
|
|
19
|
|
|
class Sentinel: |
|
20
|
|
|
""" |
|
21
|
|
|
A sentinel value tied to nothing more than a memory address. |
|
22
|
|
|
""" |
|
23
|
|
|
|
|
24
|
|
|
@classmethod |
|
25
|
|
|
def new(cls) -> Sentinel: |
|
26
|
|
|
return Sentinel() |
|
27
|
|
|
|
|
28
|
|
|
def __init__(self): |
|
29
|
|
|
pass |
|
30
|
|
|
|
|
31
|
|
|
|
|
32
|
|
|
V = TypeVar("V") |
|
33
|
|
|
|
|
34
|
|
|
|
|
35
|
|
|
class LazyWrapped(Generic[V], metaclass=abc.ABCMeta): |
|
36
|
|
|
def __init__(self): |
|
37
|
|
|
self._v, self._exists = None, False |
|
38
|
|
|
|
|
39
|
|
|
def get(self) -> V: |
|
40
|
|
|
if not self._exists: |
|
41
|
|
|
self._v = self._generate() |
|
42
|
|
|
self._exists = True |
|
43
|
|
|
return self._v |
|
44
|
|
|
|
|
45
|
|
|
@property |
|
46
|
|
|
def raw_value(self): |
|
47
|
|
|
return self._v |
|
48
|
|
|
|
|
49
|
|
|
@property |
|
50
|
|
|
def is_defined(self): |
|
51
|
|
|
return self._exists |
|
52
|
|
|
|
|
53
|
|
|
@property |
|
54
|
|
|
def _name(self): |
|
55
|
|
|
raise NotImplementedError() |
|
56
|
|
|
|
|
57
|
|
|
def _generate(self): |
|
58
|
|
|
raise NotImplementedError() |
|
59
|
|
|
|
|
60
|
|
|
def __repr__(self): |
|
61
|
|
|
return self._name + "[" + (repr(self._v) if self.is_defined else "⌀") + "]" |
|
62
|
|
|
|
|
63
|
|
|
def __str__(self): |
|
64
|
|
|
return self._name + "[" + (str(self._v) if self.is_defined else "⌀") + "]" |
|
65
|
|
|
|
|
66
|
|
|
def __eq__(self, other): |
|
67
|
|
|
return ( |
|
68
|
|
|
type(self) == type(other) |
|
69
|
|
|
and self.is_defined == other.is_defined |
|
70
|
|
|
and self.raw_value == other.raw_value |
|
71
|
|
|
) |
|
72
|
|
|
|
|
73
|
|
|
|
|
74
|
|
|
class PlainLazyWrapped(LazyWrapped, metaclass=abc.ABCMeta): |
|
75
|
|
|
pass |
|
76
|
|
|
|
|
77
|
|
|
|
|
78
|
|
|
class ClearableLazyWrapped(LazyWrapped, metaclass=abc.ABCMeta): |
|
79
|
|
|
def clear(self) -> None: |
|
80
|
|
|
self._exists = False |
|
81
|
|
|
|
|
82
|
|
|
|
|
83
|
|
|
class LazyWrap: |
|
84
|
|
|
@classmethod |
|
85
|
|
|
def new_type(cls, dtype: str, generator: Callable[[], V]) -> Type[PlainLazyWrapped]: |
|
86
|
|
|
# noinspection PyTypeChecker |
|
87
|
|
|
return cls._new_type(dtype, generator, PlainLazyWrapped) |
|
88
|
|
|
|
|
89
|
|
|
@classmethod |
|
90
|
|
|
def new_clearable_type( |
|
91
|
|
|
cls, dtype: str, generator: Callable[[], V] |
|
92
|
|
|
) -> Type[ClearableLazyWrapped]: |
|
93
|
|
|
# noinspection PyTypeChecker |
|
94
|
|
|
return cls._new_type(dtype, generator, ClearableLazyWrapped) |
|
95
|
|
|
|
|
96
|
|
|
@classmethod |
|
97
|
|
|
def _new_type( |
|
98
|
|
|
cls, dtype: str, generator: Callable[[], V], superclass: Type[LazyWrapped] |
|
99
|
|
|
) -> Type[LazyWrapped]: |
|
100
|
|
|
""" |
|
101
|
|
|
Creates a new mutable wrapped type. |
|
102
|
|
|
|
|
103
|
|
|
Example: |
|
104
|
|
|
>>> LazyRemoteTime = LazyWrap.new_type('RemoteTime', lambda: ...) |
|
105
|
|
|
>>> dt = LazyRemoteTime() # nothing happens |
|
106
|
|
|
>>> dt.get() # has a value |
|
107
|
|
|
|
|
108
|
|
|
Args: |
|
109
|
|
|
dtype: The name of the data type, such as 'datetime' if generator=datetime.now |
|
110
|
|
|
generator: This is called to (lazily) initialize an instance of the LazyWrapped |
|
111
|
|
|
|
|
112
|
|
|
Returns: |
|
113
|
|
|
A new class subclassing LazyWrapped |
|
114
|
|
|
""" |
|
115
|
|
|
|
|
116
|
|
|
class X(superclass): |
|
117
|
|
|
@property |
|
118
|
|
|
def _name(self): |
|
119
|
|
|
return dtype |
|
120
|
|
|
|
|
121
|
|
|
def _generate(self): |
|
122
|
|
|
return generator() |
|
123
|
|
|
|
|
124
|
|
|
X.__name__ = superclass.__name__ + dtype |
|
125
|
|
|
return X |
|
126
|
|
|
|
|
127
|
|
|
|
|
128
|
|
|
class DictNamespace(UserDict): |
|
129
|
|
|
""" |
|
130
|
|
|
Behaves like a dict and a ``SimpleNamespace``. |
|
131
|
|
|
This means it has a length, can be iterated over, etc., and can be accessed via ``.``. |
|
132
|
|
|
""" |
|
133
|
|
|
|
|
134
|
|
|
def __init__(self, /, **kwargs): |
|
|
|
|
|
|
135
|
|
|
super().__init__(**kwargs) |
|
136
|
|
|
self.__dict__.update(kwargs) |
|
137
|
|
|
|
|
138
|
|
|
def __eq__(self, other): |
|
139
|
|
|
if isinstance(self, DictNamespace) and isinstance(other, DictNamespace): |
|
140
|
|
|
return self.__dict__ == other.__dict__ |
|
141
|
|
|
return NotImplemented |
|
142
|
|
|
|
|
143
|
|
|
|
|
144
|
|
|
class SmartEnum(enum.Enum): |
|
145
|
|
|
""" |
|
146
|
|
|
An enum with a classmethod ``of`` that parses a string of the member's name. |
|
147
|
|
|
""" |
|
148
|
|
|
|
|
149
|
|
|
@classmethod |
|
150
|
|
|
def of(cls, v): |
|
151
|
|
|
""" |
|
152
|
|
|
Returns the member of this enum class from a string with the member's name, |
|
153
|
|
|
case-insensitive and stripping whitespace. |
|
154
|
|
|
Will return ``v`` if ``v`` is already an instance of this class. |
|
155
|
|
|
""" |
|
156
|
|
|
if isinstance(v, cls): |
|
157
|
|
|
return v |
|
158
|
|
|
elif isinstance(v, str): |
|
159
|
|
|
if v in cls: |
|
160
|
|
|
return cls[v.upper().strip()] |
|
161
|
|
|
else: |
|
162
|
|
|
# in case the names are lowercase |
|
163
|
|
|
# noinspection PyTypeChecker |
|
164
|
|
|
for e in cls: |
|
165
|
|
|
if e.name.lower().strip() == v: |
|
166
|
|
|
return e |
|
167
|
|
|
raise LookupError(f"{v} not found in {str(cls)}") |
|
168
|
|
|
else: |
|
169
|
|
|
raise TypeError(str(type(v))) |
|
170
|
|
|
|
|
171
|
|
|
|
|
172
|
|
|
# noinspection PyPep8Naming |
|
173
|
|
|
class frozenlist(Sequence): |
|
174
|
|
|
""" |
|
175
|
|
|
An immutable sequence backed by a list. |
|
176
|
|
|
The sole advantage over a tuple is the list-like __str__ with square brackets, which may be less confusing to a user. |
|
177
|
|
|
""" |
|
178
|
|
|
|
|
179
|
|
|
def __init__(self, items: Iterable[T]): |
|
180
|
|
|
self.__items = list(items) |
|
181
|
|
|
|
|
182
|
|
|
def __getitem__(self, item) -> T: |
|
183
|
|
|
if isinstance(item, int): |
|
184
|
|
|
return self.__items[item] |
|
185
|
|
|
else: |
|
186
|
|
|
return frozenlist(self.__items[item]) |
|
187
|
|
|
|
|
188
|
|
|
def __setitem__(self, key, value): |
|
189
|
|
|
raise ImmutableError() |
|
190
|
|
|
|
|
191
|
|
|
def __len__(self) -> int: |
|
192
|
|
|
return len(self.__items) |
|
193
|
|
|
|
|
194
|
|
|
def __repr__(self): |
|
195
|
|
|
return repr(self.__items) |
|
196
|
|
|
|
|
197
|
|
|
def __eq__(self, other): |
|
198
|
|
|
return type(self) == type(other) and self.__items == other.__items |
|
199
|
|
|
|
|
200
|
|
|
def __str__(self): |
|
201
|
|
|
return repr(self.__items) |
|
202
|
|
|
|
|
203
|
|
|
|
|
204
|
|
|
class OptRow: |
|
205
|
|
|
""" |
|
206
|
|
|
Short for 'optional row'. |
|
207
|
|
|
A wrapper around a NamedTuple that returns None if the key doesn't exist. |
|
208
|
|
|
This is intended for Pandas itertuples(). |
|
209
|
|
|
""" |
|
210
|
|
|
|
|
211
|
|
|
def __init__(self, row): |
|
212
|
|
|
self._row = row |
|
213
|
|
|
|
|
214
|
|
|
def __getattr__(self, item: str) -> Any: |
|
215
|
|
|
try: |
|
216
|
|
|
return getattr(self._row, item) |
|
217
|
|
|
except AttributeError: |
|
218
|
|
|
return None |
|
219
|
|
|
|
|
220
|
|
|
def opt(self, item: str, look=None) -> Any: |
|
221
|
|
|
x = getattr(self, item) |
|
222
|
|
|
if x is None: |
|
223
|
|
|
return None |
|
224
|
|
|
return _look(x, look) |
|
225
|
|
|
|
|
226
|
|
|
def req(self, item: str, look=None) -> Any: |
|
227
|
|
|
x = getattr(self._row, item) |
|
228
|
|
|
if x is None: |
|
229
|
|
|
return None |
|
230
|
|
|
return _look(x, look) |
|
231
|
|
|
|
|
232
|
|
|
def __contains__(self, item): |
|
233
|
|
|
try: |
|
234
|
|
|
getattr(self._row, item) |
|
235
|
|
|
return True |
|
236
|
|
|
except AttributeError: |
|
237
|
|
|
return False |
|
238
|
|
|
|
|
239
|
|
|
def items(self): |
|
240
|
|
|
# noinspection PyProtectedMember |
|
241
|
|
|
return self._row._asdict() |
|
242
|
|
|
|
|
243
|
|
|
def keys(self): |
|
244
|
|
|
# noinspection PyProtectedMember |
|
245
|
|
|
return self._row._asdict().keys() |
|
246
|
|
|
|
|
247
|
|
|
def values(self): |
|
248
|
|
|
# noinspection PyProtectedMember |
|
249
|
|
|
return self._row._asdict().values() |
|
250
|
|
|
|
|
251
|
|
|
def __repr__(self): |
|
252
|
|
|
return self.__class__.__name__ + "@" + hex(id(self)) |
|
253
|
|
|
|
|
254
|
|
|
def __str__(self): |
|
255
|
|
|
return self.__class__.__name__ |
|
256
|
|
|
|
|
257
|
|
|
def __eq__(self, other): |
|
258
|
|
|
# noinspection PyProtectedMember |
|
259
|
|
|
return self._row == other._row |
|
260
|
|
|
|
|
261
|
|
|
|
|
262
|
|
|
__all__ = [ |
|
263
|
|
|
"Sentinel", |
|
264
|
|
|
"SmartEnum", |
|
265
|
|
|
"frozenlist", |
|
266
|
|
|
"PathLike", |
|
267
|
|
|
"PathLikeUtils", |
|
268
|
|
|
"OptRow", |
|
269
|
|
|
"LazyWrap", |
|
270
|
|
|
"DictNamespace", |
|
271
|
|
|
] |
|
272
|
|
|
|