Completed
Push — master ( c47a1a...d9aecd )
by Ionel Cristian
33s
created

Stop   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 3
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 3
rs 10
wmc 1

1 Method

Rating   Name   Duplication   Size   Complexity  
A __call__() 0 2 1
1
from __future__ import absolute_import
2
3
import atexit
4
import functools
5
import inspect
6
import os
7
import weakref
8
9
from .actions import Action
10
from .actions import CallPrinter
11
from .actions import CodePrinter
12
from .actions import Debugger
13
from .actions import Manhole
14
from .actions import VarsPrinter
15
16
try:
17
    if os.environ.get("PUREPYTHONHUNTER"):
18
        raise ImportError("Skipped")
19
20
    from ._predicates import And as _And
21
    from ._predicates import Not
22
    from ._predicates import Or as _Or
23
    from ._predicates import When
24
    from ._predicates import Query
25
    from ._tracer import Tracer
26
except ImportError:
27
    from .predicates import And as _And
28
    from .predicates import Not
29
    from .predicates import Or as _Or
30
    from .predicates import When
31
    from .predicates import Query
32
    from .tracer import Tracer
33
34
__version__ = "1.4.1"
35
__all__ = (
36
    'And',
37
    'CallPrinter',
38
    'CodePrinter',
39
    'Debugger',
40
    'Manhole',
41
    'Not',
42
    'Or',
43
    'Q',
44
    'Query',
45
    'VarsPrinter',
46
    'When',
47
48
    'stop',
49
    'trace',
50
)
51
_last_tracer = None
52
53
54
def Q(*predicates, **query):
55
    """
56
    Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
57
    Conveniently converts that to an :class:`hunter.And` predicate.
58
    """
59
    optional_actions = query.pop("actions", [])
60
    if "action" in query:
61
        optional_actions.append(query.pop("action"))
62
63
    for p in predicates:
64
        if not callable(p):
65
            raise TypeError("Predicate {0!r} is not callable.".format(p))
66
67
    for a in optional_actions:
68
        if not callable(a):
69
            raise TypeError("Action {0!r} is not callable.".format(a))
70
71
    if predicates:
72
        predicates = tuple(
73
            p() if inspect.isclass(p) and issubclass(p, Action) else p
74
            for p in predicates
75
        )
76
        if any(isinstance(p, CodePrinter) for p in predicates):
77
            if CodePrinter in optional_actions:
78
                optional_actions.remove(CodePrinter)
79
        if query:
80
            predicates += Query(**query),
81
82
        result = And(*predicates)
83
    else:
84
        result = Query(**query)
85
86
    if optional_actions:
87
        result = When(result, *optional_actions)
88
89
    return result
90
91
92
def _flatten(cls, predicate, *predicates):
93
    if not predicates:
94
        return predicate
95
    else:
96
        all_predicates = []
97
        if isinstance(predicate, cls):
98
            all_predicates.extend(predicate.predicates)
99
        else:
100
            all_predicates.append(predicate)
101
102
        for p in predicates:
103
            if isinstance(p, cls):
104
                all_predicates.extend(p.predicates)
105
            else:
106
                all_predicates.append(p)
107
        return cls(*all_predicates)
108
109
110
def And(*predicates, **kwargs):
111
    """
112
    `And` predicate. Returns ``False`` at the first sub-predicate that returns ``False``.
113
    """
114
    if kwargs:
115
        predicates += Query(**kwargs),
116
    return _flatten(_And, *predicates)
117
118
119
def Or(*predicates, **kwargs):
120
    """
121
    `Or` predicate. Returns ``True`` at the first sub-predicate that returns ``True``.
122
    """
123
    if kwargs:
124
        predicates += tuple(Query(**{k: v}) for k, v in kwargs.items())
125
    return _flatten(_Or, *predicates)
126
127
128
def stop():
129
    """
130
    Stop tracing. Restores previous tracer (if there was any).
131
    """
132
    global _last_tracer
133
134
    if _last_tracer is not None:
135
        _last_tracer.stop()
136
        _last_tracer = None
137
138
139
class Stop(Action):
140
    def __call__(self, event):
141
        stop()
142
143
144
def _prepare_predicate(*predicates, **options):
145
    if "action" not in options and "actions" not in options:
146
        options["action"] = CodePrinter
147
148
    return Q(*predicates, **options)
149
150
151
def trace(*predicates, **options):
152
    """
153
    Starts tracing. Can be used as a context manager (with slightly incorrect semantics - it starts tracing
154
    before ``__enter__`` is called).
155
156
    Parameters:
157
        *predicates (callables): Runs actions if **all** of the given predicates match.
158
    Keyword Args:
159
        clear_env_var: Disables tracing in subprocess. Default: ``False``.
160
        threading_support: Enable tracing *new* threads. Default: ``False``.
161
        action: Action to run if all the predicates return ``True``. Default: ``CodePrinter``.
162
        actions: Actions to run (in case you want more than 1).
163
    """
164
    global _last_tracer
165
166
    clear_env_var = options.pop("clear_env_var", False)
167
    threading_support = (
168
        options.pop("threading_support", False) or
169
        options.pop("threads_support", False) or
170
        options.pop("thread_support", False) or
171
        options.pop("threadingsupport", False) or
172
        options.pop("threadssupport", False) or
173
        options.pop("threadsupport", False) or
174
        options.pop("threading", False) or
175
        options.pop("threads", False) or
176
        options.pop("thread", False)
177
    )
178
    predicate = _prepare_predicate(*predicates, **options)
179
180
    if clear_env_var:
181
        os.environ.pop("PYTHONHUNTER", None)
182
183
    _last_tracer = Tracer(threading_support)
184
185
    @atexit.register
186
    def atexit_cleanup(ref=weakref.ref(_last_tracer)):
187
        maybe_tracer = ref()
188
        if maybe_tracer is not None:
189
            maybe_tracer.stop()
190
191
    return _last_tracer.trace(predicate)
192
193
194
def wrap(function_to_trace=None, **trace_options):
195
    """
196
    Functions decorated with this will be traced.
197
198
    Use ``local=True`` to only trace local code, eg::
199
200
        @hunter.wrap(local=True)
201
        def my_function():
202
            ...
203
204
    Keyword arguments are allowed, eg::
205
206
        @hunter.wrap(action=hunter.CallPrinter)
207
        def my_function():
208
            ...
209
210
    Or, filters::
211
212
        @hunter.wrap(module='foobar')
213
        def my_function():
214
            ...
215
    """
216
217
    def tracing_decorator(func):
218
        @functools.wraps(func)
219
        def tracing_wrapper(*args, **kwargs):
220
            tracer = trace(~When(Q(calls_gt=0, depth=0), Stop), **trace_options)
221
            try:
222
                return func(*args, **kwargs)
223
            finally:
224
                tracer.stop()
225
        return tracing_wrapper
226
    if function_to_trace is None:
227
        return tracing_decorator
228
    else:
229
        return tracing_decorator(function_to_trace)
230
231
232