Completed
Push — master ( b3feb5...459677 )
by Ionel Cristian
01:00
created

tests.EvilTracer.__init__()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 4
rs 10
cc 1
1
from __future__ import print_function
2
3
import inspect
4
import os
5
import platform
6
import subprocess
7
import sys
8
import tokenize
9
10
try:
11
    from cStringIO import StringIO
12
except ImportError:
13
    from io import StringIO
14
try:
15
    from itertools import izip_longest
16
except ImportError:
17
    from itertools import zip_longest as izip_longest
18
19
import pytest
20
21
import hunter
22
from hunter import And
23
from hunter import Not
24
from hunter import Or
25
from hunter import Q
26
from hunter import Query
27
from hunter import When
28
29
from hunter import CodePrinter
30
from hunter import Debugger
31
from hunter import VarsPrinter
32
33
pytest_plugins = 'pytester',
34
35
36
class EvilTracer(object):
37
    def __init__(self, *args, **kwargs):
38
        self._calls = []
39
        self._handler = hunter._prepare_predicate(*args, **kwargs)
40
        self._tracer = hunter.trace(self._append)
41
42
    def _append(self, event):
43
        event.module
44
        event.function
45
        event.filename
46
        event.lineno
47
        self._calls.append(event)
48
49
    def __enter__(self):
50
        return self
51
52
    def __exit__(self, exc_type, exc_val, exc_tb):
53
        self._tracer.stop()
54
        predicate = self._handler
55
        for call in self._calls:
56
            predicate(call)
57
58
59
trace = EvilTracer
60
61
62
def _get_func_spec(func):
63
    spec = inspect.getargspec(func)
64
    return inspect.formatargspec(spec.args, spec.varargs)
65
66
67
def test_pth_activation():
68
    module_name = os.path.__name__
69
    expected_module = "{0}.py".format(module_name)
70
    hunter_env = "module={!r},function=\"join\"".format(module_name)
71
    func_spec = _get_func_spec(os.path.join)
72
    expected_call = "call      def join{0}:".format(func_spec)
73
74
    output = subprocess.check_output(
75
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample.py')],
76
        env=dict(os.environ, PYTHONHUNTER=hunter_env),
77
        stderr=subprocess.STDOUT,
78
    )
79
    assert expected_module.encode() in output
80
    assert expected_call.encode() in output
81
82
83
def test_pth_sample4():
84
    env = dict(os.environ, PYTHONHUNTER="CodePrinter")
85
    env.pop('COVERAGE_PROCESS_START', None)
86
    env.pop('COV_CORE_SOURCE', None)
87
    output = subprocess.check_output(
88
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample4.py')],
89
        env=env,
90
        stderr=subprocess.STDOUT,
91
    )
92
    assert output
93
94
95
def test_pth_sample2(LineMatcher):
96
    env = dict(os.environ, PYTHONHUNTER="module='__main__'")
97
    env.pop('COVERAGE_PROCESS_START', None)
98
    env.pop('COV_CORE_SOURCE', None)
99
    output = subprocess.check_output(
100
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample2.py')],
101
        env=env,
102
        stderr=subprocess.STDOUT,
103
    )
104
    lm = LineMatcher(output.decode('utf-8').splitlines())
105
    lm.fnmatch_lines([
106
        '*tests*sample2.py:* call      if __name__ == "__main__":  #*',
107
        '*tests*sample2.py:* line      if __name__ == "__main__":  #*',
108
        '*tests*sample2.py:* line          import functools',
109
        '*tests*sample2.py:* line          def deco(opt):',
110
        '*tests*sample2.py:* line          @deco(1)',
111
        '*tests*sample2.py:* call          def deco(opt):',
112
        '*tests*sample2.py:* line              def decorator(func):',
113
        '*tests*sample2.py:* line              return decorator',
114
        '*tests*sample2.py:* return            return decorator',
115
        '*                 * ...       return value: <function deco*',
116
        '*tests*sample2.py:* line          @deco(2)',
117
        '*tests*sample2.py:* call          def deco(opt):',
118
        '*tests*sample2.py:* line              def decorator(func):',
119
        '*tests*sample2.py:* line              return decorator',
120
        '*tests*sample2.py:* return            return decorator',
121
        '*                 * ...       return value: <function deco*',
122
        '*tests*sample2.py:* line          @deco(3)',
123
        '*tests*sample2.py:* call          def deco(opt):',
124
        '*tests*sample2.py:* line              def decorator(func):',
125
        '*tests*sample2.py:* line              return decorator',
126
        '*tests*sample2.py:* return            return decorator',
127
        '*                 * ...       return value: <function deco*',
128
        '*tests*sample2.py:* call              def decorator(func):',
129
        '*tests*sample2.py:* line                  @functools.wraps(func)',
130
        '*tests*sample2.py:* line                  return wrapper',
131
        '*tests*sample2.py:* return                return wrapper',
132
        '*                 * ...       return value: <function foo *',
133
        '*tests*sample2.py:* call              def decorator(func):',
134
        '*tests*sample2.py:* line                  @functools.wraps(func)',
135
        '*tests*sample2.py:* line                  return wrapper',
136
        '*tests*sample2.py:* return                return wrapper',
137
        '*                 * ...       return value: <function foo *',
138
        '*tests*sample2.py:* call              def decorator(func):',
139
        '*tests*sample2.py:* line                  @functools.wraps(func)',
140
        '*tests*sample2.py:* line                  return wrapper',
141
        '*tests*sample2.py:* return                return wrapper',
142
        '*                 * ...       return value: <function foo *',
143
        '*tests*sample2.py:* line          foo(',
144
        "*tests*sample2.py:* line              'a*',",
145
        "*tests*sample2.py:* line              'b'",
146
        '*tests*sample2.py:* call                  @functools.wraps(func)',
147
        '*                 *    |                  def wrapper(*args):',
148
        '*tests*sample2.py:* line                      return func(*args)',
149
        '*tests*sample2.py:* call                  @functools.wraps(func)',
150
        '*                 *    |                  def wrapper(*args):',
151
        '*tests*sample2.py:* line                      return func(*args)',
152
        '*tests*sample2.py:* call                  @functools.wraps(func)',
153
        '*                 *    |                  def wrapper(*args):',
154
        '*tests*sample2.py:* line                      return func(*args)',
155
        '*tests*sample2.py:* call          @deco(1)',
156
        '*                 *    |          @deco(2)',
157
        '*                 *    |          @deco(3)',
158
        '*                 *    |          def foo(*args):',
159
        '*tests*sample2.py:* line              return args',
160
        '*tests*sample2.py:* return            return args',
161
        "*                 * ...       return value: ('a*', 'b')",
162
        "*tests*sample2.py:* return                    return func(*args)",
163
        "*                 * ...       return value: ('a*', 'b')",
164
        "*tests*sample2.py:* return                    return func(*args)",
165
        "*                 * ...       return value: ('a*', 'b')",
166
        "*tests*sample2.py:* return                    return func(*args)",
167
        "*                 * ...       return value: ('a*', 'b')",
168
        "*tests*sample2.py:* line          try:",
169
        "*tests*sample2.py:* line              None(",
170
        "*tests*sample2.py:* line                  'a',",
171
        "*tests*sample2.py:* line                  'b'",
172
        "*tests*sample2.py:* exception             'b'",
173
        "*                 * ...       exception value: *",
174
        "*tests*sample2.py:* line          except:",
175
        "*tests*sample2.py:* line              pass",
176
        "*tests*sample2.py:* return            pass",
177
        "*                   ...       return value: None",
178
    ])
179
180
181
def test_predicate_str_repr():
182
    assert repr(Q(module='a')).endswith("predicates.Query: query_eq={'module': 'a'}>")
183
    assert str(Q(module='a')) == "Query(module='a')"
184
185
    assert "predicates.When: condition=<hunter." in repr(Q(module='a', action='foo'))
186
    assert "predicates.Query: query_eq={'module': 'a'}>, actions=['foo']>" in repr(Q(module='a', action='foo'))
187
    assert str(Q(module='a', action='foo')) == "When(Query(module='a'), 'foo')"
188
189
    assert "predicates.Not: predicate=<hunter." in repr(~Q(module='a'))
190
    assert "predicates.Query: query_eq={'module': 'a'}>>" in repr(~Q(module='a'))
191
    assert str(~Q(module='a')) == "Not(Query(module='a'))"
192
193
    assert "predicates.Or: predicates=(<hunter." in repr(Q(module='a') | Q(module='b'))
194
    assert "predicates.Query: query_eq={'module': 'a'}>, " in repr(Q(module='a') | Q(module='b'))
195
    assert repr(Q(module='a') | Q(module='b')).endswith("predicates.Query: query_eq={'module': 'b'}>)>")
196
    assert str(Q(module='a') | Q(module='b')) == "Or(Query(module='a'), Query(module='b'))"
197
198
    assert "predicates.And: predicates=(<hunter." in repr(Q(module='a') & Q(module='b'))
199
    assert "predicates.Query: query_eq={'module': 'a'}>," in repr(Q(module='a') & Q(module='b'))
200
    assert repr(Q(module='a') & Q(module='b')).endswith("predicates.Query: query_eq={'module': 'b'}>)>")
201
    assert str(Q(module='a') & Q(module='b')) == "And(Query(module='a'), Query(module='b'))"
202
203
204
def test_predicate_q_nest_1():
205
    assert repr(Q(Q(module='a'))).endswith("predicates.Query: query_eq={'module': 'a'}>")
206
207
208
def test_predicate_q_expansion():
209
    assert Q(1, 2, module=3) == And(1, 2, Q(module=3))
210
    assert Q(1, 2, module=3, action=4) == When(And(1, 2, Q(module=3)), 4)
211
    assert Q(1, 2, module=3, actions=[4, 5]) == When(And(1, 2, Q(module=3)), 4, 5)
212
213
214
def test_predicate_and():
215
    assert And(1, 2) == And(1, 2)
216
    assert Q(module=1) & Q(module=2) == And(Q(module=1), Q(module=2))
217
    assert Q(module=1) & Q(module=2) & Q(module=3) == And(Q(module=1), Q(module=2), Q(module=3))
218
219
    assert (Q(module=1) & Q(module=2))({'module': 3}) == False
220
    assert (Q(module=1) & Q(function=2))({'module': 1, 'function': 2}) == True
221
222
    assert And(1, 2) | 3 == Or(And(1, 2), 3)
223
224
225
def test_predicate_or():
226
    assert Q(module=1) | Q(module=2) == Or(Q(module=1), Q(module=2))
227
    assert Q(module=1) | Q(module=2) | Q(module=3) == Or(Q(module=1), Q(module=2), Q(module=3))
228
229
    assert (Q(module=1) | Q(module=2))({'module': 3}) == False
230
    assert (Q(module=1) | Q(module=2))({'module': 2}) == True
231
232
    assert Or(1, 2) & 3 == And(Or(1, 2), 3)
233
234
235
def test_tracing_bare(LineMatcher):
236
    lines = StringIO()
237
    with hunter.trace(CodePrinter(stream=lines)):
238
        def a():
239
            return 1
240
241
        b = a()
242
        b = 2
243
        try:
244
            raise Exception("BOOM!")
245
        except Exception:
246
            pass
247
    print(lines.getvalue())
248
    lm = LineMatcher(lines.getvalue().splitlines())
249
    lm.fnmatch_lines([
250
        "*test_hunter.py* call              def a():",
251
        "*test_hunter.py* line                  return 1",
252
        "*test_hunter.py* return                return 1",
253
        "* ...       return value: 1",
254
    ])
255
256
257
def test_tracing_printing_failures(LineMatcher):
258
    lines = StringIO()
259
    with trace(actions=[CodePrinter(stream=lines), VarsPrinter("x", stream=lines)]):
260
        class Bad(Exception):
261
            def __repr__(self):
262
                raise RuntimeError("I'm a bad class!")
263
264
        def a():
265
            x = Bad()
266
            return x
267
268
        def b():
269
            x = Bad()
270
            raise x
271
272
        a()
273
        try:
274
            b()
275
        except Exception as exc:
276
            pass
277
    lm = LineMatcher(lines.getvalue().splitlines())
278
    lm.fnmatch_lines([
279
        """*tests*test_hunter.py:* call              class Bad(Exception):""",
280
        """*tests*test_hunter.py:* line              class Bad(Exception):""",
281
        """*tests*test_hunter.py:* line                  def __repr__(self):""",
282
        """*tests*test_hunter.py:* return                def __repr__(self):""",
283
        """* ...       return value: *""",
284
        """*tests*test_hunter.py:* call              def a():""",
285
        """*tests*test_hunter.py:* line                  x = Bad()""",
286
        """*tests*test_hunter.py:* line                  return x""",
287
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
288
        """*tests*test_hunter.py:* return                return x""",
289
        """* ...       return value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
290
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
291
        """*tests*test_hunter.py:* call              def b():""",
292
        """*tests*test_hunter.py:* line                  x = Bad()""",
293
        """*tests*test_hunter.py:* line                  raise x""",
294
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
295
        """*tests*test_hunter.py:* exception             raise x""",
296
        """* ...       exception value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
297
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
298
        """*tests*test_hunter.py:* return                raise x""",
299
        """* ...       return value: None""",
300
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
301
    ])
302
303
304
def test_tracing_vars(LineMatcher):
305
    lines = StringIO()
306
    with hunter.trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
307
        def a():
308
            b = 1
309
            b = 2
310
            return 1
311
312
        b = a()
313
        b = 2
314
        try:
315
            raise Exception("BOOM!")
316
        except Exception:
317
            pass
318
    print(lines.getvalue())
319
    lm = LineMatcher(lines.getvalue().splitlines())
320
    lm.fnmatch_lines([
321
        "*test_hunter.py* call              def a():",
322
        "*test_hunter.py* line                  b = 1",
323
        "* vars      b => 1",
324
        "*test_hunter.py* line                  b = 2",
325
        "* vars      b => 2",
326
        "*test_hunter.py* line                  return 1",
327
        "* vars      b => 2",
328
        "*test_hunter.py* return                return 1",
329
        "* ...       return value: 1",
330
    ])
331
332
333
def test_trace_merge():
334
    with hunter.trace(function="a"):
335
        with hunter.trace(function="b"):
336
            with hunter.trace(function="c"):
337
                assert sys.gettrace()._handler == When(Q(function="c"), CodePrinter)
338
            assert sys.gettrace()._handler == When(Q(function="b"), CodePrinter)
339
        assert sys.gettrace()._handler == When(Q(function="a"), CodePrinter)
340
341
342
def test_trace_api_expansion():
343
    # simple use
344
    with trace(function="foobar") as t:
345
        assert t._handler == When(Q(function="foobar"), CodePrinter)
346
347
    # "or" by expression
348
    with trace(module="foo", function="foobar") as t:
349
        assert t._handler == When(Q(module="foo", function="foobar"), CodePrinter)
350
351
    # pdb.set_trace
352
    with trace(function="foobar", action=Debugger) as t:
353
        assert t._handler == When(Q(function="foobar"), Debugger)
354
355
    # pdb.set_trace on any hits
356
    with trace(module="foo", function="foobar", action=Debugger) as t:
357
        assert t._handler == When(Q(module="foo", function="foobar"), Debugger)
358
359
    # pdb.set_trace when function is foobar, otherwise just print when module is foo
360
    with trace(Q(function="foobar", action=Debugger), module="foo") as t:
361
        assert t._handler == When(And(
362
            When(Q(function="foobar"), Debugger),
363
            Q(module="foo")
364
        ), CodePrinter)
365
366
    # dumping variables from stack
367
    with trace(Q(function="foobar", action=VarsPrinter("foobar")), module="foo") as t:
368
        assert t._handler == When(And(
369
            When(Q(function="foobar"), VarsPrinter("foobar")),
370
            Q(module="foo"),
371
        ), CodePrinter)
372
373
    with trace(Q(function="foobar", action=VarsPrinter("foobar", "mumbojumbo")), module="foo") as t:
374
        assert t._handler == When(And(
375
            When(Q(function="foobar"), VarsPrinter("foobar", "mumbojumbo")),
376
            Q(module="foo"),
377
        ), CodePrinter)
378
379
    # multiple actions
380
    with trace(Q(function="foobar", actions=[VarsPrinter("foobar"), Debugger]), module="foo") as t:
381
        assert t._handler == When(And(
382
            When(Q(function="foobar"), VarsPrinter("foobar"), Debugger),
383
            Q(module="foo"),
384
        ), CodePrinter)
385
386
387
def test_locals():
388
    out = StringIO()
389
    with hunter.trace(
390
        lambda event: event.locals.get("node") == "Foobar",
391
        module="test_hunter",
392
        function="foo",
393
        action=CodePrinter(stream=out)
394
    ):
395
        def foo():
396
            a = 1
397
            node = "Foobar"
398
            node += "x"
399
            a += 2
400
            return a
401
402
        foo()
403
    assert out.getvalue().endswith('node += "x"\n')
404
405
406
def test_fullsource_decorator_issue(LineMatcher):
407
    out = StringIO()
408
    with trace(kind='call', action=CodePrinter(stream=out)):
409
        foo = bar = lambda x: x
410
411
        @foo
412
        @bar
413
        def foo():
414
            return 1
415
416
        foo()
417
418
    lm = LineMatcher(out.getvalue().splitlines())
419
    lm.fnmatch_lines([
420
        '* call              @foo',
421
        '*    |              @bar',
422
        '*    |              def foo():',
423
    ])
424
425
426
def test_source(LineMatcher):
427
    calls = []
428
    with trace(action=lambda event: calls.append(event.source)):
429
        foo = bar = lambda x: x
430
431
        @foo
432
        @bar
433
        def foo():
434
            return 1
435
436
        foo()
437
438
    lm = LineMatcher(calls)
439
    lm.fnmatch_lines([
440
        '        foo = bar = lambda x: x\n',
441
        '        @foo\n',
442
        '            return 1\n',
443
    ])
444
445
def test_fullsource(LineMatcher):
446
    calls = []
447
    with trace(action=lambda event: calls.append(event.fullsource)):
448
        foo = bar = lambda x: x
449
450
        @foo
451
        @bar
452
        def foo():
453
            return 1
454
455
        foo()
456
457
    lm = LineMatcher(calls)
458
    lm.fnmatch_lines([
459
        '        foo = bar = lambda x: x\n',
460
        '        @foo\n        @bar\n        def foo():\n',
461
        '            return 1\n',
462
    ])
463
464
465
def test_debugger(LineMatcher):
466
    out = StringIO()
467
    calls = []
468
469
    class FakePDB:
470
        def __init__(self, foobar=1):
471
            calls.append(foobar)
472
473
        def set_trace(self, frame):
474
            calls.append(frame.f_code.co_name)
475
476
    with hunter.trace(
477
        lambda event: event.locals.get("node") == "Foobar",
478
        module="test_hunter",
479
        function="foo",
480
        actions=[VarsPrinter("a", "node", "foo", "test_debugger", globals=True, stream=out),
481
                 Debugger(klass=FakePDB, foobar=2)]
482
    ):
483
        def foo():
484
            a = 1
485
            node = "Foobar"
486
            node += "x"
487
            a += 2
488
            return a
489
490
        foo()
491
    print(out.getvalue())
492
    assert calls == [2, 'foo']
493
    lm = LineMatcher(out.getvalue().splitlines())
494
    lm.fnmatch_lines_random([
495
        "*      test_debugger => <function test_debugger at *",
496
        "*      node => 'Foobar'",
497
        "*      a => 1",
498
    ])
499
500
501
def test_custom_action():
502
    calls = []
503
504
    with trace(action=lambda event: calls.append(event.function), kind="return"):
505
        def foo():
506
            return 1
507
508
        foo()
509
    assert 'foo' in calls
510
511
512
def test_trace_with_class_actions():
513
    with trace(CodePrinter):
514
        def a():
515
            pass
516
517
        a()
518
519
520
def test_predicate_no_inf_recursion():
521
    assert Or(And(1)) == 1
522
    assert Or(Or(1)) == 1
523
    assert And(Or(1)) == 1
524
    assert And(And(1)) == 1
525
    predicate = Q(Q(lambda ev: 1, module='wat'))
526
    print('predicate:', predicate)
527
    predicate({'module': 'foo'})
528
529
530
def test_predicate_compression():
531
    assert Or(Or(1, 2), And(3)) == Or(1, 2, 3)
532
    assert Or(Or(1, 2), 3) == Or(1, 2, 3)
533
    assert Or(1, Or(2, 3), 4) == Or(1, 2, 3, 4)
534
    assert And(1, 2, Or(3, 4)).predicates == (1, 2, Or(3, 4))
535
536
    assert repr(Or(Or(1, 2), And(3))) == repr(Or(1, 2, 3))
537
    assert repr(Or(Or(1, 2), 3)) == repr(Or(1, 2, 3))
538
    assert repr(Or(1, Or(2, 3), 4)) == repr(Or(1, 2, 3, 4))
539
540
541
def test_predicate_not():
542
    assert Not(1).predicate == 1
543
    assert ~Or(1, 2) == Not(Or(1, 2))
544
    assert ~And(1, 2) == Not(And(1, 2))
545
546
    assert ~Not(1) == 1
547
548
    assert ~Query(module=1) | ~Query(module=2) == Not(And(Query(module=1), Query(module=2)))
549
    assert ~Query(module=1) & ~Query(module=2) == Not(Or(Query(module=1), Query(module=2)))
550
551
    assert ~Query(module=1) | Query(module=2) == Or(Not(Query(module=1)), Query(module=2))
552
    assert ~Query(module=1) & Query(module=2) == And(Not(Query(module=1)), Query(module=2))
553
554
    assert ~(Query(module=1) & Query(module=2)) == Not(And(Query(module=1), Query(module=2)))
555
    assert ~(Query(module=1) | Query(module=2)) == Not(Or(Query(module=1), Query(module=2)))
556
557
    assert repr(~Or(1, 2)) == repr(Not(Or(1, 2)))
558
    assert repr(~And(1, 2)) == repr(Not(And(1, 2)))
559
560
    assert repr(~Query(module=1) | ~Query(module=2)) == repr(Not(And(Query(module=1), Query(module=2))))
561
    assert repr(~Query(module=1) & ~Query(module=2)) == repr(Not(Or(Query(module=1), Query(module=2))))
562
563
    assert repr(~(Query(module=1) & Query(module=2))) == repr(Not(And(Query(module=1), Query(module=2))))
564
    assert repr(~(Query(module=1) | Query(module=2))) == repr(Not(Or(Query(module=1), Query(module=2))))
565
566
    assert Not(Q(module=1))({'module': 1}) == False
567
568
569
def test_predicate_query_allowed():
570
    pytest.raises(TypeError, Query, 1)
571
    pytest.raises(TypeError, Query, a=1)
572
573
574
def test_predicate_when_allowed():
575
    pytest.raises(TypeError, When, 1)
576
577
578
@pytest.mark.parametrize('expr,inp,expected', [
579
    ({'module': "abc"}, {'module': "abc"}, True),
580
    ({'module': "abcd"}, {'module': "abc"}, False),
581
    ({'module': "abcd"}, {'module': "abce"}, False),
582
    ({'module_startswith': "abc"}, {'module': "abcd"}, True),
583
    ({'module__startswith': "abc"}, {'module': "abcd"}, True),
584
    ({'module_contains': "bc"}, {'module': "abcd"}, True),
585
    ({'module_contains': "bcde"}, {'module': "abcd"}, False),
586
587
    ({'module_endswith': "abc"}, {'module': "abcd"}, False),
588
    ({'module__endswith': "bcd"}, {'module': "abcd"}, True),
589
590
    ({'module_in': "abcd"}, {'module': "bc"}, True),
591
    ({'module': "abcd"}, {'module': "bc"}, False),
592
    ({'module': ["abcd"]}, {'module': "bc"}, False),
593
    ({'module_in': ["abcd"]}, {'module': "bc"}, False),
594
    ({'module_in': ["a", "bc", "d"]}, {'module': "bc"}, True),
595
596
    ({'module': "abcd"}, {'module': "abc"}, False),
597
598
    ({'module_startswith': ("abc", "xyz")}, {'module': "abc"}, True),
599
    ({'module_startswith': {"abc", "xyz"}}, {'module': "abc"}, True),
600
    ({'module_startswith': ["abc", "xyz"]}, {'module': "abc"}, True),
601
    ({'module_startswith': ("abc", "xyz")}, {'module': "abcd"}, True),
602
    ({'module_startswith': ("abc", "xyz")}, {'module': "xyzw"}, True),
603
    ({'module_startswith': ("abc", "xyz")}, {'module': "fooabc"}, False),
604
605
    ({'module_endswith': ("abc", "xyz")}, {'module': "abc"}, True),
606
    ({'module_endswith': {"abc", "xyz"}}, {'module': "abc"}, True),
607
    ({'module_endswith': ["abc", "xyz"]}, {'module': "abc"}, True),
608
    ({'module_endswith': ("abc", "xyz")}, {'module': "1abc"}, True),
609
    ({'module_endswith': ("abc", "xyz")}, {'module': "1xyz"}, True),
610
    ({'module_endswith': ("abc", "xyz")}, {'module': "abcfoo"}, False),
611
612
    ({'module': "abc"}, {'module': 1}, False),
613
614
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "regex"}, False),
615
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re.gex"}, True),
616
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "sregex"}, True),
617
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re"}, True),
618
])
619
def test_predicate_matching(expr, inp, expected):
620
    assert Query(**expr)(inp) == expected
621
622
623
@pytest.mark.parametrize('exc_type,expr', [
624
    (TypeError, {'module_1': 1}),
625
    (TypeError, {'module1': 1}),
626
    (ValueError, {'module_startswith': 1}),
627
    (ValueError, {'module_startswith': {1: 2}}),
628
    (ValueError, {'module_endswith': 1}),
629
    (ValueError, {'module_endswith': {1: 2}}),
630
    (TypeError, {'module_foo': 1}),
631
    (TypeError, {'module_a_b': 1}),
632
])
633
def test_predicate_bad_query(expr, exc_type):
634
    pytest.raises(exc_type, Query, **expr)
635
636
637
def test_predicate_when():
638
    called = []
639
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 2}) == False
640
    assert called == []
641
642
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 1}) == True
643
    assert called == [{'module': 1}]
644
645
    called = []
646
    assert Q(module=1, action=lambda ev: called.append(ev))({'module': 1}) == True
647
    assert called == [{'module': 1}]
648
649
    called = [[], []]
650
    predicate = (
651
        Q(module=1, action=lambda ev: called[0].append(ev)) |
652
        Q(module=2, action=lambda ev: called[1].append(ev))
653
    )
654
    assert predicate({'module': 1}) == True
655
    assert called == [[{'module': 1}], []]
656
657
    assert predicate({'module': 2}) == True
658
    assert called == [[{'module': 1}], [{'module': 2}]]
659
660
    called = [[], []]
661
    predicate = (
662
        Q(module=1, action=lambda ev: called[0].append(ev)) &
663
        Q(function=2, action=lambda ev: called[1].append(ev))
664
    )
665
    assert predicate({'module': 2}) == False
666
    assert called == [[], []]
667
668
    assert predicate({'module': 1, 'function': 2}) == True
669
    assert called == [[{'module': 1, 'function': 2}], [{'module': 1, 'function': 2}]]
670
671
672
def test_proper_backend():
673
    if os.environ.get('PUREPYTHONHUNTER') or platform.python_implementation() == 'PyPy':
674
        assert 'hunter.tracer.Tracer' in repr(hunter.Tracer)
675
    else:
676
        assert 'hunter._tracer.Tracer' in repr(hunter.Tracer)
677
678
679
@pytest.fixture(scope="session", params=['pure', 'cython'])
680
def tracer_impl(request):
681
    if request.param == 'pure':
682
        return pytest.importorskip('hunter.tracer').Tracer
683
    elif request.param == 'cython':
684
        return pytest.importorskip('hunter._tracer').Tracer
685
686
687
def _tokenize():
688
    with open(tokenize.__file__, 'rb') as fh:
689
        toks = []
690
        try:
691
            for tok in tokenize.tokenize(fh.readline):
692
                toks.append(tok)
693
        except tokenize.TokenError as exc:
694
            toks.append(exc)
695
696
697
def test_perf_filter(tracer_impl, benchmark):
698
    t = tracer_impl()
699
700
    @benchmark
701
    def run():
702
        output = StringIO()
703
        with t.trace(Q(
704
            Q(module="does-not-exist") | Q(module="does not exist".split()),
705
            action=CodePrinter(stream=output)
706
        )):
707
            _tokenize()
708
        return output
709
710
    assert run.getvalue() == ''
711
712
713
def test_perf_stdlib(tracer_impl, benchmark):
714
    t = tracer_impl()
715
716
    @benchmark
717
    def run():
718
        output = StringIO()
719
        with t.trace(Q(
720
            ~Q(module_contains='pytest'),
721
            stdlib=False,
722
            action=CodePrinter(stream=output)
723
        )):
724
            _tokenize()
725
        return output
726
727
    assert run.getvalue() == ''
728
729
730
def test_perf_actions(tracer_impl, benchmark):
731
    t = tracer_impl()
732
733
    @benchmark
734
    def run():
735
        output = StringIO()
736
        with t.trace(Q(
737
            ~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind="call"),
738
            actions=[
739
                CodePrinter(
740
                    stream=output
741
                ),
742
                VarsPrinter(
743
                    'line',
744
                    globals=True,
745
                    stream=output
746
                )
747
            ]
748
        )):
749
            _tokenize()
750