Test Failed
Push — master ( ce0fc3...e09530 )
by Nicolas
03:36
created

glances.globals.pretty_date()   F

Complexity

Conditions 21

Size

Total Lines 49
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 21
eloc 38
nop 2
dl 0
loc 49
rs 0
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like glances.globals.pretty_date() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
37
# Prefer faster libs for JSON (de)serialization
38
# Preference Order: orjson > ujson > json (builtin)
39
try:
40
    import orjson as json
41
42
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
43
except ImportError:
44
    # Need to log info but importing logger will cause cyclic imports
45
    pass
46
47
if 'json' not in globals():
48
    try:
49
        # Note: ujson is not officially supported
50
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
51
        import ujson as json
52
    except ImportError:
53
        import json
54
55
    # To allow ujson & json dumps to serialize datetime
56
    def _json_default(v: Any) -> Any:
57
        if isinstance(v, datetime):
58
            return v.isoformat()
59
        return v
60
61
    json.dumps = functools.partial(json.dumps, default=_json_default)
62
63
##############
64
# GLOBALS VARS
65
##############
66
67
# OS constants (some libraries/features are OS-dependent)
68
BSD = sys.platform.find('bsd') != -1
69
LINUX = sys.platform.startswith('linux')
70
MACOS = sys.platform.startswith('darwin')
71
SUNOS = sys.platform.startswith('sunos')
72
WINDOWS = sys.platform.startswith('win')
73
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
74
75
# Set the AMPs, plugins and export modules path
76
work_path = os.path.realpath(os.path.dirname(__file__))
77
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
78
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
79
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
80
sys_path = sys.path[:]
81
sys.path.insert(1, exports_path)
82
sys.path.insert(1, plugins_path)
83
sys.path.insert(1, amps_path)
84
85
# Types
86
text_type = str
87
binary_type = bytes
88
bool_type = bool
89
long = int
90
91
# Alias errors
92
PermissionError = OSError
93
94
# Alias methods
95
viewkeys = methodcaller('keys')
96
viewvalues = methodcaller('values')
97
viewitems = methodcaller('items')
98
99
100
###################
101
# GLOBALS FUNCTIONS
102
###################
103
104
105
def printandflush(string):
106
    """Print and flush (used by stdout* outputs modules)"""
107
    print(string, flush=True)
108
109
110
def to_ascii(s):
111
    """Convert the bytes string to a ASCII string
112
    Useful to remove accent (diacritics)"""
113
    if isinstance(s, binary_type):
114
        return s.decode()
115
    return s.encode('ascii', 'ignore').decode()
116
117
118
def listitems(d):
119
    return list(d.items())
120
121
122
def listkeys(d):
123
    return list(d.keys())
124
125
126
def listvalues(d):
127
    return list(d.values())
128
129
130
def iteritems(d):
131
    return iter(d.items())
132
133
134
def iterkeys(d):
135
    return iter(d.keys())
136
137
138
def itervalues(d):
139
    return iter(d.values())
140
141
142
def u(s, errors='replace'):
143
    if isinstance(s, text_type):
144
        return s
145
    return s.decode('utf-8', errors=errors)
146
147
148
def b(s, errors='replace'):
149
    if isinstance(s, binary_type):
150
        return s
151
    return s.encode('utf-8', errors=errors)
152
153
154
def nativestr(s, errors='replace'):
155
    if isinstance(s, text_type):
156
        return s
157
    if isinstance(s, (int, float)):
158
        return s.__str__()
159
    return s.decode('utf-8', errors=errors)
160
161
162
def system_exec(command):
163
    """Execute a system command and return the result as a str"""
164
    try:
165
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
166
    except Exception as e:
167
        res = f'ERROR: {e}'
168
    return res.rstrip()
169
170
171
def subsample(data, sampling):
172
    """Compute a simple mean subsampling.
173
174
    Data should be a list of numerical itervalues
175
176
    Return a subsampled list of sampling length
177
    """
178
    if len(data) <= sampling:
179
        return data
180
    sampling_length = int(round(len(data) / float(sampling)))
181
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
182
183
184
def time_series_subsample(data, sampling):
185
    """Compute a simple mean subsampling.
186
187
    Data should be a list of set (time, value)
188
189
    Return a subsampled list of sampling length
190
    """
191
    if len(data) <= sampling:
192
        return data
193
    t = [t[0] for t in data]
194
    v = [t[1] for t in data]
195
    sampling_length = int(round(len(data) / float(sampling)))
196
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
197
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
198
    return list(zip(t_subsampled, v_subsampled))
199
200
201
def to_fahrenheit(celsius):
202
    """Convert Celsius to Fahrenheit."""
203
    return celsius * 1.8 + 32
204
205
206
def is_admin():
207
    """
208
    https://stackoverflow.com/a/19719292
209
    @return: True if the current user is an 'Admin' whatever that
210
    means (root on Unix), otherwise False.
211
    Warning: The inner function fails unless you have Windows XP SP2 or
212
    higher. The failure causes a traceback to be printed and this
213
    function to return False.
214
    """
215
216
    if os.name == 'nt':
217
        import ctypes
218
        import traceback
219
220
        # WARNING: requires Windows XP SP2 or higher!
221
        try:
222
            return ctypes.windll.shell32.IsUserAnAdmin()
223
        except Exception as e:
224
            print(f"Admin check failed with error: {e}")
225
            traceback.print_exc()
226
            return False
227
    else:
228
        # Check for root on Posix
229
        return os.getuid() == 0
230
231
232
def key_exist_value_not_none(k, d):
233
    # Return True if:
234
    # - key k exists
235
    # - d[k] is not None
236
    return k in d and d[k] is not None
237
238
239
def key_exist_value_not_none_not_v(k, d, value='', length=None):
240
    # Return True if:
241
    # - key k exists
242
    # - d[k] is not None
243
    # - d[k] != value
244
    # - if length is not None and len(d[k]) >= length
245
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
246
247
248
def disable(class_name, var):
249
    """Set disable_<var> to True in the class class_name."""
250
    setattr(class_name, 'enable_' + var, False)
251
    setattr(class_name, 'disable_' + var, True)
252
253
254
def enable(class_name, var):
255
    """Set disable_<var> to False in the class class_name."""
256
    setattr(class_name, 'enable_' + var, True)
257
    setattr(class_name, 'disable_' + var, False)
258
259
260
def safe_makedirs(path):
261
    """A safe function for creating a directory tree."""
262
    try:
263
        os.makedirs(path)
264
    except OSError as err:
265
        if err.errno == errno.EEXIST:
266
            if not os.path.isdir(path):
267
                raise
268
        else:
269
            raise
270
271
272
def get_time_diffs(ref, now):
273
    if isinstance(ref, int):
274
        diff = now - datetime.fromtimestamp(ref)
275
    elif isinstance(ref, datetime):
276
        diff = now - ref
277
    elif not ref:
278
        diff = 0
279
280
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
281
282
283
def get_first_true_val(conds):
284
    return next(key for key, val in conds.items() if val)
285
286
287
def maybe_add_plural(count):
288
    return "s" if count > 1 else ""
289
290
291
def build_str_when_more_than_seven_days(day_diff, unit):
292
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
293
294
    count = day_diff // scale
295
296
    return str(count) + " " + unit + maybe_add_plural(count)
297
298
299
def pretty_date(ref, now=None):
300
    """
301
    Get a datetime object or a int() Epoch timestamp and return a
302
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
303
    'just now', etc
304
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
305
306
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
307
    break the function. Get back to the old fashion way.
308
    """
309
    if not now:
310
        now = datetime.now()
311
    if isinstance(ref, int):
312
        diff = now - datetime.fromtimestamp(ref)
313
    elif isinstance(ref, datetime):
314
        diff = now - ref
315
    elif not ref:
316
        diff = 0
317
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
318
    day_diff = diff.days
319
320
    if day_diff < 0:
321
        return ''
322
323
    if day_diff == 0:
324
        if second_diff < 10:
325
            return "just now"
326
        if second_diff < 60:
327
            return str(second_diff) + " secs"
328
        if second_diff < 120:
329
            return "a min"
330
        if second_diff < 3600:
331
            return str(second_diff // 60) + " mins"
332
        if second_diff < 7200:
333
            return "an hour"
334
        if second_diff < 86400:
335
            return str(second_diff // 3600) + " hours"
336
    if day_diff == 1:
337
        return "yesterday"
338
    if day_diff < 7:
339
        return str(day_diff) + " days" if day_diff > 1 else "a day"
340
    if day_diff < 31:
341
        week = day_diff // 7
342
        return str(week) + " weeks" if week > 1 else "a week"
343
    if day_diff < 365:
344
        month = day_diff // 30
345
        return str(month) + " months" if month > 1 else "a month"
346
    year = day_diff // 365
347
    return str(year) + " years" if year > 1 else "an year"
348
349
350
def urlopen_auth(url, username, password):
351
    """Open a url with basic auth"""
352
    return urlopen(
353
        Request(
354
            url,
355
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
356
        )
357
    )
358
359
360
def json_dumps(data) -> bytes:
361
    """Return the object data in a JSON format.
362
363
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
364
    """
365
    try:
366
        res = json.dumps(data)
367
    except UnicodeDecodeError:
368
        res = json.dumps(data, ensure_ascii=False)
369
    # ujson & json libs return strings, but our contract expects bytes
370
    return b(res)
371
372
373
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
374
    """Load a JSON buffer into memory as a Python object"""
375
    return json.loads(data)
376
377
378
def dictlist(data, item):
379
    if isinstance(data, dict):
380
        try:
381
            return {item: data[item]}
382
        except (TypeError, IndexError, KeyError):
383
            return None
384
    elif isinstance(data, list):
385
        try:
386
            # Source:
387
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
388
            # But https://github.com/nicolargo/glances/issues/1401
389
            return {item: list(map(itemgetter(item), data))}
390
        except (TypeError, IndexError, KeyError):
391
            return None
392
    else:
393
        return None
394
395
396
def json_dumps_dictlist(data, item):
397
    dl = dictlist(data, item)
398
    if dl is None:
399
        return None
400
    return json_dumps(dl)
401
402
403
def string_value_to_float(s):
404
    """Convert a string with a value and an unit to a float.
405
    Example:
406
    '12.5 MB' -> 12500000.0
407
    '32.5 GB' -> 32500000000.0
408
    Args:
409
        s (string): Input string with value and unit
410
    Output:
411
        float: The value in float
412
    """
413
    convert_dict = {
414
        None: 1,
415
        'B': 1,
416
        'KB': 1000,
417
        'MB': 1000000,
418
        'GB': 1000000000,
419
        'TB': 1000000000000,
420
        'PB': 1000000000000000,
421
    }
422
    unpack_string = [
423
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
424
    ]
425
    if len(unpack_string) == 2:
426
        value, unit = unpack_string
427
    elif len(unpack_string) == 1:
428
        value = unpack_string[0]
429
        unit = None
430
    else:
431
        return None
432
    try:
433
        value = float(unpack_string[0])
434
    except ValueError:
435
        return None
436
    return value * convert_dict[unit]
437
438
439
def file_exists(filename):
440
    """Return True if the file exists and is readable."""
441
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
442
443
444
def folder_size(path, errno=0):
445
    """Return a tuple with the size of the directory given by path and the errno.
446
    If an error occurs (for example one file or subfolder is not accessible),
447
    errno is set to the error number.
448
449
    path: <string>
450
    errno: <int> Should always be 0 when calling the function"""
451
    ret_size = 0
452
    ret_err = errno
453
    try:
454
        for f in os.scandir(path):
455
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
456
                ret = folder_size(os.path.join(path, f.name), ret_err)
457
                ret_size += ret[0]
458
                ret_err = ret[1]
459
            else:
460
                try:
461
                    ret_size += f.stat().st_size
462
                except OSError as e:
463
                    ret_err = e.errno
464
    except (OSError, PermissionError) as e:
465
        return 0, e.errno
466
    else:
467
        return ret_size, ret_err
468
469
470
def weak_lru_cache(maxsize=128, typed=False):
471
    """LRU Cache decorator that keeps a weak reference to self
472
    Source: https://stackoverflow.com/a/55990799"""
473
474
    def wrapper(func):
475
        @functools.lru_cache(maxsize, typed)
476
        def _func(_self, *args, **kwargs):
477
            return func(_self(), *args, **kwargs)
478
479
        @functools.wraps(func)
480
        def inner(self, *args, **kwargs):
481
            return _func(weakref.ref(self), *args, **kwargs)
482
483
        return inner
484
485
    return wrapper
486
487
488
def namedtuple_to_dict(data):
489
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
490
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
491
492
493
def list_of_namedtuple_to_list_of_dict(data):
494
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
495
    return [namedtuple_to_dict(d) for d in data]
496
497
498
def replace_special_chars(input_string, by=' '):
499
    """Replace some special char by another in the input_string
500
    Return: the string with the chars replaced"""
501
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
502