Completed
Pull Request — master (#19)
by Ionel Cristian
01:00
created

src.hunter._flatten()   B

Complexity

Conditions 6

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 20
rs 8
1
from __future__ import absolute_import
2
3
import atexit
4
import inspect
5
import os
6
from functools import partial
7
8
from .actions import Action
9
from .actions import CodePrinter
10
from .actions import Debugger
11
from .actions import VarsPrinter
12
13
try:
14
    if os.environ.get("PUREPYTHONHUNTER"):
15
        raise ImportError("Skipped")
16
17
    from ._predicates import And as _And
18
    from ._predicates import Or as _Or
19
    from ._predicates import When
20
    from ._predicates import Query
21
    from ._tracer import Tracer
22
except ImportError:
23
    from .predicates import And as _And
24
    from .predicates import Or as _Or
25
    from .predicates import When
26
    from .predicates import Query
27
    from .tracer import Tracer
28
29
__version__ = "0.6.0"
30
__all__ = 'Q', 'When', 'And', 'Or', 'CodePrinter', 'Debugger', 'VarsPrinter', 'trace', 'stop'
31
32
33
def Q(*predicates, **query):
34
    """
35
    Handles situations where :class:`hunter.Query` objects (or other callables) are passed in as positional arguments.
36
    Conveniently converts that to an :class:`hunter.Or` predicate.
37
    """
38
    optional_actions = query.pop("actions", [])
39
    if "action" in query:
40
        optional_actions.append(query.pop("action"))
41
42
    if predicates:
43
        predicates = tuple(
44
            p() if inspect.isclass(p) and issubclass(p, Action) else p
45
            for p in predicates
46
        )
47
        if any(isinstance(p, CodePrinter) for p in predicates):
48
            if CodePrinter in optional_actions:
49
                optional_actions.remove(CodePrinter)
50
        if query:
51
            predicates += Query(**query),
52
53
        result = Or(*predicates)
54
    else:
55
        result = Query(**query)
56
57
    if optional_actions:
58
        result = When(result, *optional_actions)
59
60
    return result
61
62
63
def _flatten(predicate, *predicates, **kwargs):
64
    cls = kwargs.pop('cls')
65
    if kwargs:
66
        raise TypeError("Did not expecte keyword arguments")
67
68
    if not predicates:
69
        return predicate
70
    else:
71
        all_predicates = []
72
        if isinstance(predicate, cls):
73
            all_predicates.extend(predicate.predicates)
74
        else:
75
            all_predicates.append(predicate)
76
77
        for p in predicates:
78
            if isinstance(p, cls):
79
                all_predicates.extend(p.predicates)
80
            else:
81
                all_predicates.append(p)
82
        return cls(*all_predicates)
83
84
85
And = partial(_flatten, cls=_And)
86
Or = partial(_flatten, cls=_Or)
87
88
_tracer = Tracer()
89
stop = atexit.register(_tracer.stop)
90
91
92
def trace(*predicates, **options):
93
    if "action" not in options and "actions" not in options:
94
        options["action"] = CodePrinter
95
96
    clear_env_var = options.pop("clear_env_var", False)
97
    predicate = Q(*predicates, **options)
98
99
    if clear_env_var:
100
        os.environ.pop("PYTHONHUNTER", None)
101
102
    return _tracer.trace(predicate)
103