Test Failed
Push — develop ( 420cf2...653997 )
by Nicolas
01:06 queued 15s
created

glances.globals.exit_after()   A

Complexity

Conditions 3

Size

Total Lines 27
Code Lines 18

Duplication

Lines 27
Ratio 100 %

Importance

Changes 0
Metric Value
cc 3
eloc 18
nop 2
dl 27
loc 27
rs 9.5
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from multiprocessing import Process, Queue
31
from operator import itemgetter, methodcaller
32
from statistics import mean
33
from typing import Any, Optional, Union
34
from urllib.error import HTTPError, URLError
35
from urllib.parse import urlparse
36
from urllib.request import Request, urlopen
37
38
# Prefer faster libs for JSON (de)serialization
39
# Preference Order: orjson > ujson > json (builtin)
40
try:
41
    import orjson as json
42
43
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
44
except ImportError:
45
    # Need to log info but importing logger will cause cyclic imports
46
    pass
47
48
if 'json' not in globals():
49
    try:
50
        # Note: ujson is not officially supported
51
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
52
        import ujson as json
53
    except ImportError:
54
        import json
55
56
    # To allow ujson & json dumps to serialize datetime
57
    def _json_default(v: Any) -> Any:
58
        if isinstance(v, datetime):
59
            return v.isoformat()
60
        return v
61
62
    json.dumps = functools.partial(json.dumps, default=_json_default)
63
64
##############
65
# GLOBALS VARS
66
##############
67
68
# OS constants (some libraries/features are OS-dependent)
69
BSD = sys.platform.find('bsd') != -1
70
LINUX = sys.platform.startswith('linux')
71
MACOS = sys.platform.startswith('darwin')
72
SUNOS = sys.platform.startswith('sunos')
73
WINDOWS = sys.platform.startswith('win')
74
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
75
76
# Set the AMPs, plugins and export modules path
77
work_path = os.path.realpath(os.path.dirname(__file__))
78
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
79
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
80
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
81
sys_path = sys.path[:]
82
sys.path.insert(1, exports_path)
83
sys.path.insert(1, plugins_path)
84
sys.path.insert(1, amps_path)
85
86
# Types
87
text_type = str
88
binary_type = bytes
89
bool_type = bool
90
long = int
91
92
# Alias errors
93
PermissionError = OSError
94
95
# Alias methods
96
viewkeys = methodcaller('keys')
97
viewvalues = methodcaller('values')
98
viewitems = methodcaller('items')
99
100
101
###################
102
# GLOBALS FUNCTIONS
103
###################
104
105
106
def printandflush(string):
107
    """Print and flush (used by stdout* outputs modules)"""
108
    print(string, flush=True)
109
110
111
def to_ascii(s):
112
    """Convert the bytes string to a ASCII string
113
    Useful to remove accent (diacritics)"""
114
    if isinstance(s, binary_type):
115
        return s.decode()
116
    return s.encode('ascii', 'ignore').decode()
117
118
119
def listitems(d):
120
    return list(d.items())
121
122
123
def listkeys(d):
124
    return list(d.keys())
125
126
127
def listvalues(d):
128
    return list(d.values())
129
130
131
def u(s, errors='replace'):
132
    if isinstance(s, text_type):
133
        return s
134
    return s.decode('utf-8', errors=errors)
135
136
137
def b(s, errors='replace'):
138
    if isinstance(s, binary_type):
139
        return s
140
    return s.encode('utf-8', errors=errors)
141
142
143
def nativestr(s, errors='replace'):
144
    if isinstance(s, text_type):
145
        return s
146
    if isinstance(s, (int, float)):
147
        return s.__str__()
148
    return s.decode('utf-8', errors=errors)
149
150
151
def system_exec(command):
152
    """Execute a system command and return the result as a str"""
153
    try:
154
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
155
    except Exception as e:
156
        res = f'ERROR: {e}'
157
    return res.rstrip()
158
159
160
def subsample(data, sampling):
161
    """Compute a simple mean subsampling.
162
163
    Data should be a list of numerical itervalues
164
165
    Return a subsampled list of sampling length
166
    """
167
    if len(data) <= sampling:
168
        return data
169
    sampling_length = int(round(len(data) / float(sampling)))
170
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
171
172
173
def time_series_subsample(data, sampling):
174
    """Compute a simple mean subsampling.
175
176
    Data should be a list of set (time, value)
177
178
    Return a subsampled list of sampling length
179
    """
180
    if len(data) <= sampling:
181
        return data
182
    t = [t[0] for t in data]
183
    v = [t[1] for t in data]
184
    sampling_length = int(round(len(data) / float(sampling)))
185
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
186
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
187
    return list(zip(t_subsampled, v_subsampled))
188
189
190
def to_fahrenheit(celsius):
191
    """Convert Celsius to Fahrenheit."""
192
    return celsius * 1.8 + 32
193
194
195
def is_admin():
196
    """
197
    https://stackoverflow.com/a/19719292
198
    @return: True if the current user is an 'Admin' whatever that
199
    means (root on Unix), otherwise False.
200
    Warning: The inner function fails unless you have Windows XP SP2 or
201
    higher. The failure causes a traceback to be printed and this
202
    function to return False.
203
    """
204
205
    if os.name == 'nt':
206
        import ctypes
207
        import traceback
208
209
        # WARNING: requires Windows XP SP2 or higher!
210
        try:
211
            return ctypes.windll.shell32.IsUserAnAdmin()
212
        except Exception as e:
213
            print(f"Admin check failed with error: {e}")
214
            traceback.print_exc()
215
            return False
216
    else:
217
        # Check for root on Posix
218
        return os.getuid() == 0
219
220
221
def key_exist_value_not_none(k, d):
222
    # Return True if:
223
    # - key k exists
224
    # - d[k] is not None
225
    return k in d and d[k] is not None
226
227
228
def key_exist_value_not_none_not_v(k, d, value='', length=None):
229
    # Return True if:
230
    # - key k exists
231
    # - d[k] is not None
232
    # - d[k] != value
233
    # - if length is not None and len(d[k]) >= length
234
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
235
236
237
def disable(class_name, var):
238
    """Set disable_<var> to True in the class class_name."""
239
    setattr(class_name, 'enable_' + var, False)
240
    setattr(class_name, 'disable_' + var, True)
241
242
243
def enable(class_name, var):
244
    """Set disable_<var> to False in the class class_name."""
245
    setattr(class_name, 'enable_' + var, True)
246
    setattr(class_name, 'disable_' + var, False)
247
248
249
def safe_makedirs(path):
250
    """A safe function for creating a directory tree."""
251
    try:
252
        os.makedirs(path)
253
    except OSError as err:
254
        if err.errno == errno.EEXIST:
255
            if not os.path.isdir(path):
256
                raise
257
        else:
258
            raise
259
260
261
def get_time_diffs(ref, now):
262
    if isinstance(ref, int):
263
        diff = now - datetime.fromtimestamp(ref)
264
    elif isinstance(ref, datetime):
265
        diff = now - ref
266
    elif not ref:
267
        diff = 0
268
269
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
270
271
272
def get_first_true_val(conds):
273
    return next(key for key, val in conds.items() if val)
274
275
276
def maybe_add_plural(count):
277
    return "s" if count > 1 else ""
278
279
280
def build_str_when_more_than_seven_days(day_diff, unit):
281
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
282
283
    count = day_diff // scale
284
285
    return str(count) + " " + unit + maybe_add_plural(count)
286
287
288
def pretty_date(ref, now=None):
289
    """
290
    Get a datetime object or a int() Epoch timestamp and return a
291
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
292
    'just now', etc
293
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
294
295
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
296
    break the function. Get back to the old fashion way.
297
    """
298
    if not now:
299
        now = datetime.now()
300
    if isinstance(ref, int):
301
        diff = now - datetime.fromtimestamp(ref)
302
    elif isinstance(ref, datetime):
303
        diff = now - ref
304
    elif not ref:
305
        diff = 0
306
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
307
    day_diff = diff.days
308
309
    if day_diff < 0:
310
        return ''
311
312
    if day_diff == 0:
313
        if second_diff < 10:
314
            return "just now"
315
        if second_diff < 60:
316
            return str(second_diff) + " secs"
317
        if second_diff < 120:
318
            return "a min"
319
        if second_diff < 3600:
320
            return str(second_diff // 60) + " mins"
321
        if second_diff < 7200:
322
            return "an hour"
323
        if second_diff < 86400:
324
            return str(second_diff // 3600) + " hours"
325
    if day_diff == 1:
326
        return "yesterday"
327
    if day_diff < 7:
328
        return str(day_diff) + " days" if day_diff > 1 else "a day"
329
    if day_diff < 31:
330
        week = day_diff // 7
331
        return str(week) + " weeks" if week > 1 else "a week"
332
    if day_diff < 365:
333
        month = day_diff // 30
334
        return str(month) + " months" if month > 1 else "a month"
335
    year = day_diff // 365
336
    return str(year) + " years" if year > 1 else "an year"
337
338
339
def urlopen_auth(url, username, password):
340
    """Open a url with basic auth"""
341
    return urlopen(
342
        Request(
343
            url,
344
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
345
        )
346
    )
347
348
349
def json_dumps(data) -> bytes:
350
    """Return the object data in a JSON format.
351
352
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
353
    """
354
    try:
355
        res = json.dumps(data)
356
    except UnicodeDecodeError:
357
        res = json.dumps(data, ensure_ascii=False)
358
    # ujson & json libs return strings, but our contract expects bytes
359
    return b(res)
360
361
362
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
363
    """Load a JSON buffer into memory as a Python object"""
364
    return json.loads(data)
365
366
367
def list_to_dict(data):
368
    """Convert a list of dict (with key in 'key') to a dict with key as key and value as value."""
369
    if not isinstance(data, list):
370
        return None
371
    return {item[item['key']]: item for item in data if 'key' in item}
372
373
374
def dictlist(data, item):
375
    if isinstance(data, dict):
376
        try:
377
            return {item: data[item]}
378
        except (TypeError, IndexError, KeyError):
379
            return None
380
    elif isinstance(data, list):
381
        try:
382
            # Source:
383
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
384
            # But https://github.com/nicolargo/glances/issues/1401
385
            return {item: list(map(itemgetter(item), data))}
386
        except (TypeError, IndexError, KeyError):
387
            return None
388
    else:
389
        return None
390
391
392
def dictlist_json_dumps(data, item):
393
    dl = dictlist(data, item)
394
    if dl is None:
395
        return None
396
    return json_dumps(dl)
397
398
399
def dictlist_first_key_value(data: list[dict], key, value) -> Optional[dict]:
400
    """In a list of dict, return first item where key=value or none if not found."""
401
    try:
402
        ret = next(item for item in data if key in item and item[key] == value)
403
    except StopIteration:
404
        ret = None
405
    return ret
406
407
408
def auto_unit(number, low_precision=False, min_symbol='K', none_symbol='-'):
409
    """Make a nice human-readable string out of number.
410
411
    Number of decimal places increases as quantity approaches 1.
412
    CASE: 613421788        RESULT:       585M low_precision:       585M
413
    CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
414
    CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
415
    CASE: 838471403472     RESULT:       781G low_precision:       781G
416
    CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
417
    CASE: 1073741824       RESULT:      1024M low_precision:      1024M
418
    CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
419
420
    :low_precision: returns less decimal places potentially (default is False)
421
                    sacrificing precision for more readability.
422
    :min_symbol: Do not approach if number < min_symbol (default is K)
423
    :decimal_count: if set, force the number of decimal number (default is None)
424
    """
425
    if number is None:
426
        return none_symbol
427
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
428
    if min_symbol in symbols:
429
        symbols = symbols[symbols.index(min_symbol) :]
430
    prefix = {
431
        'Y': 1208925819614629174706176,
432
        'Z': 1180591620717411303424,
433
        'E': 1152921504606846976,
434
        'P': 1125899906842624,
435
        'T': 1099511627776,
436
        'G': 1073741824,
437
        'M': 1048576,
438
        'K': 1024,
439
    }
440
441
    if number == 0:
442
        # Avoid 0.0
443
        return '0'
444
445
    decimal_precision = 2
446
    for symbol in reversed(symbols):
447
        value = float(number) / prefix[symbol]
448
        if value > 1:
449
            decimal_precision = 0
450
            if value < 10:
451
                decimal_precision = 2
452
            elif value < 100:
453
                decimal_precision = 1
454
            if low_precision:
455
                if symbol in 'MK':
456
                    decimal_precision = 0
457
                else:
458
                    decimal_precision = min(1, decimal_precision)
459
            elif symbol in 'K':
460
                decimal_precision = 0
461
            return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
462
463
    return f'{number:.{decimal_precision}f}'
464
465
466
def string_value_to_float(s):
467
    """Convert a string with a value and an unit to a float.
468
    Example:
469
    '12.5 MB' -> 12500000.0
470
    '32.5 GB' -> 32500000000.0
471
    Args:
472
        s (string): Input string with value and unit
473
    Output:
474
        float: The value in float
475
    """
476
    convert_dict = {
477
        None: 1,
478
        'B': 1,
479
        'KB': 1000,
480
        'MB': 1000000,
481
        'GB': 1000000000,
482
        'TB': 1000000000000,
483
        'PB': 1000000000000000,
484
    }
485
    unpack_string = [
486
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
487
    ]
488
    if len(unpack_string) == 2:
489
        value, unit = unpack_string
490
    elif len(unpack_string) == 1:
491
        value = unpack_string[0]
492
        unit = None
493
    else:
494
        return None
495
    try:
496
        value = float(unpack_string[0])
497
    except ValueError:
498
        return None
499
    return value * convert_dict[unit]
500
501
502
def file_exists(filename):
503
    """Return True if the file exists and is readable."""
504
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
505
506
507
def folder_size(path, errno=0):
508
    """Return a tuple with the size of the directory given by path and the errno.
509
    If an error occurs (for example one file or subfolder is not accessible),
510
    errno is set to the error number.
511
512
    path: <string>
513
    errno: <int> Should always be 0 when calling the function"""
514
    ret_size = 0
515
    ret_err = errno
516
    try:
517
        for f in os.scandir(path):
518
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
519
                ret = folder_size(os.path.join(path, f.name), ret_err)
520
                ret_size += ret[0]
521
                ret_err = ret[1]
522
            else:
523
                try:
524
                    ret_size += f.stat().st_size
525
                except OSError as e:
526
                    ret_err = e.errno
527
    except (OSError, PermissionError) as e:
528
        return 0, e.errno
529
    else:
530
        return ret_size, ret_err
531
532
533
def _get_ttl_hash(ttl):
534
    """A simple (dummy) function to return a hash based on the current second.
535
    TODO: Implement a real TTL mechanism.
536
    """
537
    if ttl is None:
538
        return 0
539
    now = datetime.now()
540
    return now.second
541
542
543
def weak_lru_cache(maxsize=1, typed=False, ttl=None):
544
    """LRU Cache decorator that keeps a weak reference to self
545
546
    Warning: When used in a class, the class should implement __eq__(self, other) and __hash__(self) methods
547
548
    Source: https://stackoverflow.com/a/55990799"""
549
550
    def wrapper(func):
551
        @functools.lru_cache(maxsize, typed)
552
        def _func(_self, *args, ttl_hash=None, **kwargs):
553
            del ttl_hash  # Unused parameter, but kept for compatibility
554
            return func(_self(), *args, **kwargs)
555
556
        @functools.wraps(func)
557
        def inner(self, *args, **kwargs):
558
            return _func(weakref.ref(self), *args, ttl_hash=_get_ttl_hash(ttl), **kwargs)
559
560
        return inner
561
562
    return wrapper
563
564
565
def namedtuple_to_dict(data):
566
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
567
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in list(data.items())}
568
569
570
def list_of_namedtuple_to_list_of_dict(data):
571
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
572
    return [namedtuple_to_dict(d) for d in data]
573
574
575
def replace_special_chars(input_string, by=' '):
576
    """Replace some special char by another in the input_string
577
    Return: the string with the chars replaced"""
578
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
579
580
581
def atoi(text):
582
    return int(text) if text.isdigit() else text
583
584
585
def natural_keys(text):
586
    """Return a text in a natural/human readable format."""
587
    return [atoi(c) for c in re.split(r'(\d+)', text)]
588
589
590 View Code Duplication
def exit_after(seconds, default=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
591
    """Exit the function if it takes more than 'seconds' seconds to complete.
592
    In this case, return the value of 'default' (default: None)."""
593
594
    def handler(q, func, args, kwargs):
595
        q.put(func(*args, **kwargs))
596
597
    def decorator(func):
598
        def wraps(*args, **kwargs):
599
            q = Queue()
600
            p = Process(target=handler, args=(q, func, args, kwargs))
601
            p.start()
602
            p.join(timeout=seconds)
603
            if not p.is_alive():
604
                return q.get()
605
606
            p.terminate()
607
            p.join(timeout=0.1)
608
            if p.is_alive():
609
                # Kill in case processes doesn't terminate
610
                # Happens with cases like broken NFS connections
611
                p.kill()
612
            return default
613
614
        return wraps
615
616
    return decorator
617