Completed
Push — master ( 930cb9...2547f8 )
by Max
03:44
created

Namespace.no_blocker()   A

Complexity

Conditions 4

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
c 1
b 0
f 0
dl 0
loc 12
ccs 10
cts 10
cp 1
crap 4
rs 9.2
1
"""Class Namespaces (internal module).
2
3
All of the guts of the class namespace implementation.
4
5
"""
6
7
8
# See https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
9
10 6
import collections.abc
11 6
import functools
12 6
import itertools
13 6
import weakref
14
15 6
from .descriptor_inspector import _DescriptorInspector
16 6
from .flags import ENABLE_SET_NAME
17
18
19 6
_PROXY_INFOS = weakref.WeakKeyDictionary()
20
21
22 6
def _is_namespaceable(cls):
23
    """Partial function for filter."""
24 6
    return isinstance(cls, Namespaceable)
25
26
27 6
def _has_namespace_at(path, cls):
28
    """Return whether there is a namespace for cls at the given path."""
29 6
    return Namespace.namespace_exists(cls, path)
30
31
32 6
def _mro_to_chained(mro, dct):
33
    """Return a chained map of lookups for the given namespace and mro."""
34 6
    return collections.ChainMap(*[
35
        Namespace.get_namespace(cls, dct.path) for cls in
36
        filter(
37
            functools.partial(_has_namespace_at, dct.path),
38
            itertools.takewhile(
39
                functools.partial(Namespace.no_blocker, dct),
40
                filter(_is_namespaceable, mro)))])
41
42
43 6
def _instance_map(ns_proxy):
44
    """Return a map, possibly chained, of lookups for the given instance."""
45 6
    dct, instance, _ = _PROXY_INFOS[ns_proxy]
46 6
    if instance is not None:
47 6
        if isinstance(instance, Namespaceable):
48 6
            return _mro_to_chained(instance.__mro__, dct)
49
        else:
50 6
            return Namespace.get_namespace(instance, dct.path)
51
    else:
52 6
        return {}
53
54
55 6
def _mro_map(ns_proxy):
56
    """Return a chained map of lookups for the given owner class."""
57 6
    dct, _, owner = _PROXY_INFOS[ns_proxy]
58 6
    mro = owner.__mro__
59 6
    mro = mro[mro.index(dct.parent_object):]
60 6
    return _mro_to_chained(mro, dct)
61
62
63 6
def _retarget(ns_proxy):
64
    """Convert a class lookup to an instance lookup, if needed."""
65 6
    dct, instance, owner = _PROXY_INFOS[ns_proxy]
66 6
    if instance is None and isinstance(type(owner), Namespaceable):
67 6
        instance, owner = owner, type(owner)
68 6
        dct = Namespace.get_namespace(owner, dct.path)
69 6
        ns_proxy = _NamespaceProxy(dct, instance, owner)
70 6
    return ns_proxy
71
72
73 6
def _get(a_map, name):
74
    """Return a _DescriptorInspector around the attribute, or None."""
75 6
    try:
76 6
        value = a_map[name]
77 6
    except KeyError:
78 6
        return None
79
    else:
80 6
        return _DescriptorInspector(value)
81
82
83 6
def _delete(dct, name):
84
    """Attempt to delete `name` from `dct`. Raise AttributeError if missing."""
85 6
    try:
86 6
        del dct[name]
87 6
    except KeyError:
88 6
        raise AttributeError(name)
89
90
91 6
def _has_get(value):
92
    """Return whether the value is a wrapped getter descriptor."""
93 6
    return value is not None and value.has_get
94
95
96 6
def _is_data(value):
97
    """Return whether the value is a wrapped data descriptor."""
98 6
    return value is not None and value.is_data
99
100
101 6
def _get_data(dct, name):
102
    """Return the data descriptor associated with `name` in `dct`, if any."""
103 6
    value = _get(dct, name)
104 6
    if _is_data(value):
105
        return value
106
107
108 6
class _NamespaceProxy:
109
110
    """Proxy object for manipulating and querying namespaces."""
111
112 6
    __slots__ = '__weakref__',
113
114 6
    def __init__(self, dct, instance, owner):
115 6
        _PROXY_INFOS[self] = dct, instance, owner
116
117 6
    def __dir__(self):
118 6
        return collections.ChainMap(_instance_map(self), _mro_map(self))
119
120 6
    def __getattribute__(self, name):
121 6
        self = _retarget(self)
122 6
        dct, instance, owner = _PROXY_INFOS[self]
123 6
        if owner is None:
124 6
            return dct[name]
125 6
        instance_map = _instance_map(self)
126 6
        mro_map = _mro_map(self)
127 6
        instance_value = _get(instance_map, name)
128 6
        mro_value = _get(mro_map, name)
129 6
        if _is_data(mro_value):
130 6
            return mro_value.get(instance, owner)
131 6
        elif issubclass(owner, type) and _has_get(instance_value):
132 6
            return instance_value.get(None, instance)
133 6
        elif instance_value is not None:
134 6
            return instance_value.object
135 6
        elif _has_get(mro_value):
136 6
            return mro_value.get(instance, owner)
137 6
        elif mro_value is not None:
138 6
            return mro_value.object
139
        else:
140 6
            raise AttributeError(name)
141
142 6
    def __setattr__(self, name, value):
143 6
        self = _retarget(self)
144 6
        dct, instance, owner = _PROXY_INFOS[self]
145 6
        if owner is None:
146 6
            dct[name] = value
147 6
            return
148 6
        if instance is None:
149 6
            real_map = Namespace.get_namespace(owner, dct.path)
150 6
            real_map[name] = value
151 6
            return
152 6
        mro_map = _mro_map(self)
153 6
        target_value = _get_data(mro_map, name)
154 6
        if target_value is not None:
155
            target_value.set(instance, value)
156
            return
157 6
        instance_map = Namespace.get_namespace(instance, dct.path)
158 6
        instance_map[name] = value
159
160 6
    def __delattr__(self, name):
161 6
        self = _retarget(self)
162 6
        dct, instance, owner = _PROXY_INFOS[self]
163 6
        if owner is None:
164 6
            _delete(dct, name)
165 6
            return
166 6
        real_map = Namespace.get_namespace(owner, dct.path)
167 6
        if instance is None:
168 6
            _delete(real_map, name)
169 6
            return
170 6
        value = _get_data(real_map, name)
171 6
        if value is not None:
172
            value.delete(instance)
173
            return
174 6
        instance_map = Namespace.get_namespace(instance, dct.path)
175 6
        _delete(instance_map, name)
176
177 6
    def __enter__(self):
178 6
        dct, _, _ = _PROXY_INFOS[self]
179 6
        return dct.__enter__()
180
181 6
    def __exit__(self, exc_type, exc_value, traceback):
182 6
        dct, _, _ = _PROXY_INFOS[self]
183 6
        return dct.__exit__(exc_type, exc_value, traceback)
184
185
186 6
class Namespace(dict):
187
188
    """Namespace."""
189
190 6
    __slots__ = 'name', 'scope', 'parent', 'active', 'parent_object'
191
192 6
    __namespaces = {}
193
194 6
    def __init__(self, *args, **kwargs):
195 6
        super().__init__(*args, **kwargs)
196 6
        bad_values = tuple(
197
            value for value in self.values() if
198
            isinstance(value, (Namespace, _NamespaceProxy)))
199 6
        if bad_values:
200 6
            raise ValueError('Bad values: {}'.format(bad_values))
201 6
        self.name = None
202 6
        self.scope = None
203 6
        self.parent = None
204 6
        self.active = False
205 6
        self.parent_object = None
206
207 6
    @classmethod
208
    def premake(cls, name, parent):
209
        """Return an empty namespace with the given name and parent."""
210 6
        self = cls()
211 6
        self.name = name
212 6
        self.parent = parent
213 6
        return self
214
215 6
    def __setitem__(self, key, value):
216 6
        if (
217
                self.parent_object is not None and
218
                isinstance(value, Namespace) and value.name != key):
219 6
            value.push(key, self.scope)
220 6
            value.add(self.parent_object)
221 6
        super().__setitem__(key, value)
222
223 6
    def __enter__(self):
224 6
        self.activate()
225 6
        return self
226
227 6
    def __exit__(self, exc_type, exc_value, traceback):
228 6
        if self.name is None:
229 6
            raise RuntimeError('Namespace must be named.')
230 6
        self.deactivate()
231
232 6
    @property
233
    def path(self):
234
        """Return the full path of the namespace."""
235 6
        if self.name is None or self.parent is None:
236
            raise ValueError
237 6
        if isinstance(self.parent, Namespace):
238 6
            parent_path = self.parent.path
239
        else:
240 6
            parent_path = ()
241 6
        return parent_path + (self.name,)
242
243 6
    @classmethod
244
    def __get_helper(cls, target, path):
245
        """Return the namespace for `target` at `path`, create if needed."""
246 6
        if isinstance(target, Namespaceable):
247 6
            path_ = target, path
248 6
            namespaces = cls.__namespaces
249
        else:
250 6
            path_ = path
251 6
            try:
252 6
                namespaces = target.__namespaces
253 6
            except AttributeError:
254 6
                namespaces = {}
255 6
                target.__namespaces = namespaces
256 6
        return path_, namespaces
257
258 6
    @classmethod
259
    def namespace_exists(cls, target, path):
260
        """Return whether the given namespace exists."""
261 6
        path_, namespaces = cls.__get_helper(target, path)
262 6
        return path_ in namespaces
263
264 6
    @classmethod
265
    def get_namespace(cls, target, path):
266
        """Return a namespace with given target and path, create if needed."""
267 6
        path_, namespaces = cls.__get_helper(target, path)
268 6
        try:
269 6
            return namespaces[path_]
270 6
        except KeyError:
271 6
            if len(path) == 1:
272 6
                parent = {}
273
            else:
274 6
                parent = cls.get_namespace(target, path[:-1])
275 6
            return namespaces.setdefault(path_, cls.premake(path[-1], parent))
276
277 6
    @classmethod
278
    def no_blocker(cls, dct, cls_):
279
        """Return False if there's a non-Namespace object in the path."""
280 6
        try:
281 6
            namespace = vars(cls_)
282 6
            for name in dct.path:
283 6
                namespace = namespace[name]
284 6
                if not isinstance(namespace, cls):
285 6
                    return False
286 6
        except KeyError:
1 ignored issue
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
287 6
            pass
288 6
        return True
289
290 6
    def add(self, target):
291
        """Add self as a namespace under target."""
292 6
        path, namespaces = self.__get_helper(target, self.path)
293 6
        res = namespaces.setdefault(path, self)
294 6
        if res is self:
295 6
            self.parent_object = target
296
297 6
    def set_if_none(self, name, value):
298
        """Set the attribute `name` to `value`, if it's initially None."""
299 6
        if getattr(self, name) is None:
300 6
            setattr(self, name, value)
301
302 6
    def push(self, name, scope):
303
        """Bind self to the given name and scope, and activate."""
304 6
        self.set_if_none('name', name)
305 6
        self.set_if_none('scope', scope)
306 6
        self.set_if_none('parent', scope.dicts[0])
307 6
        if name != self.name:
308 6
            raise ValueError('Cannot rename namespace')
309 6
        if scope is not self.scope:
310
            raise ValueError('Cannot reuse namespace')
311 6
        if scope.dicts[0] is not self.parent:
312
            raise ValueError('Cannot reparent namespace')
313 6
        self.scope.namespaces.append(self)
314 6
        self.activate()
315
316 6
    def activate(self):
317
        """Take over as the scope for the target."""
318 6
        if self.scope is not None and not self.active:
319 6
            if self.scope.dicts[0] is not self.parent:
320
                raise ValueError('Cannot reparent namespace')
321 6
            self.active = True
322 6
            self.scope.dicts.insert(0, self)
323
324 6
    def deactivate(self):
325
        """Stop being the scope for the target."""
326 6
        if self.scope is not None and self.active:
327 6
            self.active = False
328 6
            self.scope.dicts.pop(0)
329
330 6
    def __get__(self, instance, owner):
331 6
        return _NamespaceProxy(self, instance, owner)
332
333
334 6
class _NamespaceScope(collections.abc.MutableMapping):
335
336
    """The class creation namespace for Namespaceables."""
337
338 6
    __slots__ = 'dicts', 'namespaces'
339
340 6
    def __init__(self, dct):
341 6
        self.dicts = [dct]
342 6
        self.namespaces = []
343
344 6
    def __getitem__(self, key):
345 6
        value = collections.ChainMap(*self.dicts)[key]
346 6
        if isinstance(value, Namespace):
347 6
            value = _NamespaceProxy(value, None, None)
348 6
        return value
349
350 6
    def __setitem__(self, key, value):
351 6
        dct = self.dicts[0]
352 6
        if isinstance(value, _NamespaceProxy):
353 6
            value, _, _ = _PROXY_INFOS[value]
354 6
        if isinstance(value, Namespace) and value.name != key:
355 6
            value.push(key, self)
356 6
        dct[key] = value
357
358 6
    def __delitem__(self, key):
359 6
        del self.dicts[0][key]
360
361 6
    def __iter__(self):
362
        return iter(collections.ChainMap(*self.dicts))
363
364 6
    def __len__(self):
365
        return len(collections.ChainMap(*self.dicts))
366
367
368 6
_NAMESPACE_SCOPES = weakref.WeakKeyDictionary()
369
370
371 6
class Namespaceable(type):
372
373
    """Metaclass for classes that can contain namespaces."""
374
375 6
    @classmethod
376
    def __prepare__(mcs, name, bases, **kwargs):
377 6
        return _NamespaceScope(super().__prepare__(name, bases, **kwargs))
378
379 6
    def __new__(mcs, name, bases, dct, **kwargs):
380 6
        cls = super().__new__(mcs, name, bases, dct.dicts[0], **kwargs)
381 6
        _NAMESPACE_SCOPES[cls] = dct
382 6
        for namespace in dct.namespaces:
383 6
            namespace.add(cls)
384 6
            if ENABLE_SET_NAME:
385 2
                for name, value in namespace.items():
386 2
                    wrapped = _DescriptorInspector(value)
387 2
                    if wrapped.has_set_name:
388 2
                        wrapped.set_name(cls, name)
389 6
        return cls
390
391 6
    def __setattr__(cls, name, value):
392 6
        if isinstance(value, Namespace) and value.name != name:
393 6
            value.push(name, _NAMESPACE_SCOPES[cls])
394 6
            value.add(cls)
395 6
        if issubclass(cls, Namespaceable):
396 6
            super().__setattr__(cls, name, value)
397
        else:
398
            super().__setattr__(name, value)
399