Completed
Push — master ( fb4fb6...da9680 )
by Ionel Cristian
01:05
created

src.hunter.Or()   A

Complexity

Conditions 3

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 7
rs 9.4286
cc 3
1
from __future__ import absolute_import
2
3
import atexit
4
import inspect
5
import os
6
from functools import partial
7
8
from .actions import Action
9
from .actions import CodePrinter
10
from .actions import Debugger
11
from .actions import VarsPrinter
12
13
try:
14
    if os.environ.get("PUREPYTHONHUNTER"):
15
        raise ImportError("Skipped")
16
17
    from ._predicates import And as _And
18
    from ._predicates import Not
19
    from ._predicates import Or as _Or
20
    from ._predicates import When
21
    from ._predicates import Query
22
    from ._tracer import Tracer
23
except ImportError:
24
    from .predicates import And as _And
25
    from .predicates import Not
26
    from .predicates import Or as _Or
27
    from .predicates import When
28
    from .predicates import Query
29
    from .tracer import Tracer
30
31
__version__ = "1.0.2"
32
__all__ = (
33
    'And',
34
    'CodePrinter',
35
    'Debugger',
36
    'Not',
37
    'Or',
38
    'Q',
39
    'Query',
40
    'stop',
41
    'trace',
42
    'VarsPrinter',
43
    'When',
44
)
45
_current_tracer = None
46
47
48
def Q(*predicates, **query):
49
    """
50
    Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
51
    Conveniently converts that to an :class:`hunter.And` predicate.
52
    """
53
    optional_actions = query.pop("actions", [])
54
    if "action" in query:
55
        optional_actions.append(query.pop("action"))
56
57
    if predicates:
58
        predicates = tuple(
59
            p() if inspect.isclass(p) and issubclass(p, Action) else p
60
            for p in predicates
61
        )
62
        if any(isinstance(p, CodePrinter) for p in predicates):
63
            if CodePrinter in optional_actions:
64
                optional_actions.remove(CodePrinter)
65
        if query:
66
            predicates += Query(**query),
67
68
        result = And(*predicates)
69
    else:
70
        result = Query(**query)
71
72
    if optional_actions:
73
        result = When(result, *optional_actions)
74
75
    return result
76
77
78
def _flatten(cls, predicate, *predicates):
79
    if not predicates:
80
        return predicate
81
    else:
82
        all_predicates = []
83
        if isinstance(predicate, cls):
84
            all_predicates.extend(predicate.predicates)
85
        else:
86
            all_predicates.append(predicate)
87
88
        for p in predicates:
89
            if isinstance(p, cls):
90
                all_predicates.extend(p.predicates)
91
            else:
92
                all_predicates.append(p)
93
        return cls(*all_predicates)
94
95
96
def And(*predicates, **kwargs):
97
    """
98
    `And` predicate. Exits at the first sub-predicate that returns ``False``.
99
    """
100
    if kwargs:
101
        predicates += Query(**kwargs),
102
    return _flatten(_And, *predicates)
103
104
105
def Or(*predicates, **kwargs):
106
    """
107
    `And` predicate. Exits at the first sub-predicate that returns ``False``.
108
    """
109
    if kwargs:
110
        predicates += tuple(Query(**{k: v}) for k, v in kwargs.items())
111
    return _flatten(_Or, *predicates)
112
113
114
def stop():
115
    """
116
    Stop tracing.
117
118
    Notes:
119
        Restores previous tracer (if there was any).
120
    """
121
    global _current_tracer
122
123
    if _current_tracer is not None:
124
        _current_tracer.stop()
125
        _current_tracer = None
126
127
128
def _prepare_predicate(*predicates, **options):
129
    if "action" not in options and "actions" not in options:
130
        options["action"] = CodePrinter
131
132
    return Q(*predicates, **options)
133
134
135
def trace(*predicates, **options):
136
    """
137
    Starts tracing. Can be used as a context manager (with slightly incorrect semantics - it starts tracing
138
    before ``__enter__`` is called).
139
140
    Parameters:
141
        *predicates (callables): Runs actions if any of the given predicates match.
142
    Keyword Args:
143
        clear_env_var: Disables tracing in subprocess. Default: ``False``.
144
        action: Action to run if all the predicates return ``True``. Default: ``CodePrinter``.
145
        actions: Actions to run (in case you want more than 1).
146
    """
147
    global _current_tracer
148
149
    predicate = _prepare_predicate(*predicates, **options)
150
    clear_env_var = options.pop("clear_env_var", False)
151
152
    if clear_env_var:
153
        os.environ.pop("PYTHONHUNTER", None)
154
    try:
155
        _current_tracer = Tracer()
156
        return _current_tracer.trace(predicate)
157
    finally:
158
        atexit.register(_current_tracer.stop)
159