__logged__.advising_function()   F
last analyzed

Complexity

Conditions 21

Size

Total Lines 50

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 21
dl 0
loc 50
rs 2.7751

How to fix   Complexity   

Complexity

Complex classes like __logged__.advising_function() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
import os
3
import string
4
import sys
5
from itertools import islice
6
7
from aspectlib import Aspect
8
from aspectlib import mimic
9
10
try:
11
    from types import InstanceType
12
except ImportError:
13
    InstanceType = type(None)
14
15
logger = logging.getLogger(__name__)
16
17
18
def frame_iterator(frame):
19
    """
20
    Yields frames till there are no more.
21
    """
22
    while frame:
23
        yield frame
24
        frame = frame.f_back
25
26
27
def format_stack(skip=0, length=6, _sep=os.path.sep):
28
    """
29
    Returns a one-line string with the current callstack.
30
    """
31
    return ' < '.join("%s:%s:%s" % (
32
        '/'.join(f.f_code.co_filename.split(_sep)[-2:]),
33
        f.f_lineno,
34
        f.f_code.co_name
35
    ) for f in islice(frame_iterator(sys._getframe(1 + skip)), length))
36
37
PRINTABLE = string.digits + string.ascii_letters + string.punctuation + ' '
38
ASCII_ONLY = ''.join(i if i in PRINTABLE else '.' for i in (chr(c) for c in range(256)))
39
40
41
def strip_non_ascii(val):
42
    """
43
    Convert to string (using `str`) and replace non-ascii characters with a dot (``.``).
44
    """
45
    return str(val).translate(ASCII_ONLY)
46
47
48
def log(func=None,
49
        stacktrace=10,
50
        stacktrace_align=60,
51
        attributes=(),
52
        module=True,
53
        call=True,
54
        call_args=True,
55
        call_args_repr=repr,
56
        result=True,
57
        exception=True,
58
        exception_repr=repr,
59
        result_repr=strip_non_ascii,
60
        use_logging='CRITICAL',
61
        print_to=None):
62
    """
63
    Decorates `func` to have logging.
64
65
    Args
66
        func (function):
67
            Function to decorate. If missing log returns a partial which you can use as a decorator.
68
        stacktrace (int):
69
            Number of frames to show.
70
        stacktrace_align (int):
71
            Column to align the framelist to.
72
        attributes (list):
73
            List of instance attributes to show, in case the function is a instance method.
74
        module (bool):
75
            Show the module.
76
        call (bool):
77
            If ``True``, then show calls. If ``False`` only show the call details on exceptions (if ``exception`` is
78
            enabled) (default: ``True``)
79
        call_args (bool):
80
            If ``True``, then show call arguments. (default: ``True``)
81
        call_args_repr (bool):
82
            Function to convert one argument to a string. (default: ``repr``)
83
        result (bool):
84
            If ``True``, then show result. (default: ``True``)
85
        exception (bool):
86
            If ``True``, then show exceptions. (default: ``True``)
87
        exception_repr (function):
88
            Function to convert an exception to a string. (default: ``repr``)
89
        result_repr (function):
90
            Function to convert the result object to a string. (default: ``strip_non_ascii`` - like ``str`` but nonascii
91
            characters are replaced with dots.)
92
        use_logging (string):
93
            Emit log messages with the given loglevel. (default: ``"CRITICAL"``)
94
        print_to (fileobject):
95
            File object to write to, in case you don't want to use logging module. (default: ``None`` - printing is
96
            disabled)
97
98
    Returns:
99
        A decorator or a wrapper.
100
101
    Example::
102
103
        >>> @log(print_to=sys.stdout)
104
        ... def a(weird=False):
105
        ...     if weird:
106
        ...         raise RuntimeError('BOOM!')
107
        >>> a()
108
        a()                                                           <<< ...
109
        a => None
110
        >>> try:
111
        ...     a(weird=True)
112
        ... except Exception:
113
        ...     pass # naughty code!
114
        a(weird=True)                                                 <<< ...
115
        a ~ raised RuntimeError('BOOM!',)
116
117
    You can conveniently use this to logs just errors, or just results, example::
118
119
        >>> import aspectlib
120
        >>> with aspectlib.weave(float, log(call=False, result=False, print_to=sys.stdout)):
121
        ...     try:
122
        ...         float('invalid')
123
        ...     except Exception as e:
124
        ...         pass # naughty code!
125
        float('invalid')                                              <<< ...
126
        float ~ raised ValueError(...float...invalid...)
127
128
    This makes debugging naughty code easier.
129
130
    PS: Without the weaving it looks like this::
131
132
        >>> try:
133
        ...     log(call=False, result=False, print_to=sys.stdout)(float)('invalid')
134
        ... except Exception:
135
        ...     pass # naughty code!
136
        float('invalid')                                              <<< ...
137
        float ~ raised ValueError(...float...invalid...)
138
139
140
    .. versionchanged:: 0.5.0
141
142
        Renamed `arguments` to `call_args`.
143
        Renamed `arguments_repr` to `call_args_repr`.
144
        Added `call` option.
145
    """
146
147
    loglevel = use_logging and (
148
        logging._levelNames if hasattr(logging, '_levelNames') else logging._nameToLevel
149
    ).get(use_logging, logging.CRITICAL)
150
    _missing = object()
151
152
    def dump(buf):
153
        try:
154
            if use_logging:
155
                logger._log(loglevel, buf, ())
156
            if print_to:
157
                buf += '\n'
158
                print_to.write(buf)
159
        except Exception as exc:
160
            logger.critical('Failed to log a message: %s', exc, exc_info=True)
161
162
    class __logged__(Aspect):
163
        __slots__ = 'cutpoint_function', 'final_function', 'binding', '__name__', '__weakref__'
164
165
        bind = False
166
167
        def __init__(self, cutpoint_function, binding=None):
168
            mimic(self, cutpoint_function)
169
            self.cutpoint_function = cutpoint_function
170
            self.final_function = super(__logged__, self).__call__(cutpoint_function)
171
            self.binding = binding
172
173
        def __get__(self, instance, owner):
174
            return __logged__(self.cutpoint_function.__get__(instance, owner), instance)
175
176
        def __call__(self, *args, **kwargs):
177
            return self.final_function(*args, **kwargs)
178
179
        def advising_function(self, *args, **kwargs):
180
            name = self.cutpoint_function.__name__
181
            instance = self.binding
182
            if instance is not None:
183
                if isinstance(instance, InstanceType):
184
                    instance_type = instance.__class__
185
                else:
186
                    instance_type = type(instance)
187
188
                info = []
189
                for key in attributes:
190
                    if key.endswith('()'):
191
                        callarg = key = key.rstrip('()')
192
                    else:
193
                        callarg = False
194
                    val = getattr(instance, key, _missing)
195
                    if val is not _missing and key != name:
196
                        info.append(' %s=%s' % (
197
                            key, call_args_repr(val() if callarg else val)
198
                        ))
199
                sig = buf = '{%s%s%s}.%s' % (
200
                    instance_type.__module__ + '.' if module else '',
201
                    instance_type.__name__,
202
                    ''.join(info),
203
                    name
204
                )
205
            else:
206
                sig = buf = name
207
            if call_args:
208
                buf += '(%s%s)' % (
209
                    ', '.join(repr(i) for i in (args if call_args is True else args[:call_args])),
210
                    ((', ' if args else '') + ', '.join('%s=%r' % i for i in kwargs.items()))
211
                    if kwargs and call_args is True
212
                    else '',
213
                )
214
            if stacktrace:
215
                buf = ("%%-%ds  <<< %%s" % stacktrace_align) % (buf, format_stack(skip=1, length=stacktrace))
216
            if call:
217
                dump(buf)
218
            try:
219
                res = yield
220
            except Exception as exc:
221
                if exception:
222
                    if not call:
223
                        dump(buf)
224
                    dump('%s ~ raised %s' % (sig, exception_repr(exc)))
225
                raise
226
227
            if result:
228
                dump('%s => %s' % (sig, result_repr(res)))
229
230
    if func:
231
        return __logged__(func)
232
    else:
233
        return __logged__
234