Completed
Push — master ( 461620...b3feb5 )
by Ionel Cristian
57s
created

tests.EvilTracer   A

Complexity

Total Complexity 4

Size/Duplication

Total Lines 14
Duplicated Lines 0 %
Metric Value
wmc 4
dl 0
loc 14
rs 10

3 Methods

Rating   Name   Duplication   Size   Complexity  
A __init__() 0 4 1
A __enter__() 0 2 1
A __exit__() 0 5 2
1
from __future__ import print_function
2
3
import inspect
4
import os
5
import platform
6
import subprocess
7
import sys
8
import tokenize
9
10
try:
11
    from cStringIO import StringIO
12
except ImportError:
13
    from io import StringIO
14
try:
15
    from itertools import izip_longest
16
except ImportError:
17
    from itertools import zip_longest as izip_longest
18
19
import pytest
20
21
import hunter
22
from hunter import And
23
from hunter import Not
24
from hunter import Or
25
from hunter import Q
26
from hunter import Query
27
from hunter import When
28
29
from hunter import CodePrinter
30
from hunter import Debugger
31
from hunter import VarsPrinter
32
33
pytest_plugins = 'pytester',
34
35
36
class EvilTracer(object):
37
    def __init__(self, *args, **kwargs):
38
        self.calls = []
39
        self._handler = hunter._prepare_predicate(*args, **kwargs)
40
        self._tracer = hunter.trace(self.calls.append)
41
42
    def __enter__(self):
43
        return self
44
45
    def __exit__(self, exc_type, exc_val, exc_tb):
46
        self._tracer.stop()
47
        predicate = self._handler
48
        for call in self.calls:
49
            predicate(call)
50
51
52
trace = EvilTracer
53
54
55
def _get_func_spec(func):
56
    spec = inspect.getargspec(func)
57
    return inspect.formatargspec(spec.args, spec.varargs)
58
59
60
def test_pth_activation():
61
    module_name = os.path.__name__
62
    expected_module = "{0}.py".format(module_name)
63
    hunter_env = "module={!r},function=\"join\"".format(module_name)
64
    func_spec = _get_func_spec(os.path.join)
65
    expected_call = "call      def join{0}:".format(func_spec)
66
67
    output = subprocess.check_output(
68
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample.py')],
69
        env=dict(os.environ, PYTHONHUNTER=hunter_env),
70
        stderr=subprocess.STDOUT,
71
    )
72
    assert expected_module.encode() in output
73
    assert expected_call.encode() in output
74
75
76
def test_pth_sample4():
77
    env = dict(os.environ, PYTHONHUNTER="CodePrinter")
78
    env.pop('COVERAGE_PROCESS_START', None)
79
    env.pop('COV_CORE_SOURCE', None)
80
    output = subprocess.check_output(
81
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample4.py')],
82
        env=env,
83
        stderr=subprocess.STDOUT,
84
    )
85
    assert output
86
87
88
def test_pth_sample2(LineMatcher):
89
    env = dict(os.environ, PYTHONHUNTER="module='__main__'")
90
    env.pop('COVERAGE_PROCESS_START', None)
91
    env.pop('COV_CORE_SOURCE', None)
92
    output = subprocess.check_output(
93
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample2.py')],
94
        env=env,
95
        stderr=subprocess.STDOUT,
96
    )
97
    lm = LineMatcher(output.decode('utf-8').splitlines())
98
    lm.fnmatch_lines([
99
        '*tests*sample2.py:* call      if __name__ == "__main__":  #*',
100
        '*tests*sample2.py:* line      if __name__ == "__main__":  #*',
101
        '*tests*sample2.py:* line          import functools',
102
        '*tests*sample2.py:* line          def deco(opt):',
103
        '*tests*sample2.py:* line          @deco(1)',
104
        '*tests*sample2.py:* call          def deco(opt):',
105
        '*tests*sample2.py:* line              def decorator(func):',
106
        '*tests*sample2.py:* line              return decorator',
107
        '*tests*sample2.py:* return            return decorator',
108
        '*                 * ...       return value: <function deco*',
109
        '*tests*sample2.py:* line          @deco(2)',
110
        '*tests*sample2.py:* call          def deco(opt):',
111
        '*tests*sample2.py:* line              def decorator(func):',
112
        '*tests*sample2.py:* line              return decorator',
113
        '*tests*sample2.py:* return            return decorator',
114
        '*                 * ...       return value: <function deco*',
115
        '*tests*sample2.py:* line          @deco(3)',
116
        '*tests*sample2.py:* call          def deco(opt):',
117
        '*tests*sample2.py:* line              def decorator(func):',
118
        '*tests*sample2.py:* line              return decorator',
119
        '*tests*sample2.py:* return            return decorator',
120
        '*                 * ...       return value: <function deco*',
121
        '*tests*sample2.py:* call              def decorator(func):',
122
        '*tests*sample2.py:* line                  @functools.wraps(func)',
123
        '*tests*sample2.py:* line                  return wrapper',
124
        '*tests*sample2.py:* return                return wrapper',
125
        '*                 * ...       return value: <function foo *',
126
        '*tests*sample2.py:* call              def decorator(func):',
127
        '*tests*sample2.py:* line                  @functools.wraps(func)',
128
        '*tests*sample2.py:* line                  return wrapper',
129
        '*tests*sample2.py:* return                return wrapper',
130
        '*                 * ...       return value: <function foo *',
131
        '*tests*sample2.py:* call              def decorator(func):',
132
        '*tests*sample2.py:* line                  @functools.wraps(func)',
133
        '*tests*sample2.py:* line                  return wrapper',
134
        '*tests*sample2.py:* return                return wrapper',
135
        '*                 * ...       return value: <function foo *',
136
        '*tests*sample2.py:* line          foo(',
137
        "*tests*sample2.py:* line              'a*',",
138
        "*tests*sample2.py:* line              'b'",
139
        '*tests*sample2.py:* call                  @functools.wraps(func)',
140
        '*                 *    |                  def wrapper(*args):',
141
        '*tests*sample2.py:* line                      return func(*args)',
142
        '*tests*sample2.py:* call                  @functools.wraps(func)',
143
        '*                 *    |                  def wrapper(*args):',
144
        '*tests*sample2.py:* line                      return func(*args)',
145
        '*tests*sample2.py:* call                  @functools.wraps(func)',
146
        '*                 *    |                  def wrapper(*args):',
147
        '*tests*sample2.py:* line                      return func(*args)',
148
        '*tests*sample2.py:* call          @deco(1)',
149
        '*                 *    |          @deco(2)',
150
        '*                 *    |          @deco(3)',
151
        '*                 *    |          def foo(*args):',
152
        '*tests*sample2.py:* line              return args',
153
        '*tests*sample2.py:* return            return args',
154
        "*                 * ...       return value: ('a*', 'b')",
155
        "*tests*sample2.py:* return                    return func(*args)",
156
        "*                 * ...       return value: ('a*', 'b')",
157
        "*tests*sample2.py:* return                    return func(*args)",
158
        "*                 * ...       return value: ('a*', 'b')",
159
        "*tests*sample2.py:* return                    return func(*args)",
160
        "*                 * ...       return value: ('a*', 'b')",
161
        "*tests*sample2.py:* line          try:",
162
        "*tests*sample2.py:* line              None(",
163
        "*tests*sample2.py:* line                  'a',",
164
        "*tests*sample2.py:* line                  'b'",
165
        "*tests*sample2.py:* exception             'b'",
166
        "*                 * ...       exception value: *",
167
        "*tests*sample2.py:* line          except:",
168
        "*tests*sample2.py:* line              pass",
169
        "*tests*sample2.py:* return            pass",
170
        "*                   ...       return value: None",
171
    ])
172
173
174
def test_predicate_str_repr():
175
    assert repr(Q(module='a')).endswith("predicates.Query: query_eq={'module': 'a'}>")
176
    assert str(Q(module='a')) == "Query(module='a')"
177
178
    assert "predicates.When: condition=<hunter." in repr(Q(module='a', action='foo'))
179
    assert "predicates.Query: query_eq={'module': 'a'}>, actions=['foo']>" in repr(Q(module='a', action='foo'))
180
    assert str(Q(module='a', action='foo')) == "When(Query(module='a'), 'foo')"
181
182
    assert "predicates.Not: predicate=<hunter." in repr(~Q(module='a'))
183
    assert "predicates.Query: query_eq={'module': 'a'}>>" in repr(~Q(module='a'))
184
    assert str(~Q(module='a')) == "Not(Query(module='a'))"
185
186
    assert "predicates.Or: predicates=(<hunter." in repr(Q(module='a') | Q(module='b'))
187
    assert "predicates.Query: query_eq={'module': 'a'}>, " in repr(Q(module='a') | Q(module='b'))
188
    assert repr(Q(module='a') | Q(module='b')).endswith("predicates.Query: query_eq={'module': 'b'}>)>")
189
    assert str(Q(module='a') | Q(module='b')) == "Or(Query(module='a'), Query(module='b'))"
190
191
    assert "predicates.And: predicates=(<hunter." in repr(Q(module='a') & Q(module='b'))
192
    assert "predicates.Query: query_eq={'module': 'a'}>," in repr(Q(module='a') & Q(module='b'))
193
    assert repr(Q(module='a') & Q(module='b')).endswith("predicates.Query: query_eq={'module': 'b'}>)>")
194
    assert str(Q(module='a') & Q(module='b')) == "And(Query(module='a'), Query(module='b'))"
195
196
197
def test_predicate_q_nest_1():
198
    assert repr(Q(Q(module='a'))).endswith("predicates.Query: query_eq={'module': 'a'}>")
199
200
201
def test_predicate_q_expansion():
202
    assert Q(1, 2, module=3) == And(1, 2, Q(module=3))
203
    assert Q(1, 2, module=3, action=4) == When(And(1, 2, Q(module=3)), 4)
204
    assert Q(1, 2, module=3, actions=[4, 5]) == When(And(1, 2, Q(module=3)), 4, 5)
205
206
207
def test_predicate_and():
208
    assert And(1, 2) == And(1, 2)
209
    assert Q(module=1) & Q(module=2) == And(Q(module=1), Q(module=2))
210
    assert Q(module=1) & Q(module=2) & Q(module=3) == And(Q(module=1), Q(module=2), Q(module=3))
211
212
    assert (Q(module=1) & Q(module=2))({'module': 3}) == False
213
    assert (Q(module=1) & Q(function=2))({'module': 1, 'function': 2}) == True
214
215
    assert And(1, 2) | 3 == Or(And(1, 2), 3)
216
217
218
def test_predicate_or():
219
    assert Q(module=1) | Q(module=2) == Or(Q(module=1), Q(module=2))
220
    assert Q(module=1) | Q(module=2) | Q(module=3) == Or(Q(module=1), Q(module=2), Q(module=3))
221
222
    assert (Q(module=1) | Q(module=2))({'module': 3}) == False
223
    assert (Q(module=1) | Q(module=2))({'module': 2}) == True
224
225
    assert Or(1, 2) & 3 == And(Or(1, 2), 3)
226
227
228
def test_tracing_bare(LineMatcher):
229
    lines = StringIO()
230
    with hunter.trace(CodePrinter(stream=lines)):
231
        def a():
232
            return 1
233
234
        b = a()
235
        b = 2
236
        try:
237
            raise Exception("BOOM!")
238
        except Exception:
239
            pass
240
    print(lines.getvalue())
241
    lm = LineMatcher(lines.getvalue().splitlines())
242
    lm.fnmatch_lines([
243
        "*test_hunter.py* call              def a():",
244
        "*test_hunter.py* line                  return 1",
245
        "*test_hunter.py* return                return 1",
246
        "* ...       return value: 1",
247
    ])
248
249
250
def test_tracing_printing_failures(LineMatcher):
251
    lines = StringIO()
252
    with hunter.trace(actions=[CodePrinter(stream=lines), VarsPrinter("x", stream=lines)]):
253
        class Bad(Exception):
254
            def __repr__(self):
255
                raise RuntimeError("I'm a bad class!")
256
257
        def a():
258
            x = Bad()
259
            return x
260
261
        def b():
262
            x = Bad()
263
            raise x
264
265
        a()
266
        try:
267
            b()
268
        except Exception as exc:
269
            pass
270
    lm = LineMatcher(lines.getvalue().splitlines())
271
    lm.fnmatch_lines([
272
        """*tests*test_hunter.py:* call              class Bad(Exception):""",
273
        """*tests*test_hunter.py:* line              class Bad(Exception):""",
274
        """*tests*test_hunter.py:* line                  def __repr__(self):""",
275
        """*tests*test_hunter.py:* return                def __repr__(self):""",
276
        """* ...       return value: *""",
277
        """*tests*test_hunter.py:* call              def a():""",
278
        """*tests*test_hunter.py:* line                  x = Bad()""",
279
        """*tests*test_hunter.py:* line                  return x""",
280
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
281
        """*tests*test_hunter.py:* return                return x""",
282
        """* ...       return value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
283
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
284
        """*tests*test_hunter.py:* call              def b():""",
285
        """*tests*test_hunter.py:* line                  x = Bad()""",
286
        """*tests*test_hunter.py:* line                  raise x""",
287
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
288
        """*tests*test_hunter.py:* exception             raise x""",
289
        """* ...       exception value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
290
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
291
        """*tests*test_hunter.py:* return                raise x""",
292
        """* ...       return value: None""",
293
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
294
    ])
295
296
297
def test_tracing_vars(LineMatcher):
298
    lines = StringIO()
299
    with hunter.trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
300
        def a():
301
            b = 1
302
            b = 2
303
            return 1
304
305
        b = a()
306
        b = 2
307
        try:
308
            raise Exception("BOOM!")
309
        except Exception:
310
            pass
311
    print(lines.getvalue())
312
    lm = LineMatcher(lines.getvalue().splitlines())
313
    lm.fnmatch_lines([
314
        "*test_hunter.py* call              def a():",
315
        "*test_hunter.py* line                  b = 1",
316
        "* vars      b => 1",
317
        "*test_hunter.py* line                  b = 2",
318
        "* vars      b => 2",
319
        "*test_hunter.py* line                  return 1",
320
        "* vars      b => 2",
321
        "*test_hunter.py* return                return 1",
322
        "* ...       return value: 1",
323
    ])
324
325
326
def test_trace_merge():
327
    with hunter.trace(function="a"):
328
        with hunter.trace(function="b"):
329
            with hunter.trace(function="c"):
330
                assert sys.gettrace()._handler == When(Q(function="c"), CodePrinter)
331
            assert sys.gettrace()._handler == When(Q(function="b"), CodePrinter)
332
        assert sys.gettrace()._handler == When(Q(function="a"), CodePrinter)
333
334
335
def test_trace_api_expansion():
336
    # simple use
337
    with trace(function="foobar") as t:
338
        assert t._handler == When(Q(function="foobar"), CodePrinter)
339
340
    # "or" by expression
341
    with trace(module="foo", function="foobar") as t:
342
        assert t._handler == When(Q(module="foo", function="foobar"), CodePrinter)
343
344
    # pdb.set_trace
345
    with trace(function="foobar", action=Debugger) as t:
346
        assert t._handler == When(Q(function="foobar"), Debugger)
347
348
    # pdb.set_trace on any hits
349
    with trace(module="foo", function="foobar", action=Debugger) as t:
350
        assert t._handler == When(Q(module="foo", function="foobar"), Debugger)
351
352
    # pdb.set_trace when function is foobar, otherwise just print when module is foo
353
    with trace(Q(function="foobar", action=Debugger), module="foo") as t:
354
        assert t._handler == When(And(
355
            When(Q(function="foobar"), Debugger),
356
            Q(module="foo")
357
        ), CodePrinter)
358
359
    # dumping variables from stack
360
    with trace(Q(function="foobar", action=VarsPrinter("foobar")), module="foo") as t:
361
        assert t._handler == When(And(
362
            When(Q(function="foobar"), VarsPrinter("foobar")),
363
            Q(module="foo"),
364
        ), CodePrinter)
365
366
    with trace(Q(function="foobar", action=VarsPrinter("foobar", "mumbojumbo")), module="foo") as t:
367
        assert t._handler == When(And(
368
            When(Q(function="foobar"), VarsPrinter("foobar", "mumbojumbo")),
369
            Q(module="foo"),
370
        ), CodePrinter)
371
372
    # multiple actions
373
    with trace(Q(function="foobar", actions=[VarsPrinter("foobar"), Debugger]), module="foo") as t:
374
        assert t._handler == When(And(
375
            When(Q(function="foobar"), VarsPrinter("foobar"), Debugger),
376
            Q(module="foo"),
377
        ), CodePrinter)
378
379
380
def test_locals():
381
    out = StringIO()
382
    with hunter.trace(
383
        lambda event: event.locals.get("node") == "Foobar",
384
        module="test_hunter",
385
        function="foo",
386
        action=CodePrinter(stream=out)
387
    ):
388
        def foo():
389
            a = 1
390
            node = "Foobar"
391
            node += "x"
392
            a += 2
393
            return a
394
395
        foo()
396
    assert out.getvalue().endswith('node += "x"\n')
397
398
399
def test_debugger(LineMatcher):
400
    out = StringIO()
401
    calls = []
402
403
    class FakePDB:
404
        def __init__(self, foobar=1):
405
            calls.append(foobar)
406
407
        def set_trace(self, frame):
408
            calls.append(frame.f_code.co_name)
409
410
    with hunter.trace(
411
        lambda event: event.locals.get("node") == "Foobar",
412
        module="test_hunter",
413
        function="foo",
414
        actions=[VarsPrinter("a", "node", "foo", "test_debugger", globals=True, stream=out),
415
                 Debugger(klass=FakePDB, foobar=2)]
416
    ):
417
        def foo():
418
            a = 1
419
            node = "Foobar"
420
            node += "x"
421
            a += 2
422
            return a
423
424
        foo()
425
    print(out.getvalue())
426
    assert calls == [2, 'foo']
427
    lm = LineMatcher(out.getvalue().splitlines())
428
    lm.fnmatch_lines_random([
429
        "*      test_debugger => <function test_debugger at *",
430
        "*      node => 'Foobar'",
431
        "*      a => 1",
432
    ])
433
434
435
def test_custom_action():
436
    calls = []
437
438
    with trace(action=lambda event: calls.append(event.function), kind="return"):
439
        def foo():
440
            return 1
441
442
        foo()
443
    assert 'foo' in calls
444
445
446
def test_trace_with_class_actions():
447
    with trace(CodePrinter):
448
        def a():
449
            pass
450
451
        a()
452
453
454
def test_predicate_no_inf_recursion():
455
    assert Or(And(1)) == 1
456
    assert Or(Or(1)) == 1
457
    assert And(Or(1)) == 1
458
    assert And(And(1)) == 1
459
    predicate = Q(Q(lambda ev: 1, module='wat'))
460
    print('predicate:', predicate)
461
    predicate({'module': 'foo'})
462
463
464
def test_predicate_compression():
465
    assert Or(Or(1, 2), And(3)) == Or(1, 2, 3)
466
    assert Or(Or(1, 2), 3) == Or(1, 2, 3)
467
    assert Or(1, Or(2, 3), 4) == Or(1, 2, 3, 4)
468
    assert And(1, 2, Or(3, 4)).predicates == (1, 2, Or(3, 4))
469
470
    assert repr(Or(Or(1, 2), And(3))) == repr(Or(1, 2, 3))
471
    assert repr(Or(Or(1, 2), 3)) == repr(Or(1, 2, 3))
472
    assert repr(Or(1, Or(2, 3), 4)) == repr(Or(1, 2, 3, 4))
473
474
475
def test_predicate_not():
476
    assert Not(1).predicate == 1
477
    assert ~Or(1, 2) == Not(Or(1, 2))
478
    assert ~And(1, 2) == Not(And(1, 2))
479
480
    assert ~Not(1) == 1
481
482
    assert ~Query(module=1) | ~Query(module=2) == Not(And(Query(module=1), Query(module=2)))
483
    assert ~Query(module=1) & ~Query(module=2) == Not(Or(Query(module=1), Query(module=2)))
484
485
    assert ~Query(module=1) | Query(module=2) == Or(Not(Query(module=1)), Query(module=2))
486
    assert ~Query(module=1) & Query(module=2) == And(Not(Query(module=1)), Query(module=2))
487
488
    assert ~(Query(module=1) & Query(module=2)) == Not(And(Query(module=1), Query(module=2)))
489
    assert ~(Query(module=1) | Query(module=2)) == Not(Or(Query(module=1), Query(module=2)))
490
491
    assert repr(~Or(1, 2)) == repr(Not(Or(1, 2)))
492
    assert repr(~And(1, 2)) == repr(Not(And(1, 2)))
493
494
    assert repr(~Query(module=1) | ~Query(module=2)) == repr(Not(And(Query(module=1), Query(module=2))))
495
    assert repr(~Query(module=1) & ~Query(module=2)) == repr(Not(Or(Query(module=1), Query(module=2))))
496
497
    assert repr(~(Query(module=1) & Query(module=2))) == repr(Not(And(Query(module=1), Query(module=2))))
498
    assert repr(~(Query(module=1) | Query(module=2))) == repr(Not(Or(Query(module=1), Query(module=2))))
499
500
    assert Not(Q(module=1))({'module': 1}) == False
501
502
503
def test_predicate_query_allowed():
504
    pytest.raises(TypeError, Query, 1)
505
    pytest.raises(TypeError, Query, a=1)
506
507
508
def test_predicate_when_allowed():
509
    pytest.raises(TypeError, When, 1)
510
511
512
@pytest.mark.parametrize('expr,inp,expected', [
513
    ({'module': "abc"}, {'module': "abc"}, True),
514
    ({'module': "abcd"}, {'module': "abc"}, False),
515
    ({'module': "abcd"}, {'module': "abce"}, False),
516
    ({'module_startswith': "abc"}, {'module': "abcd"}, True),
517
    ({'module__startswith': "abc"}, {'module': "abcd"}, True),
518
    ({'module_contains': "bc"}, {'module': "abcd"}, True),
519
    ({'module_contains': "bcde"}, {'module': "abcd"}, False),
520
521
    ({'module_endswith': "abc"}, {'module': "abcd"}, False),
522
    ({'module__endswith': "bcd"}, {'module': "abcd"}, True),
523
524
    ({'module_in': "abcd"}, {'module': "bc"}, True),
525
    ({'module': "abcd"}, {'module': "bc"}, False),
526
    ({'module': ["abcd"]}, {'module': "bc"}, False),
527
    ({'module_in': ["abcd"]}, {'module': "bc"}, False),
528
    ({'module_in': ["a", "bc", "d"]}, {'module': "bc"}, True),
529
530
    ({'module': "abcd"}, {'module': "abc"}, False),
531
532
    ({'module_startswith': ("abc", "xyz")}, {'module': "abc"}, True),
533
    ({'module_startswith': {"abc", "xyz"}}, {'module': "abc"}, True),
534
    ({'module_startswith': ["abc", "xyz"]}, {'module': "abc"}, True),
535
    ({'module_startswith': ("abc", "xyz")}, {'module': "abcd"}, True),
536
    ({'module_startswith': ("abc", "xyz")}, {'module': "xyzw"}, True),
537
    ({'module_startswith': ("abc", "xyz")}, {'module': "fooabc"}, False),
538
539
    ({'module_endswith': ("abc", "xyz")}, {'module': "abc"}, True),
540
    ({'module_endswith': {"abc", "xyz"}}, {'module': "abc"}, True),
541
    ({'module_endswith': ["abc", "xyz"]}, {'module': "abc"}, True),
542
    ({'module_endswith': ("abc", "xyz")}, {'module': "1abc"}, True),
543
    ({'module_endswith': ("abc", "xyz")}, {'module': "1xyz"}, True),
544
    ({'module_endswith': ("abc", "xyz")}, {'module': "abcfoo"}, False),
545
546
    ({'module': "abc"}, {'module': 1}, False),
547
548
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "regex"}, False),
549
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re.gex"}, True),
550
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "sregex"}, True),
551
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re"}, True),
552
])
553
def test_predicate_matching(expr, inp, expected):
554
    assert Query(**expr)(inp) == expected
555
556
557
@pytest.mark.parametrize('exc_type,expr', [
558
    (TypeError, {'module_1': 1}),
559
    (TypeError, {'module1': 1}),
560
    (ValueError, {'module_startswith': 1}),
561
    (ValueError, {'module_startswith': {1: 2}}),
562
    (ValueError, {'module_endswith': 1}),
563
    (ValueError, {'module_endswith': {1: 2}}),
564
    (TypeError, {'module_foo': 1}),
565
    (TypeError, {'module_a_b': 1}),
566
])
567
def test_predicate_bad_query(expr, exc_type):
568
    pytest.raises(exc_type, Query, **expr)
569
570
571
def test_predicate_when():
572
    called = []
573
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 2}) == False
574
    assert called == []
575
576
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 1}) == True
577
    assert called == [{'module': 1}]
578
579
    called = []
580
    assert Q(module=1, action=lambda ev: called.append(ev))({'module': 1}) == True
581
    assert called == [{'module': 1}]
582
583
    called = [[], []]
584
    predicate = (
585
        Q(module=1, action=lambda ev: called[0].append(ev)) |
586
        Q(module=2, action=lambda ev: called[1].append(ev))
587
    )
588
    assert predicate({'module': 1}) == True
589
    assert called == [[{'module': 1}], []]
590
591
    assert predicate({'module': 2}) == True
592
    assert called == [[{'module': 1}], [{'module': 2}]]
593
594
    called = [[], []]
595
    predicate = (
596
        Q(module=1, action=lambda ev: called[0].append(ev)) &
597
        Q(function=2, action=lambda ev: called[1].append(ev))
598
    )
599
    assert predicate({'module': 2}) == False
600
    assert called == [[], []]
601
602
    assert predicate({'module': 1, 'function': 2}) == True
603
    assert called == [[{'module': 1, 'function': 2}], [{'module': 1, 'function': 2}]]
604
605
606
def test_proper_backend():
607
    if os.environ.get('PUREPYTHONHUNTER') or platform.python_implementation() == 'PyPy':
608
        assert 'hunter.tracer.Tracer' in repr(hunter.Tracer)
609
    else:
610
        assert 'hunter._tracer.Tracer' in repr(hunter.Tracer)
611
612
613
@pytest.fixture(scope="session", params=['pure', 'cython'])
614
def tracer_impl(request):
615
    if request.param == 'pure':
616
        return pytest.importorskip('hunter.tracer').Tracer
617
    elif request.param == 'cython':
618
        return pytest.importorskip('hunter._tracer').Tracer
619
620
621
def _tokenize():
622
    with open(tokenize.__file__, 'rb') as fh:
623
        toks = []
624
        try:
625
            for tok in tokenize.tokenize(fh.readline):
626
                toks.append(tok)
627
        except tokenize.TokenError as exc:
628
            toks.append(exc)
629
630
631
def test_perf_filter(tracer_impl, benchmark):
632
    t = tracer_impl()
633
634
    @benchmark
635
    def run():
636
        output = StringIO()
637
        with t.trace(Q(
638
            Q(module="does-not-exist") | Q(module="does not exist".split()),
639
            action=CodePrinter(stream=output)
640
        )):
641
            _tokenize()
642
        return output
643
644
    assert run.getvalue() == ''
645
646
647
def test_perf_stdlib(tracer_impl, benchmark):
648
    t = tracer_impl()
649
650
    @benchmark
651
    def run():
652
        output = StringIO()
653
        with t.trace(Q(
654
            ~Q(module_contains='pytest'),
655
            stdlib=False,
656
            action=CodePrinter(stream=output)
657
        )):
658
            _tokenize()
659
        return output
660
661
    assert run.getvalue() == ''
662
663
664
def test_perf_actions(tracer_impl, benchmark):
665
    t = tracer_impl()
666
667
    @benchmark
668
    def run():
669
        output = StringIO()
670
        with t.trace(Q(
671
            ~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind="call"),
672
            actions=[
673
                CodePrinter(
674
                    stream=output
675
                ),
676
                VarsPrinter(
677
                    'line',
678
                    globals=True,
679
                    stream=output
680
                )
681
            ]
682
        )):
683
            _tokenize()
684