Test Failed
Push — develop ( 0abc39...ef45c6 )
by Nicolas
02:52
created

glances.globals.auto_unit()   C

Complexity

Conditions 11

Size

Total Lines 56
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 34
nop 4
dl 0
loc 56
rs 5.4
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.globals.auto_unit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Optional, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
37
# Prefer faster libs for JSON (de)serialization
38
# Preference Order: orjson > ujson > json (builtin)
39
try:
40
    import orjson as json
41
42
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
43
except ImportError:
44
    # Need to log info but importing logger will cause cyclic imports
45
    pass
46
47
if 'json' not in globals():
48
    try:
49
        # Note: ujson is not officially supported
50
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
51
        import ujson as json
52
    except ImportError:
53
        import json
54
55
    # To allow ujson & json dumps to serialize datetime
56
    def _json_default(v: Any) -> Any:
57
        if isinstance(v, datetime):
58
            return v.isoformat()
59
        return v
60
61
    json.dumps = functools.partial(json.dumps, default=_json_default)
62
63
##############
64
# GLOBALS VARS
65
##############
66
67
# OS constants (some libraries/features are OS-dependent)
68
BSD = sys.platform.find('bsd') != -1
69
LINUX = sys.platform.startswith('linux')
70
MACOS = sys.platform.startswith('darwin')
71
SUNOS = sys.platform.startswith('sunos')
72
WINDOWS = sys.platform.startswith('win')
73
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
74
75
# Set the AMPs, plugins and export modules path
76
work_path = os.path.realpath(os.path.dirname(__file__))
77
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
78
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
79
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
80
sys_path = sys.path[:]
81
sys.path.insert(1, exports_path)
82
sys.path.insert(1, plugins_path)
83
sys.path.insert(1, amps_path)
84
85
# Types
86
text_type = str
87
binary_type = bytes
88
bool_type = bool
89
long = int
90
91
# Alias errors
92
PermissionError = OSError
93
94
# Alias methods
95
viewkeys = methodcaller('keys')
96
viewvalues = methodcaller('values')
97
viewitems = methodcaller('items')
98
99
100
###################
101
# GLOBALS FUNCTIONS
102
###################
103
104
105
def printandflush(string):
106
    """Print and flush (used by stdout* outputs modules)"""
107
    print(string, flush=True)
108
109
110
def to_ascii(s):
111
    """Convert the bytes string to a ASCII string
112
    Useful to remove accent (diacritics)"""
113
    if isinstance(s, binary_type):
114
        return s.decode()
115
    return s.encode('ascii', 'ignore').decode()
116
117
118
def listitems(d):
119
    return list(d.items())
120
121
122
def listkeys(d):
123
    return list(d.keys())
124
125
126
def listvalues(d):
127
    return list(d.values())
128
129
130
def u(s, errors='replace'):
131
    if isinstance(s, text_type):
132
        return s
133
    return s.decode('utf-8', errors=errors)
134
135
136
def b(s, errors='replace'):
137
    if isinstance(s, binary_type):
138
        return s
139
    return s.encode('utf-8', errors=errors)
140
141
142
def nativestr(s, errors='replace'):
143
    if isinstance(s, text_type):
144
        return s
145
    if isinstance(s, (int, float)):
146
        return s.__str__()
147
    return s.decode('utf-8', errors=errors)
148
149
150
def system_exec(command):
151
    """Execute a system command and return the result as a str"""
152
    try:
153
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
154
    except Exception as e:
155
        res = f'ERROR: {e}'
156
    return res.rstrip()
157
158
159
def subsample(data, sampling):
160
    """Compute a simple mean subsampling.
161
162
    Data should be a list of numerical itervalues
163
164
    Return a subsampled list of sampling length
165
    """
166
    if len(data) <= sampling:
167
        return data
168
    sampling_length = int(round(len(data) / float(sampling)))
169
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
170
171
172
def time_series_subsample(data, sampling):
173
    """Compute a simple mean subsampling.
174
175
    Data should be a list of set (time, value)
176
177
    Return a subsampled list of sampling length
178
    """
179
    if len(data) <= sampling:
180
        return data
181
    t = [t[0] for t in data]
182
    v = [t[1] for t in data]
183
    sampling_length = int(round(len(data) / float(sampling)))
184
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
185
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
186
    return list(zip(t_subsampled, v_subsampled))
187
188
189
def to_fahrenheit(celsius):
190
    """Convert Celsius to Fahrenheit."""
191
    return celsius * 1.8 + 32
192
193
194
def is_admin():
195
    """
196
    https://stackoverflow.com/a/19719292
197
    @return: True if the current user is an 'Admin' whatever that
198
    means (root on Unix), otherwise False.
199
    Warning: The inner function fails unless you have Windows XP SP2 or
200
    higher. The failure causes a traceback to be printed and this
201
    function to return False.
202
    """
203
204
    if os.name == 'nt':
205
        import ctypes
206
        import traceback
207
208
        # WARNING: requires Windows XP SP2 or higher!
209
        try:
210
            return ctypes.windll.shell32.IsUserAnAdmin()
211
        except Exception as e:
212
            print(f"Admin check failed with error: {e}")
213
            traceback.print_exc()
214
            return False
215
    else:
216
        # Check for root on Posix
217
        return os.getuid() == 0
218
219
220
def key_exist_value_not_none(k, d):
221
    # Return True if:
222
    # - key k exists
223
    # - d[k] is not None
224
    return k in d and d[k] is not None
225
226
227
def key_exist_value_not_none_not_v(k, d, value='', length=None):
228
    # Return True if:
229
    # - key k exists
230
    # - d[k] is not None
231
    # - d[k] != value
232
    # - if length is not None and len(d[k]) >= length
233
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
234
235
236
def disable(class_name, var):
237
    """Set disable_<var> to True in the class class_name."""
238
    setattr(class_name, 'enable_' + var, False)
239
    setattr(class_name, 'disable_' + var, True)
240
241
242
def enable(class_name, var):
243
    """Set disable_<var> to False in the class class_name."""
244
    setattr(class_name, 'enable_' + var, True)
245
    setattr(class_name, 'disable_' + var, False)
246
247
248
def safe_makedirs(path):
249
    """A safe function for creating a directory tree."""
250
    try:
251
        os.makedirs(path)
252
    except OSError as err:
253
        if err.errno == errno.EEXIST:
254
            if not os.path.isdir(path):
255
                raise
256
        else:
257
            raise
258
259
260
def get_time_diffs(ref, now):
261
    if isinstance(ref, int):
262
        diff = now - datetime.fromtimestamp(ref)
263
    elif isinstance(ref, datetime):
264
        diff = now - ref
265
    elif not ref:
266
        diff = 0
267
268
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
269
270
271
def get_first_true_val(conds):
272
    return next(key for key, val in conds.items() if val)
273
274
275
def maybe_add_plural(count):
276
    return "s" if count > 1 else ""
277
278
279
def build_str_when_more_than_seven_days(day_diff, unit):
280
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
281
282
    count = day_diff // scale
283
284
    return str(count) + " " + unit + maybe_add_plural(count)
285
286
287
def pretty_date(ref, now=None):
288
    """
289
    Get a datetime object or a int() Epoch timestamp and return a
290
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
291
    'just now', etc
292
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
293
294
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
295
    break the function. Get back to the old fashion way.
296
    """
297
    if not now:
298
        now = datetime.now()
299
    if isinstance(ref, int):
300
        diff = now - datetime.fromtimestamp(ref)
301
    elif isinstance(ref, datetime):
302
        diff = now - ref
303
    elif not ref:
304
        diff = 0
305
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
306
    day_diff = diff.days
307
308
    if day_diff < 0:
309
        return ''
310
311
    if day_diff == 0:
312
        if second_diff < 10:
313
            return "just now"
314
        if second_diff < 60:
315
            return str(second_diff) + " secs"
316
        if second_diff < 120:
317
            return "a min"
318
        if second_diff < 3600:
319
            return str(second_diff // 60) + " mins"
320
        if second_diff < 7200:
321
            return "an hour"
322
        if second_diff < 86400:
323
            return str(second_diff // 3600) + " hours"
324
    if day_diff == 1:
325
        return "yesterday"
326
    if day_diff < 7:
327
        return str(day_diff) + " days" if day_diff > 1 else "a day"
328
    if day_diff < 31:
329
        week = day_diff // 7
330
        return str(week) + " weeks" if week > 1 else "a week"
331
    if day_diff < 365:
332
        month = day_diff // 30
333
        return str(month) + " months" if month > 1 else "a month"
334
    year = day_diff // 365
335
    return str(year) + " years" if year > 1 else "an year"
336
337
338
def urlopen_auth(url, username, password):
339
    """Open a url with basic auth"""
340
    return urlopen(
341
        Request(
342
            url,
343
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
344
        )
345
    )
346
347
348
def json_dumps(data) -> bytes:
349
    """Return the object data in a JSON format.
350
351
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
352
    """
353
    try:
354
        res = json.dumps(data)
355
    except UnicodeDecodeError:
356
        res = json.dumps(data, ensure_ascii=False)
357
    # ujson & json libs return strings, but our contract expects bytes
358
    return b(res)
359
360
361
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
362
    """Load a JSON buffer into memory as a Python object"""
363
    return json.loads(data)
364
365
366
def list_to_dict(data):
367
    """Convert a list of dict (with key in 'key') to a dict with key as key and value as value."""
368
    if not isinstance(data, list):
369
        return None
370
    return {item[item['key']]: item for item in data if 'key' in item}
371
372
373
def dictlist(data, item):
374
    if isinstance(data, dict):
375
        try:
376
            return {item: data[item]}
377
        except (TypeError, IndexError, KeyError):
378
            return None
379
    elif isinstance(data, list):
380
        try:
381
            # Source:
382
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
383
            # But https://github.com/nicolargo/glances/issues/1401
384
            return {item: list(map(itemgetter(item), data))}
385
        except (TypeError, IndexError, KeyError):
386
            return None
387
    else:
388
        return None
389
390
391
def dictlist_json_dumps(data, item):
392
    dl = dictlist(data, item)
393
    if dl is None:
394
        return None
395
    return json_dumps(dl)
396
397
398
def dictlist_first_key_value(data: list[dict], key, value) -> Optional[dict]:
399
    """In a list of dict, return first item where key=value or none if not found."""
400
    try:
401
        ret = next(item for item in data if key in item and item[key] == value)
402
    except StopIteration:
403
        ret = None
404
    return ret
405
406
407
def auto_unit(number, low_precision=False, min_symbol='K', none_symbol='-'):
408
    """Make a nice human-readable string out of number.
409
410
    Number of decimal places increases as quantity approaches 1.
411
    CASE: 613421788        RESULT:       585M low_precision:       585M
412
    CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
413
    CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
414
    CASE: 838471403472     RESULT:       781G low_precision:       781G
415
    CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
416
    CASE: 1073741824       RESULT:      1024M low_precision:      1024M
417
    CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
418
419
    :low_precision: returns less decimal places potentially (default is False)
420
                    sacrificing precision for more readability.
421
    :min_symbol: Do not approach if number < min_symbol (default is K)
422
    :decimal_count: if set, force the number of decimal number (default is None)
423
    """
424
    if number is None:
425
        return none_symbol
426
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
427
    if min_symbol in symbols:
428
        symbols = symbols[symbols.index(min_symbol) :]
429
    prefix = {
430
        'Y': 1208925819614629174706176,
431
        'Z': 1180591620717411303424,
432
        'E': 1152921504606846976,
433
        'P': 1125899906842624,
434
        'T': 1099511627776,
435
        'G': 1073741824,
436
        'M': 1048576,
437
        'K': 1024,
438
    }
439
440
    if number == 0:
441
        # Avoid 0.0
442
        return '0'
443
444
    decimal_precision = 2
445
    for symbol in reversed(symbols):
446
        value = float(number) / prefix[symbol]
447
        if value > 1:
448
            decimal_precision = 0
449
            if value < 10:
450
                decimal_precision = 2
451
            elif value < 100:
452
                decimal_precision = 1
453
            if low_precision:
454
                if symbol in 'MK':
455
                    decimal_precision = 0
456
                else:
457
                    decimal_precision = min(1, decimal_precision)
458
            elif symbol in 'K':
459
                decimal_precision = 0
460
            return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
461
462
    return f'{number:.{decimal_precision}f}'
463
464
465
def string_value_to_float(s):
466
    """Convert a string with a value and an unit to a float.
467
    Example:
468
    '12.5 MB' -> 12500000.0
469
    '32.5 GB' -> 32500000000.0
470
    Args:
471
        s (string): Input string with value and unit
472
    Output:
473
        float: The value in float
474
    """
475
    convert_dict = {
476
        None: 1,
477
        'B': 1,
478
        'KB': 1000,
479
        'MB': 1000000,
480
        'GB': 1000000000,
481
        'TB': 1000000000000,
482
        'PB': 1000000000000000,
483
    }
484
    unpack_string = [
485
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
486
    ]
487
    if len(unpack_string) == 2:
488
        value, unit = unpack_string
489
    elif len(unpack_string) == 1:
490
        value = unpack_string[0]
491
        unit = None
492
    else:
493
        return None
494
    try:
495
        value = float(unpack_string[0])
496
    except ValueError:
497
        return None
498
    return value * convert_dict[unit]
499
500
501
def file_exists(filename):
502
    """Return True if the file exists and is readable."""
503
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
504
505
506
def folder_size(path, errno=0):
507
    """Return a tuple with the size of the directory given by path and the errno.
508
    If an error occurs (for example one file or subfolder is not accessible),
509
    errno is set to the error number.
510
511
    path: <string>
512
    errno: <int> Should always be 0 when calling the function"""
513
    ret_size = 0
514
    ret_err = errno
515
    try:
516
        for f in os.scandir(path):
517
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
518
                ret = folder_size(os.path.join(path, f.name), ret_err)
519
                ret_size += ret[0]
520
                ret_err = ret[1]
521
            else:
522
                try:
523
                    ret_size += f.stat().st_size
524
                except OSError as e:
525
                    ret_err = e.errno
526
    except (OSError, PermissionError) as e:
527
        return 0, e.errno
528
    else:
529
        return ret_size, ret_err
530
531
532
def _get_ttl_hash(ttl):
533
    """A simple (dummy) function to return a hash based on the current second.
534
    TODO: Implement a real TTL mechanism.
535
    """
536
    if ttl is None:
537
        return 0
538
    now = datetime.now()
539
    return now.second
540
541
542
def weak_lru_cache(maxsize=1, typed=False, ttl=None):
543
    """LRU Cache decorator that keeps a weak reference to self
544
545
    Warning: When used in a class, the class should implement __eq__(self, other) and __hash__(self) methods
546
547
    Source: https://stackoverflow.com/a/55990799"""
548
549
    def wrapper(func):
550
        @functools.lru_cache(maxsize, typed)
551
        def _func(_self, *args, ttl_hash=None, **kwargs):
552
            del ttl_hash  # Unused parameter, but kept for compatibility
553
            return func(_self(), *args, **kwargs)
554
555
        @functools.wraps(func)
556
        def inner(self, *args, **kwargs):
557
            return _func(weakref.ref(self), *args, ttl_hash=_get_ttl_hash(ttl), **kwargs)
558
559
        return inner
560
561
    return wrapper
562
563
564
def namedtuple_to_dict(data):
565
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
566
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in list(data.items())}
567
568
569
def list_of_namedtuple_to_list_of_dict(data):
570
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
571
    return [namedtuple_to_dict(d) for d in data]
572
573
574
def replace_special_chars(input_string, by=' '):
575
    """Replace some special char by another in the input_string
576
    Return: the string with the chars replaced"""
577
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
578
579
580
def atoi(text):
581
    return int(text) if text.isdigit() else text
582
583
584
def natural_keys(text):
585
    """Return a text in a natural/human readable format."""
586
    return [atoi(c) for c in re.split(r'(\d+)', text)]
587