Test Failed
Push — master ( 73a01d...ef36eb )
by Nicolas
04:43
created

glances.globals.auto_unit()   D

Complexity

Conditions 12

Size

Total Lines 57
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 34
nop 4
dl 0
loc 57
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.globals.auto_unit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import multiprocessing
21
import os
22
import platform
23
import queue
24
import re
25
import subprocess
26
import sys
27
import weakref
28
from collections import OrderedDict
29
from configparser import ConfigParser, NoOptionError, NoSectionError
30
from datetime import datetime
31
from operator import itemgetter, methodcaller
32
from statistics import mean
33
from typing import Any, Optional, Union
34
from urllib.error import HTTPError, URLError
35
from urllib.parse import urlparse
36
from urllib.request import Request, urlopen
37
38
# Prefer faster libs for JSON (de)serialization
39
# Preference Order: orjson > ujson > json (builtin)
40
try:
41
    import orjson as json
42
43
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
44
except ImportError:
45
    # Need to log info but importing logger will cause cyclic imports
46
    pass
47
48
if 'json' not in globals():
49
    try:
50
        # Note: ujson is not officially supported
51
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
52
        import ujson as json
53
    except ImportError:
54
        import json
55
56
    # To allow ujson & json dumps to serialize datetime
57
    def _json_default(v: Any) -> Any:
58
        if isinstance(v, datetime):
59
            return v.isoformat()
60
        return v
61
62
    json.dumps = functools.partial(json.dumps, default=_json_default)
63
64
##############
65
# GLOBALS VARS
66
##############
67
68
# OS constants (some libraries/features are OS-dependent)
69
BSD = sys.platform.find('bsd') != -1
70
LINUX = sys.platform.startswith('linux')
71
MACOS = sys.platform.startswith('darwin')
72
SUNOS = sys.platform.startswith('sunos')
73
WINDOWS = sys.platform.startswith('win')
74
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
75
76
# Set the AMPs, plugins and export modules path
77
work_path = os.path.realpath(os.path.dirname(__file__))
78
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
79
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
80
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
81
sys_path = sys.path[:]
82
sys.path.insert(1, exports_path)
83
sys.path.insert(1, plugins_path)
84
sys.path.insert(1, amps_path)
85
86
# Types
87
text_type = str
88
binary_type = bytes
89
bool_type = bool
90
long = int
91
92
# Alias errors
93
PermissionError = OSError
94
95
# Alias methods
96
viewkeys = methodcaller('keys')
97
viewvalues = methodcaller('values')
98
viewitems = methodcaller('items')
99
100
# Multiprocessing start method (on POSIX system)
101
if LINUX or BSD or SUNOS or MACOS:
102
    ctx_mp_fork = multiprocessing.get_context('fork')
103
else:
104
    ctx_mp_fork = multiprocessing.get_context()
105
106
###################
107
# GLOBALS FUNCTIONS
108
###################
109
110
111
def printandflush(string):
112
    """Print and flush (used by stdout* outputs modules)"""
113
    print(string, flush=True)
114
115
116
def to_ascii(s):
117
    """Convert the bytes string to a ASCII string
118
    Useful to remove accent (diacritics)"""
119
    if isinstance(s, binary_type):
120
        return s.decode()
121
    return s.encode('ascii', 'ignore').decode()
122
123
124
def listitems(d):
125
    return list(d.items())
126
127
128
def listkeys(d):
129
    return list(d.keys())
130
131
132
def listvalues(d):
133
    return list(d.values())
134
135
136
def u(s, errors='replace'):
137
    if isinstance(s, text_type):
138
        return s
139
    return s.decode('utf-8', errors=errors)
140
141
142
def b(s, errors='replace'):
143
    if isinstance(s, binary_type):
144
        return s
145
    return s.encode('utf-8', errors=errors)
146
147
148
def nativestr(s, errors='replace'):
149
    if isinstance(s, text_type):
150
        return s
151
    if isinstance(s, (int, float)):
152
        return s.__str__()
153
    return s.decode('utf-8', errors=errors)
154
155
156
def system_exec(command):
157
    """Execute a system command and return the result as a str"""
158
    try:
159
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
160
    except Exception as e:
161
        res = f'ERROR: {e}'
162
    return res.rstrip()
163
164
165
def subsample(data, sampling):
166
    """Compute a simple mean subsampling.
167
168
    Data should be a list of numerical itervalues
169
170
    Return a subsampled list of sampling length
171
    """
172
    if len(data) <= sampling:
173
        return data
174
    sampling_length = int(round(len(data) / float(sampling)))
175
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
176
177
178
def time_series_subsample(data, sampling):
179
    """Compute a simple mean subsampling.
180
181
    Data should be a list of set (time, value)
182
183
    Return a subsampled list of sampling length
184
    """
185
    if len(data) <= sampling:
186
        return data
187
    t = [t[0] for t in data]
188
    v = [t[1] for t in data]
189
    sampling_length = int(round(len(data) / float(sampling)))
190
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
191
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
192
    return list(zip(t_subsampled, v_subsampled))
193
194
195
def to_fahrenheit(celsius):
196
    """Convert Celsius to Fahrenheit."""
197
    return celsius * 1.8 + 32
198
199
200
def is_admin():
201
    """
202
    https://stackoverflow.com/a/19719292
203
    @return: True if the current user is an 'Admin' whatever that
204
    means (root on Unix), otherwise False.
205
    Warning: The inner function fails unless you have Windows XP SP2 or
206
    higher. The failure causes a traceback to be printed and this
207
    function to return False.
208
    """
209
210
    if os.name == 'nt':
211
        import ctypes
212
        import traceback
213
214
        # WARNING: requires Windows XP SP2 or higher!
215
        try:
216
            return ctypes.windll.shell32.IsUserAnAdmin()
217
        except Exception as e:
218
            print(f"Admin check failed with error: {e}")
219
            traceback.print_exc()
220
            return False
221
    else:
222
        # Check for root on Posix
223
        return os.getuid() == 0
224
225
226
def key_exist_value_not_none(k, d):
227
    # Return True if:
228
    # - key k exists
229
    # - d[k] is not None
230
    return k in d and d[k] is not None
231
232
233
def key_exist_value_not_none_not_v(k, d, value='', length=None):
234
    # Return True if:
235
    # - key k exists
236
    # - d[k] is not None
237
    # - d[k] != value
238
    # - if length is not None and len(d[k]) >= length
239
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
240
241
242
def disable(class_name, var):
243
    """Set disable_<var> to True in the class class_name."""
244
    setattr(class_name, 'enable_' + var, False)
245
    setattr(class_name, 'disable_' + var, True)
246
247
248
def enable(class_name, var):
249
    """Set disable_<var> to False in the class class_name."""
250
    setattr(class_name, 'enable_' + var, True)
251
    setattr(class_name, 'disable_' + var, False)
252
253
254
def safe_makedirs(path):
255
    """A safe function for creating a directory tree."""
256
    try:
257
        os.makedirs(path)
258
    except OSError as err:
259
        if err.errno == errno.EEXIST:
260
            if not os.path.isdir(path):
261
                raise
262
        else:
263
            raise
264
265
266
def get_time_diffs(ref, now):
267
    if isinstance(ref, int):
268
        diff = now - datetime.fromtimestamp(ref)
269
    elif isinstance(ref, datetime):
270
        diff = now - ref
271
    elif not ref:
272
        diff = 0
273
274
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
275
276
277
def get_first_true_val(conds):
278
    return next(key for key, val in conds.items() if val)
279
280
281
def maybe_add_plural(count):
282
    return "s" if count > 1 else ""
283
284
285
def build_str_when_more_than_seven_days(day_diff, unit):
286
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
287
288
    count = day_diff // scale
289
290
    return str(count) + " " + unit + maybe_add_plural(count)
291
292
293
def pretty_date(ref, now=None):
294
    """
295
    Get a datetime object or a int() Epoch timestamp and return a
296
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
297
    'just now', etc
298
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
299
300
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
301
    break the function. Get back to the old fashion way.
302
    """
303
    if not now:
304
        now = datetime.now()
305
    if isinstance(ref, int):
306
        diff = now - datetime.fromtimestamp(ref)
307
    elif isinstance(ref, datetime):
308
        diff = now - ref
309
    elif not ref:
310
        diff = 0
311
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
312
    day_diff = diff.days
313
314
    if day_diff < 0:
315
        return ''
316
317
    if day_diff == 0:
318
        if second_diff < 10:
319
            return "just now"
320
        if second_diff < 60:
321
            return str(second_diff) + " secs"
322
        if second_diff < 120:
323
            return "a min"
324
        if second_diff < 3600:
325
            return str(second_diff // 60) + " mins"
326
        if second_diff < 7200:
327
            return "an hour"
328
        if second_diff < 86400:
329
            return str(second_diff // 3600) + " hours"
330
    if day_diff == 1:
331
        return "yesterday"
332
    if day_diff < 7:
333
        return str(day_diff) + " days" if day_diff > 1 else "a day"
334
    if day_diff < 31:
335
        week = day_diff // 7
336
        return str(week) + " weeks" if week > 1 else "a week"
337
    if day_diff < 365:
338
        month = day_diff // 30
339
        return str(month) + " months" if month > 1 else "a month"
340
    year = day_diff // 365
341
    return str(year) + " years" if year > 1 else "an year"
342
343
344
def urlopen_auth(url, username, password):
345
    """Open a url with basic auth"""
346
    return urlopen(
347
        Request(
348
            url,
349
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
350
        )
351
    )
352
353
354
def json_dumps(data) -> bytes:
355
    """Return the object data in a JSON format.
356
357
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
358
    """
359
    try:
360
        res = json.dumps(data)
361
    except UnicodeDecodeError:
362
        res = json.dumps(data, ensure_ascii=False)
363
    # ujson & json libs return strings, but our contract expects bytes
364
    return b(res)
365
366
367
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
368
    """Load a JSON buffer into memory as a Python object"""
369
    return json.loads(data)
370
371
372
def list_to_dict(data):
373
    """Convert a list of dict (with key in 'key') to a dict with key as key and value as value."""
374
    if not isinstance(data, list):
375
        return None
376
    return {item[item['key']]: item for item in data if 'key' in item}
377
378
379
def dictlist(data, item):
380
    if isinstance(data, dict):
381
        try:
382
            return {item: data[item]}
383
        except (TypeError, IndexError, KeyError):
384
            return None
385
    elif isinstance(data, list):
386
        try:
387
            # Source:
388
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
389
            # But https://github.com/nicolargo/glances/issues/1401
390
            return {item: list(map(itemgetter(item), data))}
391
        except (TypeError, IndexError, KeyError):
392
            return None
393
    else:
394
        return None
395
396
397
def dictlist_json_dumps(data, item):
398
    dl = dictlist(data, item)
399
    if dl is None:
400
        return None
401
    return json_dumps(dl)
402
403
404
def dictlist_first_key_value(data: list[dict], key, value) -> Optional[dict]:
405
    """In a list of dict, return first item where key=value or none if not found."""
406
    try:
407
        ret = next(item for item in data if key in item and item[key] == value)
408
    except StopIteration:
409
        ret = None
410
    return ret
411
412
413
def auto_unit(number, low_precision=False, min_symbol='K', none_symbol='-'):
414
    """Make a nice human-readable string out of number.
415
416
    Number of decimal places increases as quantity approaches 1.
417
    CASE: 613421788        RESULT:       585M low_precision:       585M
418
    CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
419
    CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
420
    CASE: 838471403472     RESULT:       781G low_precision:       781G
421
    CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
422
    CASE: 1073741824       RESULT:      1024M low_precision:      1024M
423
    CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
424
425
    :low_precision: returns less decimal places potentially (default is False)
426
                    sacrificing precision for more readability.
427
    :min_symbol: Do not approach if number < min_symbol (default is K)
428
    :decimal_count: if set, force the number of decimal number (default is None)
429
    """
430
    if number is None:
431
        return none_symbol
432
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
433
    if min_symbol in symbols:
434
        symbols = symbols[symbols.index(min_symbol) :]
435
    prefix = {
436
        'Y': 1208925819614629174706176,
437
        'Z': 1180591620717411303424,
438
        'E': 1152921504606846976,
439
        'P': 1125899906842624,
440
        'T': 1099511627776,
441
        'G': 1073741824,
442
        'M': 1048576,
443
        'K': 1024,
444
    }
445
446
    if number == 0:
447
        # Avoid 0.0
448
        return '0'
449
450
    # If a value is a float, decimal_precision is 2 else 0
451
    decimal_precision = 2 if isinstance(number, float) else 0
452
    for symbol in reversed(symbols):
453
        value = float(number) / prefix[symbol]
454
        if value > 1:
455
            decimal_precision = 0
456
            if value < 10:
457
                decimal_precision = 2
458
            elif value < 100:
459
                decimal_precision = 1
460
            if low_precision:
461
                if symbol in 'MK':
462
                    decimal_precision = 0
463
                else:
464
                    decimal_precision = min(1, decimal_precision)
465
            elif symbol in 'K':
466
                decimal_precision = 0
467
            return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
468
469
    return f'{number:.{decimal_precision}f}'
470
471
472
def string_value_to_float(s):
473
    """Convert a string with a value and an unit to a float.
474
    Example:
475
    '12.5 MB' -> 12500000.0
476
    '32.5 GB' -> 32500000000.0
477
    Args:
478
        s (string): Input string with value and unit
479
    Output:
480
        float: The value in float
481
    """
482
    convert_dict = {
483
        None: 1,
484
        'B': 1,
485
        'KB': 1000,
486
        'MB': 1000000,
487
        'GB': 1000000000,
488
        'TB': 1000000000000,
489
        'PB': 1000000000000000,
490
    }
491
    unpack_string = [
492
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
493
    ]
494
    if len(unpack_string) == 2:
495
        value, unit = unpack_string
496
    elif len(unpack_string) == 1:
497
        value = unpack_string[0]
498
        unit = None
499
    else:
500
        return None
501
    try:
502
        value = float(unpack_string[0])
503
    except ValueError:
504
        return None
505
    return value * convert_dict[unit]
506
507
508
def file_exists(filename):
509
    """Return True if the file exists and is readable."""
510
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
511
512
513
def folder_size(path, errno=0):
514
    """Return a tuple with the size of the directory given by path and the errno.
515
    If an error occurs (for example one file or subfolder is not accessible),
516
    errno is set to the error number.
517
518
    path: <string>
519
    errno: <int> Should always be 0 when calling the function"""
520
    ret_size = 0
521
    ret_err = errno
522
    try:
523
        for f in os.scandir(path):
524
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
525
                ret = folder_size(os.path.join(path, f.name), ret_err)
526
                ret_size += ret[0]
527
                ret_err = ret[1]
528
            else:
529
                try:
530
                    ret_size += f.stat().st_size
531
                except OSError as e:
532
                    ret_err = e.errno
533
    except (OSError, PermissionError) as e:
534
        return 0, e.errno
535
    else:
536
        return ret_size, ret_err
537
538
539
def _get_ttl_hash(ttl):
540
    """A simple (dummy) function to return a hash based on the current second.
541
    TODO: Implement a real TTL mechanism.
542
    """
543
    if ttl is None:
544
        return 0
545
    now = datetime.now()
546
    return now.second
547
548
549
def weak_lru_cache(maxsize=1, typed=False, ttl=None):
550
    """LRU Cache decorator that keeps a weak reference to self
551
552
    Warning: When used in a class, the class should implement __eq__(self, other) and __hash__(self) methods
553
554
    Source: https://stackoverflow.com/a/55990799"""
555
556
    def wrapper(func):
557
        @functools.lru_cache(maxsize, typed)
558
        def _func(_self, *args, ttl_hash=None, **kwargs):
559
            del ttl_hash  # Unused parameter, but kept for compatibility
560
            return func(_self(), *args, **kwargs)
561
562
        @functools.wraps(func)
563
        def inner(self, *args, **kwargs):
564
            return _func(weakref.ref(self), *args, ttl_hash=_get_ttl_hash(ttl), **kwargs)
565
566
        return inner
567
568
    return wrapper
569
570
571
def namedtuple_to_dict(data):
572
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
573
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in list(data.items())}
574
575
576
def list_of_namedtuple_to_list_of_dict(data):
577
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
578
    return [namedtuple_to_dict(d) for d in data]
579
580
581
def replace_special_chars(input_string, by=' '):
582
    """Replace some special char by another in the input_string
583
    Return: the string with the chars replaced"""
584
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
585
586
587
def atoi(text):
588
    return int(text) if text.isdigit() else text
589
590
591
def natural_keys(text):
592
    """Return a text in a natural/human readable format."""
593
    return [atoi(c) for c in re.split(r'(\d+)', text)]
594
595
596
def exit_after(seconds, default=None):
597
    """Exit the function if it takes more than 'seconds' seconds to complete.
598
    In this case, return the value of 'default' (default: None)."""
599
600
    def handler(q, func, args, kwargs):
601
        q.put(func(*args, **kwargs))
602
603
    def decorator(func):
604
        if not LINUX:
605
            return func
606
607
        def wraps(*args, **kwargs):
608
            try:
609
                q = ctx_mp_fork.Queue()
610
            except PermissionError:
611
                # Manage an exception in Snap packages on Linux
612
                # The strict mode prevent the use of multiprocessing.Queue()
613
                # There is a "dirty" hack:
614
                # https://forum.snapcraft.io/t/python-multiprocessing-permission-denied-in-strictly-confined-snap/15518/2
615
                # But i prefer to just disable the timeout feature in this case
616
                func(*args, **kwargs)
617
            else:
618
                p = ctx_mp_fork.Process(target=handler, args=(q, func, args, kwargs))
619
                p.start()
620
                p.join(timeout=seconds)
621
                if not p.is_alive():
622
                    return q.get()
623
                p.terminate()
624
                p.join(timeout=0.1)
625
                if p.is_alive():
626
                    # Kill in case processes doesn't terminate
627
                    # Happens with cases like broken NFS connections
628
                    p.kill()
629
            return default
630
631
        return wraps
632
633
    return decorator
634