Completed
Push — master ( 133cc4...520d15 )
by Max
02:23
created

_is_namespaceable()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
"""Class Namespaces (internal module).
2
3
All of the guts of the class namespace implementation.
4
5
"""
6
7
8
# See https://blog.ionelmc.ro/2015/02/09/understanding-python-metaclasses/
9
10 6
import collections.abc
11 6
import functools
12 6
import itertools
13 6
import weakref
14
15 6
from .descriptor_inspector import _DescriptorInspector
16 6
from .flags import ENABLE_SET_NAME
17
18
19 6
_PROXY_INFOS = weakref.WeakKeyDictionary()
20
21
22 6
def _mro_to_chained(mro, dct):
23
    """Return a chained map of lookups for the given namespace and mro."""
24 6
    return collections.ChainMap(*[
25
        Namespace.get_namespace(cls, dct.path) for cls in
26
        itertools.takewhile(
27
            functools.partial(Namespace.no_blocker, dct),
28
            (cls for cls in mro if isinstance(cls, Namespaceable))) if
29
        Namespace.namespace_exists(dct.path, cls)])
30
31
32 6
def _instance_map(ns_proxy):
33
    """Return a map, possibly chained, of lookups for the given instance."""
34 6
    dct, instance, _ = _PROXY_INFOS[ns_proxy]
35 6
    if instance is not None:
36 6
        if isinstance(instance, Namespaceable):
37 6
            return _mro_to_chained(instance.__mro__, dct)
38
        else:
39 6
            return Namespace.get_namespace(instance, dct.path)
40
    else:
41 6
        return {}
42
43
44 6
def _mro_map(ns_proxy):
45
    """Return a chained map of lookups for the given owner class."""
46 6
    dct, _, owner = _PROXY_INFOS[ns_proxy]
47 6
    mro = owner.__mro__
48 6
    mro = mro[mro.index(dct.parent_object):]
49 6
    return _mro_to_chained(mro, dct)
50
51
52 6
def _retarget(ns_proxy):
53
    """Convert a class lookup to an instance lookup, if needed."""
54 6
    dct, instance, owner = _PROXY_INFOS[ns_proxy]
55 6
    if instance is None and isinstance(type(owner), Namespaceable):
56 6
        instance, owner = owner, type(owner)
57 6
        dct = Namespace.get_namespace(owner, dct.path)
58 6
        ns_proxy = _NamespaceProxy(dct, instance, owner)
59 6
    return ns_proxy
60
61
62 6
def _get(a_map, name):
63
    """Return a _DescriptorInspector around the attribute, or None."""
64 6
    try:
65 6
        value = a_map[name]
66 6
    except KeyError:
67 6
        return None
68
    else:
69 6
        return _DescriptorInspector(value)
70
71
72 6
def _delete(dct, name):
73
    """Attempt to delete `name` from `dct`. Raise AttributeError if missing."""
74 6
    try:
75 6
        del dct[name]
76 6
    except KeyError:
77 6
        raise AttributeError(name)
78
79
80 6
def _has_get(value):
81
    """Return whether the value is a wrapped getter descriptor."""
82 6
    return value is not None and value.has_get
83
84
85 6
def _is_data(value):
86
    """Return whether the value is a wrapped data descriptor."""
87 6
    return value is not None and value.is_data
88
89
90 6
def _get_data(dct, name):
91
    """Return the data descriptor associated with `name` in `dct`, if any."""
92 6
    value = _get(dct, name)
93 6
    if _is_data(value):
94
        return value
95
96
97 6
class _NamespaceProxy:
98
99
    """Proxy object for manipulating and querying namespaces."""
100
101 6
    __slots__ = '__weakref__',
102
103 6
    def __init__(self, dct, instance, owner):
104 6
        _PROXY_INFOS[self] = dct, instance, owner
105
106 6
    def __dir__(self):
107 6
        return collections.ChainMap(_instance_map(self), _mro_map(self))
108
109 6
    def __getattribute__(self, name):
110 6
        self = _retarget(self)
111 6
        dct, instance, owner = _PROXY_INFOS[self]
112 6
        if owner is None:
113 6
            return dct[name]
114 6
        instance_map = _instance_map(self)
115 6
        mro_map = _mro_map(self)
116 6
        instance_value = _get(instance_map, name)
117 6
        mro_value = _get(mro_map, name)
118 6
        if _is_data(mro_value):
119 6
            return mro_value.get(instance, owner)
120 6
        elif issubclass(owner, type) and _has_get(instance_value):
121 6
            return instance_value.get(None, instance)
122 6
        elif instance_value is not None:
123 6
            return instance_value.object
124 6
        elif _has_get(mro_value):
125 6
            return mro_value.get(instance, owner)
126 6
        elif mro_value is not None:
127 6
            return mro_value.object
128
        else:
129 6
            raise AttributeError(name)
130
131 6
    def __setattr__(self, name, value):
132 6
        self = _retarget(self)
133 6
        dct, instance, owner = _PROXY_INFOS[self]
134 6
        if owner is None:
135 6
            dct[name] = value
136 6
            return
137 6
        if instance is None:
138 6
            real_map = Namespace.get_namespace(owner, dct.path)
139 6
            real_map[name] = value
140 6
            return
141 6
        mro_map = _mro_map(self)
142 6
        target_value = _get_data(mro_map, name)
143 6
        if target_value is not None:
144
            target_value.set(instance, value)
145
            return
146 6
        instance_map = Namespace.get_namespace(instance, dct.path)
147 6
        instance_map[name] = value
148
149 6
    def __delattr__(self, name):
150 6
        self = _retarget(self)
151 6
        dct, instance, owner = _PROXY_INFOS[self]
152 6
        if owner is None:
153 6
            _delete(dct, name)
154 6
            return
155 6
        real_map = Namespace.get_namespace(owner, dct.path)
156 6
        if instance is None:
157 6
            _delete(real_map, name)
158 6
            return
159 6
        value = _get_data(real_map, name)
160 6
        if value is not None:
161
            value.delete(instance)
162
            return
163 6
        instance_map = Namespace.get_namespace(instance, dct.path)
164 6
        _delete(instance_map, name)
165
166 6
    def __enter__(self):
167 6
        dct, _, _ = _PROXY_INFOS[self]
168 6
        return dct.__enter__()
169
170 6
    def __exit__(self, exc_type, exc_value, traceback):
171 6
        dct, _, _ = _PROXY_INFOS[self]
172 6
        return dct.__exit__(exc_type, exc_value, traceback)
173
174
175 6
class Namespace(dict):
176
177
    """Namespace."""
178
179 6
    __slots__ = 'name', 'scope', 'parent', 'active', 'parent_object'
180
181 6
    __namespaces = {}
182
183 6
    def __init__(self, *args, **kwargs):
184 6
        super().__init__(*args, **kwargs)
185 6
        bad_values = tuple(
186
            value for value in self.values() if
187
            isinstance(value, (Namespace, _NamespaceProxy)))
188 6
        if bad_values:
189 6
            raise ValueError('Bad values: {}'.format(bad_values))
190 6
        self.name = None
191 6
        self.scope = None
192 6
        self.parent = None
193 6
        self.active = False
194 6
        self.parent_object = None
195
196 6
    @classmethod
197
    def premake(cls, name, parent):
198
        """Return an empty namespace with the given name and parent."""
199 6
        self = cls()
200 6
        self.name = name
201 6
        self.parent = parent
202 6
        return self
203
204 6
    def __setitem__(self, key, value):
205 6
        if (
206
                self.parent_object is not None and
207
                isinstance(value, Namespace) and value.name != key):
208 6
            value.push(key, self.scope)
209 6
            value.add(self.parent_object)
210 6
        super().__setitem__(key, value)
211
212 6
    def __enter__(self):
213 6
        self.activate()
214 6
        return self
215
216 6
    def __exit__(self, exc_type, exc_value, traceback):
217 6
        if self.name is None:
218 6
            raise RuntimeError('Namespace must be named.')
219 6
        self.deactivate()
220
221 6
    @property
222
    def path(self):
223
        """Return the full path of the namespace."""
224 6
        if self.name is None or self.parent is None:
225
            raise ValueError
226 6
        if isinstance(self.parent, Namespace):
227 6
            parent_path = self.parent.path
228
        else:
229 6
            parent_path = ()
230 6
        return parent_path + (self.name,)
231
232 6
    @classmethod
233
    def __get_helper(cls, target, path):
234
        """Return the namespace for `target` at `path`, create if needed."""
235 6
        if isinstance(target, Namespaceable):
236 6
            path_ = target, path
237 6
            namespaces = cls.__namespaces
238
        else:
239 6
            path_ = path
240 6
            try:
241 6
                namespaces = target.__namespaces
242 6
            except AttributeError:
243 6
                namespaces = {}
244 6
                target.__namespaces = namespaces
245 6
        return path_, namespaces
246
247 6
    @classmethod
248
    def namespace_exists(cls, path, target):
249
        """Return whether the given namespace exists."""
250 6
        path_, namespaces = cls.__get_helper(target, path)
251 6
        return path_ in namespaces
252
253 6
    @classmethod
254
    def get_namespace(cls, target, path):
255
        """Return a namespace with given target and path, create if needed."""
256 6
        path_, namespaces = cls.__get_helper(target, path)
257 6
        try:
258 6
            return namespaces[path_]
259 6
        except KeyError:
260 6
            if len(path) == 1:
261 6
                parent = {}
262
            else:
263 6
                parent = cls.get_namespace(target, path[:-1])
264 6
            return namespaces.setdefault(path_, cls.premake(path[-1], parent))
265
266 6
    @classmethod
267
    def no_blocker(cls, dct, cls_):
268
        """Return False if there's a non-Namespace object in the path."""
269 6
        try:
270 6
            namespace = vars(cls_)
271 6
            for name in dct.path:
272 6
                namespace = namespace[name]
273 6
                if not isinstance(namespace, cls):
274 6
                    return False
275 6
        except KeyError:
1 ignored issue
show
Unused Code introduced by
This except handler seems to be unused and could be removed.

Except handlers which only contain pass and do not have an else clause can usually simply be removed:

try:
    raises_exception()
except:  # Could be removed
    pass
Loading history...
276 6
            pass
277 6
        return True
278
279 6
    def add(self, target):
280
        """Add self as a namespace under target."""
281 6
        path, namespaces = self.__get_helper(target, self.path)
282 6
        res = namespaces.setdefault(path, self)
283 6
        if res is self:
284 6
            self.parent_object = target
285
286 6
    def set_if_none(self, name, value):
287
        """Set the attribute `name` to `value`, if it's initially None."""
288 6
        if getattr(self, name) is None:
289 6
            setattr(self, name, value)
290
291 6
    def push(self, name, scope):
292
        """Bind self to the given name and scope, and activate."""
293 6
        self.set_if_none('name', name)
294 6
        self.set_if_none('scope', scope)
295 6
        self.set_if_none('parent', scope.dicts[0])
296 6
        if name != self.name:
297 6
            raise ValueError('Cannot rename namespace')
298 6
        if scope is not self.scope:
299
            raise ValueError('Cannot reuse namespace')
300 6
        if scope.dicts[0] is not self.parent:
301
            raise ValueError('Cannot reparent namespace')
302 6
        self.scope.namespaces.append(self)
303 6
        self.activate()
304
305 6
    def activate(self):
306
        """Take over as the scope for the target."""
307 6
        if self.scope is not None and not self.active:
308 6
            if self.scope.dicts[0] is not self.parent:
309
                raise ValueError('Cannot reparent namespace')
310 6
            self.active = True
311 6
            self.scope.dicts.insert(0, self)
312
313 6
    def deactivate(self):
314
        """Stop being the scope for the target."""
315 6
        if self.scope is not None and self.active:
316 6
            self.active = False
317 6
            self.scope.dicts.pop(0)
318
319 6
    def __get__(self, instance, owner):
320 6
        return _NamespaceProxy(self, instance, owner)
321
322
323 6
class _NamespaceScope(collections.abc.MutableMapping):
324
325
    """The class creation namespace for Namespaceables."""
326
327 6
    __slots__ = 'dicts', 'namespaces'
328
329 6
    def __init__(self, dct):
330 6
        self.dicts = [dct]
331 6
        self.namespaces = []
332
333 6
    def __getitem__(self, key):
334 6
        value = collections.ChainMap(*self.dicts)[key]
335 6
        if isinstance(value, Namespace):
336 6
            value = _NamespaceProxy(value, None, None)
337 6
        return value
338
339 6
    def __setitem__(self, key, value):
340 6
        dct = self.dicts[0]
341 6
        if isinstance(value, _NamespaceProxy):
342 6
            value, _, _ = _PROXY_INFOS[value]
343 6
        if isinstance(value, Namespace) and value.name != key:
344 6
            value.push(key, self)
345 6
        dct[key] = value
346
347 6
    def __delitem__(self, key):
348 6
        del self.dicts[0][key]
349
350 6
    def __iter__(self):
351
        return iter(collections.ChainMap(*self.dicts))
352
353 6
    def __len__(self):
354
        return len(collections.ChainMap(*self.dicts))
355
356
357 6
_NAMESPACE_SCOPES = weakref.WeakKeyDictionary()
358
359
360 6
class Namespaceable(type):
361
362
    """Metaclass for classes that can contain namespaces."""
363
364 6
    @classmethod
365
    def __prepare__(mcs, name, bases, **kwargs):
366 6
        return _NamespaceScope(super().__prepare__(name, bases, **kwargs))
367
368 6
    def __new__(mcs, name, bases, dct, **kwargs):
369 6
        cls = super().__new__(mcs, name, bases, dct.dicts[0], **kwargs)
370 6
        _NAMESPACE_SCOPES[cls] = dct
371 6
        for namespace in dct.namespaces:
372 6
            namespace.add(cls)
373 6
            if ENABLE_SET_NAME:
374 2
                for name, value in namespace.items():
375 2
                    wrapped = _DescriptorInspector(value)
376 2
                    if wrapped.has_set_name:
377 2
                        wrapped.set_name(cls, name)
378 6
        return cls
379
380 6
    def __setattr__(cls, name, value):
381 6
        if isinstance(value, Namespace) and value.name != name:
382 6
            value.push(name, _NAMESPACE_SCOPES[cls])
383 6
            value.add(cls)
384 6
        if issubclass(cls, Namespaceable):
385 6
            super().__setattr__(cls, name, value)
386
        else:
387
            super().__setattr__(name, value)
388