Completed
Push — master ( 4d55e8...9e8559 )
by Ionel Cristian
01:05
created

src.hunter.atexit_cleanup()   A

Complexity

Conditions 2

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 5
rs 9.4285
cc 2
1
from __future__ import absolute_import
2
3
import atexit
4
import inspect
5
import os
6
import weakref
7
8
from .actions import Action
9
from .actions import CallPrinter
10
from .actions import CodePrinter
11
from .actions import Debugger
12
from .actions import Manhole
13
from .actions import VarsPrinter
14
15
try:
16
    if os.environ.get("PUREPYTHONHUNTER"):
17
        raise ImportError("Skipped")
18
19
    from ._predicates import And as _And
20
    from ._predicates import Not
21
    from ._predicates import Or as _Or
22
    from ._predicates import When
23
    from ._predicates import Query
24
    from ._tracer import Tracer
25
except ImportError:
26
    from .predicates import And as _And
27
    from .predicates import Not
28
    from .predicates import Or as _Or
29
    from .predicates import When
30
    from .predicates import Query
31
    from .tracer import Tracer
32
33
__version__ = "1.2.2"
34
__all__ = (
35
    'And',
36
    'CallPrinter',
37
    'CodePrinter',
38
    'Debugger',
39
    'Manhole',
40
    'Not',
41
    'Or',
42
    'Q',
43
    'Query',
44
    'VarsPrinter',
45
    'When',
46
47
    'stop',
48
    'trace',
49
)
50
_last_tracer = None
51
52
53
def Q(*predicates, **query):
54
    """
55
    Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
56
    Conveniently converts that to an :class:`hunter.And` predicate.
57
    """
58
    optional_actions = query.pop("actions", [])
59
    if "action" in query:
60
        optional_actions.append(query.pop("action"))
61
62
    for p in predicates:
63
        if not callable(p):
64
            raise TypeError("Predicate {0!r} is not callable.".format(p))
65
66
    for a in optional_actions:
67
        if not callable(a):
68
            raise TypeError("Action {0!r} is not callable.".format(a))
69
70
    if predicates:
71
        predicates = tuple(
72
            p() if inspect.isclass(p) and issubclass(p, Action) else p
73
            for p in predicates
74
        )
75
        if any(isinstance(p, CodePrinter) for p in predicates):
76
            if CodePrinter in optional_actions:
77
                optional_actions.remove(CodePrinter)
78
        if query:
79
            predicates += Query(**query),
80
81
        result = And(*predicates)
82
    else:
83
        result = Query(**query)
84
85
    if optional_actions:
86
        result = When(result, *optional_actions)
87
88
    return result
89
90
91
def _flatten(cls, predicate, *predicates):
92
    if not predicates:
93
        return predicate
94
    else:
95
        all_predicates = []
96
        if isinstance(predicate, cls):
97
            all_predicates.extend(predicate.predicates)
98
        else:
99
            all_predicates.append(predicate)
100
101
        for p in predicates:
102
            if isinstance(p, cls):
103
                all_predicates.extend(p.predicates)
104
            else:
105
                all_predicates.append(p)
106
        return cls(*all_predicates)
107
108
109
def And(*predicates, **kwargs):
110
    """
111
    `And` predicate. Returns ``False`` at the first sub-predicate that returns ``False``.
112
    """
113
    if kwargs:
114
        predicates += Query(**kwargs),
115
    return _flatten(_And, *predicates)
116
117
118
def Or(*predicates, **kwargs):
119
    """
120
    `Or` predicate. Returns ``True`` at the first sub-predicate that returns ``True``.
121
    """
122
    if kwargs:
123
        predicates += tuple(Query(**{k: v}) for k, v in kwargs.items())
124
    return _flatten(_Or, *predicates)
125
126
127
def stop():
128
    """
129
    Stop tracing. Restores previous tracer (if there was any).
130
    """
131
    global _last_tracer
132
133
    if _last_tracer is not None:
134
        _last_tracer.stop()
135
        _last_tracer = None
136
137
138
def _prepare_predicate(*predicates, **options):
139
    if "action" not in options and "actions" not in options:
140
        options["action"] = CodePrinter
141
142
    return Q(*predicates, **options)
143
144
145
def trace(*predicates, **options):
146
    """
147
    Starts tracing. Can be used as a context manager (with slightly incorrect semantics - it starts tracing
148
    before ``__enter__`` is called).
149
150
    Parameters:
151
        *predicates (callables): Runs actions if **all** of the given predicates match.
152
    Keyword Args:
153
        clear_env_var: Disables tracing in subprocess. Default: ``False``.
154
        threading_support: Enable tracing *new* threads. Default: ``False``.
155
        action: Action to run if all the predicates return ``True``. Default: ``CodePrinter``.
156
        actions: Actions to run (in case you want more than 1).
157
    """
158
    global _last_tracer
159
160
    clear_env_var = options.pop("clear_env_var", False)
161
    threading_support = (
162
        options.pop("threading_support", False) or
163
        options.pop("threads_support", False) or
164
        options.pop("thread_support", False) or
165
        options.pop("threadingsupport", False) or
166
        options.pop("threadssupport", False) or
167
        options.pop("threadsupport", False) or
168
        options.pop("threading", False) or
169
        options.pop("threads", False) or
170
        options.pop("thread", False)
171
    )
172
    predicate = _prepare_predicate(*predicates, **options)
173
174
    if clear_env_var:
175
        os.environ.pop("PYTHONHUNTER", None)
176
177
    _last_tracer = Tracer(threading_support)
178
179
    @atexit.register
180
    def atexit_cleanup(ref=weakref.ref(_last_tracer)):
181
        maybe_tracer = ref()
182
        if maybe_tracer is not None:
183
            maybe_tracer.stop()
184
185
    return _last_tracer.trace(predicate)
186
187