Q()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 39

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 17
c 1
b 0
f 0
dl 0
loc 39
rs 2.7204

How to fix   Complexity   

Complexity

Complex classes like Q() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
3
import atexit
4
import functools
5
import inspect
6
import os
7
import weakref
8
9
from .actions import Action
10
from .actions import CallPrinter
11
from .actions import CodePrinter
12
from .actions import Debugger
13
from .actions import Manhole
14
from .actions import VarsPrinter
15
16
try:
17
    if os.environ.get("PUREPYTHONHUNTER"):
18
        raise ImportError("Skipped")
19
20
    from ._predicates import And as _And
21
    from ._predicates import Not
22
    from ._predicates import Or as _Or
23
    from ._predicates import When
24
    from ._predicates import Query
25
    from ._tracer import Tracer
26
except ImportError:
27
    from .predicates import And as _And
28
    from .predicates import Not
29
    from .predicates import Or as _Or
30
    from .predicates import When
31
    from .predicates import Query
32
    from .tracer import Tracer
33
34
__version__ = "2.0.2"
35
__all__ = (
36
    'And',
37
    'CallPrinter',
38
    'CodePrinter',
39
    'Debugger',
40
    'Manhole',
41
    'Not',
42
    'Or',
43
    'Q',
44
    'Query',
45
    'VarsPrinter',
46
    'When',
47
48
    'stop',
49
    'trace',
50
)
51
_last_tracer = None
52
53
54
def Q(*predicates, **query):
55
    """
56
    Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
57
    Conveniently converts that to an :class:`hunter.And` predicate.
58
    """
59
    optional_actions = query.pop("actions", [])
60
    if "action" in query:
61
        optional_actions.append(query.pop("action"))
62
63
    for p in predicates:
64
        if not callable(p):
65
            raise TypeError("Predicate {0!r} is not callable.".format(p))
66
67
    for a in optional_actions:
68
        if not callable(a):
69
            raise TypeError("Action {0!r} is not callable.".format(a))
70
71
    if predicates:
72
        predicates = tuple(
73
            p() if inspect.isclass(p) and issubclass(p, Action) else p
74
            for p in predicates
75
        )
76
        if any(isinstance(p, CodePrinter) for p in predicates):
77
            if CodePrinter in optional_actions:
78
                optional_actions.remove(CodePrinter)
79
        if any(isinstance(p, CallPrinter) for p in predicates):
80
            if CallPrinter in optional_actions:
81
                optional_actions.remove(CallPrinter)
82
        if query:
83
            predicates += Query(**query),
84
85
        result = And(*predicates)
86
    else:
87
        result = Query(**query)
88
89
    if optional_actions:
90
        result = When(result, *optional_actions)
91
92
    return result
93
94
95
def _flatten(cls, predicate, *predicates):
96
    if not predicates:
97
        return predicate
98
    else:
99
        all_predicates = []
100
        if isinstance(predicate, cls):
101
            all_predicates.extend(predicate.predicates)
102
        else:
103
            all_predicates.append(predicate)
104
105
        for p in predicates:
106
            if isinstance(p, cls):
107
                all_predicates.extend(p.predicates)
108
            else:
109
                all_predicates.append(p)
110
        return cls(*all_predicates)
111
112
113
def And(*predicates, **kwargs):
114
    """
115
    `And` predicate. Returns ``False`` at the first sub-predicate that returns ``False``.
116
    """
117
    if kwargs:
118
        predicates += Query(**kwargs),
119
    return _flatten(_And, *predicates)
120
121
122
def Or(*predicates, **kwargs):
123
    """
124
    `Or` predicate. Returns ``True`` at the first sub-predicate that returns ``True``.
125
    """
126
    if kwargs:
127
        predicates += tuple(Query(**{k: v}) for k, v in kwargs.items())
128
    return _flatten(_Or, *predicates)
129
130
131
def stop():
132
    """
133
    Stop tracing. Restores previous tracer (if there was any).
134
    """
135
    global _last_tracer
136
137
    if _last_tracer is not None:
138
        _last_tracer.stop()
139
        _last_tracer = None
140
141
142
class Stop(Action):
143
    def __call__(self, event):
144
        stop()
145
146
147
def _prepare_predicate(*predicates, **options):
148
    if "action" not in options and "actions" not in options:
149
        options["action"] = CallPrinter
150
151
    return Q(*predicates, **options)
152
153
154
def trace(*predicates, **options):
155
    """
156
    Starts tracing. Can be used as a context manager (with slightly incorrect semantics - it starts tracing
157
    before ``__enter__`` is called).
158
159
    Parameters:
160
        *predicates (callables): Runs actions if **all** of the given predicates match.
161
    Keyword Args:
162
        clear_env_var: Disables tracing in subprocess. Default: ``False``.
163
        threading_support: Enable tracing *new* threads. Default: ``False``. You can also use
164
        ``threads_support``, ``thread_support``, ``threadingsupport``, ``threadssupport``, ``threadsupport``,
165
        ``threading``, ``threads`` or ``thread``.
166
        action: Action to run if all the predicates return ``True``. Default: ``CodePrinter``.
167
        actions: Actions to run (in case you want more than 1).
168
        **kwargs: for convenience you can also pass anything that you'd pass to :ref:`hunter.Q`
169
    """
170
    global _last_tracer
171
172
    clear_env_var = options.pop("clear_env_var", False)
173
    threading_support = (
174
        options.pop("threading_support", False) or
175
        options.pop("threads_support", False) or
176
        options.pop("thread_support", False) or
177
        options.pop("threadingsupport", False) or
178
        options.pop("threadssupport", False) or
179
        options.pop("threadsupport", False) or
180
        options.pop("threading", False) or
181
        options.pop("threads", False) or
182
        options.pop("thread", False)
183
    )
184
    predicate = _prepare_predicate(*predicates, **options)
185
186
    if clear_env_var:
187
        os.environ.pop("PYTHONHUNTER", None)
188
189
    _last_tracer = Tracer(threading_support)
190
191
    @atexit.register
192
    def atexit_cleanup(ref=weakref.ref(_last_tracer)):
193
        maybe_tracer = ref()
194
        if maybe_tracer is not None:
195
            maybe_tracer.stop()
196
197
    return _last_tracer.trace(predicate)
198
199
200
def wrap(function_to_trace=None, **trace_options):
201
    """
202
    Functions decorated with this will be traced.
203
204
    Use ``local=True`` to only trace local code, eg::
205
206
        @hunter.wrap(local=True)
207
        def my_function():
208
            ...
209
210
    Keyword arguments are allowed, eg::
211
212
        @hunter.wrap(action=hunter.CallPrinter)
213
        def my_function():
214
            ...
215
216
    Or, filters::
217
218
        @hunter.wrap(module='foobar')
219
        def my_function():
220
            ...
221
    """
222
223
    def tracing_decorator(func):
224
        @functools.wraps(func)
225
        def tracing_wrapper(*args, **kwargs):
226
            tracer = trace(~When(Q(calls_gt=0, depth=0), Stop), **trace_options)
227
            try:
228
                return func(*args, **kwargs)
229
            finally:
230
                tracer.stop()
231
        return tracing_wrapper
232
    if function_to_trace is None:
233
        return tracing_decorator
234
    else:
235
        return tracing_decorator(function_to_trace)
236