1
|
|
|
import logging |
|
|
|
|
2
|
|
|
import sys |
3
|
|
|
from contextlib import contextmanager |
4
|
|
|
from typing import ( |
5
|
|
|
Any, |
6
|
|
|
ByteString, |
7
|
|
|
Callable, |
8
|
|
|
Generator, |
9
|
|
|
Iterable, |
10
|
|
|
Iterator, |
11
|
|
|
List, |
12
|
|
|
Optional, |
13
|
|
|
Tuple, |
14
|
|
|
TypeVar, |
15
|
|
|
Union, |
16
|
|
|
) |
17
|
|
|
|
18
|
|
|
from pocketutils.core._internal import look as _look |
|
|
|
|
19
|
|
|
from pocketutils.core.exceptions import LengthMismatchError, MultipleMatchesError, XTypeError |
|
|
|
|
20
|
|
|
from pocketutils.core.input_output import Writeable |
|
|
|
|
21
|
|
|
|
22
|
|
|
logger = logging.getLogger("pocketutils") |
23
|
|
|
Y = TypeVar("Y") |
|
|
|
|
24
|
|
|
T = TypeVar("T") |
|
|
|
|
25
|
|
|
Z = TypeVar("Z") |
|
|
|
|
26
|
|
|
|
27
|
|
|
|
28
|
|
|
class BaseTools: |
|
|
|
|
29
|
|
|
@classmethod |
30
|
|
|
def is_lambda(cls, function: Any) -> bool: |
31
|
|
|
""" |
32
|
|
|
Returns whether this is a lambda function. Will return False for non-callables. |
33
|
|
|
""" |
34
|
|
|
# noinspection PyPep8Naming |
35
|
|
|
LAMBDA = lambda: 0 # noqa: E731 |
|
|
|
|
36
|
|
|
if not hasattr(function, "__name__"): |
37
|
|
|
return False # not a function |
38
|
|
|
return ( |
39
|
|
|
isinstance(function, type(LAMBDA)) |
40
|
|
|
and function.__name__ == LAMBDA.__name__ |
41
|
|
|
or str(function).startswith("<function <lambda> at ") |
42
|
|
|
and str(function).endswith(">") |
43
|
|
|
) |
44
|
|
|
|
45
|
|
|
@classmethod |
46
|
|
|
def only( |
47
|
|
|
cls, |
|
|
|
|
48
|
|
|
sequence: Iterable[Any], |
|
|
|
|
49
|
|
|
condition: Union[str, Callable[[Any], bool]] = None, |
|
|
|
|
50
|
|
|
name: str = "collection", |
|
|
|
|
51
|
|
|
) -> Any: |
52
|
|
|
""" |
53
|
|
|
Returns either the SINGLE (ONLY) UNIQUE ITEM in the sequence or raises an exception. |
54
|
|
|
Each item must have __hash__ defined on it. |
55
|
|
|
|
56
|
|
|
Args: |
57
|
|
|
sequence: A list of any items (untyped) |
58
|
|
|
condition: If nonnull, consider only those matching this condition |
59
|
|
|
name: Just a name for the collection to use in an error message |
60
|
|
|
|
61
|
|
|
Returns: |
62
|
|
|
The first item the sequence. |
63
|
|
|
|
64
|
|
|
Raises: |
65
|
|
|
LookupError If the sequence is empty |
66
|
|
|
MultipleMatchesError If there is more than one unique item. |
67
|
|
|
""" |
68
|
|
|
|
69
|
|
|
def _only(sq): |
|
|
|
|
70
|
|
|
st = set(sq) |
|
|
|
|
71
|
|
|
if len(st) > 1: |
72
|
|
|
raise MultipleMatchesError("More then 1 item in " + str(name)) |
73
|
|
|
if len(st) == 0: |
74
|
|
|
raise LookupError("Empty " + str(name)) |
75
|
|
|
return next(iter(st)) |
76
|
|
|
|
77
|
|
|
if condition and isinstance(condition, str): |
|
|
|
|
78
|
|
|
return _only( |
79
|
|
|
[ |
80
|
|
|
s |
81
|
|
|
for s in sequence |
82
|
|
|
if ( |
83
|
|
|
not getattr(s, condition[1:]) |
84
|
|
|
if condition.startswith("!") |
85
|
|
|
else getattr(s, condition) |
86
|
|
|
) |
87
|
|
|
] |
88
|
|
|
) |
89
|
|
|
elif condition: |
90
|
|
|
return _only([s for s in sequence if condition(s)]) |
91
|
|
|
else: |
92
|
|
|
return _only(sequence) |
93
|
|
|
|
94
|
|
|
@classmethod |
95
|
|
|
def zip_strict(cls, *args: Iterable[Any]) -> Generator[Tuple[Any], None, None]: |
96
|
|
|
""" |
97
|
|
|
Same as zip(), but raises an IndexError if the lengths don't match. |
98
|
|
|
|
99
|
|
|
Args: |
100
|
|
|
args: Same as with ``zip`` |
101
|
|
|
|
102
|
|
|
Yields: |
103
|
|
|
Tuples corresponding to the input items |
104
|
|
|
|
105
|
|
|
Raises: |
106
|
|
|
LengthMismatchError: If the lengths of the input iterators don't match |
107
|
|
|
""" |
108
|
|
|
# we need to catch these cases before or they'll fail |
109
|
|
|
# in particular, 1 element would fail with a LengthMismatchError |
110
|
|
|
# and 0 elements would loop forever |
111
|
|
|
if len(args) < 2: |
112
|
|
|
yield from zip(*args) |
113
|
|
|
return |
114
|
|
|
iters = [iter(axis) for axis in args] |
115
|
|
|
n_elements = 0 |
116
|
|
|
failures = [] |
117
|
|
|
while len(failures) == 0: |
118
|
|
|
values = [] |
119
|
|
|
failures = [] |
120
|
|
|
for axis, iterator in enumerate(iters): |
121
|
|
|
try: |
122
|
|
|
values.append(next(iterator)) |
123
|
|
|
except StopIteration: |
124
|
|
|
failures.append(axis) |
125
|
|
|
if len(failures) == 0: |
126
|
|
|
yield tuple(values) |
127
|
|
|
elif len(failures) == 1: |
128
|
|
|
raise LengthMismatchError( |
129
|
|
|
f"Too few elements ({n_elements}) along axis {failures[0]}" |
130
|
|
|
) |
131
|
|
|
elif len(failures) < len(iters): |
132
|
|
|
raise LengthMismatchError(f"Too few elements ({n_elements}) along axes {failures}") |
133
|
|
|
n_elements += 1 |
134
|
|
|
|
135
|
|
|
@classmethod |
136
|
|
|
def zip_list(cls, *args) -> List[Tuple[Any]]: |
137
|
|
|
""" |
138
|
|
|
Same as :meth:`zip_strict`, but more informative. |
139
|
|
|
Converts to a list and can provide a more detailed error message. |
140
|
|
|
Zips two sequences into a list of tuples and raises an |
141
|
|
|
IndexError if the lengths don't match. |
142
|
|
|
|
143
|
|
|
Args: |
144
|
|
|
args: Same as with ``zip`` |
145
|
|
|
|
146
|
|
|
Yields: |
147
|
|
|
Tuples corresponding to the input items |
148
|
|
|
|
149
|
|
|
Raises: |
150
|
|
|
LengthMismatchError: If the lengths of the input iterators don't match |
151
|
|
|
""" |
152
|
|
|
try: |
153
|
|
|
return list(cls.zip_strict(*args)) |
154
|
|
|
except LengthMismatchError: |
155
|
|
|
raise LengthMismatchError( |
156
|
|
|
"Length mismatch in zip_strict: Sizes are {}".format([len(x) for x in args]) |
157
|
|
|
) from None |
158
|
|
|
|
159
|
|
|
@classmethod |
160
|
|
|
def forever(cls) -> Iterator[int]: |
161
|
|
|
""" |
162
|
|
|
Yields i for i in range(0, infinity). |
163
|
|
|
Useful for simplifying a i = 0; while True: i += 1 block. |
164
|
|
|
""" |
165
|
|
|
i = 0 |
166
|
|
|
while True: |
167
|
|
|
yield i |
168
|
|
|
i += 1 |
169
|
|
|
|
170
|
|
|
@classmethod |
171
|
|
|
def to_true_iterable(cls, s: Any) -> Iterable[Any]: |
|
|
|
|
172
|
|
|
""" |
173
|
|
|
See :meth:`is_true_iterable`. |
174
|
|
|
|
175
|
|
|
Examples: |
176
|
|
|
- ``to_true_iterable('abc') # ['abc']`` |
177
|
|
|
- ``to_true_iterable(['ab', 'cd')] # ['ab', 'cd']`` |
178
|
|
|
""" |
179
|
|
|
if BaseTools.is_true_iterable(s): |
|
|
|
|
180
|
|
|
return s |
181
|
|
|
else: |
182
|
|
|
return [s] |
183
|
|
|
|
184
|
|
|
@classmethod |
185
|
|
|
def is_true_iterable(cls, s: Any) -> bool: |
|
|
|
|
186
|
|
|
""" |
187
|
|
|
Returns whether ``s`` is a probably "proper" iterable. |
188
|
|
|
In other words, iterable but not a string or bytes. |
189
|
|
|
|
190
|
|
|
.. caution:: |
191
|
|
|
This is not fully reliable. |
192
|
|
|
Types that do not define ``__iter__`` but are iterable |
193
|
|
|
via ``__getitem__`` will not be included. |
194
|
|
|
""" |
195
|
|
|
return ( |
196
|
|
|
s is not None |
197
|
|
|
and isinstance(s, Iterable) |
|
|
|
|
198
|
|
|
and not isinstance(s, str) |
199
|
|
|
and not isinstance(s, ByteString) |
|
|
|
|
200
|
|
|
) |
201
|
|
|
|
202
|
|
|
@classmethod |
203
|
|
|
@contextmanager |
204
|
|
|
def null_context(cls) -> Generator[None, None, None]: |
205
|
|
|
""" |
206
|
|
|
Returns an empty context (literally just yields). |
207
|
|
|
Useful to simplify when a generator needs to be used depending on a switch. |
208
|
|
|
Ex:: |
209
|
|
|
if verbose_flag: |
210
|
|
|
do_something() |
211
|
|
|
else: |
212
|
|
|
with Tools.silenced(): |
213
|
|
|
do_something() |
214
|
|
|
Can become:: |
215
|
|
|
with (Tools.null_context() if verbose else Tools.silenced()): |
216
|
|
|
do_something() |
217
|
|
|
""" |
218
|
|
|
yield |
219
|
|
|
|
220
|
|
|
@classmethod |
221
|
|
|
def look(cls, obj: Y, attrs: Union[str, Iterable[str], Callable[[Y], Z]]) -> Optional[Z]: |
222
|
|
|
""" |
223
|
|
|
Follows a dotted syntax for getting an item nested in class attributes. |
224
|
|
|
Returns the value of a chain of attributes on object ``obj``, |
225
|
|
|
or None any object in that chain is None or lacks the next attribute. |
226
|
|
|
|
227
|
|
|
Example: |
228
|
|
|
Get a kitten's breed:: |
229
|
|
|
|
230
|
|
|
BaseTools.look(kitten), 'breed.name') # either None or a string |
231
|
|
|
|
232
|
|
|
Args: |
233
|
|
|
obj: Any object |
234
|
|
|
attrs: One of: |
235
|
|
|
- A string in the form attr1.attr2, translating to ``obj.attr1`` |
236
|
|
|
- An iterable of strings of the attributes |
237
|
|
|
- A function that maps ``obj`` to its output; |
238
|
|
|
equivalent to calling `attrs(obj)` but returning None on ``AttributeError``. |
239
|
|
|
|
240
|
|
|
Returns: |
241
|
|
|
Either None or the type of the attribute |
242
|
|
|
|
243
|
|
|
Raises: |
244
|
|
|
TypeError: |
245
|
|
|
""" |
246
|
|
|
return _look(obj, attrs) |
247
|
|
|
|
248
|
|
|
@classmethod |
249
|
|
|
def make_writer(cls, writer: Union[Writeable, Callable[[str], Any]]): |
|
|
|
|
250
|
|
|
if Writeable.isinstance(writer): |
|
|
|
|
251
|
|
|
return writer |
252
|
|
|
elif callable(writer): |
253
|
|
|
|
254
|
|
|
class W_(Writeable): |
|
|
|
|
255
|
|
|
def write(self, msg): |
256
|
|
|
writer(msg) |
257
|
|
|
|
258
|
|
|
def flush(self): |
259
|
|
|
pass |
260
|
|
|
|
261
|
|
|
def close(self): |
262
|
|
|
pass |
263
|
|
|
|
264
|
|
|
return W_() |
265
|
|
|
raise XTypeError(f"{type(writer)} cannot be wrapped into a Writeable") |
266
|
|
|
|
267
|
|
|
@classmethod |
268
|
|
|
def get_log_function( |
|
|
|
|
269
|
|
|
cls, log: Union[None, str, Callable[[str], None], Any] |
|
|
|
|
270
|
|
|
) -> Callable[[str], None]: |
271
|
|
|
""" |
272
|
|
|
Gets a logging function from user input. |
273
|
|
|
The rules are: |
274
|
|
|
- If None, uses logger.info |
275
|
|
|
- If 'print' or 'stdout', use sys.stdout.write |
276
|
|
|
- If 'stderr', use sys.stderr.write |
277
|
|
|
- If another str or int, try using that logger level (raises an error if invalid) |
278
|
|
|
- If callable, returns it |
279
|
|
|
- If it has a callable method called 'write', uses that |
280
|
|
|
|
281
|
|
|
Returns: |
282
|
|
|
A function of the log message that returns None |
283
|
|
|
""" |
284
|
|
|
if log is None: |
|
|
|
|
285
|
|
|
return logger.info |
286
|
|
|
elif isinstance(log, str) and log.lower() in ["print", "stdout"]: |
287
|
|
|
# noinspection PyTypeChecker |
288
|
|
|
return sys.stdout.write |
289
|
|
|
elif log == "stderr": |
290
|
|
|
# noinspection PyTypeChecker |
291
|
|
|
return sys.stderr.write |
292
|
|
|
elif isinstance(log, int): |
293
|
|
|
return getattr(logger, logging.getLevelName(log).lower()) |
294
|
|
|
elif isinstance(log, str): |
295
|
|
|
return getattr(logger, log.lower()) |
296
|
|
|
elif callable(log): |
297
|
|
|
return log |
298
|
|
|
elif hasattr(log, "write"): |
299
|
|
|
return log.write |
300
|
|
|
else: |
301
|
|
|
raise XTypeError(f"Log type {type(log)} not known", actual=str(type(log))) |
302
|
|
|
|
303
|
|
|
def __repr__(self): |
304
|
|
|
return self.__class__.__name__ |
305
|
|
|
|
306
|
|
|
def __str__(self): |
307
|
|
|
return self.__class__.__name__ |
308
|
|
|
|
309
|
|
|
|
310
|
|
|
__all__ = ["BaseTools"] |
311
|
|
|
|