Completed
Push — master ( 2547f8...133cc4 )
by Max
02:04
created

_has_namespace_at()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 1
c 4
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""Class Namespaces (internal module).
2
3
All of the guts of the class namespace implementation.
4
5
"""
6
7
8
# See https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
9
10 6
import collections.abc
11 6
import functools
12 6
import itertools
13 6
import weakref
14
15 6
from .descriptor_inspector import _DescriptorInspector
16 6
from .flags import ENABLE_SET_NAME
17
18
19 6
_PROXY_INFOS = weakref.WeakKeyDictionary()
20
21
22 6
def _is_namespaceable(cls):
23
    """Partial function for filter."""
24 6
    return isinstance(cls, Namespaceable)
25
26
27 6
def _mro_to_chained(mro, dct):
28
    """Return a chained map of lookups for the given namespace and mro."""
29 6
    return collections.ChainMap(*[
30
        Namespace.get_namespace(cls, dct.path) for cls in
31
        filter(
32
            functools.partial(Namespace.namespace_exists, dct.path),
33
            itertools.takewhile(
34
                functools.partial(Namespace.no_blocker, dct),
35
                filter(_is_namespaceable, mro)))])
36
37
38 6
def _instance_map(ns_proxy):
39
    """Return a map, possibly chained, of lookups for the given instance."""
40 6
    dct, instance, _ = _PROXY_INFOS[ns_proxy]
41 6
    if instance is not None:
42 6
        if isinstance(instance, Namespaceable):
43 6
            return _mro_to_chained(instance.__mro__, dct)
44
        else:
45 6
            return Namespace.get_namespace(instance, dct.path)
46
    else:
47 6
        return {}
48
49
50 6
def _mro_map(ns_proxy):
51
    """Return a chained map of lookups for the given owner class."""
52 6
    dct, _, owner = _PROXY_INFOS[ns_proxy]
53 6
    mro = owner.__mro__
54 6
    mro = mro[mro.index(dct.parent_object):]
55 6
    return _mro_to_chained(mro, dct)
56
57
58 6
def _retarget(ns_proxy):
59
    """Convert a class lookup to an instance lookup, if needed."""
60 6
    dct, instance, owner = _PROXY_INFOS[ns_proxy]
61 6
    if instance is None and isinstance(type(owner), Namespaceable):
62 6
        instance, owner = owner, type(owner)
63 6
        dct = Namespace.get_namespace(owner, dct.path)
64 6
        ns_proxy = _NamespaceProxy(dct, instance, owner)
65 6
    return ns_proxy
66
67
68 6
def _get(a_map, name):
69
    """Return a _DescriptorInspector around the attribute, or None."""
70 6
    try:
71 6
        value = a_map[name]
72 6
    except KeyError:
73 6
        return None
74
    else:
75 6
        return _DescriptorInspector(value)
76
77
78 6
def _delete(dct, name):
79
    """Attempt to delete `name` from `dct`. Raise AttributeError if missing."""
80 6
    try:
81 6
        del dct[name]
82 6
    except KeyError:
83 6
        raise AttributeError(name)
84
85
86 6
def _has_get(value):
87
    """Return whether the value is a wrapped getter descriptor."""
88 6
    return value is not None and value.has_get
89
90
91 6
def _is_data(value):
92
    """Return whether the value is a wrapped data descriptor."""
93 6
    return value is not None and value.is_data
94
95
96 6
def _get_data(dct, name):
97
    """Return the data descriptor associated with `name` in `dct`, if any."""
98 6
    value = _get(dct, name)
99 6
    if _is_data(value):
100
        return value
101
102
103 6
class _NamespaceProxy:
104
105
    """Proxy object for manipulating and querying namespaces."""
106
107 6
    __slots__ = '__weakref__',
108
109 6
    def __init__(self, dct, instance, owner):
110 6
        _PROXY_INFOS[self] = dct, instance, owner
111
112 6
    def __dir__(self):
113 6
        return collections.ChainMap(_instance_map(self), _mro_map(self))
114
115 6
    def __getattribute__(self, name):
116 6
        self = _retarget(self)
117 6
        dct, instance, owner = _PROXY_INFOS[self]
118 6
        if owner is None:
119 6
            return dct[name]
120 6
        instance_map = _instance_map(self)
121 6
        mro_map = _mro_map(self)
122 6
        instance_value = _get(instance_map, name)
123 6
        mro_value = _get(mro_map, name)
124 6
        if _is_data(mro_value):
125 6
            return mro_value.get(instance, owner)
126 6
        elif issubclass(owner, type) and _has_get(instance_value):
127 6
            return instance_value.get(None, instance)
128 6
        elif instance_value is not None:
129 6
            return instance_value.object
130 6
        elif _has_get(mro_value):
131 6
            return mro_value.get(instance, owner)
132 6
        elif mro_value is not None:
133 6
            return mro_value.object
134
        else:
135 6
            raise AttributeError(name)
136
137 6
    def __setattr__(self, name, value):
138 6
        self = _retarget(self)
139 6
        dct, instance, owner = _PROXY_INFOS[self]
140 6
        if owner is None:
141 6
            dct[name] = value
142 6
            return
143 6
        if instance is None:
144 6
            real_map = Namespace.get_namespace(owner, dct.path)
145 6
            real_map[name] = value
146 6
            return
147 6
        mro_map = _mro_map(self)
148 6
        target_value = _get_data(mro_map, name)
149 6
        if target_value is not None:
150
            target_value.set(instance, value)
151
            return
152 6
        instance_map = Namespace.get_namespace(instance, dct.path)
153 6
        instance_map[name] = value
154
155 6
    def __delattr__(self, name):
156 6
        self = _retarget(self)
157 6
        dct, instance, owner = _PROXY_INFOS[self]
158 6
        if owner is None:
159 6
            _delete(dct, name)
160 6
            return
161 6
        real_map = Namespace.get_namespace(owner, dct.path)
162 6
        if instance is None:
163 6
            _delete(real_map, name)
164 6
            return
165 6
        value = _get_data(real_map, name)
166 6
        if value is not None:
167
            value.delete(instance)
168
            return
169 6
        instance_map = Namespace.get_namespace(instance, dct.path)
170 6
        _delete(instance_map, name)
171
172 6
    def __enter__(self):
173 6
        dct, _, _ = _PROXY_INFOS[self]
174 6
        return dct.__enter__()
175
176 6
    def __exit__(self, exc_type, exc_value, traceback):
177 6
        dct, _, _ = _PROXY_INFOS[self]
178 6
        return dct.__exit__(exc_type, exc_value, traceback)
179
180
181 6
class Namespace(dict):
182
183
    """Namespace."""
184
185 6
    __slots__ = 'name', 'scope', 'parent', 'active', 'parent_object'
186
187 6
    __namespaces = {}
188
189 6
    def __init__(self, *args, **kwargs):
190 6
        super().__init__(*args, **kwargs)
191 6
        bad_values = tuple(
192
            value for value in self.values() if
193
            isinstance(value, (Namespace, _NamespaceProxy)))
194 6
        if bad_values:
195 6
            raise ValueError('Bad values: {}'.format(bad_values))
196 6
        self.name = None
197 6
        self.scope = None
198 6
        self.parent = None
199 6
        self.active = False
200 6
        self.parent_object = None
201
202 6
    @classmethod
203
    def premake(cls, name, parent):
204
        """Return an empty namespace with the given name and parent."""
205 6
        self = cls()
206 6
        self.name = name
207 6
        self.parent = parent
208 6
        return self
209
210 6
    def __setitem__(self, key, value):
211 6
        if (
212
                self.parent_object is not None and
213
                isinstance(value, Namespace) and value.name != key):
214 6
            value.push(key, self.scope)
215 6
            value.add(self.parent_object)
216 6
        super().__setitem__(key, value)
217
218 6
    def __enter__(self):
219 6
        self.activate()
220 6
        return self
221
222 6
    def __exit__(self, exc_type, exc_value, traceback):
223 6
        if self.name is None:
224 6
            raise RuntimeError('Namespace must be named.')
225 6
        self.deactivate()
226
227 6
    @property
228
    def path(self):
229
        """Return the full path of the namespace."""
230 6
        if self.name is None or self.parent is None:
231
            raise ValueError
232 6
        if isinstance(self.parent, Namespace):
233 6
            parent_path = self.parent.path
234
        else:
235 6
            parent_path = ()
236 6
        return parent_path + (self.name,)
237
238 6
    @classmethod
239
    def __get_helper(cls, target, path):
240
        """Return the namespace for `target` at `path`, create if needed."""
241 6
        if isinstance(target, Namespaceable):
242 6
            path_ = target, path
243 6
            namespaces = cls.__namespaces
244
        else:
245 6
            path_ = path
246 6
            try:
247 6
                namespaces = target.__namespaces
248 6
            except AttributeError:
249 6
                namespaces = {}
250 6
                target.__namespaces = namespaces
251 6
        return path_, namespaces
252
253 6
    @classmethod
254
    def namespace_exists(cls, path, target):
255
        """Return whether the given namespace exists."""
256 6
        path_, namespaces = cls.__get_helper(target, path)
257 6
        return path_ in namespaces
258
259 6
    @classmethod
260
    def get_namespace(cls, target, path):
261
        """Return a namespace with given target and path, create if needed."""
262 6
        path_, namespaces = cls.__get_helper(target, path)
263 6
        try:
264 6
            return namespaces[path_]
265 6
        except KeyError:
266 6
            if len(path) == 1:
267 6
                parent = {}
268
            else:
269 6
                parent = cls.get_namespace(target, path[:-1])
270 6
            return namespaces.setdefault(path_, cls.premake(path[-1], parent))
271
272 6
    @classmethod
273
    def no_blocker(cls, dct, cls_):
274
        """Return False if there's a non-Namespace object in the path."""
275 6
        try:
276 6
            namespace = vars(cls_)
277 6
            for name in dct.path:
278 6
                namespace = namespace[name]
279 6
                if not isinstance(namespace, cls):
280 6
                    return False
281 6
        except KeyError:
1 ignored issue
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
282 6
            pass
283 6
        return True
284
285 6
    def add(self, target):
286
        """Add self as a namespace under target."""
287 6
        path, namespaces = self.__get_helper(target, self.path)
288 6
        res = namespaces.setdefault(path, self)
289 6
        if res is self:
290 6
            self.parent_object = target
291
292 6
    def set_if_none(self, name, value):
293
        """Set the attribute `name` to `value`, if it's initially None."""
294 6
        if getattr(self, name) is None:
295 6
            setattr(self, name, value)
296
297 6
    def push(self, name, scope):
298
        """Bind self to the given name and scope, and activate."""
299 6
        self.set_if_none('name', name)
300 6
        self.set_if_none('scope', scope)
301 6
        self.set_if_none('parent', scope.dicts[0])
302 6
        if name != self.name:
303 6
            raise ValueError('Cannot rename namespace')
304 6
        if scope is not self.scope:
305
            raise ValueError('Cannot reuse namespace')
306 6
        if scope.dicts[0] is not self.parent:
307
            raise ValueError('Cannot reparent namespace')
308 6
        self.scope.namespaces.append(self)
309 6
        self.activate()
310
311 6
    def activate(self):
312
        """Take over as the scope for the target."""
313 6
        if self.scope is not None and not self.active:
314 6
            if self.scope.dicts[0] is not self.parent:
315
                raise ValueError('Cannot reparent namespace')
316 6
            self.active = True
317 6
            self.scope.dicts.insert(0, self)
318
319 6
    def deactivate(self):
320
        """Stop being the scope for the target."""
321 6
        if self.scope is not None and self.active:
322 6
            self.active = False
323 6
            self.scope.dicts.pop(0)
324
325 6
    def __get__(self, instance, owner):
326 6
        return _NamespaceProxy(self, instance, owner)
327
328
329 6
class _NamespaceScope(collections.abc.MutableMapping):
330
331
    """The class creation namespace for Namespaceables."""
332
333 6
    __slots__ = 'dicts', 'namespaces'
334
335 6
    def __init__(self, dct):
336 6
        self.dicts = [dct]
337 6
        self.namespaces = []
338
339 6
    def __getitem__(self, key):
340 6
        value = collections.ChainMap(*self.dicts)[key]
341 6
        if isinstance(value, Namespace):
342 6
            value = _NamespaceProxy(value, None, None)
343 6
        return value
344
345 6
    def __setitem__(self, key, value):
346 6
        dct = self.dicts[0]
347 6
        if isinstance(value, _NamespaceProxy):
348 6
            value, _, _ = _PROXY_INFOS[value]
349 6
        if isinstance(value, Namespace) and value.name != key:
350 6
            value.push(key, self)
351 6
        dct[key] = value
352
353 6
    def __delitem__(self, key):
354 6
        del self.dicts[0][key]
355
356 6
    def __iter__(self):
357
        return iter(collections.ChainMap(*self.dicts))
358
359 6
    def __len__(self):
360
        return len(collections.ChainMap(*self.dicts))
361
362
363 6
_NAMESPACE_SCOPES = weakref.WeakKeyDictionary()
364
365
366 6
class Namespaceable(type):
367
368
    """Metaclass for classes that can contain namespaces."""
369
370 6
    @classmethod
371
    def __prepare__(mcs, name, bases, **kwargs):
372 6
        return _NamespaceScope(super().__prepare__(name, bases, **kwargs))
373
374 6
    def __new__(mcs, name, bases, dct, **kwargs):
375 6
        cls = super().__new__(mcs, name, bases, dct.dicts[0], **kwargs)
376 6
        _NAMESPACE_SCOPES[cls] = dct
377 6
        for namespace in dct.namespaces:
378 6
            namespace.add(cls)
379 6
            if ENABLE_SET_NAME:
380 2
                for name, value in namespace.items():
381 2
                    wrapped = _DescriptorInspector(value)
382 2
                    if wrapped.has_set_name:
383 2
                        wrapped.set_name(cls, name)
384 6
        return cls
385
386 6
    def __setattr__(cls, name, value):
387 6
        if isinstance(value, Namespace) and value.name != name:
388 6
            value.push(name, _NAMESPACE_SCOPES[cls])
389 6
            value.add(cls)
390 6
        if issubclass(cls, Namespaceable):
391 6
            super().__setattr__(cls, name, value)
392
        else:
393
            super().__setattr__(name, value)
394