pocketutils.core.input_output.OpenMode.update()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
from __future__ import annotations
9
10
import abc
11
import contextlib
12
import functools
13
import logging
14
from dataclasses import dataclass
15
from types import TracebackType
16
from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, final
17
18
from pocketutils import ValueIllegalError
19
20
if TYPE_CHECKING:
21
    from io import StringIO
22
23
24
T_co = TypeVar("T_co", covariant=True)
25
logger = logging.getLogger("pocketutils")
26
27
28
class Writeable(Generic[T_co], metaclass=abc.ABCMeta):
29
    @classmethod
30
    def isinstance(cls: type[Self], value: T_co) -> bool:
31
        return hasattr(value, "write") and hasattr(value, "flush") and hasattr(value, "close")
32
33
    def write(self: Self, msg: T_co) -> int:
34
        raise NotImplementedError()
35
36
    def flush(self: Self) -> None:
37
        raise NotImplementedError()
38
39
    def close(self: Self) -> bool:
40
        raise NotImplementedError()
41
42
    def __enter__(self: Self) -> Self:
43
        return self
44
45
    def __exit__(self: Self, exc_type: BaseException, exc_value: BaseException, traceback: TracebackType) -> None:
46
        self.close()
47
48
49
@final
50
class DevNull(Writeable[T_co]):
51
    """Pretends to write but doesn't."""
52
53
    def write(self: Self, msg: T_co) -> int:
54
        return 0
55
56
    def flush(self: Self) -> None:
57
        pass
58
59
    def close(self: Self) -> None:
60
        pass
61
62
    def __enter__(self: Self) -> Self:
63
        return self
64
65
    def __exit__(self: Self, exc_type: BaseException, exc_value: BaseException, traceback: TracebackType) -> None:
66
        pass
67
68
69
class LogWriter(Writeable[str]):
70
    """
71
    A call to a logger at some level, pretending to be a writer.
72
    Has a write method, as well as flush and close methods that do nothing.
73
    """
74
75
    def __init__(self: Self, level: int | str) -> None:
76
        if isinstance(level, str):
77
            level = level.upper()
78
        self.level = logging.getLevelName(level)
79
80
    def __enter__(self: Self) -> Self:
81
        return self
82
83
    def __exit__(self: Self, exc_type: BaseException, exc_value: BaseException, traceback: TracebackType) -> None:
84
        self.close()
85
86
    def write(self: Self, msg: str) -> int:
87
        getattr(logger, self.level)(msg)
88
        return len(msg)
89
90
    def flush(self: Self) -> None:
91
        pass
92
93
    def close(self: Self) -> None:
94
        pass
95
96
97
class DelegatingWriter(Writeable):
98
    # we CANNOT override TextIOBase: It causes hangs
99
    def __init__(self: Self, *writers: Writeable) -> None:
100
        self._writers = writers
101
102
    def __enter__(self: Self) -> Self:
103
        return self
104
105
    def __exit__(self: Self, exc_type: BaseException, exc_value: BaseException, traceback: TracebackType) -> None:
106
        self.close()
107
108
    def write(self: Self, s: T_co) -> int:
109
        x = 0
110
        for writer in self._writers:
111
            x += writer.write(s)
112
        return x
113
114
    def flush(self: Self) -> None:
115
        for writer in self._writers:
116
            writer.flush()
117
118
    def close(self: Self) -> None:
119
        for writer in self._writers:
120
            writer.close()
121
122
123
@dataclass(frozen=True, slots=True)
124
class Capture:
125
    """
126
    A lazy string-like object that wraps around a StringIO result.
127
    It's too hard to fully subclass a string while keeping it lazy.
128
    """
129
130
    cio: StringIO
0 ignored issues
show
introduced by
The variable StringIO does not seem to be defined in case TYPE_CHECKING on line 20 is False. Are you sure this can never be the case?
Loading history...
131
132
    def __len__(self: Self) -> int:
133
        return len(repr(self))
134
135
    def __str__(self: Self) -> str:
136
        return self.cio.getvalue()
137
138
    def __repr__(self: Self) -> str:
139
        return self.cio.getvalue()
140
141
    @property
142
    def lines(self: Self) -> list[str]:
143
        return self.split("\n")
144
145
    @property
146
    def value(self: Self) -> str:
147
        return self.cio.getvalue()
148
149
    def split(self: Self, x: str) -> list[str]:
150
        return self.cio.getvalue().split(x)
151
152
153
@functools.total_ordering
154
class OpenMode(str):
155
    """
156
    Python file open modes (`open()`-compatible).
157
    Contains method :meth:`normalize` and properties :meth:`read`.
158
159
    Here are the flags:
160
        - 'r' means read
161
        - 'w' means overwrite
162
        - 'x' means exclusive write; complain if it exists
163
        - 'a' means append
164
        - 't' means text (default)
165
        - 'b' means binary
166
        - '+' means open for updating
167
    """
168
169
    def __new__(cls, s: str):
170
        for c in s:
171
            if c not in {"r", "w", "x", "a", "t", "b", "+", "U"}:
172
                msg = f"Invalid flag '{c}' in open mode '{s}'"
173
                raise ValueIllegalError(msg, value=s)
174
        if ("r" in s) + ("w" in s) + ("x" in s) + ("a" in s) > 1:
175
            raise ValueIllegalError(f"Too many 'r'/'w'/'x'/'a' flags in '{s}'", value=s)
176
        return str.__new__(cls, s)
177
178
    @property
179
    def read(self: Self) -> bool:
180
        return "w" not in self and "x" not in self and "a" not in self and "+" not in self
181
182
    @property
183
    def write(self: Self) -> bool:
184
        return ("w" in self or "x" in self or "a" in self) and "+" not in self
185
186
    @property
187
    def update(self: Self) -> bool:
188
        return "+" in self
189
190
    @property
191
    def overwrite(self: Self) -> bool:
192
        return "w" in self
193
194
    @property
195
    def safe(self: Self) -> bool:
196
        return "x" in self
197
198
    @property
199
    def append(self: Self) -> bool:
200
        return "a" in self
201
202
    @property
203
    def text(self: Self) -> bool:
204
        return "b" not in self
205
206
    @property
207
    def binary(self: Self) -> bool:
208
        return "b" in self
209
210
    def normalize(self: Self) -> Self:
211
        s = ""
212
        if self.append:
213
            s += "a"
214
        elif self.safe:
215
            s += "x"
216
        elif self.overwrite:
217
            s += "w"
218
        elif self.read:
219
            s += "r"
220
        if self.binary:
221
            s += "b"
222
        else:
223
            s += "t"
224
        if self.update:
225
            s += "+"
226
        return self.__class__(s)
227
228
229
def null_context():
230
    yield
231
232
233
def return_none_1_param(a: Any) -> None:
234
    return None
235
236
237
def return_none_2_params(a: Any, b: Any) -> None:
238
    return None
239
240
241
def return_none_3_params(a: Any, b: Any, c: Any) -> None:
242
    return None
243
244
245
@contextlib.contextmanager
246
def silenced(no_stdout: bool = True, no_stderr: bool = True):
247
    with contextlib.redirect_stdout(DevNull()) if no_stdout else null_context():
248
        with contextlib.redirect_stderr(DevNull()) if no_stderr else null_context():
249
            yield
250
251
252
__all__ = [
253
    "Writeable",
254
    "DevNull",
255
    "LogWriter",
256
    "DelegatingWriter",
257
    "Capture",
258
    "OpenMode",
259
    "return_none_1_param",
260
    "return_none_2_params",
261
    "return_none_3_params",
262
]
263