Completed
Push — master ( da4845...4a7506 )
by Ionel Cristian
01:05
created

tests.test_threading_support()   C

Complexity

Conditions 7

Size

Total Lines 36

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 36
rs 5.5
cc 7

2 Methods

Rating   Name   Duplication   Size   Complexity  
A tests.record() 0 4 1
A tests.main() 0 2 1
1
from __future__ import print_function
2
3
import inspect
4
import os
5
import platform
6
import subprocess
7
import sys
8
import threading
9
import tokenize
10
11
import hunter
12
import pytest
13
from fields import Fields
14
from hunter import Q
15
from hunter import And
16
from hunter import CallPrinter
17
from hunter import CodePrinter
18
from hunter import Debugger
19
from hunter import Not
20
from hunter import Or
21
from hunter import Query
22
from hunter import VarsPrinter
23
from hunter import When
24
25
try:
26
    from cStringIO import StringIO
27
except ImportError:
28
    from io import StringIO
29
try:
30
    from itertools import izip_longest
31
except ImportError:
32
    from itertools import zip_longest as izip_longest
33
34
pytest_plugins = 'pytester',
35
36
37
class FakeCallable(Fields.value):
38
    def __call__(self):
39
        raise NotImplementedError("Nope")
40
41
    def __repr__(self):
42
        return repr(self.value)
43
44
    def __str__(self):
45
        return str(self.value)
46
47
C = FakeCallable
48
49
50
class EvilTracer(object):
51
    def __init__(self, *args, **kwargs):
52
        self._calls = []
53
        threading_support = kwargs.pop('threading_support', False)
54
        self._handler = hunter._prepare_predicate(*args, **kwargs)
55
        self._tracer = hunter.trace(self._append, threading_support=threading_support)
56
57
    def _append(self, event):
58
        # Make sure the lineno is cached. Frames are reused
59
        # and later on the events would be very broken ..
60
        event.lineno
61
        self._calls.append(event)
62
63
    def __enter__(self):
64
        return self
65
66
    def __exit__(self, exc_type, exc_val, exc_tb):
67
        self._tracer.stop()
68
        predicate = self._handler
69
        for call in self._calls:
70
            predicate(call)
71
72
73
trace = EvilTracer
74
75
76
def _get_func_spec(func):
77
    spec = inspect.getargspec(func)
78
    return inspect.formatargspec(spec.args, spec.varargs)
79
80
81
def test_pth_activation():
82
    module_name = os.path.__name__
83
    expected_module = "{0}.py".format(module_name)
84
    hunter_env = "module={!r},function=\"join\"".format(module_name)
85
    func_spec = _get_func_spec(os.path.join)
86
    expected_call = "call      def join{0}:".format(func_spec)
87
88
    output = subprocess.check_output(
89
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample.py')],
90
        env=dict(os.environ, PYTHONHUNTER=hunter_env),
91
        stderr=subprocess.STDOUT,
92
    )
93
    assert expected_module.encode() in output
94
    assert expected_call.encode() in output
95
96
97
def test_pth_sample4():
98
    env = dict(os.environ, PYTHONHUNTER="CodePrinter")
99
    env.pop('COVERAGE_PROCESS_START', None)
100
    env.pop('COV_CORE_SOURCE', None)
101
    output = subprocess.check_output(
102
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample4.py')],
103
        env=env,
104
        stderr=subprocess.STDOUT,
105
    )
106
    assert output
107
108
109
def test_pth_sample2(LineMatcher):
110
    env = dict(os.environ, PYTHONHUNTER="module='__main__'")
111
    env.pop('COVERAGE_PROCESS_START', None)
112
    env.pop('COV_CORE_SOURCE', None)
113
    output = subprocess.check_output(
114
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample2.py')],
115
        env=env,
116
        stderr=subprocess.STDOUT,
117
    )
118
    lm = LineMatcher(output.decode('utf-8').splitlines())
119
    lm.fnmatch_lines([
120
        '*tests*sample2.py:* call      if __name__ == "__main__":  #*',
121
        '*tests*sample2.py:* line      if __name__ == "__main__":  #*',
122
        '*tests*sample2.py:* line          import functools',
123
        '*tests*sample2.py:* line          def deco(opt):',
124
        '*tests*sample2.py:* line          @deco(1)',
125
        '*tests*sample2.py:* call          def deco(opt):',
126
        '*tests*sample2.py:* line              def decorator(func):',
127
        '*tests*sample2.py:* line              return decorator',
128
        '*tests*sample2.py:* return            return decorator',
129
        '*                 * ...       return value: <function deco*',
130
        '*tests*sample2.py:* line          @deco(2)',
131
        '*tests*sample2.py:* call          def deco(opt):',
132
        '*tests*sample2.py:* line              def decorator(func):',
133
        '*tests*sample2.py:* line              return decorator',
134
        '*tests*sample2.py:* return            return decorator',
135
        '*                 * ...       return value: <function deco*',
136
        '*tests*sample2.py:* line          @deco(3)',
137
        '*tests*sample2.py:* call          def deco(opt):',
138
        '*tests*sample2.py:* line              def decorator(func):',
139
        '*tests*sample2.py:* line              return decorator',
140
        '*tests*sample2.py:* return            return decorator',
141
        '*                 * ...       return value: <function deco*',
142
        '*tests*sample2.py:* call              def decorator(func):',
143
        '*tests*sample2.py:* line                  @functools.wraps(func)',
144
        '*tests*sample2.py:* line                  return wrapper',
145
        '*tests*sample2.py:* return                return wrapper',
146
        '*                 * ...       return value: <function foo *',
147
        '*tests*sample2.py:* call              def decorator(func):',
148
        '*tests*sample2.py:* line                  @functools.wraps(func)',
149
        '*tests*sample2.py:* line                  return wrapper',
150
        '*tests*sample2.py:* return                return wrapper',
151
        '*                 * ...       return value: <function foo *',
152
        '*tests*sample2.py:* call              def decorator(func):',
153
        '*tests*sample2.py:* line                  @functools.wraps(func)',
154
        '*tests*sample2.py:* line                  return wrapper',
155
        '*tests*sample2.py:* return                return wrapper',
156
        '*                 * ...       return value: <function foo *',
157
        '*tests*sample2.py:* line          foo(',
158
        "*tests*sample2.py:* line              'a*',",
159
        "*tests*sample2.py:* line              'b'",
160
        '*tests*sample2.py:* call                  @functools.wraps(func)',
161
        '*                 *    |                  def wrapper(*args):',
162
        '*tests*sample2.py:* line                      return func(*args)',
163
        '*tests*sample2.py:* call                  @functools.wraps(func)',
164
        '*                 *    |                  def wrapper(*args):',
165
        '*tests*sample2.py:* line                      return func(*args)',
166
        '*tests*sample2.py:* call                  @functools.wraps(func)',
167
        '*                 *    |                  def wrapper(*args):',
168
        '*tests*sample2.py:* line                      return func(*args)',
169
        '*tests*sample2.py:* call          @deco(1)',
170
        '*                 *    |          @deco(2)',
171
        '*                 *    |          @deco(3)',
172
        '*                 *    |          def foo(*args):',
173
        '*tests*sample2.py:* line              return args',
174
        '*tests*sample2.py:* return            return args',
175
        "*                 * ...       return value: ('a*', 'b')",
176
        "*tests*sample2.py:* return                    return func(*args)",
177
        "*                 * ...       return value: ('a*', 'b')",
178
        "*tests*sample2.py:* return                    return func(*args)",
179
        "*                 * ...       return value: ('a*', 'b')",
180
        "*tests*sample2.py:* return                    return func(*args)",
181
        "*                 * ...       return value: ('a*', 'b')",
182
        "*tests*sample2.py:* line          try:",
183
        "*tests*sample2.py:* line              None(",
184
        "*tests*sample2.py:* line                  'a',",
185
        "*tests*sample2.py:* line                  'b'",
186
        "*tests*sample2.py:* exception             'b'",
187
        "*                 * ...       exception value: *",
188
        "*tests*sample2.py:* line          except:",
189
        "*tests*sample2.py:* line              pass",
190
        "*tests*sample2.py:* return            pass",
191
        "*                   ...       return value: None",
192
    ])
193
194
195
def test_predicate_str_repr():
196
    assert repr(Q(module='a', function='b')).endswith("predicates.Query: query_eq=(('function', 'b'), ('module', 'a'))>")
197
    assert str(Q(module='a', function='b')) == "Query(function='b', module='a')"
198
199
    assert repr(Q(module='a')).endswith("predicates.Query: query_eq=(('module', 'a'),)>")
200
    assert str(Q(module='a')) == "Query(module='a')"
201
202
    assert "predicates.When: condition=<hunter." in repr(Q(module='a', action=C('foo')))
203
    assert "predicates.Query: query_eq=(('module', 'a'),)>, actions=('foo',)>" in repr(Q(module='a', action=C('foo')))
204
    assert str(Q(module='a', action=C('foo'))) == "When(Query(module='a'), 'foo')"
205
206
    assert "predicates.Not: predicate=<hunter." in repr(~Q(module='a'))
207
    assert "predicates.Query: query_eq=(('module', 'a'),)>>" in repr(~Q(module='a'))
208
    assert str(~Q(module='a')) == "Not(Query(module='a'))"
209
210
    assert "predicates.Or: predicates=(<hunter." in repr(Q(module='a') | Q(module='b'))
211
    assert "predicates.Query: query_eq=(('module', 'a'),)>, " in repr(Q(module='a') | Q(module='b'))
212
    assert repr(Q(module='a') | Q(module='b')).endswith("predicates.Query: query_eq=(('module', 'b'),)>)>")
213
    assert str(Q(module='a') | Q(module='b')) == "Or(Query(module='a'), Query(module='b'))"
214
215
    assert "predicates.And: predicates=(<hunter." in repr(Q(module='a') & Q(module='b'))
216
    assert "predicates.Query: query_eq=(('module', 'a'),)>," in repr(Q(module='a') & Q(module='b'))
217
    assert repr(Q(module='a') & Q(module='b')).endswith("predicates.Query: query_eq=(('module', 'b'),)>)>")
218
    assert str(Q(module='a') & Q(module='b')) == "And(Query(module='a'), Query(module='b'))"
219
220
221
def test_predicate_q_nest_1():
222
    assert repr(Q(Q(module='a'))).endswith("predicates.Query: query_eq=(('module', 'a'),)>")
223
224
225
def test_predicate_q_not_callable():
226
    exc = pytest.raises(TypeError, Q, 'foobar')
227
    assert exc.value.args == ("Predicate 'foobar' is not callable.",)
228
229
230
def test_predicate_q_expansion():
231
    assert Q(C(1), C(2), module=3) == And(C(1), C(2), Q(module=3))
232
    assert Q(C(1), C(2), module=3, action=C(4)) == When(And(C(1), C(2), Q(module=3)), C(4))
233
    assert Q(C(1), C(2), module=3, actions=[C(4), C(5)]) == When(And(C(1), C(2), Q(module=3)), C(4), C(5))
234
235
236
def test_predicate_and():
237
    assert And(C(1), C(2)) == And(C(1), C(2))
238
    assert Q(module=1) & Q(module=2) == And(Q(module=1), Q(module=2))
239
    assert Q(module=1) & Q(module=2) & Q(module=3) == And(Q(module=1), Q(module=2), Q(module=3))
240
241
    assert (Q(module=1) & Q(module=2))({'module': 3}) == False
242
    assert (Q(module=1) & Q(function=2))({'module': 1, 'function': 2}) == True
243
244
    assert And(1, 2) | 3 == Or(And(1, 2), 3)
245
246
247
def test_predicate_or():
248
    assert Q(module=1) | Q(module=2) == Or(Q(module=1), Q(module=2))
249
    assert Q(module=1) | Q(module=2) | Q(module=3) == Or(Q(module=1), Q(module=2), Q(module=3))
250
251
    assert (Q(module=1) | Q(module=2))({'module': 3}) == False
252
    assert (Q(module=1) | Q(module=2))({'module': 2}) == True
253
254
    assert Or(1, 2) & 3 == And(Or(1, 2), 3)
255
256
257
def test_tracing_bare(LineMatcher):
258
    lines = StringIO()
259
    with hunter.trace(CodePrinter(stream=lines)):
260
        def a():
261
            return 1
262
263
        b = a()
264
        b = 2
265
        try:
266
            raise Exception("BOOM!")
267
        except Exception:
268
            pass
269
    print(lines.getvalue())
270
    lm = LineMatcher(lines.getvalue().splitlines())
271
    lm.fnmatch_lines([
272
        "*test_hunter.py* call              def a():",
273
        "*test_hunter.py* line                  return 1",
274
        "*test_hunter.py* return                return 1",
275
        "* ...       return value: 1",
276
    ])
277
278
279
def test_threading_support(LineMatcher):
280
    lines = StringIO()
281
    idents = set()
282
    names = set()
283
    non_main_idents = set()
284
285
    def record(event):
286
        idents.add(event.threadid)
287
        names.add(event.threadname)
288
        return True
289
290
    with hunter.trace(record,
291
                      actions=[CodePrinter(stream=lines), VarsPrinter('a', stream=lines), CallPrinter(stream=lines)],
292
                      threading_support=True):
293
        def foo(a=1):
294
            print(a)
295
296
        def main():
297
            foo()
298
299
        t = threading.Thread(target=foo)
300
        t.start()
301
        main()
302
303
    lm = LineMatcher(lines.getvalue().splitlines())
304
    assert idents - {t.ident} == {None}
305
    assert names == {'MainThread', 'Thread-1'}
306
    lm.fnmatch_lines_random([
307
        'Thread-1    *test_hunter.py:*   call              def foo(a=1):',
308
        'Thread-1    *                   vars      a => 1',
309
        'Thread-1    *test_hunter.py:*   call         => foo(a=1)',
310
        'Thread-1    *                   vars      a => 1',
311
        'MainThread  *test_hunter.py:*   call              def foo(a=1):',
312
        'MainThread  *                   vars      a => 1',
313
        'MainThread  *test_hunter.py:*   call         => foo(a=1)',
314
        'MainThread  *                   vars      a => 1',
315
    ])
316
317
318
def test_tracing_printing_failures(LineMatcher):
319
    lines = StringIO()
320
    with trace(actions=[CodePrinter(stream=lines), VarsPrinter("x", stream=lines)]):
321
        class Bad(Exception):
322
            def __repr__(self):
323
                raise RuntimeError("I'm a bad class!")
324
325
        def a():
326
            x = Bad()
327
            return x
328
329
        def b():
330
            x = Bad()
331
            raise x
332
333
        a()
334
        try:
335
            b()
336
        except Exception as exc:
337
            pass
338
    lm = LineMatcher(lines.getvalue().splitlines())
339
    lm.fnmatch_lines([
340
        """*tests*test_hunter.py:* call              class Bad(Exception):""",
341
        """*tests*test_hunter.py:* line              class Bad(Exception):""",
342
        """*tests*test_hunter.py:* line                  def __repr__(self):""",
343
        """*tests*test_hunter.py:* return                def __repr__(self):""",
344
        """* ...       return value: *""",
345
        """*tests*test_hunter.py:* call              def a():""",
346
        """*tests*test_hunter.py:* line                  x = Bad()""",
347
        """*tests*test_hunter.py:* line                  return x""",
348
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
349
        """*tests*test_hunter.py:* return                return x""",
350
        """* ...       return value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
351
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
352
        """*tests*test_hunter.py:* call              def b():""",
353
        """*tests*test_hunter.py:* line                  x = Bad()""",
354
        """*tests*test_hunter.py:* line                  raise x""",
355
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
356
        """*tests*test_hunter.py:* exception             raise x""",
357
        """* ...       exception value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
358
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
359
        """*tests*test_hunter.py:* return                raise x""",
360
        """* ...       return value: None""",
361
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
362
    ])
363
364
365
def test_tracing_vars(LineMatcher):
366
    lines = StringIO()
367
    with hunter.trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
368
        def a():
369
            b = 1
370
            b = 2
371
            return 1
372
373
        b = a()
374
        b = 2
375
        try:
376
            raise Exception("BOOM!")
377
        except Exception:
378
            pass
379
    print(lines.getvalue())
380
    lm = LineMatcher(lines.getvalue().splitlines())
381
    lm.fnmatch_lines([
382
        "*test_hunter.py* call              def a():",
383
        "*test_hunter.py* line                  b = 1",
384
        "* vars      b => 1",
385
        "*test_hunter.py* line                  b = 2",
386
        "* vars      b => 2",
387
        "*test_hunter.py* line                  return 1",
388
        "* vars      b => 2",
389
        "*test_hunter.py* return                return 1",
390
        "* ...       return value: 1",
391
    ])
392
393
394
def test_trace_merge():
395
    with hunter.trace(function="a"):
396
        with hunter.trace(function="b"):
397
            with hunter.trace(function="c"):
398
                assert sys.gettrace()._handler == When(Q(function="c"), CodePrinter)
399
            assert sys.gettrace()._handler == When(Q(function="b"), CodePrinter)
400
        assert sys.gettrace()._handler == When(Q(function="a"), CodePrinter)
401
402
403
def test_trace_api_expansion():
404
    # simple use
405
    with trace(function="foobar") as t:
406
        assert t._handler == When(Q(function="foobar"), CodePrinter)
407
408
    # "or" by expression
409
    with trace(module="foo", function="foobar") as t:
410
        assert t._handler == When(Q(module="foo", function="foobar"), CodePrinter)
411
412
    # pdb.set_trace
413
    with trace(function="foobar", action=Debugger) as t:
414
        assert str(t._handler) == str(When(Q(function="foobar"), Debugger))
415
416
    # pdb.set_trace on any hits
417
    with trace(module="foo", function="foobar", action=Debugger) as t:
418
        assert str(t._handler) == str(When(Q(module="foo", function="foobar"), Debugger))
419
420
    # pdb.set_trace when function is foobar, otherwise just print when module is foo
421
    with trace(Q(function="foobar", action=Debugger), module="foo") as t:
422
        assert str(t._handler) == str(When(And(
423
            When(Q(function="foobar"), Debugger),
424
            Q(module="foo")
425
        ), CodePrinter))
426
427
    # dumping variables from stack
428
    with trace(Q(function="foobar", action=VarsPrinter("foobar")), module="foo") as t:
429
        assert str(t._handler) == str(When(And(
430
            When(Q(function="foobar"), VarsPrinter("foobar")),
431
            Q(module="foo"),
432
        ), CodePrinter))
433
434
    with trace(Q(function="foobar", action=VarsPrinter("foobar", "mumbojumbo")), module="foo") as t:
435
        assert str(t._handler) == str(When(And(
436
            When(Q(function="foobar"), VarsPrinter("foobar", "mumbojumbo")),
437
            Q(module="foo"),
438
        ), CodePrinter))
439
440
    # multiple actions
441
    with trace(Q(function="foobar", actions=[VarsPrinter("foobar"), Debugger]), module="foo") as t:
442
        assert str(t._handler) == str(When(And(
443
            When(Q(function="foobar"), VarsPrinter("foobar"), Debugger),
444
            Q(module="foo"),
445
        ), CodePrinter))
446
447
448
def test_locals():
449
    out = StringIO()
450
    with hunter.trace(
451
        lambda event: event.locals.get("node") == "Foobar",
452
        module="test_hunter",
453
        function="foo",
454
        action=CodePrinter(stream=out)
455
    ):
456
        def foo():
457
            a = 1
458
            node = "Foobar"
459
            node += "x"
460
            a += 2
461
            return a
462
463
        foo()
464
    assert out.getvalue().endswith('node += "x"\n')
465
466
467
def test_fullsource_decorator_issue(LineMatcher):
468
    out = StringIO()
469
    with trace(kind='call', action=CodePrinter(stream=out)):
470
        foo = bar = lambda x: x
471
472
        @foo
473
        @bar
474
        def foo():
475
            return 1
476
477
        foo()
478
479
    lm = LineMatcher(out.getvalue().splitlines())
480
    lm.fnmatch_lines([
481
        '* call              @foo',
482
        '*    |              @bar',
483
        '*    |              def foo():',
484
    ])
485
486
487
def test_callprinter(LineMatcher):
488
    out = StringIO()
489
    with trace(action=CallPrinter(stream=out)):
490
        foo = bar = lambda x: x
491
492
        @foo
493
        @bar
494
        def foo():
495
            return 1
496
497
        foo()
498
499
    lm = LineMatcher(out.getvalue().splitlines())
500
    lm.fnmatch_lines([
501
        '* call      => <lambda>(x=<function *foo at *>)',
502
        '* line         foo = bar = lambda x: x',
503
        '* return    <= <lambda>: <function *foo at *>',
504
        '* call      => <lambda>(x=<function *foo at *>)',
505
        '* line         foo = bar = lambda x: x',
506
        '* return    <= <lambda>: <function *foo at *>',
507
        '* call      => foo()',
508
        '* line         return 1',
509
        '* return    <= foo: 1',
510
    ])
511
512
513
def test_source(LineMatcher):
514
    calls = []
515
    with trace(action=lambda event: calls.append(event.source)):
516
        foo = bar = lambda x: x
517
518
        @foo
519
        @bar
520
        def foo():
521
            return 1
522
523
        foo()
524
525
    lm = LineMatcher(calls)
526
    lm.fnmatch_lines([
527
        '        foo = bar = lambda x: x\n',
528
        '        @foo\n',
529
        '            return 1\n',
530
    ])
531
532
533
def test_fullsource(LineMatcher):
534
    calls = []
535
    with trace(action=lambda event: calls.append(event.fullsource)):
536
        foo = bar = lambda x: x
537
538
        @foo
539
        @bar
540
        def foo():
541
            return 1
542
543
        foo()
544
545
    lm = LineMatcher(calls)
546
    lm.fnmatch_lines([
547
        '        foo = bar = lambda x: x\n',
548
        '        @foo\n        @bar\n        def foo():\n',
549
        '            return 1\n',
550
    ])
551
552
553
def test_debugger(LineMatcher):
554
    out = StringIO()
555
    calls = []
556
557
    class FakePDB:
558
        def __init__(self, foobar=1):
559
            calls.append(foobar)
560
561
        def set_trace(self, frame):
562
            calls.append(frame.f_code.co_name)
563
564
    with hunter.trace(
565
        lambda event: event.locals.get("node") == "Foobar",
566
        module="test_hunter",
567
        function="foo",
568
        actions=[VarsPrinter("a", "node", "foo", "test_debugger", globals=True, stream=out),
569
                 Debugger(klass=FakePDB, foobar=2)]
570
    ):
571
        def foo():
572
            a = 1
573
            node = "Foobar"
574
            node += "x"
575
            a += 2
576
            return a
577
578
        foo()
579
    print(out.getvalue())
580
    assert calls == [2, 'foo']
581
    lm = LineMatcher(out.getvalue().splitlines())
582
    lm.fnmatch_lines_random([
583
        "*      test_debugger => <function test_debugger at *",
584
        "*      node => 'Foobar'",
585
        "*      a => 1",
586
    ])
587
588
589
def test_custom_action():
590
    calls = []
591
592
    with trace(action=lambda event: calls.append(event.function), kind="return"):
593
        def foo():
594
            return 1
595
596
        foo()
597
    assert 'foo' in calls
598
599
600
def test_trace_with_class_actions():
601
    with trace(CodePrinter):
602
        def a():
603
            pass
604
605
        a()
606
607
608
def test_predicate_no_inf_recursion():
609
    assert Or(And(1)) == 1
610
    assert Or(Or(1)) == 1
611
    assert And(Or(1)) == 1
612
    assert And(And(1)) == 1
613
    predicate = Q(Q(lambda ev: 1, module='wat'))
614
    print('predicate:', predicate)
615
    predicate({'module': 'foo'})
616
617
618
def test_predicate_compression():
619
    assert Or(Or(1, 2), And(3)) == Or(1, 2, 3)
620
    assert Or(Or(1, 2), 3) == Or(1, 2, 3)
621
    assert Or(1, Or(2, 3), 4) == Or(1, 2, 3, 4)
622
    assert And(1, 2, Or(3, 4)).predicates == (1, 2, Or(3, 4))
623
624
    assert repr(Or(Or(1, 2), And(3))) == repr(Or(1, 2, 3))
625
    assert repr(Or(Or(1, 2), 3)) == repr(Or(1, 2, 3))
626
    assert repr(Or(1, Or(2, 3), 4)) == repr(Or(1, 2, 3, 4))
627
628
629
def test_predicate_not():
630
    assert Not(1).predicate == 1
631
    assert ~Or(1, 2) == Not(Or(1, 2))
632
    assert ~And(1, 2) == Not(And(1, 2))
633
634
    assert ~Not(1) == 1
635
636
    assert ~Query(module=1) | ~Query(module=2) == Not(And(Query(module=1), Query(module=2)))
637
    assert ~Query(module=1) & ~Query(module=2) == Not(Or(Query(module=1), Query(module=2)))
638
639
    assert ~Query(module=1) | Query(module=2) == Or(Not(Query(module=1)), Query(module=2))
640
    assert ~Query(module=1) & Query(module=2) == And(Not(Query(module=1)), Query(module=2))
641
642
    assert ~(Query(module=1) & Query(module=2)) == Not(And(Query(module=1), Query(module=2)))
643
    assert ~(Query(module=1) | Query(module=2)) == Not(Or(Query(module=1), Query(module=2)))
644
645
    assert repr(~Or(1, 2)) == repr(Not(Or(1, 2)))
646
    assert repr(~And(1, 2)) == repr(Not(And(1, 2)))
647
648
    assert repr(~Query(module=1) | ~Query(module=2)) == repr(Not(And(Query(module=1), Query(module=2))))
649
    assert repr(~Query(module=1) & ~Query(module=2)) == repr(Not(Or(Query(module=1), Query(module=2))))
650
651
    assert repr(~(Query(module=1) & Query(module=2))) == repr(Not(And(Query(module=1), Query(module=2))))
652
    assert repr(~(Query(module=1) | Query(module=2))) == repr(Not(Or(Query(module=1), Query(module=2))))
653
654
    assert Not(Q(module=1))({'module': 1}) == False
655
656
657
def test_predicate_query_allowed():
658
    pytest.raises(TypeError, Query, 1)
659
    pytest.raises(TypeError, Query, a=1)
660
661
662
def test_predicate_when_allowed():
663
    pytest.raises(TypeError, When, 1)
664
665
666
@pytest.mark.parametrize('expr,inp,expected', [
667
    ({'module': "abc"}, {'module': "abc"}, True),
668
    ({'module': "abcd"}, {'module': "abc"}, False),
669
    ({'module': "abcd"}, {'module': "abce"}, False),
670
    ({'module_startswith': "abc"}, {'module': "abcd"}, True),
671
    ({'module__startswith': "abc"}, {'module': "abcd"}, True),
672
    ({'module_contains': "bc"}, {'module': "abcd"}, True),
673
    ({'module_contains': "bcde"}, {'module': "abcd"}, False),
674
675
    ({'module_endswith': "abc"}, {'module': "abcd"}, False),
676
    ({'module__endswith': "bcd"}, {'module': "abcd"}, True),
677
678
    ({'module_in': "abcd"}, {'module': "bc"}, True),
679
    ({'module': "abcd"}, {'module': "bc"}, False),
680
    ({'module': ["abcd"]}, {'module': "bc"}, False),
681
    ({'module_in': ["abcd"]}, {'module': "bc"}, False),
682
    ({'module_in': ["a", "bc", "d"]}, {'module': "bc"}, True),
683
684
    ({'module': "abcd"}, {'module': "abc"}, False),
685
686
    ({'module_startswith': ("abc", "xyz")}, {'module': "abc"}, True),
687
    ({'module_startswith': {"abc", "xyz"}}, {'module': "abc"}, True),
688
    ({'module_startswith': ["abc", "xyz"]}, {'module': "abc"}, True),
689
    ({'module_startswith': ("abc", "xyz")}, {'module': "abcd"}, True),
690
    ({'module_startswith': ("abc", "xyz")}, {'module': "xyzw"}, True),
691
    ({'module_startswith': ("abc", "xyz")}, {'module': "fooabc"}, False),
692
693
    ({'module_endswith': ("abc", "xyz")}, {'module': "abc"}, True),
694
    ({'module_endswith': {"abc", "xyz"}}, {'module': "abc"}, True),
695
    ({'module_endswith': ["abc", "xyz"]}, {'module': "abc"}, True),
696
    ({'module_endswith': ("abc", "xyz")}, {'module': "1abc"}, True),
697
    ({'module_endswith': ("abc", "xyz")}, {'module': "1xyz"}, True),
698
    ({'module_endswith': ("abc", "xyz")}, {'module': "abcfoo"}, False),
699
700
    ({'module': "abc"}, {'module': 1}, False),
701
702
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "regex"}, False),
703
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re.gex"}, True),
704
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "sregex"}, True),
705
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re"}, True),
706
])
707
def test_predicate_matching(expr, inp, expected):
708
    assert Query(**expr)(inp) == expected
709
710
711
@pytest.mark.parametrize('exc_type,expr', [
712
    (TypeError, {'module_1': 1}),
713
    (TypeError, {'module1': 1}),
714
    (ValueError, {'module_startswith': 1}),
715
    (ValueError, {'module_startswith': {1: 2}}),
716
    (ValueError, {'module_endswith': 1}),
717
    (ValueError, {'module_endswith': {1: 2}}),
718
    (TypeError, {'module_foo': 1}),
719
    (TypeError, {'module_a_b': 1}),
720
])
721
def test_predicate_bad_query(expr, exc_type):
722
    pytest.raises(exc_type, Query, **expr)
723
724
725
def test_predicate_when():
726
    called = []
727
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 2}) == False
728
    assert called == []
729
730
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 1}) == True
731
    assert called == [{'module': 1}]
732
733
    called = []
734
    assert Q(module=1, action=lambda ev: called.append(ev))({'module': 1}) == True
735
    assert called == [{'module': 1}]
736
737
    called = [[], []]
738
    predicate = (
739
        Q(module=1, action=lambda ev: called[0].append(ev)) |
740
        Q(module=2, action=lambda ev: called[1].append(ev))
741
    )
742
    assert predicate({'module': 1}) == True
743
    assert called == [[{'module': 1}], []]
744
745
    assert predicate({'module': 2}) == True
746
    assert called == [[{'module': 1}], [{'module': 2}]]
747
748
    called = [[], []]
749
    predicate = (
750
        Q(module=1, action=lambda ev: called[0].append(ev)) &
751
        Q(function=2, action=lambda ev: called[1].append(ev))
752
    )
753
    assert predicate({'module': 2}) == False
754
    assert called == [[], []]
755
756
    assert predicate({'module': 1, 'function': 2}) == True
757
    assert called == [[{'module': 1, 'function': 2}], [{'module': 1, 'function': 2}]]
758
759
760
def test_and_or_kwargs():
761
    assert And(module=1, function=2) == Query(module=1, function=2)
762
    assert Or(module=1, function=2) == Or(Query(module=1), Query(function=2))
763
764
    assert And(3, module=1, function=2) == And(3, Query(module=1, function=2))
765
    assert Or(3, module=1, function=2) == Or(3, Query(module=1), Query(function=2))
766
767
768
def test_proper_backend():
769
    if os.environ.get('PUREPYTHONHUNTER') or platform.python_implementation() == 'PyPy':
770
        assert 'hunter.tracer.Tracer' in repr(hunter.Tracer)
771
    else:
772
        assert 'hunter._tracer.Tracer' in repr(hunter.Tracer)
773
774
775
@pytest.fixture(scope="session", params=['pure', 'cython'])
776
def tracer_impl(request):
777
    if request.param == 'pure':
778
        return pytest.importorskip('hunter.tracer').Tracer
779
    elif request.param == 'cython':
780
        return pytest.importorskip('hunter._tracer').Tracer
781
782
783
def _tokenize():
784
    with open(tokenize.__file__, 'rb') as fh:
785
        toks = []
786
        try:
787
            for tok in tokenize.tokenize(fh.readline):
788
                toks.append(tok)
789
        except tokenize.TokenError as exc:
790
            toks.append(exc)
791
792
793
def test_perf_filter(tracer_impl, benchmark):
794
    t = tracer_impl()
795
796
    @benchmark
797
    def run():
798
        output = StringIO()
799
        with t.trace(Q(
800
            Q(module="does-not-exist") | Q(module="does not exist".split()),
801
            action=CodePrinter(stream=output)
802
        )):
803
            _tokenize()
804
        return output
805
806
    assert run.getvalue() == ''
807
808
809
def test_perf_stdlib(tracer_impl, benchmark):
810
    t = tracer_impl()
811
812
    @benchmark
813
    def run():
814
        output = StringIO()
815
        with t.trace(Q(
816
            ~Q(module_contains='pytest'),
817
            ~Q(module_contains='hunter'),
818
            ~Q(filename=''),
819
            stdlib=False,
820
            action=CodePrinter(stream=output)
821
        )):
822
            _tokenize()
823
        return output
824
825
    assert run.getvalue() == ''
826
827
828
def test_perf_actions(tracer_impl, benchmark):
829
    t = tracer_impl()
830
831
    @benchmark
832
    def run():
833
        output = StringIO()
834
        with t.trace(Q(
835
            ~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind="call"),
836
            actions=[
837
                CodePrinter(
838
                    stream=output
839
                ),
840
                VarsPrinter(
841
                    'line',
842
                    globals=True,
843
                    stream=output
844
                )
845
            ]
846
        )):
847
            _tokenize()
848