Completed
Push — master ( 67ac21...36843e )
by Ionel Cristian
59s
created

src.hunter.Query   B

Complexity

Total Complexity 40

Size/Duplication

Total Lines 132
Duplicated Lines 0 %
Metric Value
wmc 40
dl 0
loc 132
rs 8.2609

7 Methods

Rating   Name   Duplication   Size   Complexity  
F __init__() 0 53 17
A __repr__() 0 10 3
F __call__() 0 30 13
A __str__() 0 12 4
A __and__() 0 5 1
A __invert__() 0 2 1
A __or__() 0 5 1

How to fix   Complexity   

Complex Class

Complex classes like src.hunter.Query often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
3
import inspect
4
import re
5
6
from fields import Fields
7
from itertools import chain
8
from six import string_types
9
10
from .actions import Action
11
from .event import Event
12
13
ALLOWED_KEYS = tuple(i for i in Event.__dict__.keys() if not i.startswith('_'))
14
ALLOWED_OPERATORS = 'startswith', 'endswith', 'in', 'contains', 'regex'
15
16
17
class Query(Fields.query_eq.query_startswith.query_endswith.query_in.query_contains):
18
    """
19
    A query class.
20
21
    See :class:`hunter.Event` for fields that can be filtered on.
22
    """
23
    def __init__(self, **query):
24
        """
25
        Args:
26
            query: criteria to match on.
27
28
                Accepted arguments: ``arg``, ``code``, ``filename``, ``frame``, ``fullsource``, ``function``,
29
                ``globals``, ``kind``, ``lineno``, ``locals``, ``module``, ``source``, ``stdlib``, ``tracer``.
30
        """
31
        self.query_eq = {}
32
        self.query_startswith = {}
33
        self.query_endswith = {}
34
        self.query_in = {}
35
        self.query_contains = {}
36
        self.query_regex = {}
37
38
        for key, value in query.items():
39
            parts = [p for p in key.split('_') if p]
40
            count = len(parts)
41
            if count > 2:
42
                raise TypeError('Unexpected argument %r. Must be one of %s with optional operators like: %s' % (
43
                    key, ALLOWED_KEYS, ALLOWED_OPERATORS
44
                ))
45
            elif count == 2:
46
                prefix, operator = parts
47
                if operator not in ALLOWED_OPERATORS:
48
                    raise TypeError('Unexpected operator %r. Must be one of %s.'.format(operator, ALLOWED_OPERATORS))
49
                elif operator == 'startswith':
50
                    if not isinstance(value, string_types):
51
                        if not isinstance(value, (list, set, tuple)):
52
                            raise ValueError('Value %r for %r is invalid. Must be a string, list, tuple or set.' % (value, key))
53
                        value = tuple(value)
54
                    mapping = self.query_startswith
55
                elif operator == 'endswith':
56
                    if not isinstance(value, string_types):
57
                        if not isinstance(value, (list, set, tuple)):
58
                            raise ValueError('Value %r for %r is invalid. Must be a string, list, tuple or set.' % (value, key))
59
                        value = tuple(value)
60
                    mapping = self.query_endswith
61
                elif operator == 'in':
62
                    mapping = self.query_in
63
                elif operator == 'contains':
64
                    mapping = self.query_contains
65
                elif operator == 'regex':
66
                    value = re.compile(value)
67
                    mapping = self.query_regex
68
            else:
69
                mapping = self.query_eq
70
                prefix = key
71
72
            if prefix not in ALLOWED_KEYS:
73
                raise TypeError('Unexpected argument %r. Must be one of %s.' % (key, ALLOWED_KEYS))
74
75
            mapping[prefix] = value
76
77
    def __str__(self):
78
        return 'Query(%s)' % (
79
            ', '.join(
80
                ', '.join('%s%s=%r' % (key, kind, value) for key, value in mapping.items())
81
                for kind, mapping in [
82
                    ('', self.query_eq),
83
                    ('_in', self.query_in),
84
                    ('_contains', self.query_contains),
85
                    ('_startswith', self.query_startswith),
86
                    ('_endswith', self.query_endswith),
87
                    ('_regex', self.query_regex),
88
                ] if mapping
89
            )
90
        )
91
92
    def __repr__(self):
93
        return '<hunter._predicates.Query: %s>' % ' '.join(
94
            fmt % mapping for fmt, mapping in [
95
                ('query_eq=%r', self.query_eq),
96
                ('query_in=%r', self.query_in),
97
                ('query_contains=%r', self.query_contains),
98
                ('query_startswith=%r', self.query_startswith),
99
                ('query_endswith=%r', self.query_endswith),
100
                ('query_regex=%r', self.query_regex),
101
            ] if mapping
102
        )
103
104
    def __call__(self, event):
105
        """
106
        Handles event. Returns True if all criteria matched.
107
        """
108
        for key, value in self.query_eq.items():
109
            evalue = event[key]
110
            if evalue != value:
111
                return False
112
        for key, value in self.query_in.items():
113
            evalue = event[key]
114
            if evalue not in value:
115
                return False
116
        for key, value in self.query_contains.items():
117
            evalue = event[key]
118
            if value not in evalue:
119
                return False
120
        for key, value in self.query_startswith.items():
121
            evalue = event[key]
122
            if not evalue.startswith(value):
123
                return False
124
        for key, value in self.query_endswith.items():
125
            evalue = event[key]
126
            if not evalue.endswith(value):
127
                return False
128
        for key, value in self.query_regex.items():
129
            evalue = event[key]
130
            if not value.match(evalue):
131
                return False
132
133
        return True
134
135
    def __or__(self, other):
136
        """
137
        Convenience API so you can do ``Q() | Q()``. It converts that to ``Or(Q(), Q())``.
138
        """
139
        return Or(self, other)
140
141
    def __and__(self, other):
142
        """
143
        Convenience API so you can do ``Q() & Q()``. It converts that to ``And(Q(), Q())``.
144
        """
145
        return And(self, other)
146
147
    def __invert__(self):
148
        return Not(self)
149
150
151
class When(Fields.condition.actions):
152
    """
153
    Runs ``actions`` when ``condition(event)`` is ``True``.
154
155
    Actions take a single ``event`` argument.
156
    """
157
158
    def __init__(self, condition, *actions):
159
        if not actions:
160
            raise TypeError('Must give at least one action.')
161
        super(When, self).__init__(condition, [
162
            action() if inspect.isclass(action) and issubclass(action, Action) else action
163
            for action in actions
164
            ])
165
166
    def __str__(self):
167
        return 'When(%s, %s)' % (
168
            self.condition,
169
            ', '.join(repr(p) for p in self.actions)
170
        )
171
172
    def __repr__(self):
173
        return '<hunter._predicates.When: condition=%r, actions=%r>' % (self.condition, self.actions)
174
175
    def __call__(self, event):
176
        """
177
        Handles the event.
178
        """
179
        if self.condition(event):
180
            for action in self.actions:
181
                action(event)
182
            return True
183
        else:
184
            return False
185
186
    def __or__(self, other):
187
        return Or(self, other)
188
189
    def __and__(self, other):
190
        return And(self, other)
191
192
193
class And(Fields.predicates):
194
    """
195
    `And` predicate. Exits at the first sub-predicate that returns ``False``.
196
    """
197
198
    def __init__(self, *predicates):
199
        self.predicates = predicates
200
201
    def __str__(self):
202
        return 'And(%s)' % ', '.join(str(p) for p in self.predicates)
203
204
    def __repr__(self):
205
        return '<hunter._predicates.And: predicates=%r>' % (self.predicates,)
206
207
    def __call__(self, event):
208
        """
209
        Handles the event.
210
        """
211
        for predicate in self.predicates:
212
            if not predicate(event):
213
                return False
214
        else:
215
            return True
216
217
    def __or__(self, other):
218
        return Or(self, other)
219
220
    def __and__(self, other):
221
        return And(*chain(self.predicates, other.predicates if isinstance(other, And) else (other,)))
222
223
    def __invert__(self):
224
        return Not(self)
225
226
227
class Or(Fields.predicates):
228
    """
229
    `Or` predicate. Exits at first sub-predicate that returns ``True``.
230
    """
231
232
    def __init__(self, *predicates):
233
        self.predicates = predicates
234
235
    def __str__(self):
236
        return 'Or(%s)' % ', '.join(str(p) for p in self.predicates)
237
238
    def __repr__(self):
239
        return '<hunter._predicates.Or: predicates=%r>' % (self.predicates,)
240
241
    def __call__(self, event):
242
        """
243
        Handles the event.
244
        """
245
        for predicate in self.predicates:
246
            if predicate(event):
247
                return True
248
        else:
249
            return False
250
251
    def __or__(self, other):
252
        return Or(*chain(self.predicates, other.predicates if isinstance(other, Or) else (other,)))
253
254
    def __and__(self, other):
255
        return And(self, other)
256
257
    def __invert__(self):
258
        return Not(self)
259
260
261
class Not(Fields.predicate):
262
    """
263
    `Not` predicate.
264
    """
265
266
    def __str__(self):
267
        return 'Not(%s)' % self.predicate
268
269
    def __repr__(self):
270
        return '<hunter._predicates.Not: predicate=%r>' % self.predicate
271
272
    def __call__(self, event):
273
        """
274
        Handles the event.
275
        """
276
        return not self.predicate(event)
277
278
    def __or__(self, other):
279
        if isinstance(other, Not):
280
            return Not(And(self.predicate, other.predicate))
281
        else:
282
            return Or(self, other)
283
284
    def __and__(self, other):
285
        if isinstance(other, Not):
286
            return Not(Or(self.predicate, other.predicate))
287
        else:
288
            return And(self, other)
289
290
    def __invert__(self):
291
        return self.predicate
292