Completed
Push — master ( e2f766...6a3298 )
by Ionel Cristian
01:41
created

Sentinel.__init__()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 3
rs 10
1
from __future__ import print_function
2
3
import logging
4
import os
5
import platform
6
import re
7
import sys
8
from collections import deque
9
from functools import wraps
10
from inspect import isclass
11
12
RegexType = type(re.compile(""))
13
14
PY3 = sys.version_info[0] == 3
15
PY2 = sys.version_info[0] == 2
16
PY26 = PY2 and sys.version_info[1] == 6
17
PYPY = platform.python_implementation() == 'PyPy'
18
19
if PY3:
20
    basestring = str
21
else:
22
    basestring = str, unicode  # flake8: noqa
23
24
FIRST_CAP_RE = re.compile('(.)([A-Z][a-z]+)')
25
ALL_CAP_RE = re.compile('([a-z0-9])([A-Z])')
26
27
DEBUG = os.getenv('ASPECTLIB_DEBUG')
28
29
30
def logf(logger_func):
31
    @wraps(logger_func)
32
    def log_wrapper(*args):
33
        if DEBUG:
34
            logProcesses = logging.logProcesses
35
            logThreads = logging.logThreads
36
            logMultiprocessing = logging.logMultiprocessing
37
            logging.logThreads = logging.logProcesses = logMultiprocessing = False
38
            # disable logging pids and tids - we don't want extra calls around, especilly when we monkeypatch stuff
39
            try:
40
                return logger_func(*args)
41
            finally:
42
                logging.logProcesses = logProcesses
43
                logging.logThreads = logThreads
44
                logging.logMultiprocessing = logMultiprocessing
45
    return log_wrapper
46
47
48
def camelcase_to_underscores(name):
49
    s1 = FIRST_CAP_RE.sub(r'\1_\2', name)
50
    return ALL_CAP_RE.sub(r'\1_\2', s1).lower()
51
52
53
def qualname(obj):
54
    if hasattr(obj, '__module__') and obj.__module__ not in ('builtins', 'exceptions'):
55
        return '%s.%s' % (obj.__module__, obj.__name__)
56
    else:
57
        return obj.__name__
58
59
60
def force_bind(func):
61
    def bound(self, *args, **kwargs):  # pylint: disable=W0613
62
        return func(*args, **kwargs)
63
    bound.__name__ = func.__name__
64
    bound.__doc__ = func.__doc__
65
    return bound
66
67
68
def make_method_matcher(regex_or_regexstr_or_namelist):
69
    if isinstance(regex_or_regexstr_or_namelist, basestring):
70
        return re.compile(regex_or_regexstr_or_namelist).match
71
    elif isinstance(regex_or_regexstr_or_namelist, (list, tuple)):
72
        return regex_or_regexstr_or_namelist.__contains__
73
    elif isinstance(regex_or_regexstr_or_namelist, RegexType):
74
        return regex_or_regexstr_or_namelist.match
75
    else:
76
        raise TypeError("Unacceptable methods spec %r." % regex_or_regexstr_or_namelist)
77
78
79
class Sentinel(object):
80
    def __init__(self, name, doc=''):
81
        self.name = name
82
        self.__doc__ = doc
83
84
    def __repr__(self):
85
        if not self.__doc__:
86
            return "%s" % self.name
87
        else:
88
            return "%s: %s" % (self.name, self.__doc__)
89
    __str__ = __repr__
90
91
92
def container(name):
93
    def __init__(self, value):
94
        self.value = value
95
96
    return type(name, (object,), {
97
        '__slots__': 'value',
98
        '__init__': __init__,
99
        '__str__': lambda self: "%s(%s)" % (name, self.value),
100
        '__repr__': lambda self: "%s(%r)" % (name, self.value),
101
        '__eq__': lambda self, other: type(self) is type(other) and self.value == other.value,
102
    })
103
104
105
def mimic(wrapper, func, module=None):
106
    try:
107
        wrapper.__name__ = func.__name__
108
    except (TypeError, AttributeError):
109
        pass
110
    try:
111
        wrapper.__module__ = module or func.__module__
112
    except (TypeError, AttributeError):
113
        pass
114
    try:
115
        wrapper.__doc__ = func.__doc__
116
    except (TypeError, AttributeError):
117
        pass
118
    return wrapper
119
120
121
representers = {
122
    tuple: lambda obj, aliases: "(%s%s)" % (', '.join(repr_ex(i) for i in obj), ',' if len(obj) == 1 else ''),
123
    list: lambda obj, aliases: "[%s]" % ', '.join(repr_ex(i) for i in obj),
124
    set: lambda obj, aliases: "set([%s])" % ', '.join(repr_ex(i) for i in obj),
125
    frozenset: lambda obj, aliases: "set([%s])" % ', '.join(repr_ex(i) for i in obj),
126
    deque: lambda obj, aliases: "collections.deque([%s])" % ', '.join(repr_ex(i) for i in obj),
127
    dict: lambda obj, aliases: "{%s}" % ', '.join(
128
        "%s: %s" % (repr_ex(k), repr_ex(v)) for k, v in (obj.items() if PY3 else obj.iteritems())
129
    ),
130
}
131
132
133
def _make_fixups():
134
    for obj in ('os.stat_result', 'grp.struct_group', 'pwd.struct_passwd'):
135
        mod, attr = obj.split('.')
136
        try:
137
            yield getattr(__import__(mod), attr), lambda obj, aliases, prefix=obj: "%s(%r)" % (
138
                prefix,
139
                obj.__reduce__()[1][0]
140
            )
141
        except ImportError:
142
            continue
143
representers.update(_make_fixups())
144
145
146
def repr_ex(obj, aliases=()):
147
    kind, ident = type(obj), id(obj)
148
    if isinstance(kind, BaseException):
149
        return "%s(%s)" % (qualname(type(obj)), ', '.join(repr_ex(i, aliases) for i in obj.args))
150
    elif isclass(obj):
151
        return qualname(obj)
152
    elif kind in representers:
153
        return representers[kind](obj, aliases)
154
    elif ident in aliases:
155
        return aliases[ident][0]
156
    else:
157
        return repr(obj)
158