BusyThread   A
last analyzed

Complexity

Total Complexity 2

Size/Duplication

Total Lines 4
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 4
rs 10
wmc 2
1
"""
2
from: https://bitbucket.org/haypo/misc/src/tip/python/pep418.py
3
4
Implementation of the PEP 418 in pure Python using ctypes.
5
6
Functions:
7
8
 - clock()
9
 - get_clock_info(name)
10
 - monotonic(): not always available
11
 - perf_frequency()
12
 - process_time()
13
 - sleep()
14
 - time()
15
16
Constants:
17
18
 - has_monotonic (bool): True if time.monotonic() is available
19
"""
20
# flake8: noqa
21
# TODO: gethrtime() for Solaris/OpenIndiana
22
# TODO: call GetSystemTimeAdjustment() to get the resolution
23
# TODO: other FIXME
24
25
import os
26
import sys
27
import time as python_time
28
29
has_mach_absolute_time = False
30
has_clock_gettime = False
31
has_gettimeofday = False
32
has_ftime = False
33
has_delay = False
34
has_libc_time = False
35
has_libc_clock = False
36
has_libc_sleep = False
37
has_GetTickCount64 = False
38
CLOCK_REALTIME = None
39
CLOCK_MONOTONIC = None
40
CLOCK_PROCESS_CPUTIME_ID = None
41
CLOCK_HIGHRES = None
42
CLOCK_PROF = None
43
try:
44
    import ctypes
45
    import ctypes.util
46
    from ctypes import byref, POINTER
47
except ImportError as err:
48
    pass
49
else:
50
    def ctypes_oserror():
51
        errno = ctypes.get_errno()
52
        message = os.strerror(errno)
53
        return OSError(errno, message)
54
55
    time_t = ctypes.c_long
56
57
    if os.name == "nt":
58
        from ctypes.wintypes import BOOL, DWORD, HANDLE, FILETIME
59
        from ctypes import windll
60
        LARGEINTEGER = ctypes.c_int64
61
        LARGEINTEGER_p = POINTER(LARGEINTEGER)
62
        FILETIME_p = POINTER(FILETIME)
63
        ULONGLONG = ctypes.c_uint64
64
65
        def ctypes_winerror():
66
            errno = ctypes.get_errno()
67
            message = os.strerror(errno)
68
            return WindowsError(errno, message)
69
70
        _QueryPerformanceFrequency = windll.kernel32.QueryPerformanceFrequency
71
        _QueryPerformanceFrequency.restype = BOOL
72
        _QueryPerformanceFrequency.argtypes = (LARGEINTEGER_p,)
73
        def QueryPerformanceFrequency():
74
            frequency = LARGEINTEGER()
75
            ok = _QueryPerformanceFrequency(byref(frequency))
76
            if not ok:
77
                raise ctypes_winerror()
78
            return int(frequency.value)
79
80
        _QueryPerformanceCounter = windll.kernel32.QueryPerformanceCounter
81
        _QueryPerformanceCounter.restype = BOOL
82
        _QueryPerformanceCounter.argtypes = (LARGEINTEGER_p,)
83
        def QueryPerformanceCounter():
84
            frequency = LARGEINTEGER()
85
            ok = _QueryPerformanceCounter(byref(frequency))
86
            if not ok:
87
                raise ctypes_winerror()
88
            return int(frequency.value)
89
90
        GetTickCount = windll.kernel32.GetTickCount
91
        GetTickCount.restype = DWORD
92
        GetTickCount.argtypes = ()
93
94
        if hasattr(windll.kernel32, 'GetTickCount64'):
95
            GetTickCount64 = windll.kernel32.GetTickCount64
96
            GetTickCount64.restype = ULONGLONG
97
            GetTickCount64.argtypes = ()
98
            has_GetTickCount64 = True
99
100
        GetCurrentProcess = windll.kernel32.GetCurrentProcess
101
        GetCurrentProcess.argtypes = ()
102
        GetCurrentProcess.restype = HANDLE
103
104
        _GetProcessTimes = windll.kernel32.GetProcessTimes
105
        _GetProcessTimes.argtypes = (HANDLE, FILETIME_p, FILETIME_p, FILETIME_p, FILETIME_p)
106
        _GetProcessTimes.restype = BOOL
107
108
        def filetime2py(obj):
109
            return (obj.dwHighDateTime << 32) + obj.dwLowDateTime
110
111
        def GetProcessTimes(handle):
112
            creation_time = FILETIME()
113
            exit_time = FILETIME()
114
            kernel_time = FILETIME()
115
            user_time = FILETIME()
116
            ok = _GetProcessTimes(handle,
117
                        byref(creation_time), byref(exit_time),
118
                        byref(kernel_time), byref(user_time))
119
            if not ok:
120
                raise ctypes_winerror()
121
            return (filetime2py(creation_time), filetime2py(exit_time),
122
                    filetime2py(kernel_time), filetime2py(user_time))
123
124
        _GetSystemTimeAsFileTime = windll.kernel32.GetSystemTimeAsFileTime
125
        _GetSystemTimeAsFileTime.argtypes = (FILETIME_p,)
126
        _GetSystemTimeAsFileTime.restype = None
127
128
        def GetSystemTimeAsFileTime():
129
            system_time = FILETIME()
130
            _GetSystemTimeAsFileTime(byref(system_time))
131
            return filetime2py(system_time)
132
133
    libc_name = ctypes.util.find_library('c')
134
    if libc_name:
135
        libc = ctypes.CDLL(libc_name, use_errno=True)
136
        clock_t = ctypes.c_ulong
137
138
        if sys.platform == 'darwin':
139
            mach_absolute_time = libc.mach_absolute_time
140
            mach_absolute_time.argtypes = ()
141
            mach_absolute_time.restype = ctypes.c_uint64
142
            has_mach_absolute_time = True
143
144
            class mach_timebase_info_data_t(ctypes.Structure):
145
                _fields_ = (
146
                    ('numer', ctypes.c_uint32),
147
                    ('denom', ctypes.c_uint32),
148
                )
149
            mach_timebase_info_data_p = POINTER(mach_timebase_info_data_t)
150
151
            _mach_timebase_info = libc.mach_timebase_info
152
            _mach_timebase_info.argtypes = (mach_timebase_info_data_p,)
153
            _mach_timebase_info.restype = ctypes.c_int
154
            def mach_timebase_info():
155
                timebase = mach_timebase_info_data_t()
156
                _mach_timebase_info(byref(timebase))
157
                return (timebase.numer, timebase.denom)
158
159
        _libc_clock = libc.clock
160
        _libc_clock.argtypes = ()
161
        _libc_clock.restype = clock_t
162
        has_libc_clock = True
163
164
        if hasattr(libc, 'sleep'):
165
            _libc_sleep = libc.sleep
166
            _libc_sleep.argtypes = (ctypes.c_uint,)
167
            _libc_sleep.restype = ctypes.c_uint
168
            has_libc_sleep = True
169
170
        if hasattr(libc, 'gettimeofday'):
171
            class timeval(ctypes.Structure):
172
                _fields_ = (
173
                    ('tv_sec', time_t),
174
                    ('tv_usec', ctypes.c_long),
175
                )
176
            timeval_p = POINTER(timeval)
177
            timezone_p = ctypes.c_void_p
178
179
            _gettimeofday = libc.gettimeofday
180
            # FIXME: some platforms only expect one argument
181
            _gettimeofday.argtypes = (timeval_p, timezone_p)
182
            _gettimeofday.restype = ctypes.c_int
183
            def gettimeofday():
184
                tv = timeval()
185
                err = _gettimeofday(byref(tv), None)
186
                if err:
187
                    raise ctypes_oserror()
188
                return tv
189
            has_gettimeofday = True
190
191
        time_t_p = POINTER(time_t)
192
        if hasattr(libc, 'time'):
193
            _libc__time = libc.time
194
            _libc__time.argtypes = (time_t_p,)
195
            _libc__time.restype = time_t
196
            def _libc_time():
197
                return _libc__time(None)
198
            has_libc_time = True
199
200
    if sys.platform.startswith(("freebsd", "openbsd")):
201
        librt_name = libc_name
202
    else:
203
        librt_name = ctypes.util.find_library('rt')
204
    if librt_name:
205
        librt = ctypes.CDLL(librt_name, use_errno=True)
206
        if hasattr(librt, 'clock_gettime'):
207
            clockid_t = ctypes.c_int
208
            class timespec(ctypes.Structure):
209
                _fields_ = (
210
                    ('tv_sec', time_t),
211
                    ('tv_nsec', ctypes.c_long),
212
                )
213
            timespec_p = POINTER(timespec)
214
215
            _clock_gettime = librt.clock_gettime
216
            _clock_gettime.argtypes = (clockid_t, timespec_p)
217
            _clock_gettime.restype = ctypes.c_int
218
            def clock_gettime(clk_id):
219
                ts = timespec()
220
                err = _clock_gettime(clk_id, byref(ts))
221
                if err:
222
                    raise ctypes_oserror()
223
                return ts.tv_sec + ts.tv_nsec * 1e-9
224
            has_clock_gettime = True
225
226
            _clock_settime = librt.clock_settime
227
            _clock_settime.argtypes = (clockid_t, timespec_p)
228
            _clock_settime.restype = ctypes.c_int
229
            def clock_settime(clk_id, value):
230
                ts = timespec()
231
                ts.tv_sec = int(value)
232
                ts.tv_nsec = int(float(abs(value)) % 1.0 * 1e9)
233
                err = _clock_settime(clk_id, byref(ts))
234
                if err:
235
                    raise ctypes_oserror()
236
                return ts.tv_sec + ts.tv_nsec * 1e-9
237
238
            _clock_getres = librt.clock_getres
239
            _clock_getres.argtypes = (clockid_t, timespec_p)
240
            _clock_getres.restype = ctypes.c_int
241
            def clock_getres(clk_id):
242
                ts = timespec()
243
                err = _clock_getres(clk_id, byref(ts))
244
                if err:
245
                    raise ctypes_oserror()
246
                return ts.tv_sec + ts.tv_nsec * 1e-9
247
248
            if sys.platform.startswith("linux"):
249
                CLOCK_REALTIME = 0
250
                CLOCK_MONOTONIC = 1
251
                CLOCK_PROCESS_CPUTIME_ID = 2
252
            elif sys.platform.startswith("freebsd"):
253
                CLOCK_REALTIME = 0
254
                CLOCK_PROF = 2
255
                CLOCK_MONOTONIC = 4
256
            elif sys.platform.startswith("openbsd"):
257
                CLOCK_REALTIME = 0
258
                CLOCK_MONOTONIC = 3
259
            elif sys.platform.startswith("sunos"):
260
                CLOCK_REALTIME = 3
261
                CLOCK_HIGHRES = 4
262
                # clock_gettime(CLOCK_PROCESS_CPUTIME_ID) fails with errno 22
263
                # on OpenSolaris
264
                # CLOCK_PROCESS_CPUTIME_ID = 5
265
266
def _clock_gettime_info(use_info, clk_id):
267
    value = clock_gettime(clk_id)
268
    if use_info:
269
        name = {
270
            CLOCK_MONOTONIC: 'CLOCK_MONOTONIC',
271
            CLOCK_PROF: 'CLOCK_PROF',
272
            CLOCK_HIGHRES: 'CLOCK_HIGHRES',
273
            CLOCK_PROCESS_CPUTIME_ID: 'CLOCK_PROCESS_CPUTIME_ID',
274
            CLOCK_REALTIME: 'CLOCK_REALTIME',
275
        }[clk_id]
276
        try:
277
            resolution = clock_getres(clk_id)
278
        except OSError:
279
            resolution = 1e-9
280
        info = {
281
            'implementation': 'clock_gettime(%s)' % name,
282
            'resolution': resolution,
283
        }
284
        if clk_id in (CLOCK_MONOTONIC, CLOCK_PROF, CLOCK_HIGHRES, CLOCK_PROCESS_CPUTIME_ID):
285
            info['monotonic'] = True
286
            info['adjustable'] = False
287
        elif clk_id in (CLOCK_REALTIME,):
288
            info['monotonic'] = False
289
            info['adjustable'] = True
290
    else:
291
        info = None
292
    return (value, info)
293
294
has_monotonic = False
295
if os.name == 'nt':
296
    # GetTickCount64() requires Windows Vista, Server 2008 or later
297
    if has_GetTickCount64:
298
        def _monotonic(use_info):
299
            value = GetTickCount64() * 1e-3
300
            if use_info:
301
                info = {
302
                    'implementation': "GetTickCount64()",
303
                    "monotonic": True,
304
                    "resolution": 1e-3,
305
                    "adjustable": False,
306
                }
307
                # FIXME: call GetSystemTimeAdjustment() to get the resolution
308
            else:
309
                info = None
310
            return (value, info)
311
        has_monotonic = True
312
    else:
313
        def _monotonic(use_info):
314
            ticks = GetTickCount()
315
            if ticks < _monotonic.last:
316
                # Integer overflow detected
317
                _monotonic.delta += 2**32
318
            _monotonic.last = ticks
319
            value = (ticks + _monotonic.delta) * 1e-3
320
321
            if use_info:
322
                info = {
323
                    'implementation': "GetTickCount()",
324
                    "monotonic": True,
325
                    "resolution": 1e-3,
326
                    "adjustable": False,
327
                }
328
                # FIXME: call GetSystemTimeAdjustment() to get the resolution
329
            else:
330
                info = None
331
            return (value, info)
332
        _monotonic.last = 0
333
        _monotonic.delta = 0
334
    has_monotonic = True
335
336
elif has_mach_absolute_time:
337
    def _monotonic(use_info):
338
        if _monotonic.factor is None:
339
            timebase = mach_timebase_info()
340
            _monotonic.factor = timebase[0] / timebase[1] * 1e-9
341
        value = mach_absolute_time() * _monotonic.factor
342
        if use_info:
343
            info = {
344
                'implementation': "mach_absolute_time()",
345
                'resolution': _monotonic.factor,
346
                'monotonic': True,
347
                'adjustable': False,
348
            }
349
        else:
350
            info = None
351
        return (value, info)
352
    _monotonic.factor = None
353
    has_monotonic = True
354
355
elif has_clock_gettime and CLOCK_HIGHRES is not None:
356
    def _monotonic(use_info):
357
        return _clock_gettime_info(use_info, CLOCK_HIGHRES)
358
    has_monotonic = True
359
360
elif has_clock_gettime and CLOCK_MONOTONIC is not None:
361
    def _monotonic(use_info):
362
        return _clock_gettime_info(use_info, CLOCK_MONOTONIC)
363
    has_monotonic = True
364
365
if has_monotonic:
366
    def monotonic():
367
        return _monotonic(False)[0]
368
369
370
def _perf_counter(use_info):
371
    info = None
372
    if _perf_counter.use_performance_counter:
373
        if _perf_counter.performance_frequency is None:
374
            value, info = _win_perf_counter(use_info)
375
            if value is not None:
376
                return (value, info)
377
    if _perf_counter.use_monotonic:
378
        # The monotonic clock is preferred over the system time
379
        try:
380
            return _monotonic(use_info)
381
        except (OSError, WindowsError):
382
            _perf_counter.use_monotonic = False
383
    return _time(use_info)
384
_perf_counter.use_performance_counter = (os.name == 'nt')
385
if _perf_counter.use_performance_counter:
386
    _perf_counter.performance_frequency = None
387
_perf_counter.use_monotonic = has_monotonic
388
389
def perf_counter():
390
    return _perf_counter(False)[0]
391
392
393
if os.name == 'nt':
394
    def _process_time(use_info):
395
        handle = GetCurrentProcess()
396
        process_times = GetProcessTimes(handle)
397
        value = (process_times[2] + process_times[3]) * 1e-7
398
        if use_info:
399
            info = {
400
                "implementation": "GetProcessTimes()",
401
                "resolution": 1e-7,
402
                "monotonic": True,
403
                "adjustable": False,
404
                # FIXME: call GetSystemTimeAdjustment() to get the resolution
405
            }
406
        else:
407
            info = None
408
        return (value, info)
409
else:
410
    import os
411
    try:
412
        import resource
413
    except ImportError:
414
        has_resource = False
415
    else:
416
        has_resource = True
417
418
    def _process_time(use_info):
419
        info = None
420
        if _process_time.clock_id is not None:
421
            try:
422
                return _clock_gettime_info(use_info, _process_time.clock_id)
423
            except OSError:
424
                _process_time.clock_id = None
425
        if _process_time.use_getrusage:
426
            try:
427
                usage = resource.getrusage(resource.RUSAGE_SELF)
428
                value = usage[0] + usage[1]
429
            except OSError:
430
                _process_time.use_getrusage = False
431
            else:
432
                if use_info:
433
                    info = {
434
                        "implementation": "getrusage(RUSAGE_SELF)",
435
                        "resolution": 1e-6,
436
                        "monotonic": True,
437
                        "adjustable": False,
438
                    }
439
                return (value, info)
440
        if _process_time.use_times:
441
            try:
442
                times = os.times()
443
                value = times[0] + times[1]
444
            except OSError:
445
                _process_time.use_getrusage = False
446
            else:
447
                if use_info:
448
                    try:
449
                        ticks_per_second = os.sysconf("SC_CLK_TCK")
450
                    except ValueError:
451
                        ticks_per_second = 60 # FIXME: get HZ constant
452
                    info = {
453
                        "implementation": "times()",
454
                        "resolution": 1.0 / ticks_per_second,
455
                        "monotonic": True,
456
                        "adjustable": False,
457
                    }
458
                return (value, info)
459
        return _libc_clock_info(use_info)
460
    if has_clock_gettime and CLOCK_PROCESS_CPUTIME_ID is not None:
461
        _process_time.clock_id = CLOCK_PROCESS_CPUTIME_ID
462
    elif has_clock_gettime and CLOCK_PROF is not None:
463
        _process_time.clock_id = CLOCK_PROF
464
    else:
465
        _process_time.clock_id = None
466
    _process_time.use_getrusage = has_resource
467
    # On OS/2, only the 5th field of os.times() is set, others are zeros
468
    _process_time.use_times = (hasattr(os, 'times') and os.name != 'os2')
469
470
def process_time():
471
    return _process_time(False)[0]
472
473
474
if os.name == "nt":
475
    def _time(use_info):
476
        value = GetSystemTimeAsFileTime() * 1e-7
477
        if use_info:
478
            info = {
479
                'implementation': 'GetSystemTimeAsFileTime',
480
                'resolution': 1e-7,
481
                'monotonic': False,
482
                # FIXME: call GetSystemTimeAdjustment() to get the resolution
483
                # and adjustable
484
            }
485
        else:
486
            info = None
487
        return (value, info)
488
else:
489
    def _time(use_info):
490
        info = None
491
        if has_clock_gettime and CLOCK_REALTIME is not None:
492
            try:
493
                return _clock_gettime_info(use_info, CLOCK_REALTIME)
494
            except OSError:
495
                # CLOCK_REALTIME is not supported (unlikely)
496
                pass
497
        if has_gettimeofday:
498
            try:
499
                tv = gettimeofday()
500
            except OSError:
501
                # gettimeofday() should not fail
502
                pass
503
            else:
504
                if use_info:
505
                    info = {
506
                        'monotonic': False,
507
                        "implementation": "gettimeofday()",
508
                        "resolution": 1e-6,
509
                        'monotonic': False,
510
                        'adjustable': True,
511
                    }
512
                value = tv.tv_sec + tv.tv_usec * 1e-6
513
                return (value, info)
514
        # FIXME: implement ftime()
515
        if has_ftime:
516
            if use_info:
517
                info = {
518
                    "implementation": "ftime()",
519
                    "resolution": 1e-3,
520
                    'monotonic': False,
521
                    'adjustable': True,
522
                }
523
            value = ftime()
524
        elif has_libc_time:
525
            if use_info:
526
                info = {
527
                    "implementation": "time()",
528
                    "resolution": 1.0,
529
                    'monotonic': False,
530
                    'adjustable': True,
531
                }
532
            value = float(_libc_time())
533
        else:
534
            if use_info:
535
                info = {
536
                    "implementation": "time.time()",
537
                    'monotonic': False,
538
                    'adjustable': True,
539
                }
540
                if os.name == "nt":
541
                    # On Windows, time.time() uses ftime()
542
                    info["resolution"] = 1e-3
543
                else:
544
                    # guess that time.time() uses gettimeofday()
545
                    info["resolution"] = 1e-6
546
            value = python_time.time()
547
        return (value, info)
548
549
def time():
550
    return _time(False)[0]
551
552
553
try:
554
    import select
555
except ImportError:
556
    has_select = False
557
else:
558
    # FIXME: On Windows, select.select([], [], [], seconds) fails with
559
    #        select.error(10093)
560
    has_select = (hasattr(select, "select") and os.name != "nt")
561
562
if has_select:
563
    def _sleep(seconds):
564
        return select.select([], [], [], seconds)
565
566
elif has_delay:
567
    def _sleep(seconds):
568
        milliseconds = int(seconds * 1000)
569
        # FIXME
570
        delay(milliseconds)
571
572
#elif os.name == "nt":
573
#    def _sleep(seconds):
574
#        milliseconds = int(seconds * 1000)
575
#        # FIXME: use ctypes
576
#        win32api.ResetEvent(hInterruptEvent);
577
#        win32api.WaitForSingleObject(sleep.sigint_event, milliseconds)
578
#
579
#    sleep.sigint_event = win32api.CreateEvent(NULL, TRUE, FALSE, FALSE)
580
#    # SetEvent(sleep.sigint_event) will be called by the signal handler of SIGINT
581
582
elif os.name == "os2":
583
    def _sleep(seconds):
584
        milliseconds = int(seconds * 1000)
585
        # FIXME
586
        DosSleep(milliseconds)
587
588
elif has_libc_sleep:
589
    def _sleep(seconds):
590
        seconds = int(seconds)
591
        _libc_sleep(seconds)
592
593
else:
594
    def _sleep(seconds):
595
        python_time.sleep(seconds)
596
597
def sleep(seconds):
598
    if seconds < 0:
599
        raise ValueError("sleep length must be non-negative")
600
    _sleep(seconds)
601
602
def _libc_clock_info(use_info):
603
    if use_info:
604
        info = {
605
            'implementation': 'clock()',
606
            'resolution': 1.0,
607
            # FIXME: 'resolution': 1.0 / CLOCKS_PER_SEC,
608
            'monotonic': True,
609
            'adjustable': False,
610
        }
611
        if os.name != "nt":
612
            info['monotonic'] = True
613
    else:
614
        info = None
615
    if has_libc_clock:
616
        value = _libc_clock()
617
        if use_info:
618
            info['implementation'] = 'clock()'
619
    else:
620
        value = python_time.clock()
621
        if use_info:
622
            info['implementation'] = 'time.clock()'
623
    return (value, info)
624
625
def _win_perf_counter(use_info):
626
    if _win_perf_counter.perf_frequency is None:
627
        try:
628
            _win_perf_counter.perf_frequency = float(QueryPerformanceFrequency())
629
        except WindowsError:
630
            # QueryPerformanceFrequency() fails if the installed
631
            # hardware does not support a high-resolution performance
632
            # counter
633
            return (None, None)
634
635
    value = QueryPerformanceCounter() / _win_perf_counter.perf_frequency
636
    if use_info:
637
        info = {
638
            'implementation': 'QueryPerformanceCounter',
639
            'resolution': 1.0 / _win_perf_counter.perf_frequency,
640
            'monotonic': True,
641
            'adjustable': False,
642
        }
643
    else:
644
        info = None
645
    return (value, info)
646
_win_perf_counter.perf_frequency = None
647
648
if os.name == 'nt':
649
    def _clock(use_info):
650
        info = None
651
        if _clock.use_performance_counter:
652
            value, info = _win_perf_counter(use_info)
653
            if value is not None:
654
                return (value, info)
655
        return _libc_clock_info(use_info)
656
    _clock.use_performance_counter = True
657
else:
658
    def _clock(use_info):
659
        return _libc_clock_info(use_info)
660
661
def clock():
662
    return _clock(False)[0]
663
664
665
class clock_info(object):
666
    def __init__(self, implementation, monotonic, adjustable, resolution):
667
        self.implementation = implementation
668
        self.monotonic = monotonic
669
        self.adjustable = adjustable
670
        self.resolution = resolution
671
672
    def __repr__(self):
673
        return (
674
            'clockinfo(adjustable=%s, implementation=%r, monotonic=%s, resolution=%s'
675
            % (self.adjustable, self.implementation, self.monotonic, self.resolution))
676
677
def get_clock_info(name):
678
    if name == 'clock':
679
        info = _clock(True)[1]
680
    elif name == 'perf_counter':
681
        info = _perf_counter(True)[1]
682
    elif name == 'process_time':
683
        info = _process_time(True)[1]
684
    elif name == 'time':
685
        info = _time(True)[1]
686
    elif has_monotonic and name == 'monotonic':
687
        info = _monotonic(True)[1]
688
    else:
689
        raise ValueError("unknown clock: %s" % name)
690
    return clock_info(**info)
691
692
if __name__ == "__main__":
693
    from errno import EPERM
694
    import threading
695
    import unittest
696
697
    class TestPEP418(unittest.TestCase):
698
        if not hasattr(unittest.TestCase, 'assertIsInstance'):
699
            # Python < 2.7 or Python < 3.2
700
            def assertIsInstance(self, obj, klass):
701
                self.assertTrue(isinstance(obj, klass))
702
            def assertGreater(self, a, b):
703
                self.assertTrue(a > b)
704
            def assertLess(self, a, b):
705
                self.assertTrue(a < b)
706
            def assertLessEqual(self, a, b):
707
                self.assertTrue(a <= b)
708
            def assertAlmostEqual(self, first, second, delta):
709
                self.assertTrue(abs(first - second) <= delta)
710
711
        def test_clock(self):
712
            clock()
713
714
            info = get_clock_info('clock')
715
            self.assertEqual(info.monotonic, True)
716
            self.assertEqual(info.adjustable, False)
717
718
        def test_get_clock_info(self):
719
            clocks = ['clock', 'perf_counter', 'process_time', 'time']
720
            if has_monotonic:
721
                clocks.append('monotonic')
722
723
            for name in clocks:
724
                info = get_clock_info(name)
725
                self.assertIsInstance(info.implementation, str)
726
                self.assertNotEqual(info.implementation, '')
727
                self.assertIsInstance(info.monotonic, bool)
728
                self.assertIsInstance(info.resolution, float)
729
                # 0 < resolution <= 1.0
730
                self.assertGreater(info.resolution, 0)
731
                self.assertLessEqual(info.resolution, 1)
732
                self.assertIsInstance(info.adjustable, bool)
733
734
            self.assertRaises(ValueError, get_clock_info, 'xxx')
735
736
        if not has_monotonic:
737
            print("Skip test_monotonic: need time.monotonic")
738
        else:
739
            def test_monotonic(self):
740
                t1 = monotonic()
741
                python_time.sleep(0.1)
742
                t2 = monotonic()
743
                dt = t2 - t1
744
                self.assertGreater(t2, t1)
745
                self.assertAlmostEqual(dt, 0.1, delta=0.2)
746
747
                info = get_clock_info('monotonic')
748
                self.assertEqual(info.monotonic, True)
749
                self.assertEqual(info.adjustable, False)
750
751
        if not has_monotonic or not has_clock_gettime:
752
            if not has_monotonic:
753
                print('Skip test_monotonic_settime: need time.monotonic')
754
            elif not has_clock_gettime:
755
                print('Skip test_monotonic_settime: need time.clock_settime')
756
        else:
757
            def test_monotonic_settime(self):
758
                t1 = monotonic()
759
                realtime = clock_gettime(CLOCK_REALTIME)
760
                # jump backward with an offset of 1 hour
761
                try:
762
                    clock_settime(CLOCK_REALTIME, realtime - 3600)
763
                except OSError as err:
764
                    if err.errno == EPERM:
765
                        if hasattr(unittest, 'SkipTest'):
766
                            raise unittest.SkipTest(str(err))
767
                        else:
768
                            print("Skip test_monotonic_settime: %s" % err)
769
                            return
770
                    else:
771
                        raise
772
                t2 = monotonic()
773
                clock_settime(CLOCK_REALTIME, realtime)
774
                # monotonic must not be affected by system clock updates
775
                self.assertGreaterEqual(t2, t1)
776
777
        def test_perf_counter(self):
778
            perf_counter()
779
780
        def test_process_time(self):
781
            start = process_time()
782
            python_time.sleep(0.1)
783
            stop = process_time()
784
            self.assertLess(stop - start, 0.01)
785
786
            info = get_clock_info('process_time')
787
            self.assertEqual(info.monotonic, True)
788
            self.assertEqual(info.adjustable, False)
789
790
791
        def test_process_time_threads(self):
792
            class BusyThread(threading.Thread):
793
                def run(self):
794
                    while not self.stop:
795
                        pass
796
797
            thread = BusyThread()
798
            thread.stop = False
799
            t1 = process_time()
800
            thread.start()
801
            sleep(0.2)
802
            t2 = process_time()
803
            thread.stop = True
804
            thread.join()
805
            self.assertGreater(t2 - t1, 0.1)
806
807
        def test_sleep(self):
808
            self.assertRaises(ValueError, sleep, -2)
809
            self.assertRaises(ValueError, sleep, -1)
810
            sleep(1.2)
811
812
        def test_time(self):
813
            value = time()
814
            self.assertIsInstance(value, float)
815
816
            info = get_clock_info('time')
817
            self.assertEqual(info.monotonic, False)
818
            self.assertEqual(info.adjustable, True)
819
820
821
    if True:
822
        from pprint import pprint
823
824
        print("clock: %s" % clock())
825
        if has_monotonic:
826
            print("monotonic: %s" % monotonic())
827
        else:
828
            print("monotonic: <not available>")
829
        print("perf_counter: %s" % perf_counter())
830
        print("process_time: %s" % process_time())
831
        print("time: %s" % time())
832
833
        clocks = ['clock', 'perf_counter', 'process_time', 'time']
834
        if has_monotonic:
835
            clocks.append('monotonic')
836
        pprint(dict((name, get_clock_info(name)) for name in clocks))
837
838
    unittest.main()
839