Completed
Push — master ( ff9406...87df0f )
by Max
01:59
created

_ScopeProxy   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 30
Duplicated Lines 0 %

Test Coverage

Coverage 85%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 30
ccs 17
cts 20
cp 0.85
rs 10
wmc 8

7 Methods

Rating   Name   Duplication   Size   Complexity  
A __getattribute__() 0 6 2
A __init__() 0 2 1
A __enter__() 0 2 1
A __setattr__() 0 2 1
A __delattr__() 0 2 1
A __dir__() 0 2 1
A __exit__() 0 2 1
1
"""Class Namespaces (internal module).
2
3
All of the guts of the class namespace implementation.
4
5
"""
6
7
8
# See https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
9
10 6
import collections.abc
11 6
import functools
12 6
import itertools
13 6
import weakref
14
15 6
from .descriptor_inspector import _DescriptorInspector
16 6
from .flags import ENABLE_SET_NAME
17 6
from .proxy import _Proxy
18
19
20 6
_PROXY_INFOS = weakref.WeakKeyDictionary()
21
22
23 6
def _mro_to_chained(mro, dct):
24
    """Return a chained map of lookups for the given namespace and mro."""
25 6
    return collections.ChainMap(*[
26
        Namespace.get_namespace(cls, dct.path) for cls in
27
        itertools.takewhile(
28
            functools.partial(Namespace.no_blocker, dct),
29
            (cls for cls in mro if isinstance(cls, Namespaceable))) if
30
        Namespace.namespace_exists(dct.path, cls)])
31
32
33 6
def _instance_map(ns_proxy):
34
    """Return a map, possibly chained, of lookups for the given instance."""
35 6
    dct, instance, _ = _PROXY_INFOS[ns_proxy]
36 6
    if instance is not None:
37 6
        if isinstance(instance, Namespaceable):
38 6
            return _mro_to_chained(instance.__mro__, dct)
39
        else:
40 6
            return Namespace.get_namespace(instance, dct.path)
41
    else:
42 6
        return {}
43
44
45 6
def _mro_map(ns_proxy):
46
    """Return a chained map of lookups for the given owner class."""
47 6
    dct, _, owner = _PROXY_INFOS[ns_proxy]
48 6
    mro = owner.__mro__
49 6
    mro = mro[mro.index(dct.parent_object):]
50 6
    return _mro_to_chained(mro, dct)
51
52
53 6
def _retarget(ns_proxy):
54
    """Convert a class lookup to an instance lookup, if needed."""
55 6
    dct, instance, owner = _PROXY_INFOS[ns_proxy]
56 6
    if instance is None and isinstance(type(owner), Namespaceable):
57 6
        instance, owner = owner, type(owner)
58 6
        dct = Namespace.get_namespace(owner, dct.path)
59 6
        ns_proxy = _NamespaceProxy(dct, instance, owner)
60 6
    return ns_proxy
61
62
63 6
def _get(a_map, name):
64
    """Return a _DescriptorInspector around the attribute, or None."""
65 6
    try:
66 6
        value = a_map[name]
67 6
    except KeyError:
68 6
        return None
69
    else:
70 6
        return _DescriptorInspector(value)
71
72
73 6
def _delete(dct, name):
74
    """Attempt to delete `name` from `dct`. Raise AttributeError if missing."""
75 6
    try:
76 6
        del dct[name]
77 6
    except KeyError:
78 6
        raise AttributeError(name)
79
80
81 6
def _has_get(value):
82
    """Return whether the value is a wrapped getter descriptor."""
83 6
    return value is not None and value.has_get
84
85
86 6
def _is_data(value):
87
    """Return whether the value is a wrapped data descriptor."""
88 6
    return value is not None and value.is_data
89
90
91 6
def _get_data(dct, name):
92
    """Return the data descriptor associated with `name` in `dct`, if any."""
93 6
    value = _get(dct, name)
94 6
    if _is_data(value):
95
        return value
96
97
98 6
class _NamespaceProxy(_Proxy):
99
100
    """Proxy object for manipulating and querying namespaces."""
101
102 6
    __slots__ = '__weakref__',
103
104 6
    def __init__(self, dct, instance, owner):
105 6
        _PROXY_INFOS[self] = dct, instance, owner
106
107 6
    def __dir__(self):
108 6
        return collections.ChainMap(_instance_map(self), _mro_map(self))
109
110 6
    def __getattribute__(self, name):
111 6
        self = _retarget(self)
112 6
        _, instance, owner = _PROXY_INFOS[self]
113 6
        instance_map = _instance_map(self)
114 6
        mro_map = _mro_map(self)
115 6
        instance_value = _get(instance_map, name)
116 6
        mro_value = _get(mro_map, name)
117 6
        if _is_data(mro_value):
118 6
            return mro_value.get(instance, owner)
119 6
        elif issubclass(owner, type) and _has_get(instance_value):
120 6
            return instance_value.get(None, instance)
121 6
        elif instance_value is not None:
122 6
            return instance_value.object
123 6
        elif _has_get(mro_value):
124 6
            return mro_value.get(instance, owner)
125 6
        elif mro_value is not None:
126 6
            return mro_value.object
127
        else:
128 6
            raise AttributeError(name)
129
130 6
    def __setattr__(self, name, value):
131 6
        self = _retarget(self)
132 6
        dct, instance, owner = _PROXY_INFOS[self]
133 6
        if instance is None:
134 6
            real_map = Namespace.get_namespace(owner, dct.path)
135 6
            real_map[name] = value
136 6
            return
137 6
        mro_map = _mro_map(self)
138 6
        target_value = _get_data(mro_map, name)
139 6
        if target_value is not None:
140
            target_value.set(instance, value)
141
            return
142 6
        instance_map = Namespace.get_namespace(instance, dct.path)
143 6
        instance_map[name] = value
144
145 6
    def __delattr__(self, name):
146 6
        self = _retarget(self)
147 6
        dct, instance, owner = _PROXY_INFOS[self]
148 6
        real_map = Namespace.get_namespace(owner, dct.path)
149 6
        if instance is None:
150 6
            _delete(real_map, name)
151 6
            return
152 6
        value = _get_data(real_map, name)
153 6
        if value is not None:
154
            value.delete(instance)
155
            return
156 6
        instance_map = Namespace.get_namespace(instance, dct.path)
157 6
        _delete(instance_map, name)
158
159
160 6
class _ScopeProxy(_Proxy):
161
162
    """Proxy object for manipulating namespaces during class creation."""
163
164 6
    __slots__ = '__weakref__',
165
166 6
    def __init__(self, dct):
167 6
        _PROXY_INFOS[self] = dct
168
169 6
    def __dir__(self):
170
        return _PROXY_INFOS[self]
171
172 6
    def __getattribute__(self, name):
173 6
        dct = _PROXY_INFOS[self]
174 6
        try:
175 6
            return dct[name]
176
        except KeyError:
177
            raise AttributeError(name)
178
179 6
    def __setattr__(self, name, value):
180 6
        _PROXY_INFOS[self][name] = value
181
182 6
    def __delattr__(self, name):
183 6
        _delete(_PROXY_INFOS[self], name)
184
185 6
    def __enter__(self):
186 6
        return _PROXY_INFOS[self].__enter__()
187
188 6
    def __exit__(self, exc_type, exc_value, traceback):
189 6
        return _PROXY_INFOS[self].__exit__(exc_type, exc_value, traceback)
190
191
192 6
class Namespace(dict):
193
194
    """Namespace."""
195
196 6
    __slots__ = 'name', 'scope', 'parent', 'active', 'parent_object'
197
198 6
    __namespaces = {}
199
200 6
    def __init__(self, *args, **kwargs):
201 6
        super().__init__(*args, **kwargs)
202 6
        bad_values = tuple(
203
            value for value in self.values() if
204
            isinstance(value, (Namespace, _Proxy)))
205 6
        if bad_values:
206 6
            raise ValueError('Bad values: {}'.format(bad_values))
207 6
        self.name = None
208 6
        self.scope = None
209 6
        self.parent = None
210 6
        self.active = False
211 6
        self.parent_object = None
212
213 6
    @classmethod
214
    def premake(cls, name, parent):
215
        """Return an empty namespace with the given name and parent."""
216 6
        self = cls()
217 6
        self.name = name
218 6
        self.parent = parent
219 6
        return self
220
221 6
    def __setitem__(self, key, value):
222 6
        if (
223
                self.parent_object is not None and
224
                isinstance(value, Namespace) and value.name != key):
225 6
            value.push(key, self.scope)
226 6
            value.add(self.parent_object)
227 6
        super().__setitem__(key, value)
228
229 6
    def __enter__(self):
230 6
        self.activate()
231 6
        return self
232
233 6
    def __exit__(self, exc_type, exc_value, traceback):
234 6
        if self.name is None:
235 6
            raise RuntimeError('Namespace must be named.')
236 6
        self.deactivate()
237
238 6
    @property
239
    def path(self):
240
        """Return the full path of the namespace."""
241 6
        if self.name is None or self.parent is None:
242
            raise ValueError
243 6
        if isinstance(self.parent, Namespace):
244 6
            parent_path = self.parent.path
245
        else:
246 6
            parent_path = ()
247 6
        return parent_path + (self.name,)
248
249 6
    @classmethod
250
    def __get_helper(cls, target, path):
251
        """Return the namespace for `target` at `path`, create if needed."""
252 6
        if isinstance(target, Namespaceable):
253 6
            path_ = target, path
254 6
            namespaces = cls.__namespaces
255
        else:
256 6
            path_ = path
257 6
            try:
258 6
                namespaces = target.__namespaces
259 6
            except AttributeError:
260 6
                namespaces = {}
261 6
                target.__namespaces = namespaces
262 6
        return path_, namespaces
263
264 6
    @classmethod
265
    def namespace_exists(cls, path, target):
266
        """Return whether the given namespace exists."""
267 6
        path_, namespaces = cls.__get_helper(target, path)
268 6
        return path_ in namespaces
269
270 6
    @classmethod
271
    def get_namespace(cls, target, path):
272
        """Return a namespace with given target and path, create if needed."""
273 6
        path_, namespaces = cls.__get_helper(target, path)
274 6
        try:
275 6
            return namespaces[path_]
276 6
        except KeyError:
277 6
            if len(path) == 1:
278 6
                parent = {}
279
            else:
280 6
                parent = cls.get_namespace(target, path[:-1])
281 6
            return namespaces.setdefault(path_, cls.premake(path[-1], parent))
282
283 6
    @classmethod
284
    def no_blocker(cls, dct, cls_):
285
        """Return False if there's a non-Namespace object in the path."""
286 6
        try:
287 6
            namespace = vars(cls_)
288 6
            for name in dct.path:
289 6
                namespace = namespace[name]
290 6
                if not isinstance(namespace, cls):
291 6
                    return False
292 6
        except KeyError:
1 ignored issue
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
293 6
            pass
294 6
        return True
295
296 6
    def add(self, target):
297
        """Add self as a namespace under target."""
298 6
        path, namespaces = self.__get_helper(target, self.path)
299 6
        res = namespaces.setdefault(path, self)
300 6
        if res is self:
301 6
            self.parent_object = target
302
303 6
    def set_if_none(self, name, value):
304
        """Set the attribute `name` to `value`, if it's initially None."""
305 6
        if getattr(self, name) is None:
306 6
            setattr(self, name, value)
307
308 6
    def push(self, name, scope):
309
        """Bind self to the given name and scope, and activate."""
310 6
        self.set_if_none('name', name)
311 6
        self.set_if_none('scope', scope)
312 6
        self.set_if_none('parent', scope.dicts[0])
313 6
        if name != self.name:
314 6
            raise ValueError('Cannot rename namespace')
315 6
        if scope is not self.scope:
316
            raise ValueError('Cannot reuse namespace')
317 6
        if scope.dicts[0] is not self.parent:
318
            raise ValueError('Cannot reparent namespace')
319 6
        self.scope.namespaces.append(self)
320 6
        self.activate()
321
322 6
    def activate(self):
323
        """Take over as the scope for the target."""
324 6
        if self.scope is not None and not self.active:
325 6
            if self.scope.dicts[0] is not self.parent:
326
                raise ValueError('Cannot reparent namespace')
327 6
            self.active = True
328 6
            self.scope.dicts.insert(0, self)
329
330 6
    def deactivate(self):
331
        """Stop being the scope for the target."""
332 6
        if self.scope is not None and self.active:
333 6
            self.active = False
334 6
            self.scope.dicts.pop(0)
335
336 6
    def __get__(self, instance, owner):
337 6
        return _NamespaceProxy(self, instance, owner)
338
339
340 6
class _NamespaceScope(collections.abc.MutableMapping):
341
342
    """The class creation namespace for Namespaceables."""
343
344 6
    __slots__ = 'dicts', 'namespaces'
345
346 6
    def __init__(self, dct):
347 6
        self.dicts = [dct]
348 6
        self.namespaces = []
349
350 6
    def __getitem__(self, key):
351 6
        value = collections.ChainMap(*self.dicts)[key]
352 6
        if isinstance(value, Namespace):
353 6
            value = _ScopeProxy(value)
354 6
        return value
355
356 6
    def __setitem__(self, key, value):
357 6
        dct = self.dicts[0]
358 6
        if isinstance(value, _ScopeProxy):
359 6
            value = _PROXY_INFOS[value]
360 6
        if isinstance(value, Namespace) and value.name != key:
361 6
            value.push(key, self)
362 6
        dct[key] = value
363
364 6
    def __delitem__(self, key):
365 6
        del self.dicts[0][key]
366
367 6
    def __iter__(self):
368
        return iter(collections.ChainMap(*self.dicts))
369
370 6
    def __len__(self):
371
        return len(collections.ChainMap(*self.dicts))
372
373
374 6
_NAMESPACE_SCOPES = weakref.WeakKeyDictionary()
375
376
377 6
class Namespaceable(type):
378
379
    """Metaclass for classes that can contain namespaces."""
380
381 6
    @classmethod
382
    def __prepare__(mcs, name, bases, **kwargs):
383 6
        return _NamespaceScope(super().__prepare__(name, bases, **kwargs))
384
385 6
    def __new__(mcs, name, bases, dct, **kwargs):
386 6
        cls = super().__new__(mcs, name, bases, dct.dicts[0], **kwargs)
387 6
        _NAMESPACE_SCOPES[cls] = dct
388 6
        for namespace in dct.namespaces:
389 6
            namespace.add(cls)
390 6
            if ENABLE_SET_NAME:
391 2
                for name, value in namespace.items():
392 2
                    wrapped = _DescriptorInspector(value)
393 2
                    if wrapped.has_set_name:
394 2
                        wrapped.set_name(cls, name)
395 6
        return cls
396
397 6
    def __setattr__(cls, name, value):
398 6
        if isinstance(value, Namespace) and value.name != name:
399 6
            value.push(name, _NAMESPACE_SCOPES[cls])
400 6
            value.add(cls)
401 6
        if issubclass(cls, Namespaceable):
402 6
            super().__setattr__(cls, name, value)
403
        else:
404
            super().__setattr__(name, value)
405