Test Failed
Pull Request — develop (#2977)
by
unknown
02:58
created

glances.globals.get_conds_day_diff()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 1
dl 0
loc 10
rs 9.9
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from configparser import ConfigParser, NoOptionError, NoSectionError
28
from datetime import datetime
29
from operator import itemgetter, methodcaller
30
from statistics import mean
31
from typing import Any, Dict, List, Union
32
from urllib.error import HTTPError, URLError
33
from urllib.parse import urlparse
34
from urllib.request import Request, urlopen
35
from xmlrpc.client import Fault, ProtocolError, Server, ServerProxy, Transport
36
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
37
38
from defusedxml.xmlrpc import monkey_patch
39
40
# Correct issue #1025 by monkey path the xmlrpc lib
41
monkey_patch()
42
43
# Prefer faster libs for JSON (de)serialization
44
# Preference Order: orjson > ujson > json (builtin)
45
try:
46
    import orjson as json
47
48
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
49
except ImportError:
50
    # Need to log info but importing logger will cause cyclic imports
51
    pass
52
53
if 'json' not in globals():
54
    try:
55
        # Note: ujson is not officially supported
56
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
57
        import ujson as json
58
    except ImportError:
59
        import json
60
61
    # To allow ujson & json dumps to serialize datetime
62
    def _json_default(v: Any) -> Any:
63
        if isinstance(v, datetime):
64
            return v.isoformat()
65
        return v
66
67
    json.dumps = functools.partial(json.dumps, default=_json_default)
68
69
##############
70
# GLOBALS VARS
71
##############
72
73
# OS constants (some libraries/features are OS-dependent)
74
BSD = sys.platform.find('bsd') != -1
75
LINUX = sys.platform.startswith('linux')
76
MACOS = sys.platform.startswith('darwin')
77
SUNOS = sys.platform.startswith('sunos')
78
WINDOWS = sys.platform.startswith('win')
79
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
80
81
# Set the AMPs, plugins and export modules path
82
work_path = os.path.realpath(os.path.dirname(__file__))
83
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
84
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
85
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
86
sys_path = sys.path[:]
87
sys.path.insert(1, exports_path)
88
sys.path.insert(1, plugins_path)
89
sys.path.insert(1, amps_path)
90
91
# Types
92
text_type = str
93
binary_type = bytes
94
bool_type = bool
95
long = int
96
97
# Alias errors
98
PermissionError = OSError
99
100
# Alias methods
101
viewkeys = methodcaller('keys')
102
viewvalues = methodcaller('values')
103
viewitems = methodcaller('items')
104
105
106
###################
107
# GLOBALS FUNCTIONS
108
###################
109
110
111
def printandflush(string):
112
    """Print and flush (used by stdout* outputs modules)"""
113
    print(string, flush=True)
114
115
116
def to_ascii(s):
117
    """Convert the bytes string to a ASCII string
118
    Useful to remove accent (diacritics)"""
119
    if isinstance(s, binary_type):
120
        return s.decode()
121
    return s.encode('ascii', 'ignore').decode()
122
123
124
def listitems(d):
125
    return list(d.items())
126
127
128
def listkeys(d):
129
    return list(d.keys())
130
131
132
def listvalues(d):
133
    return list(d.values())
134
135
136
def iteritems(d):
137
    return iter(d.items())
138
139
140
def iterkeys(d):
141
    return iter(d.keys())
142
143
144
def itervalues(d):
145
    return iter(d.values())
146
147
148
def u(s, errors='replace'):
149
    if isinstance(s, text_type):
150
        return s
151
    return s.decode('utf-8', errors=errors)
152
153
154
def b(s, errors='replace'):
155
    if isinstance(s, binary_type):
156
        return s
157
    return s.encode('utf-8', errors=errors)
158
159
160
def nativestr(s, errors='replace'):
161
    if isinstance(s, text_type):
162
        return s
163
    if isinstance(s, (int, float)):
164
        return s.__str__()
165
    return s.decode('utf-8', errors=errors)
166
167
168
def system_exec(command):
169
    """Execute a system command and return the result as a str"""
170
    try:
171
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
172
    except Exception as e:
173
        res = f'ERROR: {e}'
174
    return res.rstrip()
175
176
177
def subsample(data, sampling):
178
    """Compute a simple mean subsampling.
179
180
    Data should be a list of numerical itervalues
181
182
    Return a subsampled list of sampling length
183
    """
184
    if len(data) <= sampling:
185
        return data
186
    sampling_length = int(round(len(data) / float(sampling)))
187
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
188
189
190
def time_series_subsample(data, sampling):
191
    """Compute a simple mean subsampling.
192
193
    Data should be a list of set (time, value)
194
195
    Return a subsampled list of sampling length
196
    """
197
    if len(data) <= sampling:
198
        return data
199
    t = [t[0] for t in data]
200
    v = [t[1] for t in data]
201
    sampling_length = int(round(len(data) / float(sampling)))
202
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
203
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
204
    return list(zip(t_subsampled, v_subsampled))
205
206
207
def to_fahrenheit(celsius):
208
    """Convert Celsius to Fahrenheit."""
209
    return celsius * 1.8 + 32
210
211
212
def is_admin():
213
    """
214
    https://stackoverflow.com/a/19719292
215
    @return: True if the current user is an 'Admin' whatever that
216
    means (root on Unix), otherwise False.
217
    Warning: The inner function fails unless you have Windows XP SP2 or
218
    higher. The failure causes a traceback to be printed and this
219
    function to return False.
220
    """
221
222
    if os.name == 'nt':
223
        import ctypes
224
        import traceback
225
226
        # WARNING: requires Windows XP SP2 or higher!
227
        try:
228
            return ctypes.windll.shell32.IsUserAnAdmin()
229
        except Exception as e:
230
            print(f"Admin check failed with error: {e}")
231
            traceback.print_exc()
232
            return False
233
    else:
234
        # Check for root on Posix
235
        return os.getuid() == 0
236
237
238
def key_exist_value_not_none(k, d):
239
    # Return True if:
240
    # - key k exists
241
    # - d[k] is not None
242
    return k in d and d[k] is not None
243
244
245
def key_exist_value_not_none_not_v(k, d, value='', length=None):
246
    # Return True if:
247
    # - key k exists
248
    # - d[k] is not None
249
    # - d[k] != value
250
    # - if length is not None and len(d[k]) >= length
251
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
252
253
254
def disable(class_name, var):
255
    """Set disable_<var> to True in the class class_name."""
256
    setattr(class_name, 'enable_' + var, False)
257
    setattr(class_name, 'disable_' + var, True)
258
259
260
def enable(class_name, var):
261
    """Set disable_<var> to False in the class class_name."""
262
    setattr(class_name, 'enable_' + var, True)
263
    setattr(class_name, 'disable_' + var, False)
264
265
266
def safe_makedirs(path):
267
    """A safe function for creating a directory tree."""
268
    try:
269
        os.makedirs(path)
270
    except OSError as err:
271
        if err.errno == errno.EEXIST:
272
            if not os.path.isdir(path):
273
                raise
274
        else:
275
            raise
276
277
278
def get_time_diffs(ref, now):
279
    if isinstance(ref, int):
280
        diff = now - datetime.fromtimestamp(ref)
281
    elif isinstance(ref, datetime):
282
        diff = now - ref
283
    elif not ref:
284
        diff = 0
285
286
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
287
288
289
def get_first_true_val(conds):
290
    return next(val for key, val in conds if key)
291
292
293
def maybe_add_plural(count):
294
    return "s" if count > 1 else ""
295
296
297
def build_str_when_more_than_seven_days(day_diff, unit):
298
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
299
300
    count = day_diff // scale
301
302
    return str(count) + " " + unit + maybe_add_plural(count)
303
304
305
def get_conds_day_diff(diff):
306
    day_diff = diff.days
307
    return [
308
        (day_diff < 0, ''),
309
        (day_diff == 0, get_first_true_val(get_conds_sec_diff(diff))),
310
        (day_diff == 1, "yesterday"),
311
        (day_diff < 7, str(day_diff) + " days"),
312
        (day_diff < 31, build_str_when_more_than_seven_days(day_diff, "week")),
313
        (day_diff < 365, build_str_when_more_than_seven_days(day_diff, "month")),
314
        (day_diff >= 365, build_str_when_more_than_seven_days(day_diff, "year")),
315
    ]
316
317
318
def get_conds_sec_diff(diff):
319
    second_diff = diff.seconds
320
    return [
321
        (second_diff < 10, "just now"),
322
        (second_diff < 60, str(second_diff) + " secs"),
323
        (second_diff < 120, "a min"),
324
        (second_diff < 3600, str(second_diff // 60) + " mins"),
325
        (second_diff < 7200, "an hour"),
326
        (second_diff < 86400, str(second_diff // 3600) + " hours"),
327
    ]
328
329
330
def pretty_date(ref=False, now=datetime.now()):
331
    """
332
    Get a datetime object or a int() Epoch timestamp and return a
333
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
334
    'just now', etc
335
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
336
    """
337
    diff = get_time_diffs(ref, now)
338
339
    return get_first_true_val(get_conds_day_diff(diff))
340
341
342
def urlopen_auth(url, username, password):
343
    """Open a url with basic auth"""
344
    return urlopen(
345
        Request(
346
            url,
347
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
348
        )
349
    )
350
351
352
def json_dumps(data) -> bytes:
353
    """Return the object data in a JSON format.
354
355
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
356
    """
357
    try:
358
        res = json.dumps(data)
359
    except UnicodeDecodeError:
360
        res = json.dumps(data, ensure_ascii=False)
361
    # ujson & json libs return strings, but our contract expects bytes
362
    return b(res)
363
364
365
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
366
    """Load a JSON buffer into memory as a Python object"""
367
    return json.loads(data)
368
369
370
def dictlist(data, item):
371
    if isinstance(data, dict):
372
        try:
373
            return {item: data[item]}
374
        except (TypeError, IndexError, KeyError):
375
            return None
376
    elif isinstance(data, list):
377
        try:
378
            # Source:
379
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
380
            # But https://github.com/nicolargo/glances/issues/1401
381
            return {item: list(map(itemgetter(item), data))}
382
        except (TypeError, IndexError, KeyError):
383
            return None
384
    else:
385
        return None
386
387
388
def json_dumps_dictlist(data, item):
389
    dl = dictlist(data, item)
390
    if dl is None:
391
        return None
392
    return json_dumps(dl)
393
394
395
def string_value_to_float(s):
396
    """Convert a string with a value and an unit to a float.
397
    Example:
398
    '12.5 MB' -> 12500000.0
399
    '32.5 GB' -> 32500000000.0
400
    Args:
401
        s (string): Input string with value and unit
402
    Output:
403
        float: The value in float
404
    """
405
    convert_dict = {
406
        None: 1,
407
        'B': 1,
408
        'KB': 1000,
409
        'MB': 1000000,
410
        'GB': 1000000000,
411
        'TB': 1000000000000,
412
        'PB': 1000000000000000,
413
    }
414
    unpack_string = [
415
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
416
    ]
417
    if len(unpack_string) == 2:
418
        value, unit = unpack_string
419
    elif len(unpack_string) == 1:
420
        value = unpack_string[0]
421
        unit = None
422
    else:
423
        return None
424
    try:
425
        value = float(unpack_string[0])
426
    except ValueError:
427
        return None
428
    return value * convert_dict[unit]
429
430
431
def file_exists(filename):
432
    """Return True if the file exists and is readable."""
433
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
434
435
436
def folder_size(path, errno=0):
437
    """Return a tuple with the size of the directory given by path and the errno.
438
    If an error occurs (for example one file or subfolder is not accessible),
439
    errno is set to the error number.
440
441
    path: <string>
442
    errno: <int> Should always be 0 when calling the function"""
443
    ret_size = 0
444
    ret_err = errno
445
    try:
446
        for f in os.scandir(path):
447
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
448
                ret = folder_size(os.path.join(path, f.name), ret_err)
449
                ret_size += ret[0]
450
                ret_err = ret[1]
451
            else:
452
                try:
453
                    ret_size += f.stat().st_size
454
                except OSError as e:
455
                    ret_err = e.errno
456
    except (OSError, PermissionError) as e:
457
        return 0, e.errno
458
    else:
459
        return ret_size, ret_err
460
461
462
def weak_lru_cache(maxsize=128, typed=False):
463
    """LRU Cache decorator that keeps a weak reference to self
464
    Source: https://stackoverflow.com/a/55990799"""
465
466
    def wrapper(func):
467
        @functools.lru_cache(maxsize, typed)
468
        def _func(_self, *args, **kwargs):
469
            return func(_self(), *args, **kwargs)
470
471
        @functools.wraps(func)
472
        def inner(self, *args, **kwargs):
473
            return _func(weakref.ref(self), *args, **kwargs)
474
475
        return inner
476
477
    return wrapper
478
479
480
def namedtuple_to_dict(data):
481
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
482
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
483
484
485
def list_of_namedtuple_to_list_of_dict(data):
486
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
487
    return [namedtuple_to_dict(d) for d in data]
488
489
490
def replace_special_chars(input_string, by=' '):
491
    """Replace some special char by another in the input_string
492
    Return: the string with the chars replaced"""
493
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
494