Completed
Push — master ( fb4fb6...da9680 )
by Ionel Cristian
01:05
created

tests.test_and_or_kwargs()   B

Complexity

Conditions 5

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 6
rs 8.5455
cc 5
1
from __future__ import print_function
2
3
import inspect
4
import os
5
import platform
6
import subprocess
7
import sys
8
import tokenize
9
10
try:
11
    from cStringIO import StringIO
12
except ImportError:
13
    from io import StringIO
14
try:
15
    from itertools import izip_longest
16
except ImportError:
17
    from itertools import zip_longest as izip_longest
18
19
import pytest
20
21
import hunter
22
from hunter import And
23
from hunter import Not
24
from hunter import Or
25
from hunter import Q
26
from hunter import Query
27
from hunter import When
28
29
from hunter import CodePrinter
30
from hunter import Debugger
31
from hunter import VarsPrinter
32
33
pytest_plugins = 'pytester',
34
35
36
class EvilTracer(object):
37
    def __init__(self, *args, **kwargs):
38
        self._calls = []
39
        self._handler = hunter._prepare_predicate(*args, **kwargs)
40
        self._tracer = hunter.trace(self._append)
41
42
    def _append(self, event):
43
        # Make sure the lineno is cached. Frames are reused
44
        # and later on the events would be very broken ..
45
        event.lineno
46
        self._calls.append(event)
47
48
    def __enter__(self):
49
        return self
50
51
    def __exit__(self, exc_type, exc_val, exc_tb):
52
        self._tracer.stop()
53
        predicate = self._handler
54
        for call in self._calls:
55
            predicate(call)
56
57
58
trace = EvilTracer
59
60
61
def _get_func_spec(func):
62
    spec = inspect.getargspec(func)
63
    return inspect.formatargspec(spec.args, spec.varargs)
64
65
66
def test_pth_activation():
67
    module_name = os.path.__name__
68
    expected_module = "{0}.py".format(module_name)
69
    hunter_env = "module={!r},function=\"join\"".format(module_name)
70
    func_spec = _get_func_spec(os.path.join)
71
    expected_call = "call      def join{0}:".format(func_spec)
72
73
    output = subprocess.check_output(
74
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample.py')],
75
        env=dict(os.environ, PYTHONHUNTER=hunter_env),
76
        stderr=subprocess.STDOUT,
77
    )
78
    assert expected_module.encode() in output
79
    assert expected_call.encode() in output
80
81
82
def test_pth_sample4():
83
    env = dict(os.environ, PYTHONHUNTER="CodePrinter")
84
    env.pop('COVERAGE_PROCESS_START', None)
85
    env.pop('COV_CORE_SOURCE', None)
86
    output = subprocess.check_output(
87
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample4.py')],
88
        env=env,
89
        stderr=subprocess.STDOUT,
90
    )
91
    assert output
92
93
94
def test_pth_sample2(LineMatcher):
95
    env = dict(os.environ, PYTHONHUNTER="module='__main__'")
96
    env.pop('COVERAGE_PROCESS_START', None)
97
    env.pop('COV_CORE_SOURCE', None)
98
    output = subprocess.check_output(
99
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample2.py')],
100
        env=env,
101
        stderr=subprocess.STDOUT,
102
    )
103
    lm = LineMatcher(output.decode('utf-8').splitlines())
104
    lm.fnmatch_lines([
105
        '*tests*sample2.py:* call      if __name__ == "__main__":  #*',
106
        '*tests*sample2.py:* line      if __name__ == "__main__":  #*',
107
        '*tests*sample2.py:* line          import functools',
108
        '*tests*sample2.py:* line          def deco(opt):',
109
        '*tests*sample2.py:* line          @deco(1)',
110
        '*tests*sample2.py:* call          def deco(opt):',
111
        '*tests*sample2.py:* line              def decorator(func):',
112
        '*tests*sample2.py:* line              return decorator',
113
        '*tests*sample2.py:* return            return decorator',
114
        '*                 * ...       return value: <function deco*',
115
        '*tests*sample2.py:* line          @deco(2)',
116
        '*tests*sample2.py:* call          def deco(opt):',
117
        '*tests*sample2.py:* line              def decorator(func):',
118
        '*tests*sample2.py:* line              return decorator',
119
        '*tests*sample2.py:* return            return decorator',
120
        '*                 * ...       return value: <function deco*',
121
        '*tests*sample2.py:* line          @deco(3)',
122
        '*tests*sample2.py:* call          def deco(opt):',
123
        '*tests*sample2.py:* line              def decorator(func):',
124
        '*tests*sample2.py:* line              return decorator',
125
        '*tests*sample2.py:* return            return decorator',
126
        '*                 * ...       return value: <function deco*',
127
        '*tests*sample2.py:* call              def decorator(func):',
128
        '*tests*sample2.py:* line                  @functools.wraps(func)',
129
        '*tests*sample2.py:* line                  return wrapper',
130
        '*tests*sample2.py:* return                return wrapper',
131
        '*                 * ...       return value: <function foo *',
132
        '*tests*sample2.py:* call              def decorator(func):',
133
        '*tests*sample2.py:* line                  @functools.wraps(func)',
134
        '*tests*sample2.py:* line                  return wrapper',
135
        '*tests*sample2.py:* return                return wrapper',
136
        '*                 * ...       return value: <function foo *',
137
        '*tests*sample2.py:* call              def decorator(func):',
138
        '*tests*sample2.py:* line                  @functools.wraps(func)',
139
        '*tests*sample2.py:* line                  return wrapper',
140
        '*tests*sample2.py:* return                return wrapper',
141
        '*                 * ...       return value: <function foo *',
142
        '*tests*sample2.py:* line          foo(',
143
        "*tests*sample2.py:* line              'a*',",
144
        "*tests*sample2.py:* line              'b'",
145
        '*tests*sample2.py:* call                  @functools.wraps(func)',
146
        '*                 *    |                  def wrapper(*args):',
147
        '*tests*sample2.py:* line                      return func(*args)',
148
        '*tests*sample2.py:* call                  @functools.wraps(func)',
149
        '*                 *    |                  def wrapper(*args):',
150
        '*tests*sample2.py:* line                      return func(*args)',
151
        '*tests*sample2.py:* call                  @functools.wraps(func)',
152
        '*                 *    |                  def wrapper(*args):',
153
        '*tests*sample2.py:* line                      return func(*args)',
154
        '*tests*sample2.py:* call          @deco(1)',
155
        '*                 *    |          @deco(2)',
156
        '*                 *    |          @deco(3)',
157
        '*                 *    |          def foo(*args):',
158
        '*tests*sample2.py:* line              return args',
159
        '*tests*sample2.py:* return            return args',
160
        "*                 * ...       return value: ('a*', 'b')",
161
        "*tests*sample2.py:* return                    return func(*args)",
162
        "*                 * ...       return value: ('a*', 'b')",
163
        "*tests*sample2.py:* return                    return func(*args)",
164
        "*                 * ...       return value: ('a*', 'b')",
165
        "*tests*sample2.py:* return                    return func(*args)",
166
        "*                 * ...       return value: ('a*', 'b')",
167
        "*tests*sample2.py:* line          try:",
168
        "*tests*sample2.py:* line              None(",
169
        "*tests*sample2.py:* line                  'a',",
170
        "*tests*sample2.py:* line                  'b'",
171
        "*tests*sample2.py:* exception             'b'",
172
        "*                 * ...       exception value: *",
173
        "*tests*sample2.py:* line          except:",
174
        "*tests*sample2.py:* line              pass",
175
        "*tests*sample2.py:* return            pass",
176
        "*                   ...       return value: None",
177
    ])
178
179
180
def test_predicate_str_repr():
181
    assert repr(Q(module='a', function='b')).endswith("predicates.Query: query_eq=(('function', 'b'), ('module', 'a'))>")
182
    assert str(Q(module='a', function='b')) == "Query(function='b', module='a')"
183
184
    assert repr(Q(module='a')).endswith("predicates.Query: query_eq=(('module', 'a'),)>")
185
    assert str(Q(module='a')) == "Query(module='a')"
186
187
    assert "predicates.When: condition=<hunter." in repr(Q(module='a', action='foo'))
188
    assert "predicates.Query: query_eq=(('module', 'a'),)>, actions=('foo',)>" in repr(Q(module='a', action='foo'))
189
    assert str(Q(module='a', action='foo')) == "When(Query(module='a'), 'foo')"
190
191
    assert "predicates.Not: predicate=<hunter." in repr(~Q(module='a'))
192
    assert "predicates.Query: query_eq=(('module', 'a'),)>>" in repr(~Q(module='a'))
193
    assert str(~Q(module='a')) == "Not(Query(module='a'))"
194
195
    assert "predicates.Or: predicates=(<hunter." in repr(Q(module='a') | Q(module='b'))
196
    assert "predicates.Query: query_eq=(('module', 'a'),)>, " in repr(Q(module='a') | Q(module='b'))
197
    assert repr(Q(module='a') | Q(module='b')).endswith("predicates.Query: query_eq=(('module', 'b'),)>)>")
198
    assert str(Q(module='a') | Q(module='b')) == "Or(Query(module='a'), Query(module='b'))"
199
200
    assert "predicates.And: predicates=(<hunter." in repr(Q(module='a') & Q(module='b'))
201
    assert "predicates.Query: query_eq=(('module', 'a'),)>," in repr(Q(module='a') & Q(module='b'))
202
    assert repr(Q(module='a') & Q(module='b')).endswith("predicates.Query: query_eq=(('module', 'b'),)>)>")
203
    assert str(Q(module='a') & Q(module='b')) == "And(Query(module='a'), Query(module='b'))"
204
205
206
def test_predicate_q_nest_1():
207
    assert repr(Q(Q(module='a'))).endswith("predicates.Query: query_eq=(('module', 'a'),)>")
208
209
210
def test_predicate_q_expansion():
211
    assert Q(1, 2, module=3) == And(1, 2, Q(module=3))
212
    assert Q(1, 2, module=3, action=4) == When(And(1, 2, Q(module=3)), 4)
213
    assert Q(1, 2, module=3, actions=[4, 5]) == When(And(1, 2, Q(module=3)), 4, 5)
214
215
216
def test_predicate_and():
217
    assert And(1, 2) == And(1, 2)
218
    assert Q(module=1) & Q(module=2) == And(Q(module=1), Q(module=2))
219
    assert Q(module=1) & Q(module=2) & Q(module=3) == And(Q(module=1), Q(module=2), Q(module=3))
220
221
    assert (Q(module=1) & Q(module=2))({'module': 3}) == False
222
    assert (Q(module=1) & Q(function=2))({'module': 1, 'function': 2}) == True
223
224
    assert And(1, 2) | 3 == Or(And(1, 2), 3)
225
226
227
def test_predicate_or():
228
    assert Q(module=1) | Q(module=2) == Or(Q(module=1), Q(module=2))
229
    assert Q(module=1) | Q(module=2) | Q(module=3) == Or(Q(module=1), Q(module=2), Q(module=3))
230
231
    assert (Q(module=1) | Q(module=2))({'module': 3}) == False
232
    assert (Q(module=1) | Q(module=2))({'module': 2}) == True
233
234
    assert Or(1, 2) & 3 == And(Or(1, 2), 3)
235
236
237
def test_tracing_bare(LineMatcher):
238
    lines = StringIO()
239
    with hunter.trace(CodePrinter(stream=lines)):
240
        def a():
241
            return 1
242
243
        b = a()
244
        b = 2
245
        try:
246
            raise Exception("BOOM!")
247
        except Exception:
248
            pass
249
    print(lines.getvalue())
250
    lm = LineMatcher(lines.getvalue().splitlines())
251
    lm.fnmatch_lines([
252
        "*test_hunter.py* call              def a():",
253
        "*test_hunter.py* line                  return 1",
254
        "*test_hunter.py* return                return 1",
255
        "* ...       return value: 1",
256
    ])
257
258
259
def test_tracing_printing_failures(LineMatcher):
260
    lines = StringIO()
261
    with trace(actions=[CodePrinter(stream=lines), VarsPrinter("x", stream=lines)]):
262
        class Bad(Exception):
263
            def __repr__(self):
264
                raise RuntimeError("I'm a bad class!")
265
266
        def a():
267
            x = Bad()
268
            return x
269
270
        def b():
271
            x = Bad()
272
            raise x
273
274
        a()
275
        try:
276
            b()
277
        except Exception as exc:
278
            pass
279
    lm = LineMatcher(lines.getvalue().splitlines())
280
    lm.fnmatch_lines([
281
        """*tests*test_hunter.py:* call              class Bad(Exception):""",
282
        """*tests*test_hunter.py:* line              class Bad(Exception):""",
283
        """*tests*test_hunter.py:* line                  def __repr__(self):""",
284
        """*tests*test_hunter.py:* return                def __repr__(self):""",
285
        """* ...       return value: *""",
286
        """*tests*test_hunter.py:* call              def a():""",
287
        """*tests*test_hunter.py:* line                  x = Bad()""",
288
        """*tests*test_hunter.py:* line                  return x""",
289
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
290
        """*tests*test_hunter.py:* return                return x""",
291
        """* ...       return value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
292
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
293
        """*tests*test_hunter.py:* call              def b():""",
294
        """*tests*test_hunter.py:* line                  x = Bad()""",
295
        """*tests*test_hunter.py:* line                  raise x""",
296
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
297
        """*tests*test_hunter.py:* exception             raise x""",
298
        """* ...       exception value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
299
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
300
        """*tests*test_hunter.py:* return                raise x""",
301
        """* ...       return value: None""",
302
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
303
    ])
304
305
306
def test_tracing_vars(LineMatcher):
307
    lines = StringIO()
308
    with hunter.trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
309
        def a():
310
            b = 1
311
            b = 2
312
            return 1
313
314
        b = a()
315
        b = 2
316
        try:
317
            raise Exception("BOOM!")
318
        except Exception:
319
            pass
320
    print(lines.getvalue())
321
    lm = LineMatcher(lines.getvalue().splitlines())
322
    lm.fnmatch_lines([
323
        "*test_hunter.py* call              def a():",
324
        "*test_hunter.py* line                  b = 1",
325
        "* vars      b => 1",
326
        "*test_hunter.py* line                  b = 2",
327
        "* vars      b => 2",
328
        "*test_hunter.py* line                  return 1",
329
        "* vars      b => 2",
330
        "*test_hunter.py* return                return 1",
331
        "* ...       return value: 1",
332
    ])
333
334
335
def test_trace_merge():
336
    with hunter.trace(function="a"):
337
        with hunter.trace(function="b"):
338
            with hunter.trace(function="c"):
339
                assert sys.gettrace()._handler == When(Q(function="c"), CodePrinter)
340
            assert sys.gettrace()._handler == When(Q(function="b"), CodePrinter)
341
        assert sys.gettrace()._handler == When(Q(function="a"), CodePrinter)
342
343
344
def test_trace_api_expansion():
345
    # simple use
346
    with trace(function="foobar") as t:
347
        assert t._handler == When(Q(function="foobar"), CodePrinter)
348
349
    # "or" by expression
350
    with trace(module="foo", function="foobar") as t:
351
        assert t._handler == When(Q(module="foo", function="foobar"), CodePrinter)
352
353
    # pdb.set_trace
354
    with trace(function="foobar", action=Debugger) as t:
355
        assert str(t._handler) == str(When(Q(function="foobar"), Debugger))
356
357
    # pdb.set_trace on any hits
358
    with trace(module="foo", function="foobar", action=Debugger) as t:
359
        assert str(t._handler) == str(When(Q(module="foo", function="foobar"), Debugger))
360
361
    # pdb.set_trace when function is foobar, otherwise just print when module is foo
362
    with trace(Q(function="foobar", action=Debugger), module="foo") as t:
363
        assert str(t._handler) == str(When(And(
364
            When(Q(function="foobar"), Debugger),
365
            Q(module="foo")
366
        ), CodePrinter))
367
368
    # dumping variables from stack
369
    with trace(Q(function="foobar", action=VarsPrinter("foobar")), module="foo") as t:
370
        assert str(t._handler) == str(When(And(
371
            When(Q(function="foobar"), VarsPrinter("foobar")),
372
            Q(module="foo"),
373
        ), CodePrinter))
374
375
    with trace(Q(function="foobar", action=VarsPrinter("foobar", "mumbojumbo")), module="foo") as t:
376
        assert str(t._handler) == str(When(And(
377
            When(Q(function="foobar"), VarsPrinter("foobar", "mumbojumbo")),
378
            Q(module="foo"),
379
        ), CodePrinter))
380
381
    # multiple actions
382
    with trace(Q(function="foobar", actions=[VarsPrinter("foobar"), Debugger]), module="foo") as t:
383
        assert str(t._handler) == str(When(And(
384
            When(Q(function="foobar"), VarsPrinter("foobar"), Debugger),
385
            Q(module="foo"),
386
        ), CodePrinter))
387
388
389
def test_locals():
390
    out = StringIO()
391
    with hunter.trace(
392
        lambda event: event.locals.get("node") == "Foobar",
393
        module="test_hunter",
394
        function="foo",
395
        action=CodePrinter(stream=out)
396
    ):
397
        def foo():
398
            a = 1
399
            node = "Foobar"
400
            node += "x"
401
            a += 2
402
            return a
403
404
        foo()
405
    assert out.getvalue().endswith('node += "x"\n')
406
407
408
def test_fullsource_decorator_issue(LineMatcher):
409
    out = StringIO()
410
    with trace(kind='call', action=CodePrinter(stream=out)):
411
        foo = bar = lambda x: x
412
413
        @foo
414
        @bar
415
        def foo():
416
            return 1
417
418
        foo()
419
420
    lm = LineMatcher(out.getvalue().splitlines())
421
    lm.fnmatch_lines([
422
        '* call              @foo',
423
        '*    |              @bar',
424
        '*    |              def foo():',
425
    ])
426
427
428
def test_source(LineMatcher):
429
    calls = []
430
    with trace(action=lambda event: calls.append(event.source)):
431
        foo = bar = lambda x: x
432
433
        @foo
434
        @bar
435
        def foo():
436
            return 1
437
438
        foo()
439
440
    lm = LineMatcher(calls)
441
    lm.fnmatch_lines([
442
        '        foo = bar = lambda x: x\n',
443
        '        @foo\n',
444
        '            return 1\n',
445
    ])
446
447
def test_fullsource(LineMatcher):
448
    calls = []
449
    with trace(action=lambda event: calls.append(event.fullsource)):
450
        foo = bar = lambda x: x
451
452
        @foo
453
        @bar
454
        def foo():
455
            return 1
456
457
        foo()
458
459
    lm = LineMatcher(calls)
460
    lm.fnmatch_lines([
461
        '        foo = bar = lambda x: x\n',
462
        '        @foo\n        @bar\n        def foo():\n',
463
        '            return 1\n',
464
    ])
465
466
467
def test_debugger(LineMatcher):
468
    out = StringIO()
469
    calls = []
470
471
    class FakePDB:
472
        def __init__(self, foobar=1):
473
            calls.append(foobar)
474
475
        def set_trace(self, frame):
476
            calls.append(frame.f_code.co_name)
477
478
    with hunter.trace(
479
        lambda event: event.locals.get("node") == "Foobar",
480
        module="test_hunter",
481
        function="foo",
482
        actions=[VarsPrinter("a", "node", "foo", "test_debugger", globals=True, stream=out),
483
                 Debugger(klass=FakePDB, foobar=2)]
484
    ):
485
        def foo():
486
            a = 1
487
            node = "Foobar"
488
            node += "x"
489
            a += 2
490
            return a
491
492
        foo()
493
    print(out.getvalue())
494
    assert calls == [2, 'foo']
495
    lm = LineMatcher(out.getvalue().splitlines())
496
    lm.fnmatch_lines_random([
497
        "*      test_debugger => <function test_debugger at *",
498
        "*      node => 'Foobar'",
499
        "*      a => 1",
500
    ])
501
502
503
def test_custom_action():
504
    calls = []
505
506
    with trace(action=lambda event: calls.append(event.function), kind="return"):
507
        def foo():
508
            return 1
509
510
        foo()
511
    assert 'foo' in calls
512
513
514
def test_trace_with_class_actions():
515
    with trace(CodePrinter):
516
        def a():
517
            pass
518
519
        a()
520
521
522
def test_predicate_no_inf_recursion():
523
    assert Or(And(1)) == 1
524
    assert Or(Or(1)) == 1
525
    assert And(Or(1)) == 1
526
    assert And(And(1)) == 1
527
    predicate = Q(Q(lambda ev: 1, module='wat'))
528
    print('predicate:', predicate)
529
    predicate({'module': 'foo'})
530
531
532
def test_predicate_compression():
533
    assert Or(Or(1, 2), And(3)) == Or(1, 2, 3)
534
    assert Or(Or(1, 2), 3) == Or(1, 2, 3)
535
    assert Or(1, Or(2, 3), 4) == Or(1, 2, 3, 4)
536
    assert And(1, 2, Or(3, 4)).predicates == (1, 2, Or(3, 4))
537
538
    assert repr(Or(Or(1, 2), And(3))) == repr(Or(1, 2, 3))
539
    assert repr(Or(Or(1, 2), 3)) == repr(Or(1, 2, 3))
540
    assert repr(Or(1, Or(2, 3), 4)) == repr(Or(1, 2, 3, 4))
541
542
543
def test_predicate_not():
544
    assert Not(1).predicate == 1
545
    assert ~Or(1, 2) == Not(Or(1, 2))
546
    assert ~And(1, 2) == Not(And(1, 2))
547
548
    assert ~Not(1) == 1
549
550
    assert ~Query(module=1) | ~Query(module=2) == Not(And(Query(module=1), Query(module=2)))
551
    assert ~Query(module=1) & ~Query(module=2) == Not(Or(Query(module=1), Query(module=2)))
552
553
    assert ~Query(module=1) | Query(module=2) == Or(Not(Query(module=1)), Query(module=2))
554
    assert ~Query(module=1) & Query(module=2) == And(Not(Query(module=1)), Query(module=2))
555
556
    assert ~(Query(module=1) & Query(module=2)) == Not(And(Query(module=1), Query(module=2)))
557
    assert ~(Query(module=1) | Query(module=2)) == Not(Or(Query(module=1), Query(module=2)))
558
559
    assert repr(~Or(1, 2)) == repr(Not(Or(1, 2)))
560
    assert repr(~And(1, 2)) == repr(Not(And(1, 2)))
561
562
    assert repr(~Query(module=1) | ~Query(module=2)) == repr(Not(And(Query(module=1), Query(module=2))))
563
    assert repr(~Query(module=1) & ~Query(module=2)) == repr(Not(Or(Query(module=1), Query(module=2))))
564
565
    assert repr(~(Query(module=1) & Query(module=2))) == repr(Not(And(Query(module=1), Query(module=2))))
566
    assert repr(~(Query(module=1) | Query(module=2))) == repr(Not(Or(Query(module=1), Query(module=2))))
567
568
    assert Not(Q(module=1))({'module': 1}) == False
569
570
571
def test_predicate_query_allowed():
572
    pytest.raises(TypeError, Query, 1)
573
    pytest.raises(TypeError, Query, a=1)
574
575
576
def test_predicate_when_allowed():
577
    pytest.raises(TypeError, When, 1)
578
579
580
@pytest.mark.parametrize('expr,inp,expected', [
581
    ({'module': "abc"}, {'module': "abc"}, True),
582
    ({'module': "abcd"}, {'module': "abc"}, False),
583
    ({'module': "abcd"}, {'module': "abce"}, False),
584
    ({'module_startswith': "abc"}, {'module': "abcd"}, True),
585
    ({'module__startswith': "abc"}, {'module': "abcd"}, True),
586
    ({'module_contains': "bc"}, {'module': "abcd"}, True),
587
    ({'module_contains': "bcde"}, {'module': "abcd"}, False),
588
589
    ({'module_endswith': "abc"}, {'module': "abcd"}, False),
590
    ({'module__endswith': "bcd"}, {'module': "abcd"}, True),
591
592
    ({'module_in': "abcd"}, {'module': "bc"}, True),
593
    ({'module': "abcd"}, {'module': "bc"}, False),
594
    ({'module': ["abcd"]}, {'module': "bc"}, False),
595
    ({'module_in': ["abcd"]}, {'module': "bc"}, False),
596
    ({'module_in': ["a", "bc", "d"]}, {'module': "bc"}, True),
597
598
    ({'module': "abcd"}, {'module': "abc"}, False),
599
600
    ({'module_startswith': ("abc", "xyz")}, {'module': "abc"}, True),
601
    ({'module_startswith': {"abc", "xyz"}}, {'module': "abc"}, True),
602
    ({'module_startswith': ["abc", "xyz"]}, {'module': "abc"}, True),
603
    ({'module_startswith': ("abc", "xyz")}, {'module': "abcd"}, True),
604
    ({'module_startswith': ("abc", "xyz")}, {'module': "xyzw"}, True),
605
    ({'module_startswith': ("abc", "xyz")}, {'module': "fooabc"}, False),
606
607
    ({'module_endswith': ("abc", "xyz")}, {'module': "abc"}, True),
608
    ({'module_endswith': {"abc", "xyz"}}, {'module': "abc"}, True),
609
    ({'module_endswith': ["abc", "xyz"]}, {'module': "abc"}, True),
610
    ({'module_endswith': ("abc", "xyz")}, {'module': "1abc"}, True),
611
    ({'module_endswith': ("abc", "xyz")}, {'module': "1xyz"}, True),
612
    ({'module_endswith': ("abc", "xyz")}, {'module': "abcfoo"}, False),
613
614
    ({'module': "abc"}, {'module': 1}, False),
615
616
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "regex"}, False),
617
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re.gex"}, True),
618
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "sregex"}, True),
619
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re"}, True),
620
])
621
def test_predicate_matching(expr, inp, expected):
622
    assert Query(**expr)(inp) == expected
623
624
625
@pytest.mark.parametrize('exc_type,expr', [
626
    (TypeError, {'module_1': 1}),
627
    (TypeError, {'module1': 1}),
628
    (ValueError, {'module_startswith': 1}),
629
    (ValueError, {'module_startswith': {1: 2}}),
630
    (ValueError, {'module_endswith': 1}),
631
    (ValueError, {'module_endswith': {1: 2}}),
632
    (TypeError, {'module_foo': 1}),
633
    (TypeError, {'module_a_b': 1}),
634
])
635
def test_predicate_bad_query(expr, exc_type):
636
    pytest.raises(exc_type, Query, **expr)
637
638
639
def test_predicate_when():
640
    called = []
641
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 2}) == False
642
    assert called == []
643
644
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 1}) == True
645
    assert called == [{'module': 1}]
646
647
    called = []
648
    assert Q(module=1, action=lambda ev: called.append(ev))({'module': 1}) == True
649
    assert called == [{'module': 1}]
650
651
    called = [[], []]
652
    predicate = (
653
        Q(module=1, action=lambda ev: called[0].append(ev)) |
654
        Q(module=2, action=lambda ev: called[1].append(ev))
655
    )
656
    assert predicate({'module': 1}) == True
657
    assert called == [[{'module': 1}], []]
658
659
    assert predicate({'module': 2}) == True
660
    assert called == [[{'module': 1}], [{'module': 2}]]
661
662
    called = [[], []]
663
    predicate = (
664
        Q(module=1, action=lambda ev: called[0].append(ev)) &
665
        Q(function=2, action=lambda ev: called[1].append(ev))
666
    )
667
    assert predicate({'module': 2}) == False
668
    assert called == [[], []]
669
670
    assert predicate({'module': 1, 'function': 2}) == True
671
    assert called == [[{'module': 1, 'function': 2}], [{'module': 1, 'function': 2}]]
672
673
674
def test_and_or_kwargs():
675
    assert And(module=1, function=2) == Query(module=1, function=2)
676
    assert Or(module=1, function=2) == Or(Query(module=1), Query(function=2))
677
678
    assert And(3, module=1, function=2) == And(3, Query(module=1, function=2))
679
    assert Or(3, module=1, function=2) == Or(3, Query(module=1), Query(function=2))
680
681
682
def test_proper_backend():
683
    if os.environ.get('PUREPYTHONHUNTER') or platform.python_implementation() == 'PyPy':
684
        assert 'hunter.tracer.Tracer' in repr(hunter.Tracer)
685
    else:
686
        assert 'hunter._tracer.Tracer' in repr(hunter.Tracer)
687
688
689
@pytest.fixture(scope="session", params=['pure', 'cython'])
690
def tracer_impl(request):
691
    if request.param == 'pure':
692
        return pytest.importorskip('hunter.tracer').Tracer
693
    elif request.param == 'cython':
694
        return pytest.importorskip('hunter._tracer').Tracer
695
696
697
def _tokenize():
698
    with open(tokenize.__file__, 'rb') as fh:
699
        toks = []
700
        try:
701
            for tok in tokenize.tokenize(fh.readline):
702
                toks.append(tok)
703
        except tokenize.TokenError as exc:
704
            toks.append(exc)
705
706
707
def test_perf_filter(tracer_impl, benchmark):
708
    t = tracer_impl()
709
710
    @benchmark
711
    def run():
712
        output = StringIO()
713
        with t.trace(Q(
714
            Q(module="does-not-exist") | Q(module="does not exist".split()),
715
            action=CodePrinter(stream=output)
716
        )):
717
            _tokenize()
718
        return output
719
720
    assert run.getvalue() == ''
721
722
723
def test_perf_stdlib(tracer_impl, benchmark):
724
    t = tracer_impl()
725
726
    @benchmark
727
    def run():
728
        output = StringIO()
729
        with t.trace(Q(
730
            ~Q(module_contains='pytest'),
731
            ~Q(module_contains='hunter'),
732
            ~Q(filename=''),
733
            stdlib=False,
734
            action=CodePrinter(stream=output)
735
        )):
736
            _tokenize()
737
        return output
738
739
    assert run.getvalue() == ''
740
741
742
def test_perf_actions(tracer_impl, benchmark):
743
    t = tracer_impl()
744
745
    @benchmark
746
    def run():
747
        output = StringIO()
748
        with t.trace(Q(
749
            ~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind="call"),
750
            actions=[
751
                CodePrinter(
752
                    stream=output
753
                ),
754
                VarsPrinter(
755
                    'line',
756
                    globals=True,
757
                    stream=output
758
                )
759
            ]
760
        )):
761
            _tokenize()
762