Completed
Push — master ( 449e73...c6f33b )
by Ionel Cristian
01:09
created

Query.__init__()   F

Complexity

Conditions 16

Size

Total Lines 60

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 16
dl 0
loc 60
rs 3.0793

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like Query.__init__() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import absolute_import
2
3
import inspect
4
import re
5
from itertools import chain
6
7
from fields import Fields
8
from six import string_types
9
10
from .actions import Action
11
from .event import Event
12
13
ALLOWED_KEYS = tuple(i for i in Event.__dict__.keys() if not i.startswith('_') and i not in ('tracer', 'thread'))
14
ALLOWED_OPERATORS = 'startswith', 'endswith', 'in', 'contains', 'regex'
15
16
17
def _sloppy_hash(obj):
18
    try:
19
        return hash(obj)
20
    except TypeError:
21
        return 'id(%x)' % id(obj)
22
23
24
class Query(Fields.query_eq.query_startswith.query_endswith.query_in.query_contains):
25
    """
26
    A query class.
27
28
    See :class:`hunter.Event` for fields that can be filtered on.
29
    """
30
    def __init__(self, **query):
31
        """
32
        Args:
33
            query: criteria to match on.
34
35
                Accepted arguments: ``arg``, ``code``, ``filename``, ``frame``, ``fullsource``, ``function``,
36
                ``globals``, ``kind``, ``lineno``, ``locals``, ``module``, ``source``, ``stdlib``, ``tracer``.
37
        """
38
        query_eq = {}
39
        query_startswith = {}
40
        query_endswith = {}
41
        query_in = {}
42
        query_contains = {}
43
        query_regex = {}
44
45
        for key, value in query.items():
46
            parts = [p for p in key.split('_') if p]
47
            count = len(parts)
48
            if count > 2:
49
                raise TypeError('Unexpected argument %r. Must be one of %s with optional operators like: %s' % (
50
                    key, ALLOWED_KEYS, ALLOWED_OPERATORS
51
                ))
52
            elif count == 2:
53
                prefix, operator = parts
54
                if operator == 'startswith':
55
                    if not isinstance(value, string_types):
56
                        if not isinstance(value, (list, set, tuple)):
57
                            raise ValueError('Value %r for %r is invalid. Must be a string, list, tuple or set.' % (value, key))
58
                        value = tuple(value)
59
                    mapping = query_startswith
60
                elif operator == 'endswith':
61
                    if not isinstance(value, string_types):
62
                        if not isinstance(value, (list, set, tuple)):
63
                            raise ValueError('Value %r for %r is invalid. Must be a string, list, tuple or set.' % (value, key))
64
                        value = tuple(value)
65
                    mapping = query_endswith
66
                elif operator == 'in':
67
                    mapping = query_in
68
                elif operator == 'contains':
69
                    mapping = query_contains
70
                elif operator == 'regex':
71
                    value = re.compile(value)
72
                    mapping = query_regex
73
                else:
74
                    raise TypeError('Unexpected operator %r. Must be one of %s.' % (operator, ALLOWED_OPERATORS))
75
            else:
76
                mapping = query_eq
77
                prefix = key
78
79
            if prefix not in ALLOWED_KEYS:
80
                raise TypeError('Unexpected argument %r. Must be one of %s.' % (key, ALLOWED_KEYS))
81
82
            mapping[prefix] = value
83
84
        self.query_eq = tuple(sorted(query_eq.items()))
85
        self.query_startswith = tuple(sorted(query_startswith.items()))
86
        self.query_endswith = tuple(sorted(query_endswith.items()))
87
        self.query_in = tuple(sorted(query_in.items()))
88
        self.query_contains = tuple(sorted(query_contains.items()))
89
        self.query_regex = tuple(sorted(query_regex.items()))
90
91
    def __str__(self):
92
        return 'Query(%s)' % (
93
            ', '.join(
94
                ', '.join('%s%s=%r' % (key, kind, value) for key, value in mapping)
95
                for kind, mapping in [
96
                    ('', self.query_eq),
97
                    ('_in', self.query_in),
98
                    ('_contains', self.query_contains),
99
                    ('_startswith', self.query_startswith),
100
                    ('_endswith', self.query_endswith),
101
                    ('_regex', self.query_regex),
102
                ] if mapping
103
            )
104
        )
105
106
    def __repr__(self):
107
        return '<hunter.predicates.Query: %s>' % ' '.join(
108
            fmt % (mapping,) for fmt, mapping in [
109
                ('query_eq=%r', self.query_eq),
110
                ('query_in=%r', self.query_in),
111
                ('query_contains=%r', self.query_contains),
112
                ('query_startswith=%r', self.query_startswith),
113
                ('query_endswith=%r', self.query_endswith),
114
                ('query_regex=%r', self.query_regex),
115
            ] if mapping
116
        )
117
118
    def __call__(self, event):
119
        """
120
        Handles event. Returns True if all criteria matched.
121
        """
122
        for key, value in self.query_eq:
123
            evalue = event[key]
124
            if evalue != value:
125
                return False
126
        for key, value in self.query_in:
127
            evalue = event[key]
128
            if evalue not in value:
129
                return False
130
        for key, value in self.query_contains:
131
            evalue = event[key]
132
            if value not in evalue:
133
                return False
134
        for key, value in self.query_startswith:
135
            evalue = event[key]
136
            if not evalue.startswith(value):
137
                return False
138
        for key, value in self.query_endswith:
139
            evalue = event[key]
140
            if not evalue.endswith(value):
141
                return False
142
        for key, value in self.query_regex:
143
            evalue = event[key]
144
            if not value.match(evalue):
145
                return False
146
147
        return True
148
149
    def __or__(self, other):
150
        """
151
        Convenience API so you can do ``Q() | Q()``. It converts that to ``Or(Q(), Q())``.
152
        """
153
        return Or(self, other)
154
155
    def __and__(self, other):
156
        """
157
        Convenience API so you can do ``Q() & Q()``. It converts that to ``And(Q(), Q())``.
158
        """
159
        return And(self, other)
160
161
    def __invert__(self):
162
        return Not(self)
163
164
    __ror__ = __or__
165
    __rand__ = __and__
166
167
168
class When(Fields.condition.actions):
169
    """
170
    Runs ``actions`` when ``condition(event)`` is ``True``.
171
172
    Actions take a single ``event`` argument.
173
    """
174
175
    def __init__(self, condition, *actions):
176
        if not actions:
177
            raise TypeError('Must give at least one action.')
178
        super(When, self).__init__(condition, tuple(
179
            action() if inspect.isclass(action) and issubclass(action, Action) else action
180
            for action in actions))
181
182
    def __str__(self):
183
        return 'When(%s, %s)' % (
184
            self.condition,
185
            ', '.join(repr(p) for p in self.actions)
186
        )
187
188
    def __repr__(self):
189
        return '<hunter.predicates.When: condition=%r, actions=%r>' % (self.condition, self.actions)
190
191
    def __call__(self, event):
192
        """
193
        Handles the event.
194
        """
195
        if self.condition(event):
196
            for action in self.actions:
197
                action(event)
198
            return True
199
        else:
200
            return False
201
202
    def __or__(self, other):
203
        return Or(self, other)
204
205
    def __and__(self, other):
206
        return And(self, other)
207
208
    __ror__ = __or__
209
    __rand__ = __and__
210
211
212 View Code Duplication
class And(Fields.predicates):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
213
    """
214
    `And` predicate. Exits at the first sub-predicate that returns ``False``.
215
    """
216
217
    def __init__(self, *predicates):
218
        self.predicates = predicates
219
220
    def __str__(self):
221
        return 'And(%s)' % ', '.join(str(p) for p in self.predicates)
222
223
    def __repr__(self):
224
        return '<hunter.predicates.And: predicates=%r>' % (self.predicates,)
225
226
    def __call__(self, event):
227
        """
228
        Handles the event.
229
        """
230
        for predicate in self.predicates:
231
            if not predicate(event):
232
                return False
233
        else:
234
            return True
235
236
    def __eq__(self, other):
237
        if isinstance(other, And):
238
            if len(self.predicates) != len(other.predicates):
239
                return False
240
            return set(self.predicates) == set(other.predicates)
241
        return NotImplemented
242
243
    def __or__(self, other):
244
        return Or(self, other)
245
246
    def __and__(self, other):
247
        return And(*chain(self.predicates, other.predicates if isinstance(other, And) else (other,)))
248
249
    def __invert__(self):
250
        return Not(self)
251
252
    def __hash__(self):
253
        return hash(frozenset(self.predicates))
254
255
    __ror__ = __or__
256
    __rand__ = __and__
257
258
259 View Code Duplication
class Or(Fields.predicates):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
260
    """
261
    `Or` predicate. Exits at first sub-predicate that returns ``True``.
262
    """
263
264
    def __init__(self, *predicates):
265
        self.predicates = predicates
266
267
    def __str__(self):
268
        return 'Or(%s)' % ', '.join(str(p) for p in self.predicates)
269
270
    def __repr__(self):
271
        return '<hunter.predicates.Or: predicates=%r>' % (self.predicates,)
272
273
    def __call__(self, event):
274
        """
275
        Handles the event.
276
        """
277
        for predicate in self.predicates:
278
            if predicate(event):
279
                return True
280
        else:
281
            return False
282
283
    def __eq__(self, other):
284
        if isinstance(other, Or):
285
            if len(self.predicates) != len(other.predicates):
286
                return False
287
            return set(self.predicates) == set(other.predicates)
288
        return NotImplemented
289
290
    def __or__(self, other):
291
        return Or(*chain(self.predicates, other.predicates if isinstance(other, Or) else (other,)))
292
293
    def __and__(self, other):
294
        return And(self, other)
295
296
    def __invert__(self):
297
        return Not(self)
298
299
    def __hash__(self):
300
        return hash(frozenset(self.predicates))
301
302
    __ror__ = __or__
303
    __rand__ = __and__
304
305
306
class Not(Fields.predicate):
307
    """
308
    `Not` predicate.
309
    """
310
311
    def __str__(self):
312
        return 'Not(%s)' % self.predicate
313
314
    def __repr__(self):
315
        return '<hunter.predicates.Not: predicate=%r>' % self.predicate
316
317
    def __call__(self, event):
318
        """
319
        Handles the event.
320
        """
321
        return not self.predicate(event)
322
323
    def __or__(self, other):
324
        if isinstance(other, Not):
325
            return Not(And(self.predicate, other.predicate))
326
        else:
327
            return Or(self, other)
328
329
    def __and__(self, other):
330
        if isinstance(other, Not):
331
            return Not(Or(self.predicate, other.predicate))
332
        else:
333
            return And(self, other)
334
335
    def __invert__(self):
336
        return self.predicate
337
338
    __ror__ = __or__
339
    __rand__ = __and__
340