Completed
Push — master ( 36843e...4a5ad7 )
by Ionel Cristian
01:10
created

tests.FakePDB   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 6
Duplicated Lines 0 %
Metric Value
wmc 2
dl 0
loc 6
rs 10
1
from __future__ import print_function
2
3
import inspect
4
import os
5
import platform
6
import subprocess
7
import sys
8
import tokenize
9
10
try:
11
    from cStringIO import StringIO
12
except ImportError:
13
    from io import StringIO
14
try:
15
    from itertools import izip_longest
16
except ImportError:
17
    from itertools import zip_longest as izip_longest
18
19
import pytest
20
21
import hunter
22
from hunter import And
23
from hunter import Not
24
from hunter import Or
25
from hunter import Q
26
from hunter import Query
27
from hunter import trace
28
from hunter import When
29
30
from hunter import CodePrinter
31
from hunter import Debugger
32
from hunter import VarsPrinter
33
34
pytest_plugins = 'pytester',
35
36
37
# from hunter import stop
38
# @pytest.yield_fixture(autouse=True, scope="function")
39
# def auto_stop():
40
#     try:
41
#         yield
42
#     finally:
43
#         stop()
44
45
46
def _get_func_spec(func):
47
    spec = inspect.getargspec(func)
48
    return inspect.formatargspec(spec.args, spec.varargs)
49
50
51
def test_pth_activation():
52
    module_name = os.path.__name__
53
    expected_module = "{0}.py".format(module_name)
54
    hunter_env = "module={!r},function=\"join\"".format(module_name)
55
    func_spec = _get_func_spec(os.path.join)
56
    expected_call = "call      def join{0}:".format(func_spec)
57
58
    output = subprocess.check_output(
59
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample.py')],
60
        env=dict(os.environ, PYTHONHUNTER=hunter_env),
61
        stderr=subprocess.STDOUT,
62
    )
63
    assert expected_module.encode() in output
64
    assert expected_call.encode() in output
65
66
67
def test_pth_sample4():
68
    env = dict(os.environ, PYTHONHUNTER="CodePrinter")
69
    env.pop('COVERAGE_PROCESS_START', None)
70
    env.pop('COV_CORE_SOURCE', None)
71
    output = subprocess.check_output(
72
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample4.py')],
73
        env=env,
74
        stderr=subprocess.STDOUT,
75
    )
76
    assert output
77
78
79
def test_pth_sample2(LineMatcher):
80
    env = dict(os.environ, PYTHONHUNTER="module='__main__'")
81
    env.pop('COVERAGE_PROCESS_START', None)
82
    env.pop('COV_CORE_SOURCE', None)
83
    output = subprocess.check_output(
84
        [sys.executable, os.path.join(os.path.dirname(__file__), 'sample2.py')],
85
        env=env,
86
        stderr=subprocess.STDOUT,
87
    )
88
    lm = LineMatcher(output.decode('utf-8').splitlines())
89
    lm.fnmatch_lines([
90
        '*tests*sample2.py:* call      if __name__ == "__main__":  #*',
91
        '*tests*sample2.py:* line      if __name__ == "__main__":  #*',
92
        '*tests*sample2.py:* line          import functools',
93
        '*tests*sample2.py:* line          def deco(opt):',
94
        '*tests*sample2.py:* line          @deco(1)',
95
        '*tests*sample2.py:* call          def deco(opt):',
96
        '*tests*sample2.py:* line              def decorator(func):',
97
        '*tests*sample2.py:* line              return decorator',
98
        '*tests*sample2.py:* return            return decorator',
99
        '*                 * ...       return value: <function deco*',
100
        '*tests*sample2.py:* line          @deco(2)',
101
        '*tests*sample2.py:* call          def deco(opt):',
102
        '*tests*sample2.py:* line              def decorator(func):',
103
        '*tests*sample2.py:* line              return decorator',
104
        '*tests*sample2.py:* return            return decorator',
105
        '*                 * ...       return value: <function deco*',
106
        '*tests*sample2.py:* line          @deco(3)',
107
        '*tests*sample2.py:* call          def deco(opt):',
108
        '*tests*sample2.py:* line              def decorator(func):',
109
        '*tests*sample2.py:* line              return decorator',
110
        '*tests*sample2.py:* return            return decorator',
111
        '*                 * ...       return value: <function deco*',
112
        '*tests*sample2.py:* call              def decorator(func):',
113
        '*tests*sample2.py:* line                  @functools.wraps(func)',
114
        '*tests*sample2.py:* line                  return wrapper',
115
        '*tests*sample2.py:* return                return wrapper',
116
        '*                 * ...       return value: <function foo *',
117
        '*tests*sample2.py:* call              def decorator(func):',
118
        '*tests*sample2.py:* line                  @functools.wraps(func)',
119
        '*tests*sample2.py:* line                  return wrapper',
120
        '*tests*sample2.py:* return                return wrapper',
121
        '*                 * ...       return value: <function foo *',
122
        '*tests*sample2.py:* call              def decorator(func):',
123
        '*tests*sample2.py:* line                  @functools.wraps(func)',
124
        '*tests*sample2.py:* line                  return wrapper',
125
        '*tests*sample2.py:* return                return wrapper',
126
        '*                 * ...       return value: <function foo *',
127
        '*tests*sample2.py:* line          foo(',
128
        "*tests*sample2.py:* line              'a*',",
129
        "*tests*sample2.py:* line              'b'",
130
        '*tests*sample2.py:* call                  @functools.wraps(func)',
131
        '*                 *    |                  def wrapper(*args):',
132
        '*tests*sample2.py:* line                      return func(*args)',
133
        '*tests*sample2.py:* call                  @functools.wraps(func)',
134
        '*                 *    |                  def wrapper(*args):',
135
        '*tests*sample2.py:* line                      return func(*args)',
136
        '*tests*sample2.py:* call                  @functools.wraps(func)',
137
        '*                 *    |                  def wrapper(*args):',
138
        '*tests*sample2.py:* line                      return func(*args)',
139
        '*tests*sample2.py:* call          @deco(1)',
140
        '*                 *    |          @deco(2)',
141
        '*                 *    |          @deco(3)',
142
        '*                 *    |          def foo(*args):',
143
        '*tests*sample2.py:* line              return args',
144
        '*tests*sample2.py:* return            return args',
145
        "*                 * ...       return value: ('a*', 'b')",
146
        "*tests*sample2.py:* return                    return func(*args)",
147
        "*                 * ...       return value: ('a*', 'b')",
148
        "*tests*sample2.py:* return                    return func(*args)",
149
        "*                 * ...       return value: ('a*', 'b')",
150
        "*tests*sample2.py:* return                    return func(*args)",
151
        "*                 * ...       return value: ('a*', 'b')",
152
        "*tests*sample2.py:* line          try:",
153
        "*tests*sample2.py:* line              None(",
154
        "*tests*sample2.py:* line                  'a',",
155
        "*tests*sample2.py:* line                  'b'",
156
        "*tests*sample2.py:* exception             'b'",
157
        "*                 * ...       exception value: *",
158
        "*tests*sample2.py:* line          except:",
159
        "*tests*sample2.py:* line              pass",
160
        "*tests*sample2.py:* return            pass",
161
        "*                   ...       return value: None",
162
    ])
163
164
165
def test_predicate_str_repr():
166
    assert repr(Q(module='a')).endswith("predicates.Query: query_eq={'module': 'a'}>")
167
    assert str(Q(module='a')) == "Query(module='a')"
168
169
    assert "predicates.When: condition=<hunter." in repr(Q(module='a', action='foo'))
170
    assert "predicates.Query: query_eq={'module': 'a'}>, actions=['foo']>" in repr(Q(module='a', action='foo'))
171
    assert str(Q(module='a', action='foo')) == "When(Query(module='a'), 'foo')"
172
173
    assert "predicates.Not: predicate=<hunter." in repr(~Q(module='a'))
174
    assert "predicates.Query: query_eq={'module': 'a'}>>" in repr(~Q(module='a'))
175
    assert str(~Q(module='a')) == "Not(Query(module='a'))"
176
177
    assert "predicates.Or: predicates=(<hunter." in repr(Q(module='a') | Q(module='b'))
178
    assert "predicates.Query: query_eq={'module': 'a'}>, " in repr(Q(module='a') | Q(module='b'))
179
    assert repr(Q(module='a') | Q(module='b')).endswith("predicates.Query: query_eq={'module': 'b'}>)>")
180
    assert str(Q(module='a') | Q(module='b')) == "Or(Query(module='a'), Query(module='b'))"
181
182
    assert "predicates.And: predicates=(<hunter." in repr(Q(module='a') & Q(module='b'))
183
    assert "predicates.Query: query_eq={'module': 'a'}>," in repr(Q(module='a') & Q(module='b'))
184
    assert repr(Q(module='a') & Q(module='b')).endswith("predicates.Query: query_eq={'module': 'b'}>)>")
185
    assert str(Q(module='a') & Q(module='b')) == "And(Query(module='a'), Query(module='b'))"
186
187
188
def test_predicate_q_nest_1():
189
    assert repr(Q(Q(module='a'))).endswith("predicates.Query: query_eq={'module': 'a'}>")
190
191
192
def test_predicate_q_expansion():
193
    assert Q(1, 2, module=3) == And(1, 2, Q(module=3))
194
    assert Q(1, 2, module=3, action=4) == When(And(1, 2, Q(module=3)), 4)
195
    assert Q(1, 2, module=3, actions=[4, 5]) == When(And(1, 2, Q(module=3)), 4, 5)
196
197
198
def test_predicate_and():
199
    assert And(1, 2) == And(1, 2)
200
    assert Q(module=1) & Q(module=2) == And(Q(module=1), Q(module=2))
201
    assert Q(module=1) & Q(module=2) & Q(module=3) == And(Q(module=1), Q(module=2), Q(module=3))
202
203
    assert (Q(module=1) & Q(module=2))({'module': 3}) == False
204
    assert (Q(module=1) & Q(function=2))({'module': 1, 'function': 2}) == True
205
206
    assert And(1, 2) | 3 == Or(And(1, 2), 3)
207
208
209
def test_predicate_or():
210
    assert Q(module=1) | Q(module=2) == Or(Q(module=1), Q(module=2))
211
    assert Q(module=1) | Q(module=2) | Q(module=3) == Or(Q(module=1), Q(module=2), Q(module=3))
212
213
    assert (Q(module=1) | Q(module=2))({'module': 3}) == False
214
    assert (Q(module=1) | Q(module=2))({'module': 2}) == True
215
216
    assert Or(1, 2) & 3 == And(Or(1, 2), 3)
217
218
219
def test_tracing_bare(LineMatcher):
220
    lines = StringIO()
221
    with trace(CodePrinter(stream=lines)):
222
        def a():
223
            return 1
224
225
        b = a()
226
        b = 2
227
        try:
228
            raise Exception("BOOM!")
229
        except Exception:
230
            pass
231
    print(lines.getvalue())
232
    lm = LineMatcher(lines.getvalue().splitlines())
233
    lm.fnmatch_lines([
234
        "* ...       return value: <hunter.*tracer.Tracer *",
235
        "*test_hunter.py* call              def a():",
236
        "*test_hunter.py* line                  return 1",
237
        "*test_hunter.py* return                return 1",
238
        "* ...       return value: 1",
239
    ])
240
241
242
def test_tracing_printing_failures(LineMatcher):
243
    lines = StringIO()
244
    with trace(actions=[CodePrinter(stream=lines), VarsPrinter("x", stream=lines)]):
245
        class Bad(Exception):
246
            def __repr__(self):
247
                raise RuntimeError("I'm a bad class!")
248
249
        def a():
250
            x = Bad()
251
            return x
252
253
        def b():
254
            x = Bad()
255
            raise x
256
257
        a()
258
        try:
259
            b()
260
        except Exception as exc:
261
            pass
262
    lm = LineMatcher(lines.getvalue().splitlines())
263
    lm.fnmatch_lines([
264
        """* ...       return value: <hunter.*tracer.Tracer *""",
265
        """*tests*test_hunter.py:* call              class Bad(Exception):""",
266
        """*tests*test_hunter.py:* line              class Bad(Exception):""",
267
        """*tests*test_hunter.py:* line                  def __repr__(self):""",
268
        """*tests*test_hunter.py:* return                def __repr__(self):""",
269
        """* ...       return value: *""",
270
        """*tests*test_hunter.py:* call              def a():""",
271
        """*tests*test_hunter.py:* line                  x = Bad()""",
272
        """*tests*test_hunter.py:* line                  return x""",
273
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
274
        """*tests*test_hunter.py:* return                return x""",
275
        """* ...       return value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
276
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
277
        """*tests*test_hunter.py:* call              def b():""",
278
        """*tests*test_hunter.py:* line                  x = Bad()""",
279
        """*tests*test_hunter.py:* line                  raise x""",
280
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
281
        """*tests*test_hunter.py:* exception             raise x""",
282
        """* ...       exception value: !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
283
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
284
        """*tests*test_hunter.py:* return                raise x""",
285
        """* ...       return value: None""",
286
        """* vars      x => !!! FAILED REPR: RuntimeError("I'm a bad class!",)""",
287
    ])
288
289
290
def test_tracing_vars(LineMatcher):
291
    lines = StringIO()
292
    with trace(actions=[VarsPrinter('b', stream=lines), CodePrinter(stream=lines)]):
293
        def a():
294
            b = 1
295
            b = 2
296
            return 1
297
298
        b = a()
299
        b = 2
300
        try:
301
            raise Exception("BOOM!")
302
        except Exception:
303
            pass
304
    print(lines.getvalue())
305
    lm = LineMatcher(lines.getvalue().splitlines())
306
    lm.fnmatch_lines([
307
        "* ...       return value: <hunter.*tracer.Tracer *",
308
        "*test_hunter.py* call              def a():",
309
        "*test_hunter.py* line                  b = 1",
310
        "* vars      b => 1",
311
        "*test_hunter.py* line                  b = 2",
312
        "* vars      b => 2",
313
        "*test_hunter.py* line                  return 1",
314
        "* vars      b => 2",
315
        "*test_hunter.py* return                return 1",
316
        "* ...       return value: 1",
317
    ])
318
319
320
def test_trace_merge():
321
    with trace(function="a"):
322
        with trace(function="b"):
323
            with trace(function="c"):
324
                assert sys.gettrace()._handler == When(Q(function="c"), CodePrinter)
325
            assert sys.gettrace()._handler == When(Q(function="b"), CodePrinter)
326
        assert sys.gettrace()._handler == When(Q(function="a"), CodePrinter)
327
328
329
def test_trace_api_expansion():
330
    # simple use
331
    with trace(function="foobar") as t:
332
        assert t._handler == When(Q(function="foobar"), CodePrinter)
333
334
    # "or" by expression
335
    with trace(module="foo", function="foobar") as t:
336
        assert t._handler == When(Q(module="foo", function="foobar"), CodePrinter)
337
338
    # pdb.set_trace
339
    with trace(function="foobar", action=Debugger) as t:
340
        assert t._handler == When(Q(function="foobar"), Debugger)
341
342
    # pdb.set_trace on any hits
343
    with trace(module="foo", function="foobar", action=Debugger) as t:
344
        assert t._handler == When(Q(module="foo", function="foobar"), Debugger)
345
346
    # pdb.set_trace when function is foobar, otherwise just print when module is foo
347
    with trace(Q(function="foobar", action=Debugger), module="foo") as t:
348
        assert t._handler == When(And(
349
            When(Q(function="foobar"), Debugger),
350
            Q(module="foo")
351
        ), CodePrinter)
352
353
    # dumping variables from stack
354
    with trace(Q(function="foobar", action=VarsPrinter("foobar")), module="foo") as t:
355
        assert t._handler == When(And(
356
            When(Q(function="foobar"), VarsPrinter("foobar")),
357
            Q(module="foo"),
358
        ), CodePrinter)
359
360
    with trace(Q(function="foobar", action=VarsPrinter("foobar", "mumbojumbo")), module="foo") as t:
361
        assert t._handler == When(And(
362
            When(Q(function="foobar"), VarsPrinter("foobar", "mumbojumbo")),
363
            Q(module="foo"),
364
        ), CodePrinter)
365
366
    # multiple actions
367
    with trace(Q(function="foobar", actions=[VarsPrinter("foobar"), Debugger]), module="foo") as t:
368
        assert t._handler == When(And(
369
            When(Q(function="foobar"), VarsPrinter("foobar"), Debugger),
370
            Q(module="foo"),
371
        ), CodePrinter)
372
373
374
def test_locals():
375
    out = StringIO()
376
    with trace(
377
        lambda event: event.locals.get("node") == "Foobar",
378
        module="test_hunter",
379
        function="foo",
380
        action=CodePrinter(stream=out)
381
    ):
382
        def foo():
383
            a = 1
384
            node = "Foobar"
385
            node += "x"
386
            a += 2
387
            return a
388
389
        foo()
390
    assert out.getvalue().endswith('node += "x"\n')
391
392
393
def test_debugger(LineMatcher):
394
    out = StringIO()
395
    calls = []
396
397
    class FakePDB:
398
        def __init__(self, foobar=1):
399
            calls.append(foobar)
400
401
        def set_trace(self, frame):
402
            calls.append(frame.f_code.co_name)
403
404
    with trace(
405
        lambda event: event.locals.get("node") == "Foobar",
406
        module="test_hunter",
407
        function="foo",
408
        actions=[VarsPrinter("a", "node", "foo", "test_debugger", globals=True, stream=out),
409
                 Debugger(klass=FakePDB, foobar=2)]
410
    ):
411
        def foo():
412
            a = 1
413
            node = "Foobar"
414
            node += "x"
415
            a += 2
416
            return a
417
418
        foo()
419
    print(out.getvalue())
420
    assert calls == [2, 'foo']
421
    lm = LineMatcher(out.getvalue().splitlines())
422
    lm.fnmatch_lines_random([
423
        "*      test_debugger => <function test_debugger at *",
424
        "*      node => 'Foobar'",
425
        "*      a => 1",
426
    ])
427
428
429
def test_custom_action():
430
    calls = []
431
432
    with trace(action=lambda event: calls.append(123), kind="return"):
433
        def foo():
434
            return 1
435
436
        foo()
437
    assert calls == [123, 123, 123]
438
439
440
def test_trace_with_class_actions():
441
    with trace(CodePrinter):
442
        def a():
443
            pass
444
445
        a()
446
447
448
def test_predicate_no_inf_recursion():
449
    assert Or(And(1)) == 1
450
    assert Or(Or(1)) == 1
451
    assert And(Or(1)) == 1
452
    assert And(And(1)) == 1
453
    predicate = Q(Q(lambda ev: 1, module='wat'))
454
    print('predicate:', predicate)
455
    predicate({'module': 'foo'})
456
457
458
def test_predicate_compression():
459
    assert Or(Or(1, 2), And(3)) == Or(1, 2, 3)
460
    assert Or(Or(1, 2), 3) == Or(1, 2, 3)
461
    assert Or(1, Or(2, 3), 4) == Or(1, 2, 3, 4)
462
    assert And(1, 2, Or(3, 4)).predicates == (1, 2, Or(3, 4))
463
464
    assert repr(Or(Or(1, 2), And(3))) == repr(Or(1, 2, 3))
465
    assert repr(Or(Or(1, 2), 3)) == repr(Or(1, 2, 3))
466
    assert repr(Or(1, Or(2, 3), 4)) == repr(Or(1, 2, 3, 4))
467
468
469
def test_predicate_not():
470
    assert Not(1).predicate == 1
471
    assert ~Or(1, 2) == Not(Or(1, 2))
472
    assert ~And(1, 2) == Not(And(1, 2))
473
474
    assert ~Not(1) == 1
475
476
    assert ~Query(module=1) | ~Query(module=2) == Not(And(Query(module=1), Query(module=2)))
477
    assert ~Query(module=1) & ~Query(module=2) == Not(Or(Query(module=1), Query(module=2)))
478
479
    assert ~Query(module=1) | Query(module=2) == Or(Not(Query(module=1)), Query(module=2))
480
    assert ~Query(module=1) & Query(module=2) == And(Not(Query(module=1)), Query(module=2))
481
482
    assert ~(Query(module=1) & Query(module=2)) == Not(And(Query(module=1), Query(module=2)))
483
    assert ~(Query(module=1) | Query(module=2)) == Not(Or(Query(module=1), Query(module=2)))
484
485
    assert repr(~Or(1, 2)) == repr(Not(Or(1, 2)))
486
    assert repr(~And(1, 2)) == repr(Not(And(1, 2)))
487
488
    assert repr(~Query(module=1) | ~Query(module=2)) == repr(Not(And(Query(module=1), Query(module=2))))
489
    assert repr(~Query(module=1) & ~Query(module=2)) == repr(Not(Or(Query(module=1), Query(module=2))))
490
491
    assert repr(~(Query(module=1) & Query(module=2))) == repr(Not(And(Query(module=1), Query(module=2))))
492
    assert repr(~(Query(module=1) | Query(module=2))) == repr(Not(Or(Query(module=1), Query(module=2))))
493
494
    assert Not(Q(module=1))({'module': 1}) == False
495
496
497
def test_predicate_query_allowed():
498
    pytest.raises(TypeError, Query, 1)
499
    pytest.raises(TypeError, Query, a=1)
500
501
502
def test_predicate_when_allowed():
503
    pytest.raises(TypeError, When, 1)
504
505
506
@pytest.mark.parametrize('expr,inp,expected', [
507
    ({'module': "abc"}, {'module': "abc"}, True),
508
    ({'module': "abcd"}, {'module': "abc"}, False),
509
    ({'module': "abcd"}, {'module': "abce"}, False),
510
    ({'module_startswith': "abc"}, {'module': "abcd"}, True),
511
    ({'module__startswith': "abc"}, {'module': "abcd"}, True),
512
    ({'module_contains': "bc"}, {'module': "abcd"}, True),
513
    ({'module_contains': "bcde"}, {'module': "abcd"}, False),
514
515
    ({'module_endswith': "abc"}, {'module': "abcd"}, False),
516
    ({'module__endswith': "bcd"}, {'module': "abcd"}, True),
517
518
    ({'module_in': "abcd"}, {'module': "bc"}, True),
519
    ({'module': "abcd"}, {'module': "bc"}, False),
520
    ({'module': ["abcd"]}, {'module': "bc"}, False),
521
    ({'module_in': ["abcd"]}, {'module': "bc"}, False),
522
    ({'module_in': ["a", "bc", "d"]}, {'module': "bc"}, True),
523
524
    ({'module': "abcd"}, {'module': "abc"}, False),
525
526
    ({'module_startswith': ("abc", "xyz")}, {'module': "abc"}, True),
527
    ({'module_startswith': {"abc", "xyz"}}, {'module': "abc"}, True),
528
    ({'module_startswith': ["abc", "xyz"]}, {'module': "abc"}, True),
529
    ({'module_startswith': ("abc", "xyz")}, {'module': "abcd"}, True),
530
    ({'module_startswith': ("abc", "xyz")}, {'module': "xyzw"}, True),
531
    ({'module_startswith': ("abc", "xyz")}, {'module': "fooabc"}, False),
532
533
    ({'module_endswith': ("abc", "xyz")}, {'module': "abc"}, True),
534
    ({'module_endswith': {"abc", "xyz"}}, {'module': "abc"}, True),
535
    ({'module_endswith': ["abc", "xyz"]}, {'module': "abc"}, True),
536
    ({'module_endswith': ("abc", "xyz")}, {'module': "1abc"}, True),
537
    ({'module_endswith': ("abc", "xyz")}, {'module': "1xyz"}, True),
538
    ({'module_endswith': ("abc", "xyz")}, {'module': "abcfoo"}, False),
539
540
    ({'module': "abc"}, {'module': 1}, False),
541
542
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "regex"}, False),
543
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re.gex"}, True),
544
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "sregex"}, True),
545
    ({'module_regex': r"(re|sre.*)\b"}, {'module': "re"}, True),
546
])
547
def test_predicate_matching(expr, inp, expected):
548
    assert Query(**expr)(inp) == expected
549
550
551
@pytest.mark.parametrize('exc_type,expr', [
552
    (TypeError, {'module_1': 1}),
553
    (TypeError, {'module1': 1}),
554
    (ValueError, {'module_startswith': 1}),
555
    (ValueError, {'module_startswith': {1: 2}}),
556
    (ValueError, {'module_endswith': 1}),
557
    (ValueError, {'module_endswith': {1: 2}}),
558
    (TypeError, {'module_foo': 1}),
559
    (TypeError, {'module_a_b': 1}),
560
])
561
def test_predicate_bad_query(expr, exc_type):
562
    pytest.raises(exc_type, Query, **expr)
563
564
565
def test_predicate_when():
566
    called = []
567
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 2}) == False
568
    assert called == []
569
570
    assert When(Q(module=1), lambda ev: called.append(ev))({'module': 1}) == True
571
    assert called == [{'module': 1}]
572
573
    called = []
574
    assert Q(module=1, action=lambda ev: called.append(ev))({'module': 1}) == True
575
    assert called == [{'module': 1}]
576
577
    called = [[], []]
578
    predicate = (
579
        Q(module=1, action=lambda ev: called[0].append(ev)) |
580
        Q(module=2, action=lambda ev: called[1].append(ev))
581
    )
582
    assert predicate({'module': 1}) == True
583
    assert called == [[{'module': 1}], []]
584
585
    assert predicate({'module': 2}) == True
586
    assert called == [[{'module': 1}], [{'module': 2}]]
587
588
    called = [[], []]
589
    predicate = (
590
        Q(module=1, action=lambda ev: called[0].append(ev)) &
591
        Q(function=2, action=lambda ev: called[1].append(ev))
592
    )
593
    assert predicate({'module': 2}) == False
594
    assert called == [[], []]
595
596
    assert predicate({'module': 1, 'function': 2}) == True
597
    assert called == [[{'module': 1, 'function': 2}], [{'module': 1, 'function': 2}]]
598
599
600
def test_proper_backend():
601
    if os.environ.get('PUREPYTHONHUNTER') or platform.python_implementation() == 'PyPy':
602
        assert 'hunter.tracer.Tracer' in repr(hunter.Tracer)
603
    else:
604
        assert 'hunter._tracer.Tracer' in repr(hunter.Tracer)
605
606
607
@pytest.fixture(scope="session", params=['pure', 'cython'])
608
def tracer_impl(request):
609
    if request.param == 'pure':
610
        return pytest.importorskip('hunter.tracer').Tracer
611
    elif request.param == 'cython':
612
        return pytest.importorskip('hunter._tracer').Tracer
613
614
615
def _tokenize():
616
    with open(tokenize.__file__, 'rb') as fh:
617
        toks = []
618
        try:
619
            for tok in tokenize.tokenize(fh.readline):
620
                toks.append(tok)
621
        except tokenize.TokenError as exc:
622
            toks.append(exc)
623
624
625
def test_perf_filter(tracer_impl, benchmark):
626
    t = tracer_impl()
627
628
    @benchmark
629
    def run():
630
        with t.trace(Q(module="does-not-exist") | Q(module="does not exist".split())):
631
            _tokenize()
632
633
634
def test_perf_actions(tracer_impl, benchmark):
635
    t = tracer_impl()
636
637
    @benchmark
638
    def run():
639
        output = StringIO()
640
        with t.trace(Q(
641
            ~Q(module_in=['re', 'sre', 'sre_parse']) & ~Q(module_startswith='namedtuple') & Q(kind="call"),
642
            actions=[
643
                CodePrinter(
644
                    stream=output
645
                ),
646
                VarsPrinter(
647
                    'line',
648
                    globals=True,
649
                    stream=output
650
                )
651
            ]
652
        )):
653
            _tokenize()
654