Passed
Push — main ( 87238c...9f1476 )
by Douglas
02:33
created

pocketutils.tools.io_tools   C

Complexity

Total Complexity 55

Size/Duplication

Total Lines 333
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 55
eloc 224
dl 0
loc 333
rs 6
c 0
b 0
f 0

40 Methods

Rating   Name   Duplication   Size   Complexity  
A Base32HexBech.name() 0 3 1
A Base32HexBechTilde.name() 0 3 1
A Base64UrlTilde.name() 0 3 1
A Base16.encode() 0 2 1
A Base64Url.decode() 0 2 1
A IoUtils.hash_digest() 0 20 5
A Enc.__repr__() 0 2 1
A Base2048.encode() 0 2 1
A Base32HexTilde.decode() 0 2 1
A Enc.__lt__() 0 2 1
A Base32Bech.name() 0 3 1
A Enc.__eq__() 0 2 1
A Base32Hex.encode() 0 2 1
A Base64Url.encode() 0 2 1
A IoUtils.get_encoding_errors() 0 22 3
A Base32HexTilde.encode() 0 2 1
A IoUtils.decode() 0 5 2
A _Alphabet32Enc.decode() 0 4 1
A Base64UrlTilde.encode() 0 2 1
A Base16.decode() 0 2 1
A Enc.__reduce__() 0 2 1
A _Alphabet32Enc.encode() 0 4 1
A Base85.encode() 0 2 1
A Base32.encode() 0 2 1
A Enc.name() 0 3 1
A Base2048.decode() 0 2 1
A Enc.decode() 0 2 1
A Base32Hex.decode() 0 2 1
A Enc.__str__() 0 2 1
A Base64.decode() 0 2 1
A Base85.decode() 0 2 1
A Enc.__hash__() 0 2 1
A IoUtils.encode() 0 5 2
A Base32BechTilde.name() 0 3 1
A IoUtils.devnull() 0 10 1
A Base64UrlTilde.decode() 0 2 1
A Base32.decode() 0 2 1
B IoUtils.get_encoding() 0 21 8
A Base64.encode() 0 2 1
A Enc.encode() 0 2 1

How to fix   Complexity   

Complexity

Complex classes like pocketutils.tools.io_tools often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
import abc
9
import base64
10
import binascii
11
import functools
12
import hashlib
13
import os
14
import sys
15
from dataclasses import dataclass
16
from typing import Literal, Self, SupportsBytes
17
18
from pocketutils.core.input_output import DevNull
19
20
try:
21
    import base2048
22
except ImportError:
23
    base2048 = None
24
25
__all__ = ["IoUtils", "IoTools"]
26
27
28
Encoding = (
29
    Literal["2048"]  # for fun
30
    | Literal["base85"]
31
    | Literal["base64"]
32
    | Literal["base64url"]
33
    | Literal["base64url-tilde"]
34
    | Literal["base32"]
35
    | Literal["base32-tilde"]
36
    | Literal["base32hex"]
37
    | Literal["base32hex-tilde"]
38
    | Literal["base32-bech32"]
39
    | Literal["base32-bech32-tilde"]
40
    | Literal["base32hex-bech32"]
41
    | Literal["base32hex-bech32-tilde"]
42
    | Literal["base16"]
43
    | Literal["hex"]
44
)
45
46
HashAlgorithm = (
47
    Literal["shake_256"]
48
    | Literal["shake_128"]
49
    | Literal["sha3_512"]
50
    | Literal["sha3_384"]
51
    | Literal["sha3_256"]
52
    | Literal["sha3_224"]
53
    | Literal["sha2_512"]
54
    | Literal["sha2_256"]
55
    | Literal["sha2_224"]
56
    | Literal["sha1"]
57
    | Literal["md5"]
58
    | Literal["crc32"]
59
)
60
61
62
@functools.total_ordering
63
class Enc(metaclass=abc.ABCMeta):
64
    def __reduce__(self: Self) -> str:
65
        return self.name
66
67
    def __hash__(self: Self) -> int:
68
        return hash(self.name)
69
70
    def __eq__(self: Self, other: Self) -> bool:
71
        return self.name == other.name
72
73
    def __lt__(self: Self, other: Self) -> bool:
74
        return self.name < other.name
75
76
    def __str__(self: Self) -> str:
77
        return self.name
78
79
    def __repr__(self: Self) -> str:
80
        return self.name
81
82
    @property
83
    def name(self: Self) -> str:
84
        return self.__class__.__name__.lower()
85
86
    def encode(self: Self, d: bytes) -> str:
87
        raise NotImplementedError()
88
89
    def decode(self: Self, d: str) -> bytes:
90
        raise NotImplementedError()
91
92
93
class Base16(Enc):
94
    def encode(self: Self, d: bytes) -> str:
95
        return base64.b16encode(d).decode(encoding="ascii")
96
97
    def decode(self: Self, d: str) -> bytes:
98
        return base64.b16decode(d)
99
100
101
class Base32(Enc):
102
    def encode(self: Self, d: bytes) -> str:
103
        return base64.b32encode(d).decode(encoding="ascii")
104
105
    def decode(self: Self, d: str) -> bytes:
106
        return base64.b32decode(d)
107
108
109
class Base64(Enc):
110
    def encode(self: Self, d: bytes) -> str:
111
        return base64.standard_b64encode(d).decode(encoding="ascii")
112
113
    def decode(self: Self, d: str) -> bytes:
114
        return base64.standard_b64decode(d)
115
116
117
class Base64Url(Enc):
118
    def encode(self: Self, d: bytes) -> str:
119
        return base64.urlsafe_b64encode(d).decode(encoding="ascii")
120
121
    def decode(self: Self, d: str) -> bytes:
122
        return base64.urlsafe_b64decode(d)
123
124
125
class Base32Hex(Enc):
126
    def encode(self: Self, d: bytes) -> str:
127
        return base64.b32hexencode(d).decode(encoding="ascii")
128
129
    def decode(self: Self, d: str) -> bytes:
130
        return base64.b32hexdecode(d)
131
132
133
class Base32HexTilde(Enc):
134
    def encode(self: Self, d: bytes) -> str:
135
        return base64.b32hexencode(d).decode(encoding="ascii").replace("=", "~")
136
137
    def decode(self: Self, d: str) -> bytes:
138
        return base64.b32hexdecode(d.replace("=", "~"))
139
140
141
class Base2048(Enc):
142
    def encode(self: Self, d: bytes) -> str:
143
        return base2048.encode(d)
144
145
    def decode(self: Self, d: str) -> bytes:
146
        return base2048.decode(d)
147
148
149
class _Alphabet32Enc(Enc):
150
    _base32_alphabet = ""
151
    _to_alphabet = ""
152
153
    def encode(self: Self, d: bytes) -> str:
154
        encoded = base64.b32encode(d).decode(encoding="ascii")
155
        alphabet = dict(zip(self._base32_alphabet, self._to_alphabet))
156
        return "".join([alphabet[v] for v in encoded])
157
158
    def decode(self: Self, d: str) -> bytes:
159
        alphabet = dict(zip(self._to_alphabet, self._base32_alphabet))
160
        d = "".join([alphabet[v] for v in d])
161
        return base64.b32hexdecode(d)
162
163
164
class Base32Bech(_Alphabet32Enc):
165
    _base32_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567="
166
    _to_alphabet = "qpzry9x8gf2tvdw0s3jn54khce6mua7l="
167
168
    @property
169
    def name(self: Self) -> str:
170
        return "base32-bech"
171
172
173
class Base32BechTilde(_Alphabet32Enc):
174
    _base32_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567="
175
    _to_alphabet = "qpzry9x8gf2tvdw0s3jn54khce6mua7l~"
176
177
    @property
178
    def name(self: Self) -> str:
179
        return "base32-bech-tilde"
180
181
182
class Base32HexBech(_Alphabet32Enc):
183
    _base32_alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUV="
184
    _to_alphabet = "023456789acdefghjklmnpqrstuvwxyz="
185
186
    @property
187
    def name(self: Self) -> str:
188
        return "base32hex-bech"
189
190
191
class Base32HexBechTilde(_Alphabet32Enc):
192
    _base32_alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUV="
193
    _to_alphabet = "023456789acdefghjklmnpqrstuvwxyz~"
194
195
    @property
196
    def name(self: Self) -> str:
197
        return "base32hex-bech-tilde"
198
199
200
class Base64UrlTilde(Enc):
201
    @property
202
    def name(self: Self) -> str:
203
        return "base64url-tilde"
204
205
    def encode(self: Self, d: bytes) -> str:
206
        return base64.urlsafe_b64encode(d).decode(encoding="ascii").replace("=", "~")
207
208
    def decode(self: Self, d: str) -> bytes:
209
        return base64.urlsafe_b64decode(d.replace("~", "="))
210
211
212
class Base85(Enc):
213
    def encode(self: Self, d: bytes) -> str:
214
        return base64.b85encode(d).decode(encoding="ascii")
215
216
    def decode(self: Self, d: str) -> bytes:
217
        return base64.b85decode(d)
218
219
220
ENCODINGS = {
221
    e.name: e
222
    for e in [
223
        Base16,
224
        Base32,
225
        Base32Hex,
226
        Base32HexTilde,
227
        Base32Bech,
228
        Base32HexBech,
229
        Base32BechTilde,
230
        Base32HexBechTilde,
231
        Base64,
232
        Base64Url,
233
        Base64UrlTilde,
234
        Base85,
235
        Base2048,
236
    ]
237
}
238
239
240
@dataclass(slots=True, frozen=True)
241
class IoUtils:
242
    def get_encoding(self: Self, encoding: str = "utf-8") -> str:
243
        """
244
        Returns a text encoding from a more flexible string.
245
        Ignores hyphens and lowercases the string.
246
        Permits these nonstandard shorthands:
247
248
          - `"platform"`: use `sys.getdefaultencoding()` on the fly
249
          - `"utf-8(bom)"`: use `"utf-8-sig"` on Windows; `"utf-8"` otherwise
250
          - `"utf-16(bom)"`: use `"utf-16-sig"` on Windows; `"utf-16"` otherwise
251
          - `"utf-32(bom)"`: use `"utf-32-sig"` on Windows; `"utf-32"` otherwise
252
        """
253
        encoding = encoding.lower().replace("-", "")
254
        if encoding == "platform":
255
            encoding = sys.getdefaultencoding()
256
        if encoding == "utf-8(bom)":
257
            encoding = "utf-8-sig" if os.name == "nt" else "utf-8"
258
        if encoding == "utf-16(bom)":
259
            encoding = "utf-16-sig" if os.name == "nt" else "utf-16"
260
        if encoding == "utf-32(bom)":
261
            encoding = "utf-32-sig" if os.name == "nt" else "utf-32"
262
        return encoding
263
264
    def get_encoding_errors(self: Self, errors: str | None) -> str | None:
265
        """
266
        Returns the value passed as`errors=` in `open`.
267
268
        Raises:
269
            ValueError: If invalid
270
        """
271
        if errors is None:
272
            return "strict"
273
        if errors in (
274
            "strict",
275
            "ignore",
276
            "replace",
277
            "xmlcharrefreplace",
278
            "backslashreplace",
279
            "namereplace",
280
            "surrogateescape",
281
            "surrogatepass",
282
        ):
283
            return errors
284
        msg = f"Invalid value {errors} for errors"
285
        raise ValueError(msg)
286
287
    def hash_digest(
288
        self: Self,
289
        data: SupportsBytes | str,
290
        algorithm: HashAlgorithm,
291
        *,
292
        digest_length: int | None = None,
293
        **kwargs,
294
    ) -> bytes:
295
        kwargs = {"usedforsecurity": False} | kwargs
296
        x = data.encode("utf-8") if isinstance(data, str) else bytes(data)
297
        if algorithm == "crc32":
298
            return bytes(binascii.crc32(x))
299
        elif algorithm.startswith("shake_"):
300
            m = getattr(hashlib, algorithm)(**kwargs)
301
            m.update(x)
302
            return m.digest(128 if digest_length is None else digest_length)
303
        else:
304
            m = hashlib.new(algorithm, **kwargs)
305
            m.update(x)
306
            return m.digest()
307
308
    def encode(self: Self, d: bytes, enc: Encoding = "base64") -> str:
309
        if enc not in ENCODINGS:
310
            msg = f"Unknown encoding {enc}"
311
            raise ValueError(msg)
312
        return ENCODINGS[enc].encode(d)
313
314
    def decode(self: Self, d: bytes, enc: Encoding = "base64") -> bytes:
315
        if enc not in ENCODINGS:
316
            msg = f"Unknown encoding {enc}"
317
            raise ValueError(msg)
318
        return ENCODINGS[enc].decode(d)
319
320
    def devnull(self: Self) -> DevNull:
321
        """
322
        Yields a 'writer' that does nothing.
323
324
        Example:
325
326
            with CommonTools.devnull() as devnull:
327
                devnull.write('hello')
328
        """
329
        yield DevNull()
330
331
332
IoTools = IoUtils()
333