Completed
Pull Request — master (#19)
by Ionel Cristian
01:00
created

src.hunter._with_metaclass()   A

Complexity

Conditions 1

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 11
rs 9.4286

1 Method

Rating   Name   Duplication   Size   Complexity  
A src.hunter.metaclass.__new__() 0 2 1
1
from __future__ import absolute_import
2
3
import inspect
4
from itertools import chain
5
6
from fields import Fields
7
from six import string_types
8
9
from .actions import Action
10
from .event import Event
11
12
13
class Query(Fields.query):
14
    """
15
    A query class.
16
17
    See :class:`hunter.Event` for fields that can be filtered on.
18
    """
19
    query = ()
20
    allowed = tuple(i for i in Event.__dict__.keys() if not i.startswith('_'))
21
22
    def __init__(self, **query):
23
        """
24
        Args:
25
            query: criteria to match on.
26
27
                Accepted arguments: ``arg``, ``code``, ``filename``, ``frame``, ``fullsource``, ``function``,
28
                ``globals``, ``kind``, ``lineno``, ``locals``, ``module``, ``source``, ``stdlib``, ``tracer``.
29
        """
30
        for key in query:
31
            if key not in self.allowed:
32
                raise TypeError("Unexpected argument {!r}. Must be one of {}.".format(key, self.allowed))
33
        self.query = query
34
35
    def __repr__(self):
36
        return "Query({})".format(
37
            ', '.join("{}={!r}".format(*item) for item in self.query.items()),
38
        )
39
40
    def __call__(self, event):
41
        """
42
        Handles event. Returns True if all criteria matched.
43
        """
44
        for key, value in self.query.items():
45
            evalue = event[key]
46
            if isinstance(evalue, string_types) and isinstance(value, (list, tuple, set)):
47
                if not evalue.startswith(tuple(value)):
48
                    return False
49
            elif evalue != value:
50
                return False
51
52
        return True
53
54
    def __or__(self, other):
55
        """
56
        Convenience API so you can do ``Q() | Q()``. It converts that to ``Or(Q(), Q())``.
57
        """
58
        return Or(self, other)
59
60
    def __and__(self, other):
61
        """
62
        Convenience API so you can do ``Q() & Q()``. It converts that to ``And(Q(), Q())``.
63
        """
64
        return And(self, other)
65
66
67
class When(Fields.condition.actions):
68
    """
69
    Runs ``actions`` when ``condition(event)`` is ``True``.
70
71
    Actions take a single ``event`` argument.
72
    """
73
74
    def __init__(self, condition, *actions):
75
        if not actions:
76
            raise TypeError("Must give at least one action.")
77
        super(When, self).__init__(condition, [
78
            action() if inspect.isclass(action) and issubclass(action, Action) else action
79
            for action in actions
80
            ])
81
82
    def __call__(self, event):
83
        """
84
        Handles the event.
85
        """
86
        if self.condition(event):
87
            for action in self.actions:
88
                action(event)
89
90
            return True
91
92
    def __or__(self, other):
93
        return Or(self, other)
94
95
    def __and__(self, other):
96
        return And(self, other)
97
98
99
def _with_metaclass(meta, *bases):
100
    """Create a base class with a metaclass."""
101
102
    # This requires a bit of explanation: the basic idea is to make a dummy
103
    # metaclass for one level of class instantiation that replaces itself with
104
    # the actual metaclass.
105
    class metaclass(meta):
106
        def __new__(cls, name, this_bases, d):
107
            return meta(name, bases, d)
108
109
    return type.__new__(metaclass, 'temporary_class', (), {})
110
111
112
class And(Fields.predicates):
113
    """
114
    `And` predicate. Exits at the first sub-predicate that returns ``False``.
115
    """
116
117
    def __init__(self, *predicates):
118
        self.predicates = predicates
119
120
    def __str__(self):
121
        return "And({})".format(', '.join(str(p) for p in self.predicates))
122
123
    def __call__(self, event):
124
        """
125
        Handles the event.
126
        """
127
        for predicate in self.predicates:
128
            if not predicate(event):
129
                return False
130
        return True
131
132
    def __or__(self, other):
133
        return Or(self, other)
134
135
    def __and__(self, other):
136
        return And(*chain(self.predicates, other.predicates if isinstance(other, And) else (other,)))
137
138
139
class Or(Fields.predicates):
140
    """
141
    `Or` predicate. Exits at first sub-predicate that returns ``True``.
142
    """
143
144
    def __init__(self, *predicates):
145
        self.predicates = predicates
146
147
    def __str__(self):
148
        return "Or({})".format(', '.join(str(p) for p in self.predicates))
149
150
    def __call__(self, event):
151
        """
152
        Handles the event.
153
        """
154
        for predicate in self.predicates:
155
            if predicate(event):
156
                return True
157
        return False
158
159
    def __or__(self, other):
160
        return Or(*chain(self.predicates, other.predicates if isinstance(other, Or) else (other,)))
161
162
    def __and__(self, other):
163
        return And(self, other)
164