Completed
Push — master ( 3bc815...5816a7 )
by Ionel Cristian
01:13
created

Event.filename()   C

Complexity

Conditions 7

Size

Total Lines 21

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
c 0
b 0
f 0
dl 0
loc 21
rs 6.4705
1
from __future__ import absolute_import
2
3
import linecache
4
import os
5
import re
6
import tokenize
7
import weakref
8
from functools import partial
9
from threading import current_thread
10
11
from fields import Fields
12
13
from .const import SITE_PACKAGES_PATHS
14
from .const import SYS_PREFIX_PATHS
15
from .util import cached_property
16
17
try:
18
    from threading import main_thread
19
except ImportError:
20
    from threading import _shutdown
21
    get_main_thread = weakref.ref(
22
        _shutdown.__self__ if hasattr(_shutdown, '__self__') else _shutdown.im_self)
23
    del _shutdown
24
else:
25
    get_main_thread = weakref.ref(main_thread())
26
27
__all__ = 'Event',
28
29
STARTSWITH_TYPES = list, tuple, set
30
31
32
class Event(Fields.kind.function.module.filename):
33
    """
34
    Event wrapper for ``frame, kind, arg`` (the arguments the settrace function gets). This objects is passed to your
35
    custom functions or predicates.
36
37
    Provides few convenience properties.
38
39
    .. warning::
40
41
        Users do not instantiate this directly.
42
    """
43
    frame = None
44
    kind = None
45
    arg = None
46
    tracer = None
47
    threadid = None
48
    threadname = None
49
50
    def __init__(self, frame, kind, arg, tracer):
51
        self.frame = frame
52
        self.kind = kind
53
        self.arg = arg
54
        self.tracer = tracer
55
56
    @cached_property
57
    def threadid(self):
58
        """
59
        Current thread ident. If current thread is main thread then it returns ``None``.
60
        """
61
        current = self.thread.ident
62
        main = get_main_thread()
63
        if main is None:
64
            return current
65
        else:
66
            return current if current != main.ident else None
67
68
    @cached_property
69
    def threadname(self):
70
        """
71
        Current thread name.
72
        """
73
        return self.thread.name
74
75
    @cached_property
76
    def thread(self):
77
        """
78
        Current thread object.
79
        """
80
        return current_thread()
81
82
    @cached_property
83
    def locals(self):
84
        """
85
        A dict with local variables.
86
        """
87
        return self.frame.f_locals
88
89
    @cached_property
90
    def globals(self):
91
        """
92
        A dict with global variables.
93
        """
94
        return self.frame.f_globals
95
96
    @cached_property
97
    def function(self):
98
        """
99
        A string with function name.
100
        """
101
        return self.code.co_name
102
103
    @cached_property
104
    def module(self):
105
        """
106
        A string with module name (eg: ``"foo.bar"``).
107
        """
108
        module = self.frame.f_globals.get('__name__', '')
109
        if module is None:
110
            module = ''
111
112
        return module
113
114
    @cached_property
115
    def filename(self, exists=os.path.exists, cython_suffix_re=re.compile(r'[.]cpython-[0-9]+.+$', re.IGNORECASE)):
116
        """
117
        A string with absolute path to file.
118
        """
119
        filename = self.frame.f_globals.get('__file__', '')
120
        if filename is None:
121
            filename = ''
122
123
        if filename.endswith(('.pyc', '.pyo')):
124
            filename = filename[:-1]
125
        elif filename.endswith('$py.class'):  # Jython
126
            filename = filename[:-9] + ".py"
127
        elif filename.endswith(('.so', '.pyd')):
128
            basename = cython_suffix_re.sub('', filename)
129
            for ext in ('.pyx', '.py'):
130
                cyfilename = basename + ext
131
                if exists(cyfilename):
132
                    filename = cyfilename
133
                    break
134
        return filename
135
136
    @cached_property
137
    def lineno(self):
138
        """
139
        An integer with line number in file.
140
        """
141
        return self.frame.f_lineno
142
143
    @cached_property
144
    def code(self):
145
        """
146
        A code object (not a string).
147
        """
148
        return self.frame.f_code
149
150
    @cached_property
151
    def stdlib(self):
152
        """
153
        A boolean flag. ``True`` if frame is in stdlib.
154
        """
155
        if self.filename.startswith(SITE_PACKAGES_PATHS):
156
            # if it's in site-packages then its definitely not stdlib
157
            return False
158
        elif self.filename.startswith(SYS_PREFIX_PATHS):
159
            return True
160
        else:
161
            return False
162
163
    @cached_property
164
    def fullsource(self):
165
        """
166
        A string with the sourcecode for the current statement (from ``linecache`` - failures are ignored).
167
168
        May include multiple lines if it's a class/function definition (will include decorators).
169
        """
170
        try:
171
            return self._raw_fullsource
172
        except Exception as exc:
173
            return "??? NO SOURCE: {!r}".format(exc)
174
175
    @cached_property
176
    def source(self, getline=linecache.getline):
177
        """
178
        A string with the sourcecode for the current line (from ``linecache`` - failures are ignored).
179
180
        Fast but sometimes incomplete.
181
        """
182
        try:
183
            return getline(self.filename, self.lineno)
184
        except Exception as exc:
185
            return "??? NO SOURCE: {!r}".format(exc)
186
187
    @cached_property
188
    def _raw_fullsource(self,
189
                        getlines=linecache.getlines,
190
                        getline=linecache.getline,
191
                        generate_tokens=tokenize.generate_tokens):
192
        if self.kind == 'call' and self.code.co_name != "<module>":
193
            lines = []
194
            try:
195
                for _, token, _, _, line in generate_tokens(partial(
196
                    next,
197
                    yield_lines(self.filename, self.lineno - 1, lines.append)
198
                )):
199
                    if token in ("def", "class", "lambda"):
200
                        return ''.join(lines)
201
            except tokenize.TokenError:
202
                pass
203
204
        return getline(self.filename, self.lineno)
205
206
    __getitem__ = object.__getattribute__
207
208
209
def yield_lines(filename, start, collector,
210
                limit=10,
211
                getlines=linecache.getlines,
212
                leading_whitespace_re=re.compile('(^[ \t]*)(?:[^ \t\n])', re.MULTILINE)):
213
    dedent = None
214
    amount = 0
215
    for line in getlines(filename)[start:start + limit]:
216
        if dedent is None:
217
            dedent = leading_whitespace_re.findall(line)
218
            dedent = dedent[0] if dedent else ""
219
            amount = len(dedent)
220
        elif not line.startswith(dedent):
221
            break
222
        collector(line)
223
        yield line[amount:]
224