pocketutils.core.OptRow.__contains__()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
# SPDX-FileCopyrightText: Copyright 2020-2023, Contributors to pocketutils
2
# SPDX-PackageHomePage: https://github.com/dmyersturnbull/pocketutils
3
# SPDX-License-Identifier: Apache-2.0
4
"""
5
6
"""
7
8
from __future__ import annotations
9
10
import abc
11
import logging
12
from collections import UserDict
13
from typing import TYPE_CHECKING, Any, Generic, Self, TypeVar, Unpack
14
15
if TYPE_CHECKING:
16
    from collections.abc import Callable, Iterable, Mapping
17
18
19
T_co = TypeVar("T_co", covariant=True)
20
21
logger = logging.getLogger("pocketutils")
22
23
24
def null_context():
25
    yield
26
27
28
class Sentinel:
29
    """
30
    A sentinel value tied to nothing more than a memory address.
31
    """
32
33
    @classmethod
34
    def new(cls: type[Self]) -> Sentinel:
35
        return Sentinel()
36
37
    def __init__(self: Self) -> None:
38
        pass
39
40
41
V = TypeVar("V")
42
43
44
class LazyWrapped(Generic[V], metaclass=abc.ABCMeta):
45
    def __init__(self: Self) -> None:
46
        self._v, self._exists = None, False
47
48
    def get(self: Self) -> V:
49
        if not self._exists:
50
            self._v = self._generate()
51
            self._exists = True
52
        return self._v
53
54
    @property
55
    def raw_value(self: Self) -> V:
56
        return self._v
57
58
    @property
59
    def is_defined(self: Self) -> bool:
60
        return self._exists
61
62
    @property
63
    def _name(self: Self) -> str:
64
        raise NotImplementedError()
65
66
    def _generate(self: Self) -> V:
67
        raise NotImplementedError()
68
69
    def __repr__(self: Self) -> str:
70
        return self._name + "[" + (repr(self._v) if self.is_defined else "⌀") + "]"
71
72
    def __str__(self: Self) -> str:
73
        return self._name + "[" + (str(self._v) if self.is_defined else "⌀") + "]"
74
75
    def __eq__(self: Self, other: Self) -> bool:
76
        return type(self) == type(other) and self.is_defined == other.is_defined and self.raw_value == other.raw_value
77
78
79
class PlainLazyWrapped(LazyWrapped, metaclass=abc.ABCMeta):
80
    pass
81
82
83
class ClearableLazyWrapped(LazyWrapped, metaclass=abc.ABCMeta):
84
    def clear(self: Self) -> None:
85
        self._exists = False
86
87
88
class LazyWrap:
89
    @classmethod
90
    def new_type(cls: type[Self], dtype: str, generator: Callable[[], V]) -> type[PlainLazyWrapped]:
91
        # noinspection PyTypeChecker
92
        return cls._new_type(dtype, generator, PlainLazyWrapped)
93
94
    @classmethod
95
    def new_clearable_type(
96
        cls: type[Self],
97
        dtype: str,
98
        generator: Callable[[], V],
99
    ) -> type[ClearableLazyWrapped]:
100
        # noinspection PyTypeChecker
101
        return cls._new_type(dtype, generator, ClearableLazyWrapped)
102
103
    @classmethod
104
    def _new_type(
105
        cls: type[Self],
106
        dtype: str,
107
        generator: Callable[[], V],
108
        superclass: type[LazyWrapped],
109
    ) -> type[LazyWrapped]:
110
        """
111
        Creates a new mutable wrapped type.
112
113
        Example:
114
            >>> LazyRemoteTime = LazyWrap.new_type('RemoteTime', lambda: ...)
115
            >>> dt = LazyRemoteTime()  # nothing happens
116
            >>> dt.get()  # has a value
117
118
        Args:
119
            dtype: The name of the data type, such as 'datetime' if generator=datetime.now
120
            generator: This is called to (lazily) initialize an instance of the LazyWrapped
121
122
        Returns:
123
            A new class subclassing LazyWrapped
124
        """
125
126
        class X(superclass):
127
            @property
128
            def _name(self: Self):
129
                return dtype
130
131
            def _generate(self: Self):
132
                return generator()
133
134
        X.__name__ = superclass.__name__ + dtype
135
        return X
136
137
138
class DictNamespace(UserDict):
139
    """
140
    Behaves like a dict and a `SimpleNamespace`.
141
    This means it has a length, can be iterated over, etc., and can be accessed via `.`.
142
    """
143
144
    def __init__(self: Self, **kwargs: Unpack[Mapping[str, Any]]) -> None:
145
        super().__init__(**kwargs)
146
        self.__dict__.update(kwargs)
147
148
    def __eq__(self: Self, other: Self | DictNamespace) -> bool:
149
        if isinstance(self, DictNamespace) and isinstance(other, DictNamespace):
150
            return self.__dict__ == other.__dict__
151
        return NotImplemented
152
153
154
class OptRow:
155
    """
156
    Short for 'optional row'.
157
    A wrapper around a NamedTuple that returns None if the key doesn't exist.
158
    This is intended for Pandas itertuples().
159
    """
160
161
    def __init__(self: Self, row) -> None:
162
        self._row = row
163
164
    def __getattr__(self: Self, item: str) -> Any:
165
        try:
166
            return getattr(self._row, item)
167
        except AttributeError:
168
            return None
169
170
    def opt(self: Self, item: str) -> Any:
171
        return getattr(self, item)
172
173
    def req(self: Self, item: str) -> Any:
174
        return getattr(self._row, item)
175
176
    def __contains__(self: Self, item) -> bool:
177
        try:
178
            getattr(self._row, item)
179
            return True
180
        except AttributeError:
181
            return False
182
183
    def items(self: Self) -> Iterable[Any]:
184
        # noinspection PyProtectedMember
185
        return self._row._asdict()
186
187
    def keys(self: Self) -> Iterable[Any]:
188
        # noinspection PyProtectedMember
189
        return self._row._asdict().keys()
190
191
    def values(self: Self) -> Iterable[Any]:
192
        # noinspection PyProtectedMember
193
        return self._row._asdict().values()
194
195
    def __repr__(self: Self) -> str:
196
        return self.__class__.__name__ + "@" + hex(id(self))
197
198
    def __str__(self: Self) -> str:
199
        return self.__class__.__name__
200
201
    def __eq__(self: Self, other: Self) -> bool:
202
        # noinspection PyProtectedMember
203
        return self._row == other._row
204
205
206
__all__ = [
207
    "Sentinel",
208
    "OptRow",
209
    "LazyWrap",
210
    "DictNamespace",
211
]
212