BenchmarkFixture._raw_pedantic()   F
last analyzed

Complexity

Conditions 21

Size

Total Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 21
dl 0
loc 59
rs 3.2155
c 0
b 0
f 0

1 Method

Rating   Name   Duplication   Size   Complexity  
A BenchmarkFixture.make_arguments() 0 8 4

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like BenchmarkFixture._raw_pedantic() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import division
2
from __future__ import print_function
3
4
import cProfile
5
import gc
6
import pstats
7
import sys
8
import time
9
import traceback
10
from math import ceil
11
12
from .compat import INT
13
from .compat import XRANGE
14
from .timers import compute_timer_precision
15
from .utils import format_time
16
17
try:
18
    import statistics
19
except (ImportError, SyntaxError):
20
    statistics_error = traceback.format_exc()
21
    statistics = None
22
else:
23
    statistics_error = None
24
    from .stats import Metadata
25
26
27
class FixtureAlreadyUsed(Exception):
28
    pass
29
30
31
class BenchmarkFixture(object):
32
    _precisions = {}
33
34
    @classmethod
35
    def _get_precision(cls, timer):
36
        if timer in cls._precisions:
37
            return cls._precisions[timer]
38
        else:
39
            return cls._precisions.setdefault(timer, compute_timer_precision(timer))
40
41
    def __init__(self, node, disable_gc, timer, min_rounds, min_time, max_time, warmup, warmup_iterations,
42
                 calibration_precision, add_stats, logger, warner, disabled, use_cprofile, group=None):
43
        self.name = node.name
44
        self.fullname = node._nodeid
45
        self.disabled = disabled
46
        if hasattr(node, 'callspec'):
47
            self.param = node.callspec.id
48
            self.params = node.callspec.params
49
        else:
50
            self.param = None
51
            self.params = None
52
        self.group = group
53
        self.has_error = False
54
        self.extra_info = {}
55
56
        self._disable_gc = disable_gc
57
        self._timer = timer.target
58
        self._min_rounds = min_rounds
59
        self._max_time = float(max_time)
60
        self._min_time = float(min_time)
61
        self._add_stats = add_stats
62
        self._calibration_precision = calibration_precision
63
        self._warmup = warmup and warmup_iterations
64
        self._logger = logger
65
        self._warner = warner
66
        self._cleanup_callbacks = []
67
        self._mode = None
68
        self.use_cprofile = use_cprofile
69
        self.cprofile_stats = None
70
71
    @property
72
    def enabled(self):
73
        return not self.disabled
74
75
    def _make_runner(self, function_to_benchmark, args, kwargs):
76
        def runner(loops_range, timer=self._timer):
77
            gc_enabled = gc.isenabled()
78
            if self._disable_gc:
79
                gc.disable()
80
            tracer = sys.gettrace()
81
            sys.settrace(None)
82
            try:
83
                if loops_range:
84
                    start = timer()
85
                    for _ in loops_range:
86
                        function_to_benchmark(*args, **kwargs)
87
                    end = timer()
88
                    return end - start
89
                else:
90
                    start = timer()
91
                    result = function_to_benchmark(*args, **kwargs)
92
                    end = timer()
93
                    return end - start, result
94
            finally:
95
                sys.settrace(tracer)
96
                if gc_enabled:
97
                    gc.enable()
98
99
        return runner
100
101
    def _make_stats(self, iterations):
102
        bench_stats = Metadata(self, iterations=iterations, options={
103
            "disable_gc": self._disable_gc,
104
            "timer": self._timer,
105
            "min_rounds": self._min_rounds,
106
            "max_time": self._max_time,
107
            "min_time": self._min_time,
108
            "warmup": self._warmup,
109
        })
110
        self._add_stats(bench_stats)
111
        self.stats = bench_stats
112
        return bench_stats
113
114
    def __call__(self, function_to_benchmark, *args, **kwargs):
115
        if self._mode:
116
            self.has_error = True
117
            raise FixtureAlreadyUsed(
118
                "Fixture can only be used once. Previously it was used in %s mode." % self._mode)
119
        try:
120
            self._mode = 'benchmark(...)'
121
            return self._raw(function_to_benchmark, *args, **kwargs)
122
        except Exception:
123
            self.has_error = True
124
            raise
125
126
    def pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
127
        if self._mode:
128
            self.has_error = True
129
            raise FixtureAlreadyUsed(
130
                "Fixture can only be used once. Previously it was used in %s mode." % self._mode)
131
        try:
132
            self._mode = 'benchmark.pedantic(...)'
133
            return self._raw_pedantic(target, args=args, kwargs=kwargs, setup=setup, rounds=rounds,
134
                                      warmup_rounds=warmup_rounds, iterations=iterations)
135
        except Exception:
136
            self.has_error = True
137
            raise
138
139
    def _raw(self, function_to_benchmark, *args, **kwargs):
140
        if not self.disabled:
141
            runner = self._make_runner(function_to_benchmark, args, kwargs)
142
143
            duration, iterations, loops_range = self._calibrate_timer(runner)
144
145
            # Choose how many time we must repeat the test
146
            rounds = int(ceil(self._max_time / duration))
147
            rounds = max(rounds, self._min_rounds)
148
            rounds = min(rounds, sys.maxsize)
149
150
            stats = self._make_stats(iterations)
151
152
            self._logger.debug("  Running %s rounds x %s iterations ..." % (rounds, iterations), yellow=True, bold=True)
153
            run_start = time.time()
154
            if self._warmup:
155
                warmup_rounds = min(rounds, max(1, int(self._warmup / iterations)))
156
                self._logger.debug("  Warmup %s rounds x %s iterations ..." % (warmup_rounds, iterations))
157
                for _ in XRANGE(warmup_rounds):
158
                    runner(loops_range)
159
            for _ in XRANGE(rounds):
160
                stats.update(runner(loops_range))
161
            self._logger.debug("  Ran for %ss." % format_time(time.time() - run_start), yellow=True, bold=True)
162
        if self.use_cprofile:
163
            profile = cProfile.Profile()
164
            function_result = profile.runcall(function_to_benchmark, *args, **kwargs)
165
            self.stats.cprofile_stats = pstats.Stats(profile)
166
        else:
167
            function_result = function_to_benchmark(*args, **kwargs)
168
        return function_result
169
170
    def _raw_pedantic(self, target, args=(), kwargs=None, setup=None, rounds=1, warmup_rounds=0, iterations=1):
171
        if kwargs is None:
172
            kwargs = {}
173
174
        has_args = bool(args or kwargs)
175
176
        if not isinstance(iterations, INT) or iterations < 1:
177
            raise ValueError("Must have positive int for `iterations`.")
178
179
        if not isinstance(rounds, INT) or rounds < 1:
180
            raise ValueError("Must have positive int for `rounds`.")
181
182
        if not isinstance(warmup_rounds, INT) or warmup_rounds < 0:
183
            raise ValueError("Must have positive int for `warmup_rounds`.")
184
185
        if iterations > 1 and setup:
186
            raise ValueError("Can't use more than 1 `iterations` with a `setup` function.")
187
188
        def make_arguments(args=args, kwargs=kwargs):
189
            if setup:
190
                maybe_args = setup()
191
                if maybe_args:
192
                    if has_args:
193
                        raise TypeError("Can't use `args` or `kwargs` if `setup` returns the arguments.")
194
                    args, kwargs = maybe_args
195
            return args, kwargs
196
197
        if self.disabled:
198
            args, kwargs = make_arguments()
199
            return target(*args, **kwargs)
200
201
        stats = self._make_stats(iterations)
202
        loops_range = XRANGE(iterations) if iterations > 1 else None
203
        for _ in XRANGE(warmup_rounds):
204
            args, kwargs = make_arguments()
205
206
            runner = self._make_runner(target, args, kwargs)
207
            runner(loops_range)
208
209
        for _ in XRANGE(rounds):
210
            args, kwargs = make_arguments()
211
212
            runner = self._make_runner(target, args, kwargs)
213
            if loops_range:
214
                duration = runner(loops_range)
215
            else:
216
                duration, result = runner(loops_range)
217
            stats.update(duration)
218
219
        if loops_range:
220
            args, kwargs = make_arguments()
221
            result = target(*args, **kwargs)
222
223
        if self.use_cprofile:
224
            profile = cProfile.Profile()
225
            profile.runcall(target, *args, **kwargs)
226
            self.stats.cprofile_stats = pstats.Stats(profile)
227
228
        return result
229
230
    def weave(self, target, **kwargs):
231
        try:
232
            import aspectlib
233
        except ImportError as exc:
234
            raise ImportError(exc.args, "Please install aspectlib or pytest-benchmark[aspect]")
235
236
        def aspect(function):
237
            def wrapper(*args, **kwargs):
238
                return self(function, *args, **kwargs)
239
240
            return wrapper
241
242
        self._cleanup_callbacks.append(aspectlib.weave(target, aspect, **kwargs).rollback)
243
244
    patch = weave
245
246
    def _cleanup(self):
247
        while self._cleanup_callbacks:
248
            callback = self._cleanup_callbacks.pop()
249
            callback()
250
        if not self._mode:
251
            self._logger.warn("BENCHMARK-U1", "Benchmark fixture was not used at all in this test!",
252
                              warner=self._warner, suspend=True)
253
254
    def _calibrate_timer(self, runner):
255
        timer_precision = self._get_precision(self._timer)
256
        min_time = max(self._min_time, timer_precision * self._calibration_precision)
257
        min_time_estimate = min_time * 5 / self._calibration_precision
258
        self._logger.debug("")
259
        self._logger.debug("  Timer precision: %ss" % format_time(timer_precision), yellow=True, bold=True)
260
        self._logger.debug("  Calibrating to target round %ss; will estimate when reaching %ss." % (
261
            format_time(min_time), format_time(min_time_estimate)), yellow=True, bold=True)
262
263
        loops = 1
264
        while True:
265
            loops_range = XRANGE(loops)
266
            duration = runner(loops_range)
267
            if self._warmup:
268
                warmup_start = time.time()
269
                warmup_iterations = 0
270
                warmup_rounds = 0
271
                while time.time() - warmup_start < self._max_time and warmup_iterations < self._warmup:
272
                    duration = min(duration, runner(loops_range))
273
                    warmup_rounds += 1
274
                    warmup_iterations += loops
275
                self._logger.debug("    Warmup: %ss (%s x %s iterations)." % (
276
                    format_time(time.time() - warmup_start),
277
                    warmup_rounds, loops
278
                ))
279
280
            self._logger.debug("    Measured %s iterations: %ss." % (loops, format_time(duration)), yellow=True)
281
            if duration >= min_time:
282
                break
283
284
            if duration >= min_time_estimate:
285
                # coarse estimation of the number of loops
286
                loops = int(ceil(min_time * loops / duration))
287
                self._logger.debug("    Estimating %s iterations." % loops, green=True)
288
                if loops == 1:
289
                    # If we got a single loop then bail early - nothing to calibrate if the the
290
                    # test function is 100 times slower than the timer resolution.
291
                    loops_range = XRANGE(loops)
292
                    break
293
            else:
294
                loops *= 10
295
        return duration, loops, loops_range
296