Completed
Push — master ( 0878c0...da4845 )
by Ionel Cristian
01:09
created

tests.main()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 2
rs 10
cc 1
1
from __future__ import print_function
2
3
import inspect
4
import os
5
import platform
6
import subprocess
7
import sys
8
import threading
9
import tokenize
10
11
import hunter
12
import pytest
13
from fields import Fields
14
from hunter import Q
15
from hunter import And
16
from hunter import CallPrinter
17
from hunter import CodePrinter
18
from hunter import Debugger
19
from hunter import Not
20
from hunter import Or
21
from hunter import Query
22
from hunter import VarsPrinter
23
from hunter import When
24
25
try:
26
    from cStringIO import StringIO
27
except ImportError:
28
    from io import StringIO
29
try:
30
    from itertools import izip_longest
31
except ImportError:
32
    from itertools import zip_longest as izip_longest
33
34
pytest_plugins = 'pytester',
35
36
37
class FakeCallable(Fields.value):
38
    def __call__(self):
39
        raise NotImplementedError("Nope")
40
41
    def __repr__(self):
42
        return repr(self.value)
43
44
    def __str__(self):
45
        return str(self.value)
46
47
C = FakeCallable
48
49
50
class EvilTracer(object):
51
    def __init__(self, *args, **kwargs):
52
        self._calls = []
53
        threading_support = kwargs.pop('threading_support', False)
54
        self._handler = hunter._prepare_predicate(*args, **kwargs)
55
        self._tracer = hunter.trace(self._append, threading_support=threading_support)
56
57
    def _append(self, event):
58
        # Make sure the lineno is cached. Frames are reused
59
        # and later on the events would be very broken ..
60
        event.lineno
61
        self._calls.append(event)
62
63
    def __enter__(self):
64
        return self
65
66
    def __exit__(self, exc_type, exc_val, exc_tb):
67
        self._tracer.stop()
68
        predicate = self._handler
69
        for call in self._calls:
70
            predicate(call)
71
72
73
trace = EvilTracer
74
75
76
def _get_func_spec(func):
77
    spec = inspect.getargspec(func)
78
    return inspect.formatargspec(spec.args, spec.varargs)
79
80
81
def test_pth_activation():
82
    module_name = os.path.__name__
83
    expected_module = "{0}.py".format(module_name)
84
    hunter_env = "module={!r},function=\"join\"".format(module_name)
85
    func_spec = _get_func_spec(os.path.join)
86
    expected_call = "call      def join{0}:".format(func_spec)
87
88
    output = subprocess.check_output(
89
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample.py')],
90
        env=dict(os.environ, PYTHONHUNTER=hunter_env),
91
        stderr=subprocess.STDOUT,
92
    )
93
    assert expected_module.encode() in output
94
    assert expected_call.encode() in output
95
96
97
def test_pth_sample4():
98
    env = dict(os.environ, PYTHONHUNTER="CodePrinter")
99
    env.pop('COVERAGE_PROCESS_START', None)
100
    env.pop('COV_CORE_SOURCE', None)
101
    output = subprocess.check_output(
102
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample4.py')],
103
        env=env,
104
        stderr=subprocess.STDOUT,
105
    )
106
    assert output
107
108
109
def test_pth_sample2(LineMatcher):
110
    env = dict(os.environ, PYTHONHUNTER="module='__main__'")
111
    env.pop('COVERAGE_PROCESS_START', None)
112
    env.pop('COV_CORE_SOURCE', None)
113
    output = subprocess.check_output(
114
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample2.py')],
115
        env=env,
116
        stderr=subprocess.STDOUT,
117
    )
118
    lm = LineMatcher(output.decode('utf-8').splitlines())
119
    lm.fnmatch_lines([
120
        '*tests*sample2.py:* call      if __name__ == "__main__":  #*',
121
        '*tests*sample2.py:* line      if __name__ == "__main__":  #*',
122
        '*tests*sample2.py:* line          import functools',
123
        '*tests*sample2.py:* line          def deco(opt):',
124
        '*tests*sample2.py:* line          @deco(1)',
125
        '*tests*sample2.py:* call          def deco(opt):',
126
        '*tests*sample2.py:* line              def decorator(func):',
127
        '*tests*sample2.py:* line              return decorator',
128
        '*tests*sample2.py:* return            return decorator',
129
        '*                 * ...       return value: <function deco*',
130
        '*tests*sample2.py:* line          @deco(2)',
131
        '*tests*sample2.py:* call          def deco(opt):',
132
        '*tests*sample2.py:* line              def decorator(func):',
133
        '*tests*sample2.py:* line              return decorator',
134
        '*tests*sample2.py:* return            return decorator',
135
        '*                 * ...       return value: <function deco*',
136
        '*tests*sample2.py:* line          @deco(3)',
137
        '*tests*sample2.py:* call          def deco(opt):',
138
        '*tests*sample2.py:* line              def decorator(func):',
139
        '*tests*sample2.py:* line              return decorator',
140
        '*tests*sample2.py:* return            return decorator',
141
        '*                 * ...       return value: <function deco*',
142
        '*tests*sample2.py:* call              def decorator(func):',
143
        '*tests*sample2.py:* line                  @functools.wraps(func)',
144
        '*tests*sample2.py:* line                  return wrapper',
145
        '*tests*sample2.py:* return                return wrapper',
146
        '*                 * ...       return value: <function foo *',
147
        '*tests*sample2.py:* call              def decorator(func):',
148
        '*tests*sample2.py:* line                  @functools.wraps(func)',
149
        '*tests*sample2.py:* line                  return wrapper',
150
        '*tests*sample2.py:* return                return wrapper',
151
        '*                 * ...       return value: <function foo *',
152
        '*tests*sample2.py:* call              def decorator(func):',
153
        '*tests*sample2.py:* line                  @functools.wraps(func)',
154
        '*tests*sample2.py:* line                  return wrapper',
155
        '*tests*sample2.py:* return                return wrapper',
156
        '*                 * ...       return value: <function foo *',
157
        '*tests*sample2.py:* line          foo(',
158
        "*tests*sample2.py:* line              'a*',",
159
        "*tests*sample2.py:* line              'b'",
160
        '*tests*sample2.py:* call                  @functools.wraps(func)',
161
        '*                 *    |                  def wrapper(*args):',
162
        '*tests*sample2.py:* line                      return func(*args)',
163
        '*tests*sample2.py:* call                  @functools.wraps(func)',
164
        '*                 *    |                  def wrapper(*args):',
165
        '*tests*sample2.py:* line                      return func(*args)',
166
        '*tests*sample2.py:* call                  @functools.wraps(func)',
167
        '*                 *    |                  def wrapper(*args):',
168
        '*tests*sample2.py:* line                      return func(*args)',
169
        '*tests*sample2.py:* call          @deco(1)',
170
        '*                 *    |          @deco(2)',
171
        '*                 *    |          @deco(3)',
172
        '*                 *    |          def foo(*args):',
173
        '*tests*sample2.py:* line              return args',
174
        '*tests*sample2.py:* return            return args',
175
        "*                 * ...       return value: ('a*', 'b')",
176
        "*tests*sample2.py:* return                    return func(*args)",
177
        "*                 * ...       return value: ('a*', 'b')",
178
        "*tests*sample2.py:* return                    return func(*args)",
179
        "*                 * ...       return value: ('a*', 'b')",
180
        "*tests*sample2.py:* return                    return func(*args)",
181
        "*                 * ...       return value: ('a*', 'b')",
182
        "*tests*sample2.py:* line          try:",
183
        "*tests*sample2.py:* line              None(",
184
        "*tests*sample2.py:* line                  'a',",
185
        "*tests*sample2.py:* line                  'b'",
186
        "*tests*sample2.py:* exception             'b'",
187
        "*                 * ...       exception value: *",
188
        "*tests*sample2.py:* line          except:",
189
        "*tests*sample2.py:* line              pass",
190
        "*tests*sample2.py:* return            pass",
191
        "*                   ...       return value: None",
192
    ])
193
194
195
def test_predicate_str_repr():
196
    assert repr(Q(module='a', function='b')).endswith("predicates.Query: query_eq=(('function', 'b'), ('module', 'a'))>")
197
    assert str(Q(module='a', function='b')) == "Query(function='b', module='a')"
198
199
    assert repr(Q(module='a')).endswith("predicates.Query: query_eq=(('module', 'a'),)>")
200
    assert str(Q(module='a')) == "Query(module='a')"
201
202
    assert "predicates.When: condition=<hunter." in repr(Q(module='a', action=C('foo')))
203
    assert "predicates.Query: query_eq=(('module', 'a'),)>, actions=('foo',)>" in repr(Q(module='a', action=C('foo')))
204
    assert str(Q(module='a', action=C('foo'))) == "When(Query(module='a'), 'foo')"
205
206
    assert "predicates.Not: predicate=<hunter." in repr(~Q(module='a'))
207
    assert "predicates.Query: query_eq=(('module', 'a'),)>>" in repr(~Q(module='a'))
208
    assert str(~Q(module='a')) == "Not(Query(module='a'))"
209
210
    assert "predicates.Or: predicates=(<hunter." in repr(Q(module='a') | Q(module='b'))
211
    assert "predicates.Query: query_eq=(('module', 'a'),)>, " in repr(Q(module='a') | Q(module='b'))
212
    assert repr(Q(module='a') | Q(module='b')).endswith("predicates.Query: query_eq=(('module', 'b'),)>)>")
213
    assert str(Q(module='a') | Q(module='b')) == "Or(Query(module='a'), Query(module='b'))"
214
215
    assert "predicates.And: predicates=(<hunter." in repr(Q(module='a') & Q(module='b'))
216
    assert "predicates.Query: query_eq=(('module', 'a'),)>," in repr(Q(module='a') & Q(module='b'))
217
    assert repr(Q(module='a') & Q(module='b')).endswith("predicates.Query: query_eq=(('module', 'b'),)>)>")
218
    assert str(Q(module='a') & Q(module='b')) == "And(Query(module='a'), Query(module='b'))"
219
220
221
def test_predicate_q_nest_1():
222
    assert repr(Q(Q(module='a'))).endswith("predicates.Query: query_eq=(('module', 'a'),)>")
223
224
225
def test_predicate_q_not_callable():
226
    exc = pytest.raises(TypeError, Q, 'foobar')
227
    assert exc.value.args == ("Predicate 'foobar' is not callable.",)
228
229
230
def test_predicate_q_expansion():
231
    assert Q(C(1), C(2), module=3) == And(C(1), C(2), Q(module=3))
232
    assert Q(C(1), C(2), module=3, action=C(4)) == When(And(C(1), C(2), Q(module=3)), C(4))
233
    assert Q(C(1), C(2), module=3, actions=[C(4), C(5)]) == When(And(C(1), C(2), Q(module=3)), C(4), C(5))
234
235
236
def test_predicate_and():
237
    assert And(C(1), C(2)) == And(C(1), C(2))
238
    assert Q(module=1) & Q(module=2) == And(Q(module=1), Q(module=2))
239
    assert Q(module=1) & Q(module=2) & Q(module=3) == And(Q(module=1), Q(module=2), Q(module=3))
240
241
    assert (Q(module=1) & Q(module=2))({'module': 3}) == False
242
    assert (Q(module=1) & Q(function=2))({'module': 1, 'function': 2}) == True
243
244
    assert And(1, 2) | 3 == Or(And(1, 2), 3)
245
246
247
def test_predicate_or():
248
    assert Q(module=1) | Q(module=2) == Or(Q(module=1), Q(module=2))
249
    assert Q(module=1) | Q(module=2) | Q(module=3) == Or(Q(module=1), Q(module=2), Q(module=3))
250
251
    assert (Q(module=1) | Q(module=2))({'module': 3}) == False
252
    assert (Q(module=1) | Q(module=2))({'module': 2}) == True
253
254
    assert Or(1, 2) & 3 == And(Or(1, 2), 3)
255
256
257
def test_tracing_bare(LineMatcher):
258
    lines = StringIO()
259
    with hunter.trace(CodePrinter(stream=lines)):
260
        def a():
261
            return 1
262
263
        b = a()
264
        b = 2
265
        try:
266
            raise Exception("BOOM!")
267
        except Exception:
268
            pass
269
    print(lines.getvalue())
270
    lm = LineMatcher(lines.getvalue().splitlines())
271
    lm.fnmatch_lines([
272
        "*test_hunter.py* call              def a():",
273
        "*test_hunter.py* line                  return 1",
274
        "*test_hunter.py* return                return 1",
275
        "* ...       return value: 1",
276
    ])
277
278
279
def test_threading_support(LineMatcher):
280
    lines = StringIO()
281
    with hunter.trace(actions=[CodePrinter(stream=lines), VarsPrinter('a', stream=lines), CallPrinter(stream=lines)],
282
                      threading_support=True):
283
        def foo(a=1):
284
            print(a)
285
286
        def main():
287
            foo()
288
289
        t = threading.Thread(target=foo)
290
        t.start()
291
        main()
292
293
    lm = LineMatcher(lines.getvalue().splitlines())
294
    lm.fnmatch_lines_random([
295
        'Thread-1    [...]/python-hunter/tests/test_hunter.py:283   call              def foo(a=1):',
296
        'Thread-1                                                   vars      a => 1',
297
        'Thread-1    [...]/python-hunter/tests/test_hunter.py:283   call         => foo(a=1)',
298
        'Thread-1                                                   vars      a => 1',
299
        'MainThread  [...]/python-hunter/tests/test_hunter.py:283   call              def foo(a=1):',
300
        'MainThread                                                 vars      a => 1',
301
        'MainThread  [...]/python-hunter/tests/test_hunter.py:283   call         => foo(a=1)',
302
        'MainThread                                                 vars      a => 1',
303
    ])
304
305
306
def test_tracing_printing_failures(LineMatcher):
307
    lines = StringIO()
308
    with trace(actions=[CodePrinter(stream=lines), VarsPrinter("x", stream=lines)]):
309
        class Bad(Exception):
310
            def __repr__(self):
311
                raise RuntimeError("I'm a bad class!")
312
313
        def a():
314
            x = Bad()
315
            return x
316
317
        def b():
318
            x = Bad()
319
            raise x
320
321
        a()
322
        try:
323
            b()
324
        except Exception as exc:
325
            pass
326
    lm = LineMatcher(lines.getvalue().splitlines())
327
    lm.fnmatch_lines([
328
        """*tests*test_hunter.py:* call              class Bad(Exception):""",
329
        """*tests*test_hunter.py:* line              class Bad(Exception):""",
330
        """*tests*test_hunter.py:* line                  def __repr__(self):""",
331
        """*tests*test_hunter.py:* return                def __repr__(self):""",
332
        """* ...       return value: *""",
333
        """*tests*test_hunter.py:* call              def a():""",
334
        """*tests*test_hunter.py:* line                  x = Bad()""",
335
        """*tests*test_hunter.py:* line                  return x""",
336
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
337
        """*tests*test_hunter.py:* return                return x""",
338
        """* ...       return value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
339
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
340
        """*tests*test_hunter.py:* call              def b():""",
341
        """*tests*test_hunter.py:* line                  x = Bad()""",
342
        """*tests*test_hunter.py:* line                  raise x""",
343
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
344
        """*tests*test_hunter.py:* exception             raise x""",
345
        """* ...       exception value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
346
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
347
        """*tests*test_hunter.py:* return                raise x""",
348
        """* ...       return value: None""",
349
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
350
    ])
351
352
353
def test_tracing_vars(LineMatcher):
354
    lines = StringIO()
355
    with hunter.trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
356
        def a():
357
            b = 1
358
            b = 2
359
            return 1
360
361
        b = a()
362
        b = 2
363
        try:
364
            raise Exception("BOOM!")
365
        except Exception:
366
            pass
367
    print(lines.getvalue())
368
    lm = LineMatcher(lines.getvalue().splitlines())
369
    lm.fnmatch_lines([
370
        "*test_hunter.py* call              def a():",
371
        "*test_hunter.py* line                  b = 1",
372
        "* vars      b => 1",
373
        "*test_hunter.py* line                  b = 2",
374
        "* vars      b => 2",
375
        "*test_hunter.py* line                  return 1",
376
        "* vars      b => 2",
377
        "*test_hunter.py* return                return 1",
378
        "* ...       return value: 1",
379
    ])
380
381
382
def test_trace_merge():
383
    with hunter.trace(function="a"):
384
        with hunter.trace(function="b"):
385
            with hunter.trace(function="c"):
386
                assert sys.gettrace()._handler == When(Q(function="c"), CodePrinter)
387
            assert sys.gettrace()._handler == When(Q(function="b"), CodePrinter)
388
        assert sys.gettrace()._handler == When(Q(function="a"), CodePrinter)
389
390
391
def test_trace_api_expansion():
392
    # simple use
393
    with trace(function="foobar") as t:
394
        assert t._handler == When(Q(function="foobar"), CodePrinter)
395
396
    # "or" by expression
397
    with trace(module="foo", function="foobar") as t:
398
        assert t._handler == When(Q(module="foo", function="foobar"), CodePrinter)
399
400
    # pdb.set_trace
401
    with trace(function="foobar", action=Debugger) as t:
402
        assert str(t._handler) == str(When(Q(function="foobar"), Debugger))
403
404
    # pdb.set_trace on any hits
405
    with trace(module="foo", function="foobar", action=Debugger) as t:
406
        assert str(t._handler) == str(When(Q(module="foo", function="foobar"), Debugger))
407
408
    # pdb.set_trace when function is foobar, otherwise just print when module is foo
409
    with trace(Q(function="foobar", action=Debugger), module="foo") as t:
410
        assert str(t._handler) == str(When(And(
411
            When(Q(function="foobar"), Debugger),
412
            Q(module="foo")
413
        ), CodePrinter))
414
415
    # dumping variables from stack
416
    with trace(Q(function="foobar", action=VarsPrinter("foobar")), module="foo") as t:
417
        assert str(t._handler) == str(When(And(
418
            When(Q(function="foobar"), VarsPrinter("foobar")),
419
            Q(module="foo"),
420
        ), CodePrinter))
421
422
    with trace(Q(function="foobar", action=VarsPrinter("foobar", "mumbojumbo")), module="foo") as t:
423
        assert str(t._handler) == str(When(And(
424
            When(Q(function="foobar"), VarsPrinter("foobar", "mumbojumbo")),
425
            Q(module="foo"),
426
        ), CodePrinter))
427
428
    # multiple actions
429
    with trace(Q(function="foobar", actions=[VarsPrinter("foobar"), Debugger]), module="foo") as t:
430
        assert str(t._handler) == str(When(And(
431
            When(Q(function="foobar"), VarsPrinter("foobar"), Debugger),
432
            Q(module="foo"),
433
        ), CodePrinter))
434
435
436
def test_locals():
437
    out = StringIO()
438
    with hunter.trace(
439
        lambda event: event.locals.get("node") == "Foobar",
440
        module="test_hunter",
441
        function="foo",
442
        action=CodePrinter(stream=out)
443
    ):
444
        def foo():
445
            a = 1
446
            node = "Foobar"
447
            node += "x"
448
            a += 2
449
            return a
450
451
        foo()
452
    assert out.getvalue().endswith('node += "x"\n')
453
454
455
def test_fullsource_decorator_issue(LineMatcher):
456
    out = StringIO()
457
    with trace(kind='call', action=CodePrinter(stream=out)):
458
        foo = bar = lambda x: x
459
460
        @foo
461
        @bar
462
        def foo():
463
            return 1
464
465
        foo()
466
467
    lm = LineMatcher(out.getvalue().splitlines())
468
    lm.fnmatch_lines([
469
        '* call              @foo',
470
        '*    |              @bar',
471
        '*    |              def foo():',
472
    ])
473
474
475
def test_callprinter(LineMatcher):
476
    out = StringIO()
477
    with trace(action=CallPrinter(stream=out)):
478
        foo = bar = lambda x: x
479
480
        @foo
481
        @bar
482
        def foo():
483
            return 1
484
485
        foo()
486
487
    lm = LineMatcher(out.getvalue().splitlines())
488
    lm.fnmatch_lines([
489
        '* call      => <lambda>(x=<function *foo at *>)',
490
        '* line         foo = bar = lambda x: x',
491
        '* return    <= <lambda>: <function *foo at *>',
492
        '* call      => <lambda>(x=<function *foo at *>)',
493
        '* line         foo = bar = lambda x: x',
494
        '* return    <= <lambda>: <function *foo at *>',
495
        '* call      => foo()',
496
        '* line         return 1',
497
        '* return    <= foo: 1',
498
    ])
499
500
501
def test_source(LineMatcher):
502
    calls = []
503
    with trace(action=lambda event: calls.append(event.source)):
504
        foo = bar = lambda x: x
505
506
        @foo
507
        @bar
508
        def foo():
509
            return 1
510
511
        foo()
512
513
    lm = LineMatcher(calls)
514
    lm.fnmatch_lines([
515
        '        foo = bar = lambda x: x\n',
516
        '        @foo\n',
517
        '            return 1\n',
518
    ])
519
520
521
def test_fullsource(LineMatcher):
522
    calls = []
523
    with trace(action=lambda event: calls.append(event.fullsource)):
524
        foo = bar = lambda x: x
525
526
        @foo
527
        @bar
528
        def foo():
529
            return 1
530
531
        foo()
532
533
    lm = LineMatcher(calls)
534
    lm.fnmatch_lines([
535
        '        foo = bar = lambda x: x\n',
536
        '        @foo\n        @bar\n        def foo():\n',
537
        '            return 1\n',
538
    ])
539
540
541
def test_debugger(LineMatcher):
542
    out = StringIO()
543
    calls = []
544
545
    class FakePDB:
546
        def __init__(self, foobar=1):
547
            calls.append(foobar)
548
549
        def set_trace(self, frame):
550
            calls.append(frame.f_code.co_name)
551
552
    with hunter.trace(
553
        lambda event: event.locals.get("node") == "Foobar",
554
        module="test_hunter",
555
        function="foo",
556
        actions=[VarsPrinter("a", "node", "foo", "test_debugger", globals=True, stream=out),
557
                 Debugger(klass=FakePDB, foobar=2)]
558
    ):
559
        def foo():
560
            a = 1
561
            node = "Foobar"
562
            node += "x"
563
            a += 2
564
            return a
565
566
        foo()
567
    print(out.getvalue())
568
    assert calls == [2, 'foo']
569
    lm = LineMatcher(out.getvalue().splitlines())
570
    lm.fnmatch_lines_random([
571
        "*      test_debugger => <function test_debugger at *",
572
        "*      node => 'Foobar'",
573
        "*      a => 1",
574
    ])
575
576
577
def test_custom_action():
578
    calls = []
579
580
    with trace(action=lambda event: calls.append(event.function), kind="return"):
581
        def foo():
582
            return 1
583
584
        foo()
585
    assert 'foo' in calls
586
587
588
def test_trace_with_class_actions():
589
    with trace(CodePrinter):
590
        def a():
591
            pass
592
593
        a()
594
595
596
def test_predicate_no_inf_recursion():
597
    assert Or(And(1)) == 1
598
    assert Or(Or(1)) == 1
599
    assert And(Or(1)) == 1
600
    assert And(And(1)) == 1
601
    predicate = Q(Q(lambda ev: 1, module='wat'))
602
    print('predicate:', predicate)
603
    predicate({'module': 'foo'})
604
605
606
def test_predicate_compression():
607
    assert Or(Or(1, 2), And(3)) == Or(1, 2, 3)
608
    assert Or(Or(1, 2), 3) == Or(1, 2, 3)
609
    assert Or(1, Or(2, 3), 4) == Or(1, 2, 3, 4)
610
    assert And(1, 2, Or(3, 4)).predicates == (1, 2, Or(3, 4))
611
612
    assert repr(Or(Or(1, 2), And(3))) == repr(Or(1, 2, 3))
613
    assert repr(Or(Or(1, 2), 3)) == repr(Or(1, 2, 3))
614
    assert repr(Or(1, Or(2, 3), 4)) == repr(Or(1, 2, 3, 4))
615
616
617
def test_predicate_not():
618
    assert Not(1).predicate == 1
619
    assert ~Or(1, 2) == Not(Or(1, 2))
620
    assert ~And(1, 2) == Not(And(1, 2))
621
622
    assert ~Not(1) == 1
623
624
    assert ~Query(module=1) | ~Query(module=2) == Not(And(Query(module=1), Query(module=2)))
625
    assert ~Query(module=1) & ~Query(module=2) == Not(Or(Query(module=1), Query(module=2)))
626
627
    assert ~Query(module=1) | Query(module=2) == Or(Not(Query(module=1)), Query(module=2))
628
    assert ~Query(module=1) & Query(module=2) == And(Not(Query(module=1)), Query(module=2))
629
630
    assert ~(Query(module=1) & Query(module=2)) == Not(And(Query(module=1), Query(module=2)))
631
    assert ~(Query(module=1) | Query(module=2)) == Not(Or(Query(module=1), Query(module=2)))
632
633
    assert repr(~Or(1, 2)) == repr(Not(Or(1, 2)))
634
    assert repr(~And(1, 2)) == repr(Not(And(1, 2)))
635
636
    assert repr(~Query(module=1) | ~Query(module=2)) == repr(Not(And(Query(module=1), Query(module=2))))
637
    assert repr(~Query(module=1) & ~Query(module=2)) == repr(Not(Or(Query(module=1), Query(module=2))))
638
639
    assert repr(~(Query(module=1) & Query(module=2))) == repr(Not(And(Query(module=1), Query(module=2))))
640
    assert repr(~(Query(module=1) | Query(module=2))) == repr(Not(Or(Query(module=1), Query(module=2))))
641
642
    assert Not(Q(module=1))({'module': 1}) == False
643
644
645
def test_predicate_query_allowed():
646
    pytest.raises(TypeError, Query, 1)
647
    pytest.raises(TypeError, Query, a=1)
648
649
650
def test_predicate_when_allowed():
651
    pytest.raises(TypeError, When, 1)
652
653
654
@pytest.mark.parametrize('expr,inp,expected', [
655
    ({'module': "abc"}, {'module': "abc"}, True),
656
    ({'module': "abcd"}, {'module': "abc"}, False),
657
    ({'module': "abcd"}, {'module': "abce"}, False),
658
    ({'module_startswith': "abc"}, {'module': "abcd"}, True),
659
    ({'module__startswith': "abc"}, {'module': "abcd"}, True),
660
    ({'module_contains': "bc"}, {'module': "abcd"}, True),
661
    ({'module_contains': "bcde"}, {'module': "abcd"}, False),
662
663
    ({'module_endswith': "abc"}, {'module': "abcd"}, False),
664
    ({'module__endswith': "bcd"}, {'module': "abcd"}, True),
665
666
    ({'module_in': "abcd"}, {'module': "bc"}, True),
667
    ({'module': "abcd"}, {'module': "bc"}, False),
668
    ({'module': ["abcd"]}, {'module': "bc"}, False),
669
    ({'module_in': ["abcd"]}, {'module': "bc"}, False),
670
    ({'module_in': ["a", "bc", "d"]}, {'module': "bc"}, True),
671
672
    ({'module': "abcd"}, {'module': "abc"}, False),
673
674
    ({'module_startswith': ("abc", "xyz")}, {'module': "abc"}, True),
675
    ({'module_startswith': {"abc", "xyz"}}, {'module': "abc"}, True),
676
    ({'module_startswith': ["abc", "xyz"]}, {'module': "abc"}, True),
677
    ({'module_startswith': ("abc", "xyz")}, {'module': "abcd"}, True),
678
    ({'module_startswith': ("abc", "xyz")}, {'module': "xyzw"}, True),
679
    ({'module_startswith': ("abc", "xyz")}, {'module': "fooabc"}, False),
680
681
    ({'module_endswith': ("abc", "xyz")}, {'module': "abc"}, True),
682
    ({'module_endswith': {"abc", "xyz"}}, {'module': "abc"}, True),
683
    ({'module_endswith': ["abc", "xyz"]}, {'module': "abc"}, True),
684
    ({'module_endswith': ("abc", "xyz")}, {'module': "1abc"}, True),
685
    ({'module_endswith': ("abc", "xyz")}, {'module': "1xyz"}, True),
686
    ({'module_endswith': ("abc", "xyz")}, {'module': "abcfoo"}, False),
687
688
    ({'module': "abc"}, {'module': 1}, False),
689
690
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "regex"}, False),
691
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re.gex"}, True),
692
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "sregex"}, True),
693
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re"}, True),
694
])
695
def test_predicate_matching(expr, inp, expected):
696
    assert Query(**expr)(inp) == expected
697
698
699
@pytest.mark.parametrize('exc_type,expr', [
700
    (TypeError, {'module_1': 1}),
701
    (TypeError, {'module1': 1}),
702
    (ValueError, {'module_startswith': 1}),
703
    (ValueError, {'module_startswith': {1: 2}}),
704
    (ValueError, {'module_endswith': 1}),
705
    (ValueError, {'module_endswith': {1: 2}}),
706
    (TypeError, {'module_foo': 1}),
707
    (TypeError, {'module_a_b': 1}),
708
])
709
def test_predicate_bad_query(expr, exc_type):
710
    pytest.raises(exc_type, Query, **expr)
711
712
713
def test_predicate_when():
714
    called = []
715
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 2}) == False
716
    assert called == []
717
718
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 1}) == True
719
    assert called == [{'module': 1}]
720
721
    called = []
722
    assert Q(module=1, action=lambda ev: called.append(ev))({'module': 1}) == True
723
    assert called == [{'module': 1}]
724
725
    called = [[], []]
726
    predicate = (
727
        Q(module=1, action=lambda ev: called[0].append(ev)) |
728
        Q(module=2, action=lambda ev: called[1].append(ev))
729
    )
730
    assert predicate({'module': 1}) == True
731
    assert called == [[{'module': 1}], []]
732
733
    assert predicate({'module': 2}) == True
734
    assert called == [[{'module': 1}], [{'module': 2}]]
735
736
    called = [[], []]
737
    predicate = (
738
        Q(module=1, action=lambda ev: called[0].append(ev)) &
739
        Q(function=2, action=lambda ev: called[1].append(ev))
740
    )
741
    assert predicate({'module': 2}) == False
742
    assert called == [[], []]
743
744
    assert predicate({'module': 1, 'function': 2}) == True
745
    assert called == [[{'module': 1, 'function': 2}], [{'module': 1, 'function': 2}]]
746
747
748
def test_and_or_kwargs():
749
    assert And(module=1, function=2) == Query(module=1, function=2)
750
    assert Or(module=1, function=2) == Or(Query(module=1), Query(function=2))
751
752
    assert And(3, module=1, function=2) == And(3, Query(module=1, function=2))
753
    assert Or(3, module=1, function=2) == Or(3, Query(module=1), Query(function=2))
754
755
756
def test_proper_backend():
757
    if os.environ.get('PUREPYTHONHUNTER') or platform.python_implementation() == 'PyPy':
758
        assert 'hunter.tracer.Tracer' in repr(hunter.Tracer)
759
    else:
760
        assert 'hunter._tracer.Tracer' in repr(hunter.Tracer)
761
762
763
@pytest.fixture(scope="session", params=['pure', 'cython'])
764
def tracer_impl(request):
765
    if request.param == 'pure':
766
        return pytest.importorskip('hunter.tracer').Tracer
767
    elif request.param == 'cython':
768
        return pytest.importorskip('hunter._tracer').Tracer
769
770
771
def _tokenize():
772
    with open(tokenize.__file__, 'rb') as fh:
773
        toks = []
774
        try:
775
            for tok in tokenize.tokenize(fh.readline):
776
                toks.append(tok)
777
        except tokenize.TokenError as exc:
778
            toks.append(exc)
779
780
781
def test_perf_filter(tracer_impl, benchmark):
782
    t = tracer_impl()
783
784
    @benchmark
785
    def run():
786
        output = StringIO()
787
        with t.trace(Q(
788
            Q(module="does-not-exist") | Q(module="does not exist".split()),
789
            action=CodePrinter(stream=output)
790
        )):
791
            _tokenize()
792
        return output
793
794
    assert run.getvalue() == ''
795
796
797
def test_perf_stdlib(tracer_impl, benchmark):
798
    t = tracer_impl()
799
800
    @benchmark
801
    def run():
802
        output = StringIO()
803
        with t.trace(Q(
804
            ~Q(module_contains='pytest'),
805
            ~Q(module_contains='hunter'),
806
            ~Q(filename=''),
807
            stdlib=False,
808
            action=CodePrinter(stream=output)
809
        )):
810
            _tokenize()
811
        return output
812
813
    assert run.getvalue() == ''
814
815
816
def test_perf_actions(tracer_impl, benchmark):
817
    t = tracer_impl()
818
819
    @benchmark
820
    def run():
821
        output = StringIO()
822
        with t.trace(Q(
823
            ~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind="call"),
824
            actions=[
825
                CodePrinter(
826
                    stream=output
827
                ),
828
                VarsPrinter(
829
                    'line',
830
                    globals=True,
831
                    stream=output
832
                )
833
            ]
834
        )):
835
            _tokenize()
836