Completed
Push — master ( 461620...b3feb5 )
by Ionel Cristian
57s
created

src.hunter._prepare_predicate()   A

Complexity

Conditions 3

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 5
rs 9.4286
cc 3
1
from __future__ import absolute_import
2
3
import atexit
4
import inspect
5
import os
6
from functools import partial
7
8
from .actions import Action
9
from .actions import CodePrinter
10
from .actions import Debugger
11
from .actions import VarsPrinter
12
13
try:
14
    if os.environ.get("PUREPYTHONHUNTER"):
15
        raise ImportError("Skipped")
16
17
    from ._predicates import And as _And
18
    from ._predicates import Not
19
    from ._predicates import Or as _Or
20
    from ._predicates import When
21
    from ._predicates import Query
22
    from ._tracer import Tracer
23
except ImportError:
24
    from .predicates import And as _And
25
    from .predicates import Not
26
    from .predicates import Or as _Or
27
    from .predicates import When
28
    from .predicates import Query
29
    from .tracer import Tracer
30
31
__version__ = "0.6.0"
32
__all__ = (
33
    'And',
34
    'CodePrinter',
35
    'Debugger',
36
    'Not',
37
    'Or',
38
    'Q',
39
    'Query',
40
    'stop',
41
    'trace',
42
    'VarsPrinter',
43
    'When',
44
)
45
46
47
def Q(*predicates, **query):
48
    """
49
    Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
50
    Conveniently converts that to an :class:`hunter.Or` predicate.
51
    """
52
    optional_actions = query.pop("actions", [])
53
    if "action" in query:
54
        optional_actions.append(query.pop("action"))
55
56
    if predicates:
57
        predicates = tuple(
58
            p() if inspect.isclass(p) and issubclass(p, Action) else p
59
            for p in predicates
60
        )
61
        if any(isinstance(p, CodePrinter) for p in predicates):
62
            if CodePrinter in optional_actions:
63
                optional_actions.remove(CodePrinter)
64
        if query:
65
            predicates += Query(**query),
66
67
        result = And(*predicates)
68
    else:
69
        result = Query(**query)
70
71
    if optional_actions:
72
        result = When(result, *optional_actions)
73
74
    return result
75
76
77
def _flatten(predicate, *predicates, **kwargs):
78
    cls = kwargs.pop('cls')
79
    if kwargs:
80
        raise TypeError("Did not expecte keyword arguments")
81
82
    if not predicates:
83
        return predicate
84
    else:
85
        all_predicates = []
86
        if isinstance(predicate, cls):
87
            all_predicates.extend(predicate.predicates)
88
        else:
89
            all_predicates.append(predicate)
90
91
        for p in predicates:
92
            if isinstance(p, cls):
93
                all_predicates.extend(p.predicates)
94
            else:
95
                all_predicates.append(p)
96
        return cls(*all_predicates)
97
98
99
And = partial(_flatten, cls=_And)
100
Or = partial(_flatten, cls=_Or)
101
102
_current_tracer = None
103
104
105
def stop():
106
    global _current_tracer
107
108
    if _current_tracer is not None:
109
        _current_tracer.stop()
110
        _current_tracer = None
111
112
113
def _prepare_predicate(*predicates, **options):
114
    if "action" not in options and "actions" not in options:
115
        options["action"] = CodePrinter
116
117
    return Q(*predicates, **options)
118
119
120
def trace(*predicates, **options):
121
    global _current_tracer
122
123
    predicate = _prepare_predicate(*predicates, **options)
124
    clear_env_var = options.pop("clear_env_var", False)
125
126
    if clear_env_var:
127
        os.environ.pop("PYTHONHUNTER", None)
128
    try:
129
        _current_tracer = Tracer()
130
        return _current_tracer.trace(predicate)
131
    finally:
132
        atexit.register(_current_tracer.stop)
133