Completed
Push — master ( 42273a...3a01aa )
by Max
02:01
created

  A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 42
Duplicated Lines 0 %

Test Coverage

Coverage 93.1%

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 42
ccs 27
cts 29
cp 0.931
rs 10
wmc 10

1 Method

Rating   Name   Duplication   Size   Complexity  
A copeProxy.__init__() 0 2 1
1
"""Class Namespaces (internal module).
2
3
All of the guts of the class namespace implementation.
4
5
"""
6
7
8
# See https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
9
10 6
import collections.abc
11 6
import functools
12 6
import itertools
13 6
import weakref
14
15 6
from .descriptor_inspector import _DescriptorInspector
16 6
from .flags import ENABLE_SET_NAME
17 6
from .proxy import _Proxy
18
19
20 6
_PROXY_INFOS = weakref.WeakKeyDictionary()
21
22
23 6
def _mro_to_chained(mro, dct):
24
    """Return a chained map of lookups for the given namespace and mro."""
25 6
    return collections.ChainMap(*[
26
        Namespace.get_namespace(cls, dct.path) for cls in
27
        itertools.takewhile(
28
            functools.partial(Namespace.no_blocker, dct),
29
            (cls for cls in mro if isinstance(cls, Namespaceable))) if
30
        Namespace.namespace_exists(dct.path, cls)])
31
32
33 6
def _instance_map(ns_proxy):
34
    """Return a map, possibly chained, of lookups for the given instance."""
35 6
    dct, instance, _ = _PROXY_INFOS[ns_proxy]
36 6
    if instance is not None:
37 6
        if isinstance(instance, Namespaceable):
38 6
            return _mro_to_chained(instance.__mro__, dct)
39
        else:
40 6
            return Namespace.get_namespace(instance, dct.path)
41
    else:
42 6
        return {}
43
44
45 6
def _mro_map(ns_proxy):
46
    """Return a chained map of lookups for the given owner class."""
47 6
    dct, _, owner = _PROXY_INFOS[ns_proxy]
48 6
    mro = owner.__mro__
49 6
    mro = mro[mro.index(dct.parent_object):]
50 6
    return _mro_to_chained(mro, dct)
51
52
53 6
def _retarget(ns_proxy):
54
    """Convert a class lookup to an instance lookup, if needed."""
55 6
    dct, instance, owner = _PROXY_INFOS[ns_proxy]
56 6
    if instance is None and isinstance(type(owner), Namespaceable):
57 6
        instance, owner = owner, type(owner)
58 6
        dct = Namespace.get_namespace(owner, dct.path)
59 6
        ns_proxy = _NamespaceProxy(dct, instance, owner)
60 6
    return ns_proxy
61
62
63 6
def _get(a_map, name):
64
    """Return a _DescriptorInspector around the attribute, or None."""
65 6
    try:
66 6
        value = a_map[name]
67 6
    except KeyError:
68 6
        return None
69
    else:
70 6
        return _DescriptorInspector(value)
71
72
73 6
def _delete(dct, name):
74
    """Attempt to delete `name` from `dct`. Raise AttributeError if missing."""
75 6
    try:
76 6
        del dct[name]
77 6
    except KeyError:
78 6
        raise AttributeError(name)
79
80
81 6
def _has_get(value):
82
    """Return whether the value is a wrapped getter descriptor."""
83 6
    return value is not None and value.has_get
84
85
86 6
def _is_data(value):
87
    """Return whether the value is a wrapped data descriptor."""
88 6
    return value is not None and value.is_data
89
90
91 6
def _get_data(dct, name):
92
    """Return the data descriptor associated with `name` in `dct`, if any."""
93 6
    value = _get(dct, name)
94 6
    if _is_data(value):
95
        return value
96
97
98 6
class _NamespaceProxy(_Proxy):
99
100
    """Proxy object for manipulating and querying namespaces."""
101
102 6
    __slots__ = '__weakref__',
103
104 6
    def __init__(self, dct, instance, owner):
105 6
        _PROXY_INFOS[self] = dct, instance, owner
106
107 6
    def __dir__(self):
108 6
        return collections.ChainMap(_instance_map(self), _mro_map(self))
109
110 6
    def __getattribute__(self, name):
111 6
        self = _retarget(self)
112 6
        _, instance, owner = _PROXY_INFOS[self]
113 6
        instance_map = _instance_map(self)
114 6
        mro_map = _mro_map(self)
115 6
        instance_value = _get(instance_map, name)
116 6
        mro_value = _get(mro_map, name)
117 6
        if _is_data(mro_value):
118 6
            return mro_value.get(instance, owner)
119 6
        elif issubclass(owner, type) and _has_get(instance_value):
120 6
            return instance_value.get(None, instance)
121 6
        elif instance_value is not None:
122 6
            return instance_value.object
123 6
        elif _has_get(mro_value):
124 6
            return mro_value.get(instance, owner)
125 6
        elif mro_value is not None:
126 6
            return mro_value.object
127
        else:
128 6
            raise AttributeError(name)
129
130 6
    def __setattr__(self, name, value):
131 6
        self = _retarget(self)
132 6
        dct, instance, owner = _PROXY_INFOS[self]
133 6
        if instance is None:
134 6
            real_map = Namespace.get_namespace(owner, dct.path)
135 6
            real_map[name] = value
136 6
            return
137 6
        mro_map = _mro_map(self)
138 6
        target_value = _get_data(mro_map, name)
139 6
        if target_value is not None:
140
            target_value.set(instance, value)
141
            return
142 6
        instance_map = Namespace.get_namespace(instance, dct.path)
143 6
        instance_map[name] = value
144
145 6
    def __delattr__(self, name):
146 6
        self = _retarget(self)
147 6
        dct, instance, owner = _PROXY_INFOS[self]
148 6
        real_map = Namespace.get_namespace(owner, dct.path)
149 6
        if instance is None:
150 6
            _delete(real_map, name)
151 6
            return
152 6
        value = _get_data(real_map, name)
153 6
        if value is not None:
154
            value.delete(instance)
155
            return
156 6
        instance_map = Namespace.get_namespace(instance, dct.path)
157 6
        _delete(instance_map, name)
158
159
160 6
class _ScopeProxy(_Proxy):
161
162
    """Proxy object for manipulating namespaces during class creation."""
163
164 6
    __slots__ = '__weakref__',
165
166 6
    def __init__(self, dct, container):
167 6
        _PROXY_INFOS[self] = container
168 6
        container[self] = dct
169
170 6
    def __dir__(self):
171
        return _PROXY_INFOS[self][self]
172
173 6
    def __getattribute__(self, name):
174 6
        dct = _PROXY_INFOS[self][self]
175 6
        try:
176 6
            return dct[name]
177
        except KeyError:
178
            raise AttributeError(name)
179
180 6
    def __setattr__(self, name, value):
181 6
        _PROXY_INFOS[self][self][name] = value
182
183 6
    def __delattr__(self, name):
184 6
        _delete(_PROXY_INFOS[self][self], name)
185
186 6
    def __enter__(self):
187 6
        return _PROXY_INFOS[self][self].__enter__()
188
189 6
    def __exit__(self, exc_type, exc_value, traceback):
190 6
        return _PROXY_INFOS[self][self].__exit__(
191
            exc_type, exc_value, traceback)
192
193
194 6
class Namespace(dict):
195
196
    """Namespace."""
197
198 6
    __slots__ = 'name', 'scope', 'parent', 'active', 'parent_object'
199
200 6
    __namespaces = {}
201
202 6
    def __init__(self, *args, **kwargs):
203 6
        super().__init__(*args, **kwargs)
204 6
        bad_values = tuple(
205
            value for value in self.values() if
206
            isinstance(value, (Namespace, _Proxy)))
207 6
        if bad_values:
208 6
            raise ValueError('Bad values: {}'.format(bad_values))
209 6
        self.name = None
210 6
        self.scope = None
211 6
        self.parent = None
212 6
        self.active = False
213 6
        self.parent_object = None
214
215 6
    @classmethod
216
    def premake(cls, name, parent):
217
        """Return an empty namespace with the given name and parent."""
218 6
        self = cls()
219 6
        self.name = name
220 6
        self.parent = parent
221 6
        return self
222
223 6
    def __setitem__(self, key, value):
224 6
        if (
225
                self.parent_object is not None and
226
                isinstance(value, Namespace) and value.name != key):
227 6
            value.push(key, self.scope)
228 6
            value.add(self.parent_object)
229 6
        super().__setitem__(key, value)
230
231 6
    def __enter__(self):
232 6
        self.activate()
233 6
        return self
234
235 6
    def __exit__(self, exc_type, exc_value, traceback):
236 6
        if self.name is None:
237 6
            raise RuntimeError('Namespace must be named.')
238 6
        self.deactivate()
239
240 6
    @property
241
    def path(self):
242
        """Return the full path of the namespace."""
243 6
        if self.name is None or self.parent is None:
244
            raise ValueError
245 6
        if isinstance(self.parent, Namespace):
246 6
            parent_path = self.parent.path
247
        else:
248 6
            parent_path = ()
249 6
        return parent_path + (self.name,)
250
251 6
    @classmethod
252
    def __get_helper(cls, target, path):
253
        """Return the namespace for `target` at `path`, create if needed."""
254 6
        if isinstance(target, Namespaceable):
255 6
            path_ = target, path
256 6
            namespaces = cls.__namespaces
257
        else:
258 6
            path_ = path
259 6
            try:
260 6
                namespaces = target.__namespaces
261 6
            except AttributeError:
262 6
                namespaces = {}
263 6
                target.__namespaces = namespaces
264 6
        return path_, namespaces
265
266 6
    @classmethod
267
    def namespace_exists(cls, path, target):
268
        """Return whether the given namespace exists."""
269 6
        path_, namespaces = cls.__get_helper(target, path)
270 6
        return path_ in namespaces
271
272 6
    @classmethod
273
    def get_namespace(cls, target, path):
274
        """Return a namespace with given target and path, create if needed."""
275 6
        path_, namespaces = cls.__get_helper(target, path)
276 6
        try:
277 6
            return namespaces[path_]
278 6
        except KeyError:
279 6
            if len(path) == 1:
280 6
                parent = {}
281
            else:
282 6
                parent = cls.get_namespace(target, path[:-1])
283 6
            return namespaces.setdefault(path_, cls.premake(path[-1], parent))
284
285 6
    @classmethod
286
    def no_blocker(cls, dct, cls_):
287
        """Return False if there's a non-Namespace object in the path."""
288 6
        try:
289 6
            namespace = vars(cls_)
290 6
            for name in dct.path:
291 6
                namespace = namespace[name]
292 6
                if not isinstance(namespace, cls):
293 6
                    return False
294 6
        except KeyError:
1 ignored issue
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
295 6
            pass
296 6
        return True
297
298 6
    def add(self, target):
299
        """Add self as a namespace under target."""
300 6
        path, namespaces = self.__get_helper(target, self.path)
301 6
        res = namespaces.setdefault(path, self)
302 6
        if res is self:
303 6
            self.parent_object = target
304
305 6
    def set_if_none(self, name, value):
306
        """Set the attribute `name` to `value`, if it's initially None."""
307 6
        if getattr(self, name) is None:
308 6
            setattr(self, name, value)
309
310 6
    def push(self, name, scope):
311
        """Bind self to the given name and scope, and activate."""
312 6
        self.set_if_none('name', name)
313 6
        self.set_if_none('scope', scope)
314 6
        self.set_if_none('parent', scope.dicts[0])
315 6
        if name != self.name:
316 6
            raise ValueError('Cannot rename namespace')
317 6
        if scope is not self.scope:
318
            raise ValueError('Cannot reuse namespace')
319 6
        if scope.dicts[0] is not self.parent:
320
            raise ValueError('Cannot reparent namespace')
321 6
        self.scope.namespaces.append(self)
322 6
        self.activate()
323
324 6
    def activate(self):
325
        """Take over as the scope for the target."""
326 6
        if self.scope is not None and not self.active:
327 6
            if self.scope.dicts[0] is not self.parent:
328
                raise ValueError('Cannot reparent namespace')
329 6
            self.active = True
330 6
            self.scope.dicts.insert(0, self)
331
332 6
    def deactivate(self):
333
        """Stop being the scope for the target."""
334 6
        if self.scope is not None and self.active:
335 6
            self.active = False
336 6
            self.scope.dicts.pop(0)
337
338 6
    def __get__(self, instance, owner):
339 6
        return _NamespaceProxy(self, instance, owner)
340
341
342 6
class _NamespaceScope(collections.abc.MutableMapping):
343
344
    """The class creation namespace for Namespaceables."""
345
346 6
    __slots__ = 'dicts', 'namespaces', 'proxies', 'ScopeProxy'
347
348 6
    def __init__(self, dct):
349 6
        self.dicts = [dct]
350 6
        self.namespaces = []
351 6
        self.proxies = weakref.WeakKeyDictionary()
352
353 6
        class ScopeProxy(_ScopeProxy):
0 ignored issues
show
Coding Style introduced by
This class should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
354
355 6
            __slots__ = ()
356
357 6
            def __init__(self_, dct):
0 ignored issues
show
Coding Style Best Practice introduced by
Methods should have self as first argument.

It is a widespread convention and generally a good practice to name the first argument of methods self.

class SomeClass:
    def some_method(self):
        # ... do something
Loading history...
358 6
                _ScopeProxy.__init__(self_, dct, self.proxies)
0 ignored issues
show
Bug introduced by
The __init__ of the immediate ancestor is not called instead this calls
the method of another ancestor _ScopeProxy.

It is usually advisable to call the __init__ method of the immediate ancestor instead of another ancestor in the inheritance chain:

class NonDirectParent:
    def __init__(self):
        pass

class DirectParent(NonDirectParent):
    def __init__(self):
        NonDirectParent.__init__(self)

class YourClass(DirectParent):
    def __init__(self):
        NonDirectParent.__init__(self) # Bad
        DirectParent.__init__(self)    # Good
Loading history...
359
360 6
        self.ScopeProxy = ScopeProxy
361
362 6
    def __getitem__(self, key):
363 6
        value = collections.ChainMap(*self.dicts)[key]
364 6
        if isinstance(value, Namespace):
365 6
            value = self.ScopeProxy(value)
366 6
        return value
367
368 6
    def __setitem__(self, key, value):
369 6
        dct = self.dicts[0]
370 6
        if isinstance(value, self.ScopeProxy):
371 6
            value = self.proxies[value]
372 6
        if isinstance(value, Namespace) and value.name != key:
373 6
            value.push(key, self)
374 6
        dct[key] = value
375
376 6
    def __delitem__(self, key):
377 6
        del self.dicts[0][key]
378
379 6
    def __iter__(self):
380
        return iter(collections.ChainMap(*self.dicts))
381
382 6
    def __len__(self):
383
        return len(collections.ChainMap(*self.dicts))
384
385
386 6
_NAMESPACE_SCOPES = weakref.WeakKeyDictionary()
387
388
389 6
class Namespaceable(type):
390
391
    """Metaclass for classes that can contain namespaces."""
392
393 6
    @classmethod
394
    def __prepare__(mcs, name, bases, **kwargs):
395 6
        return _NamespaceScope(super().__prepare__(name, bases, **kwargs))
396
397 6
    def __new__(mcs, name, bases, dct, **kwargs):
398 6
        cls = super().__new__(mcs, name, bases, dct.dicts[0], **kwargs)
399 6
        _NAMESPACE_SCOPES[cls] = dct
400 6
        for namespace in dct.namespaces:
401 6
            namespace.add(cls)
402 6
            if ENABLE_SET_NAME:
403 2
                for name, value in namespace.items():
404 2
                    wrapped = _DescriptorInspector(value)
405 2
                    if wrapped.has_set_name:
406 2
                        wrapped.set_name(cls, name)
407 6
        return cls
408
409 6
    def __setattr__(cls, name, value):
410 6
        if isinstance(value, Namespace) and value.name != name:
411 6
            value.push(name, _NAMESPACE_SCOPES[cls])
412 6
            value.add(cls)
413 6
        if issubclass(cls, Namespaceable):
414 6
            super().__setattr__(cls, name, value)
415
        else:
416
            super().__setattr__(name, value)
417