Test Failed
Pull Request — develop (#2968)
by
unknown
02:55
created

glances.globals.pretty_date()   F

Complexity

Conditions 19

Size

Total Lines 41
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 19
eloc 33
nop 2
dl 0
loc 41
rs 0.5999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like glances.globals.pretty_date() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from configparser import ConfigParser, NoOptionError, NoSectionError
28
from datetime import datetime
29
from operator import itemgetter, methodcaller
30
from statistics import mean
31
from typing import Any, Dict, List, Union
32
from urllib.error import HTTPError, URLError
33
from urllib.parse import urlparse
34
from urllib.request import Request, urlopen
35
from xmlrpc.client import Fault, ProtocolError, Server, ServerProxy, Transport
36
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
37
38
from defusedxml.xmlrpc import monkey_patch
39
40
# Correct issue #1025 by monkey path the xmlrpc lib
41
monkey_patch()
42
43
# Prefer faster libs for JSON (de)serialization
44
# Preference Order: orjson > ujson > json (builtin)
45
try:
46
    import orjson as json
47
48
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
49
except ImportError:
50
    # Need to log info but importing logger will cause cyclic imports
51
    pass
52
53
if 'json' not in globals():
54
    try:
55
        # Note: ujson is not officially supported
56
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
57
        import ujson as json
58
    except ImportError:
59
        import json
60
61
    # To allow ujson & json dumps to serialize datetime
62
    def _json_default(v: Any) -> Any:
63
        if isinstance(v, datetime):
64
            return v.isoformat()
65
        return v
66
67
    json.dumps = functools.partial(json.dumps, default=_json_default)
68
69
##############
70
# GLOBALS VARS
71
##############
72
73
# OS constants (some libraries/features are OS-dependent)
74
BSD = sys.platform.find('bsd') != -1
75
LINUX = sys.platform.startswith('linux')
76
MACOS = sys.platform.startswith('darwin')
77
SUNOS = sys.platform.startswith('sunos')
78
WINDOWS = sys.platform.startswith('win')
79
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
80
81
# Set the AMPs, plugins and export modules path
82
work_path = os.path.realpath(os.path.dirname(__file__))
83
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
84
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
85
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
86
sys_path = sys.path[:]
87
sys.path.insert(1, exports_path)
88
sys.path.insert(1, plugins_path)
89
sys.path.insert(1, amps_path)
90
91
# Types
92
text_type = str
93
binary_type = bytes
94
bool_type = bool
95
long = int
96
97
# Alias errors
98
PermissionError = OSError
99
100
# Alias methods
101
viewkeys = methodcaller('keys')
102
viewvalues = methodcaller('values')
103
viewitems = methodcaller('items')
104
105
106
###################
107
# GLOBALS FUNCTIONS
108
###################
109
110
111
def printandflush(string):
112
    """Print and flush (used by stdout* outputs modules)"""
113
    print(string, flush=True)
114
115
116
def to_ascii(s):
117
    """Convert the bytes string to a ASCII string
118
    Useful to remove accent (diacritics)"""
119
    if isinstance(s, binary_type):
120
        return s.decode()
121
    return s.encode('ascii', 'ignore').decode()
122
123
124
def listitems(d):
125
    return list(d.items())
126
127
128
def listkeys(d):
129
    return list(d.keys())
130
131
132
def listvalues(d):
133
    return list(d.values())
134
135
136
def iteritems(d):
137
    return iter(d.items())
138
139
140
def iterkeys(d):
141
    return iter(d.keys())
142
143
144
def itervalues(d):
145
    return iter(d.values())
146
147
148
def u(s, errors='replace'):
149
    if isinstance(s, text_type):
150
        return s
151
    return s.decode('utf-8', errors=errors)
152
153
154
def b(s, errors='replace'):
155
    if isinstance(s, binary_type):
156
        return s
157
    return s.encode('utf-8', errors=errors)
158
159
160
def nativestr(s, errors='replace'):
161
    if isinstance(s, text_type):
162
        return s
163
    if isinstance(s, (int, float)):
164
        return s.__str__()
165
    return s.decode('utf-8', errors=errors)
166
167
168
def system_exec(command):
169
    """Execute a system command and return the result as a str"""
170
    try:
171
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
172
    except Exception as e:
173
        res = f'ERROR: {e}'
174
    return res.rstrip()
175
176
177
def subsample(data, sampling):
178
    """Compute a simple mean subsampling.
179
180
    Data should be a list of numerical itervalues
181
182
    Return a subsampled list of sampling length
183
    """
184
    if len(data) <= sampling:
185
        return data
186
    sampling_length = int(round(len(data) / float(sampling)))
187
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
188
189
190
def time_series_subsample(data, sampling):
191
    """Compute a simple mean subsampling.
192
193
    Data should be a list of set (time, value)
194
195
    Return a subsampled list of sampling length
196
    """
197
    if len(data) <= sampling:
198
        return data
199
    t = [t[0] for t in data]
200
    v = [t[1] for t in data]
201
    sampling_length = int(round(len(data) / float(sampling)))
202
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
203
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
204
    return list(zip(t_subsampled, v_subsampled))
205
206
207
def to_fahrenheit(celsius):
208
    """Convert Celsius to Fahrenheit."""
209
    return celsius * 1.8 + 32
210
211
212
def is_admin():
213
    """
214
    https://stackoverflow.com/a/19719292
215
    @return: True if the current user is an 'Admin' whatever that
216
    means (root on Unix), otherwise False.
217
    Warning: The inner function fails unless you have Windows XP SP2 or
218
    higher. The failure causes a traceback to be printed and this
219
    function to return False.
220
    """
221
222
    if os.name == 'nt':
223
        import ctypes
224
        import traceback
225
226
        # WARNING: requires Windows XP SP2 or higher!
227
        try:
228
            return ctypes.windll.shell32.IsUserAnAdmin()
229
        except Exception as e:
230
            print(f"Admin check failed with error: {e}")
231
            traceback.print_exc()
232
            return False
233
    else:
234
        # Check for root on Posix
235
        return os.getuid() == 0
236
237
238
def key_exist_value_not_none(k, d):
239
    # Return True if:
240
    # - key k exists
241
    # - d[k] is not None
242
    return k in d and d[k] is not None
243
244
245
def key_exist_value_not_none_not_v(k, d, value='', length=None):
246
    # Return True if:
247
    # - key k exists
248
    # - d[k] is not None
249
    # - d[k] != value
250
    # - if length is not None and len(d[k]) >= length
251
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
252
253
254
def disable(class_name, var):
255
    """Set disable_<var> to True in the class class_name."""
256
    setattr(class_name, 'enable_' + var, False)
257
    setattr(class_name, 'disable_' + var, True)
258
259
260
def enable(class_name, var):
261
    """Set disable_<var> to False in the class class_name."""
262
    setattr(class_name, 'enable_' + var, True)
263
    setattr(class_name, 'disable_' + var, False)
264
265
266
def safe_makedirs(path):
267
    """A safe function for creating a directory tree."""
268
    try:
269
        os.makedirs(path)
270
    except OSError as err:
271
        if err.errno == errno.EEXIST:
272
            if not os.path.isdir(path):
273
                raise
274
        else:
275
            raise
276
277
278
def pretty_date(ref=False, now=datetime.now()):
279
    """
280
    Get a datetime object or a int() Epoch timestamp and return a
281
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
282
    'just now', etc
283
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
284
    """
285
    if isinstance(ref, int):
286
        diff = now - datetime.fromtimestamp(ref)
287
    elif isinstance(ref, datetime):
288
        diff = now - ref
289
    elif not ref:
290
        diff = 0
291
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
292
    day_diff = diff.days
293
294
    if day_diff < 0:
295
        return ''
296
297
    if day_diff == 0:
298
        if second_diff < 10:
299
            return "just now"
300
        if second_diff < 60:
301
            return str(second_diff) + " secs"
302
        if second_diff < 120:
303
            return "a min"
304
        if second_diff < 3600:
305
            return str(second_diff // 60) + " mins"
306
        if second_diff < 7200:
307
            return "an hour"
308
        if second_diff < 86400:
309
            return str(second_diff // 3600) + " hours"
310
    if day_diff == 1:
311
        return "yesterday"
312
    if day_diff < 7:
313
        return str(day_diff) + " days"
314
    if day_diff < 31:
315
        return str(day_diff // 7) + " weeks" if (day_diff // 7) > 1 else "1 week"
316
    if day_diff < 365:
317
        return str(day_diff // 30) + " months" if (day_diff // 30) > 1 else "1 month"
318
    return str(day_diff // 365) + " years" if (day_diff // 365) > 1 else "1 year"
319
320
321
def urlopen_auth(url, username, password):
322
    """Open a url with basic auth"""
323
    return urlopen(
324
        Request(
325
            url,
326
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
327
        )
328
    )
329
330
331
def json_dumps(data) -> bytes:
332
    """Return the object data in a JSON format.
333
334
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
335
    """
336
    try:
337
        res = json.dumps(data)
338
    except UnicodeDecodeError:
339
        res = json.dumps(data, ensure_ascii=False)
340
    # ujson & json libs return strings, but our contract expects bytes
341
    return b(res)
342
343
344
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
345
    """Load a JSON buffer into memory as a Python object"""
346
    return json.loads(data)
347
348
349
def dictlist(data, item):
350
    if isinstance(data, dict):
351
        try:
352
            return {item: data[item]}
353
        except (TypeError, IndexError, KeyError):
354
            return None
355
    elif isinstance(data, list):
356
        try:
357
            # Source:
358
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
359
            # But https://github.com/nicolargo/glances/issues/1401
360
            return {item: list(map(itemgetter(item), data))}
361
        except (TypeError, IndexError, KeyError):
362
            return None
363
    else:
364
        return None
365
366
367
def json_dumps_dictlist(data, item):
368
    dl = dictlist(data, item)
369
    if dl is None:
370
        return None
371
    return json_dumps(dl)
372
373
374
def string_value_to_float(s):
375
    """Convert a string with a value and an unit to a float.
376
    Example:
377
    '12.5 MB' -> 12500000.0
378
    '32.5 GB' -> 32500000000.0
379
    Args:
380
        s (string): Input string with value and unit
381
    Output:
382
        float: The value in float
383
    """
384
    convert_dict = {
385
        None: 1,
386
        'B': 1,
387
        'KB': 1000,
388
        'MB': 1000000,
389
        'GB': 1000000000,
390
        'TB': 1000000000000,
391
        'PB': 1000000000000000,
392
    }
393
    unpack_string = [
394
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
395
    ]
396
    if len(unpack_string) == 2:
397
        value, unit = unpack_string
398
    elif len(unpack_string) == 1:
399
        value = unpack_string[0]
400
        unit = None
401
    else:
402
        return None
403
    try:
404
        value = float(unpack_string[0])
405
    except ValueError:
406
        return None
407
    return value * convert_dict[unit]
408
409
410
def file_exists(filename):
411
    """Return True if the file exists and is readable."""
412
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
413
414
415
def folder_size(path, errno=0):
416
    """Return a tuple with the size of the directory given by path and the errno.
417
    If an error occurs (for example one file or subfolder is not accessible),
418
    errno is set to the error number.
419
420
    path: <string>
421
    errno: <int> Should always be 0 when calling the function"""
422
    ret_size = 0
423
    ret_err = errno
424
    try:
425
        for f in os.scandir(path):
426
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
427
                ret = folder_size(os.path.join(path, f.name), ret_err)
428
                ret_size += ret[0]
429
                ret_err = ret[1]
430
            else:
431
                try:
432
                    ret_size += f.stat().st_size
433
                except OSError as e:
434
                    ret_err = e.errno
435
    except (OSError, PermissionError) as e:
436
        return 0, e.errno
437
    else:
438
        return ret_size, ret_err
439
440
441
def weak_lru_cache(maxsize=128, typed=False):
442
    """LRU Cache decorator that keeps a weak reference to self
443
    Source: https://stackoverflow.com/a/55990799"""
444
445
    def wrapper(func):
446
        @functools.lru_cache(maxsize, typed)
447
        def _func(_self, *args, **kwargs):
448
            return func(_self(), *args, **kwargs)
449
450
        @functools.wraps(func)
451
        def inner(self, *args, **kwargs):
452
            return _func(weakref.ref(self), *args, **kwargs)
453
454
        return inner
455
456
    return wrapper
457
458
459
def namedtuple_to_dict(data):
460
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
461
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
462
463
464
def list_of_namedtuple_to_list_of_dict(data):
465
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
466
    return [namedtuple_to_dict(d) for d in data]
467
468
469
def replace_special_chars(input_string, by=' '):
470
    """Replace some special char by another in the input_string
471
    Return: the string with the chars replaced"""
472
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
473