glances.globals.pretty_date()   F
last analyzed

Complexity

Conditions 21

Size

Total Lines 49
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 21
eloc 38
nop 2
dl 0
loc 49
rs 0
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like glances.globals.pretty_date() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Optional, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
37
# Prefer faster libs for JSON (de)serialization
38
# Preference Order: orjson > ujson > json (builtin)
39
try:
40
    import orjson as json
41
42
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
43
except ImportError:
44
    # Need to log info but importing logger will cause cyclic imports
45
    pass
46
47
if 'json' not in globals():
48
    try:
49
        # Note: ujson is not officially supported
50
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
51
        import ujson as json
52
    except ImportError:
53
        import json
54
55
    # To allow ujson & json dumps to serialize datetime
56
    def _json_default(v: Any) -> Any:
57
        if isinstance(v, datetime):
58
            return v.isoformat()
59
        return v
60
61
    json.dumps = functools.partial(json.dumps, default=_json_default)
62
63
##############
64
# GLOBALS VARS
65
##############
66
67
# OS constants (some libraries/features are OS-dependent)
68
BSD = sys.platform.find('bsd') != -1
69
LINUX = sys.platform.startswith('linux')
70
MACOS = sys.platform.startswith('darwin')
71
SUNOS = sys.platform.startswith('sunos')
72
WINDOWS = sys.platform.startswith('win')
73
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
74
75
# Set the AMPs, plugins and export modules path
76
work_path = os.path.realpath(os.path.dirname(__file__))
77
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
78
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
79
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
80
sys_path = sys.path[:]
81
sys.path.insert(1, exports_path)
82
sys.path.insert(1, plugins_path)
83
sys.path.insert(1, amps_path)
84
85
# Types
86
text_type = str
87
binary_type = bytes
88
bool_type = bool
89
long = int
90
91
# Alias errors
92
PermissionError = OSError
93
94
# Alias methods
95
viewkeys = methodcaller('keys')
96
viewvalues = methodcaller('values')
97
viewitems = methodcaller('items')
98
99
100
###################
101
# GLOBALS FUNCTIONS
102
###################
103
104
105
def printandflush(string):
106
    """Print and flush (used by stdout* outputs modules)"""
107
    print(string, flush=True)
108
109
110
def to_ascii(s):
111
    """Convert the bytes string to a ASCII string
112
    Useful to remove accent (diacritics)"""
113
    if isinstance(s, binary_type):
114
        return s.decode()
115
    return s.encode('ascii', 'ignore').decode()
116
117
118
def listitems(d):
119
    return list(d.items())
120
121
122
def listkeys(d):
123
    return list(d.keys())
124
125
126
def listvalues(d):
127
    return list(d.values())
128
129
130
def u(s, errors='replace'):
131
    if isinstance(s, text_type):
132
        return s
133
    return s.decode('utf-8', errors=errors)
134
135
136
def b(s, errors='replace'):
137
    if isinstance(s, binary_type):
138
        return s
139
    return s.encode('utf-8', errors=errors)
140
141
142
def nativestr(s, errors='replace'):
143
    if isinstance(s, text_type):
144
        return s
145
    if isinstance(s, (int, float)):
146
        return s.__str__()
147
    return s.decode('utf-8', errors=errors)
148
149
150
def system_exec(command):
151
    """Execute a system command and return the result as a str"""
152
    try:
153
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
154
    except Exception as e:
155
        res = f'ERROR: {e}'
156
    return res.rstrip()
157
158
159
def subsample(data, sampling):
160
    """Compute a simple mean subsampling.
161
162
    Data should be a list of numerical itervalues
163
164
    Return a subsampled list of sampling length
165
    """
166
    if len(data) <= sampling:
167
        return data
168
    sampling_length = int(round(len(data) / float(sampling)))
169
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
170
171
172
def time_series_subsample(data, sampling):
173
    """Compute a simple mean subsampling.
174
175
    Data should be a list of set (time, value)
176
177
    Return a subsampled list of sampling length
178
    """
179
    if len(data) <= sampling:
180
        return data
181
    t = [t[0] for t in data]
182
    v = [t[1] for t in data]
183
    sampling_length = int(round(len(data) / float(sampling)))
184
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
185
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
186
    return list(zip(t_subsampled, v_subsampled))
187
188
189
def to_fahrenheit(celsius):
190
    """Convert Celsius to Fahrenheit."""
191
    return celsius * 1.8 + 32
192
193
194
def is_admin():
195
    """
196
    https://stackoverflow.com/a/19719292
197
    @return: True if the current user is an 'Admin' whatever that
198
    means (root on Unix), otherwise False.
199
    Warning: The inner function fails unless you have Windows XP SP2 or
200
    higher. The failure causes a traceback to be printed and this
201
    function to return False.
202
    """
203
204
    if os.name == 'nt':
205
        import ctypes
206
        import traceback
207
208
        # WARNING: requires Windows XP SP2 or higher!
209
        try:
210
            return ctypes.windll.shell32.IsUserAnAdmin()
211
        except Exception as e:
212
            print(f"Admin check failed with error: {e}")
213
            traceback.print_exc()
214
            return False
215
    else:
216
        # Check for root on Posix
217
        return os.getuid() == 0
218
219
220
def key_exist_value_not_none(k, d):
221
    # Return True if:
222
    # - key k exists
223
    # - d[k] is not None
224
    return k in d and d[k] is not None
225
226
227
def key_exist_value_not_none_not_v(k, d, value='', length=None):
228
    # Return True if:
229
    # - key k exists
230
    # - d[k] is not None
231
    # - d[k] != value
232
    # - if length is not None and len(d[k]) >= length
233
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
234
235
236
def disable(class_name, var):
237
    """Set disable_<var> to True in the class class_name."""
238
    setattr(class_name, 'enable_' + var, False)
239
    setattr(class_name, 'disable_' + var, True)
240
241
242
def enable(class_name, var):
243
    """Set disable_<var> to False in the class class_name."""
244
    setattr(class_name, 'enable_' + var, True)
245
    setattr(class_name, 'disable_' + var, False)
246
247
248
def safe_makedirs(path):
249
    """A safe function for creating a directory tree."""
250
    try:
251
        os.makedirs(path)
252
    except OSError as err:
253
        if err.errno == errno.EEXIST:
254
            if not os.path.isdir(path):
255
                raise
256
        else:
257
            raise
258
259
260
def get_time_diffs(ref, now):
261
    if isinstance(ref, int):
262
        diff = now - datetime.fromtimestamp(ref)
263
    elif isinstance(ref, datetime):
264
        diff = now - ref
265
    elif not ref:
266
        diff = 0
267
268
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
269
270
271
def get_first_true_val(conds):
272
    return next(key for key, val in conds.items() if val)
273
274
275
def maybe_add_plural(count):
276
    return "s" if count > 1 else ""
277
278
279
def build_str_when_more_than_seven_days(day_diff, unit):
280
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
281
282
    count = day_diff // scale
283
284
    return str(count) + " " + unit + maybe_add_plural(count)
285
286
287
def pretty_date(ref, now=None):
288
    """
289
    Get a datetime object or a int() Epoch timestamp and return a
290
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
291
    'just now', etc
292
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
293
294
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
295
    break the function. Get back to the old fashion way.
296
    """
297
    if not now:
298
        now = datetime.now()
299
    if isinstance(ref, int):
300
        diff = now - datetime.fromtimestamp(ref)
301
    elif isinstance(ref, datetime):
302
        diff = now - ref
303
    elif not ref:
304
        diff = 0
305
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
306
    day_diff = diff.days
307
308
    if day_diff < 0:
309
        return ''
310
311
    if day_diff == 0:
312
        if second_diff < 10:
313
            return "just now"
314
        if second_diff < 60:
315
            return str(second_diff) + " secs"
316
        if second_diff < 120:
317
            return "a min"
318
        if second_diff < 3600:
319
            return str(second_diff // 60) + " mins"
320
        if second_diff < 7200:
321
            return "an hour"
322
        if second_diff < 86400:
323
            return str(second_diff // 3600) + " hours"
324
    if day_diff == 1:
325
        return "yesterday"
326
    if day_diff < 7:
327
        return str(day_diff) + " days" if day_diff > 1 else "a day"
328
    if day_diff < 31:
329
        week = day_diff // 7
330
        return str(week) + " weeks" if week > 1 else "a week"
331
    if day_diff < 365:
332
        month = day_diff // 30
333
        return str(month) + " months" if month > 1 else "a month"
334
    year = day_diff // 365
335
    return str(year) + " years" if year > 1 else "an year"
336
337
338
def urlopen_auth(url, username, password):
339
    """Open a url with basic auth"""
340
    return urlopen(
341
        Request(
342
            url,
343
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
344
        )
345
    )
346
347
348
def json_dumps(data) -> bytes:
349
    """Return the object data in a JSON format.
350
351
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
352
    """
353
    try:
354
        res = json.dumps(data)
355
    except UnicodeDecodeError:
356
        res = json.dumps(data, ensure_ascii=False)
357
    # ujson & json libs return strings, but our contract expects bytes
358
    return b(res)
359
360
361
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
362
    """Load a JSON buffer into memory as a Python object"""
363
    return json.loads(data)
364
365
366
def list_to_dict(data):
367
    """Convert a list of dict (with key in 'key') to a dict with key as key and value as value."""
368
    if not isinstance(data, list):
369
        return None
370
    return {item[item['key']]: item for item in data if 'key' in item}
371
372
373
def dictlist(data, item):
374
    if isinstance(data, dict):
375
        try:
376
            return {item: data[item]}
377
        except (TypeError, IndexError, KeyError):
378
            return None
379
    elif isinstance(data, list):
380
        try:
381
            # Source:
382
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
383
            # But https://github.com/nicolargo/glances/issues/1401
384
            return {item: list(map(itemgetter(item), data))}
385
        except (TypeError, IndexError, KeyError):
386
            return None
387
    else:
388
        return None
389
390
391
def dictlist_json_dumps(data, item):
392
    dl = dictlist(data, item)
393
    if dl is None:
394
        return None
395
    return json_dumps(dl)
396
397
398
def dictlist_first_key_value(data: list[dict], key, value) -> Optional[dict]:
399
    """In a list of dict, return first item where key=value or none if not found."""
400
    try:
401
        ret = next(item for item in data if key in item and item[key] == value)
402
    except StopIteration:
403
        ret = None
404
    return ret
405
406
407
def string_value_to_float(s):
408
    """Convert a string with a value and an unit to a float.
409
    Example:
410
    '12.5 MB' -> 12500000.0
411
    '32.5 GB' -> 32500000000.0
412
    Args:
413
        s (string): Input string with value and unit
414
    Output:
415
        float: The value in float
416
    """
417
    convert_dict = {
418
        None: 1,
419
        'B': 1,
420
        'KB': 1000,
421
        'MB': 1000000,
422
        'GB': 1000000000,
423
        'TB': 1000000000000,
424
        'PB': 1000000000000000,
425
    }
426
    unpack_string = [
427
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
428
    ]
429
    if len(unpack_string) == 2:
430
        value, unit = unpack_string
431
    elif len(unpack_string) == 1:
432
        value = unpack_string[0]
433
        unit = None
434
    else:
435
        return None
436
    try:
437
        value = float(unpack_string[0])
438
    except ValueError:
439
        return None
440
    return value * convert_dict[unit]
441
442
443
def file_exists(filename):
444
    """Return True if the file exists and is readable."""
445
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
446
447
448
def folder_size(path, errno=0):
449
    """Return a tuple with the size of the directory given by path and the errno.
450
    If an error occurs (for example one file or subfolder is not accessible),
451
    errno is set to the error number.
452
453
    path: <string>
454
    errno: <int> Should always be 0 when calling the function"""
455
    ret_size = 0
456
    ret_err = errno
457
    try:
458
        for f in os.scandir(path):
459
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
460
                ret = folder_size(os.path.join(path, f.name), ret_err)
461
                ret_size += ret[0]
462
                ret_err = ret[1]
463
            else:
464
                try:
465
                    ret_size += f.stat().st_size
466
                except OSError as e:
467
                    ret_err = e.errno
468
    except (OSError, PermissionError) as e:
469
        return 0, e.errno
470
    else:
471
        return ret_size, ret_err
472
473
474
def _get_ttl_hash(ttl):
475
    """A simple (dummy) function to return a hash based on the current second.
476
    TODO: Implement a real TTL mechanism.
477
    """
478
    if ttl is None:
479
        return 0
480
    now = datetime.now()
481
    return now.second
482
483
484
def weak_lru_cache(maxsize=1, typed=False, ttl=None):
485
    """LRU Cache decorator that keeps a weak reference to self
486
487
    Warning: When used in a class, the class should implement __eq__(self, other) and __hash__(self) methods
488
489
    Source: https://stackoverflow.com/a/55990799"""
490
491
    def wrapper(func):
492
        @functools.lru_cache(maxsize, typed)
493
        def _func(_self, *args, ttl_hash=None, **kwargs):
494
            del ttl_hash  # Unused parameter, but kept for compatibility
495
            return func(_self(), *args, **kwargs)
496
497
        @functools.wraps(func)
498
        def inner(self, *args, **kwargs):
499
            return _func(weakref.ref(self), *args, ttl_hash=_get_ttl_hash(ttl), **kwargs)
500
501
        return inner
502
503
    return wrapper
504
505
506
def namedtuple_to_dict(data):
507
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
508
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in list(data.items())}
509
510
511
def list_of_namedtuple_to_list_of_dict(data):
512
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
513
    return [namedtuple_to_dict(d) for d in data]
514
515
516
def replace_special_chars(input_string, by=' '):
517
    """Replace some special char by another in the input_string
518
    Return: the string with the chars replaced"""
519
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
520
521
522
def atoi(text):
523
    return int(text) if text.isdigit() else text
524
525
526
def natural_keys(text):
527
    """Return a text in a natural/human readable format."""
528
    return [atoi(c) for c in re.split(r'(\d+)', text)]
529