Test Failed
Pull Request — develop (#2961)
by
unknown
02:12
created

glances.globals.get_conds_for_day_diff_msg()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 12
nop 1
dl 0
loc 13
rs 9.8
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Dict, List, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
from xmlrpc.client import Fault, ProtocolError, Server, ServerProxy, Transport
37
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
38
39
from defusedxml.xmlrpc import monkey_patch
40
41
# Correct issue #1025 by monkey path the xmlrpc lib
42
monkey_patch()
43
44
# Prefer faster libs for JSON (de)serialization
45
# Preference Order: orjson > ujson > json (builtin)
46
try:
47
    import orjson as json
48
49
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
50
except ImportError:
51
    # Need to log info but importing logger will cause cyclic imports
52
    pass
53
54
if 'json' not in globals():
55
    try:
56
        # Note: ujson is not officially supported
57
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
58
        import ujson as json
59
    except ImportError:
60
        import json
61
62
    # To allow ujson & json dumps to serialize datetime
63
    def _json_default(v: Any) -> Any:
64
        if isinstance(v, datetime):
65
            return v.isoformat()
66
        return v
67
68
    json.dumps = functools.partial(json.dumps, default=_json_default)
69
70
##############
71
# GLOBALS VARS
72
##############
73
74
# OS constants (some libraries/features are OS-dependent)
75
BSD = sys.platform.find('bsd') != -1
76
LINUX = sys.platform.startswith('linux')
77
MACOS = sys.platform.startswith('darwin')
78
SUNOS = sys.platform.startswith('sunos')
79
WINDOWS = sys.platform.startswith('win')
80
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
81
82
# Set the AMPs, plugins and export modules path
83
work_path = os.path.realpath(os.path.dirname(__file__))
84
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
85
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
86
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
87
sys_path = sys.path[:]
88
sys.path.insert(1, exports_path)
89
sys.path.insert(1, plugins_path)
90
sys.path.insert(1, amps_path)
91
92
# Types
93
text_type = str
94
binary_type = bytes
95
bool_type = bool
96
long = int
97
98
# Alias errors
99
PermissionError = OSError
100
101
# Alias methods
102
viewkeys = methodcaller('keys')
103
viewvalues = methodcaller('values')
104
viewitems = methodcaller('items')
105
106
107
###################
108
# GLOBALS FUNCTIONS
109
###################
110
111
112
def printandflush(string):
113
    """Print and flush (used by stdout* outputs modules)"""
114
    print(string, flush=True)
115
116
117
def to_ascii(s):
118
    """Convert the bytes string to a ASCII string
119
    Useful to remove accent (diacritics)"""
120
    if isinstance(s, binary_type):
121
        return s.decode()
122
    return s.encode('ascii', 'ignore').decode()
123
124
125
def listitems(d):
126
    return list(d.items())
127
128
129
def listkeys(d):
130
    return list(d.keys())
131
132
133
def listvalues(d):
134
    return list(d.values())
135
136
137
def iteritems(d):
138
    return iter(d.items())
139
140
141
def iterkeys(d):
142
    return iter(d.keys())
143
144
145
def itervalues(d):
146
    return iter(d.values())
147
148
149
def u(s, errors='replace'):
150
    if isinstance(s, text_type):
151
        return s
152
    return s.decode('utf-8', errors=errors)
153
154
155
def b(s, errors='replace'):
156
    if isinstance(s, binary_type):
157
        return s
158
    return s.encode('utf-8', errors=errors)
159
160
161
def nativestr(s, errors='replace'):
162
    if isinstance(s, text_type):
163
        return s
164
    if isinstance(s, (int, float)):
165
        return s.__str__()
166
    return s.decode('utf-8', errors=errors)
167
168
169
def system_exec(command):
170
    """Execute a system command and return the result as a str"""
171
    try:
172
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
173
    except Exception as e:
174
        res = f'ERROR: {e}'
175
    return res.rstrip()
176
177
178
def subsample(data, sampling):
179
    """Compute a simple mean subsampling.
180
181
    Data should be a list of numerical itervalues
182
183
    Return a subsampled list of sampling length
184
    """
185
    if len(data) <= sampling:
186
        return data
187
    sampling_length = int(round(len(data) / float(sampling)))
188
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
189
190
191
def time_series_subsample(data, sampling):
192
    """Compute a simple mean subsampling.
193
194
    Data should be a list of set (time, value)
195
196
    Return a subsampled list of sampling length
197
    """
198
    if len(data) <= sampling:
199
        return data
200
    t = [t[0] for t in data]
201
    v = [t[1] for t in data]
202
    sampling_length = int(round(len(data) / float(sampling)))
203
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
204
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
205
    return list(zip(t_subsampled, v_subsampled))
206
207
208
def to_fahrenheit(celsius):
209
    """Convert Celsius to Fahrenheit."""
210
    return celsius * 1.8 + 32
211
212
213
def is_admin():
214
    """
215
    https://stackoverflow.com/a/19719292
216
    @return: True if the current user is an 'Admin' whatever that
217
    means (root on Unix), otherwise False.
218
    Warning: The inner function fails unless you have Windows XP SP2 or
219
    higher. The failure causes a traceback to be printed and this
220
    function to return False.
221
    """
222
223
    if os.name == 'nt':
224
        import ctypes
225
        import traceback
226
227
        # WARNING: requires Windows XP SP2 or higher!
228
        try:
229
            return ctypes.windll.shell32.IsUserAnAdmin()
230
        except Exception as e:
231
            print(f"Admin check failed with error: {e}")
232
            traceback.print_exc()
233
            return False
234
    else:
235
        # Check for root on Posix
236
        return os.getuid() == 0
237
238
239
def key_exist_value_not_none(k, d):
240
    # Return True if:
241
    # - key k exists
242
    # - d[k] is not None
243
    return k in d and d[k] is not None
244
245
246
def key_exist_value_not_none_not_v(k, d, value='', length=None):
247
    # Return True if:
248
    # - key k exists
249
    # - d[k] is not None
250
    # - d[k] != value
251
    # - if length is not None and len(d[k]) >= length
252
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
253
254
255
def disable(class_name, var):
256
    """Set disable_<var> to True in the class class_name."""
257
    setattr(class_name, 'enable_' + var, False)
258
    setattr(class_name, 'disable_' + var, True)
259
260
261
def enable(class_name, var):
262
    """Set disable_<var> to False in the class class_name."""
263
    setattr(class_name, 'enable_' + var, True)
264
    setattr(class_name, 'disable_' + var, False)
265
266
267
def safe_makedirs(path):
268
    """A safe function for creating a directory tree."""
269
    try:
270
        os.makedirs(path)
271
    except OSError as err:
272
        if err.errno == errno.EEXIST:
273
            if not os.path.isdir(path):
274
                raise
275
        else:
276
            raise
277
278
279
def get_diff_time(time):
280
    now = datetime.now()
281
    cond = {
282
        isinstance(time, int): now - datetime.fromtimestamp(time),
283
        isinstance(time, datetime): now - time,
284
        not time: 0,
285
    }
286
287
    return cond.get(True)
288
289
290
def get_msg_for_true_cond(conds):
291
    msg, *other = (val for key, val in conds if key)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable val does not seem to be defined.
Loading history...
292
293
    return msg
294
295
296
def get_conds_for_sec_diff_msg(diff):
297
    second_diff = diff.seconds
298
299
    return OrderedDict(
300
        {
301
            second_diff < 10: "just now",
302
            second_diff < 60: str(second_diff) + " secs",
303
            second_diff < 120: "a min",
304
            second_diff < 3600: str(second_diff // 60) + " mins",
305
            second_diff < 7200: "an hour",
306
            second_diff < 86400: str(second_diff // 3600) + " hours",
307
        }
308
    )
309
310
311
def get_conds_for_day_diff_msg(diff):
312
    day_diff = diff.days
313
    second_diff_msgs = get_conds_for_sec_diff_msg(diff)
314
315
    return OrderedDict(
316
        {
317
            day_diff < 0: '',
318
            day_diff == 0: get_msg_for_true_cond(second_diff_msgs),
319
            day_diff == 1: "yesterday",
320
            day_diff < 7: str(day_diff) + " days",
321
            day_diff < 31: str(day_diff // 7) + " weeks",
322
            day_diff < 365: str(day_diff // 30) + " months",
323
            day_diff >= 365: str(day_diff // 365) + " years",
324
        }
325
    )
326
327
328
def pretty_date(time=False):
329
    """
330
    Get a datetime object or a int() Epoch timestamp and return a
331
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
332
    'just now', etc
333
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
334
    """
335
    diff = get_diff_time(time)
336
    day_diff_msgs = get_conds_for_day_diff_msg(diff)
337
338
    return get_msg_for_true_cond(day_diff_msgs)
339
340
341
def urlopen_auth(url, username, password):
342
    """Open a url with basic auth"""
343
    return urlopen(
344
        Request(
345
            url,
346
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
347
        )
348
    )
349
350
351
def json_dumps(data) -> bytes:
352
    """Return the object data in a JSON format.
353
354
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
355
    """
356
    try:
357
        res = json.dumps(data)
358
    except UnicodeDecodeError:
359
        res = json.dumps(data, ensure_ascii=False)
360
    # ujson & json libs return strings, but our contract expects bytes
361
    return b(res)
362
363
364
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
365
    """Load a JSON buffer into memory as a Python object"""
366
    return json.loads(data)
367
368
369
def dictlist(data, item):
370
    if isinstance(data, dict):
371
        try:
372
            return {item: data[item]}
373
        except (TypeError, IndexError, KeyError):
374
            return None
375
    elif isinstance(data, list):
376
        try:
377
            # Source:
378
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
379
            # But https://github.com/nicolargo/glances/issues/1401
380
            return {item: list(map(itemgetter(item), data))}
381
        except (TypeError, IndexError, KeyError):
382
            return None
383
    else:
384
        return None
385
386
387
def json_dumps_dictlist(data, item):
388
    dl = dictlist(data, item)
389
    if dl is None:
390
        return None
391
    return json_dumps(dl)
392
393
394
def string_value_to_float(s):
395
    """Convert a string with a value and an unit to a float.
396
    Example:
397
    '12.5 MB' -> 12500000.0
398
    '32.5 GB' -> 32500000000.0
399
    Args:
400
        s (string): Input string with value and unit
401
    Output:
402
        float: The value in float
403
    """
404
    convert_dict = {
405
        None: 1,
406
        'B': 1,
407
        'KB': 1000,
408
        'MB': 1000000,
409
        'GB': 1000000000,
410
        'TB': 1000000000000,
411
        'PB': 1000000000000000,
412
    }
413
    unpack_string = [
414
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
415
    ]
416
    if len(unpack_string) == 2:
417
        value, unit = unpack_string
418
    elif len(unpack_string) == 1:
419
        value = unpack_string[0]
420
        unit = None
421
    else:
422
        return None
423
    try:
424
        value = float(unpack_string[0])
425
    except ValueError:
426
        return None
427
    return value * convert_dict[unit]
428
429
430
def file_exists(filename):
431
    """Return True if the file exists and is readable."""
432
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
433
434
435
def folder_size(path, errno=0):
436
    """Return a tuple with the size of the directory given by path and the errno.
437
    If an error occurs (for example one file or subfolder is not accessible),
438
    errno is set to the error number.
439
440
    path: <string>
441
    errno: <int> Should always be 0 when calling the function"""
442
    ret_size = 0
443
    ret_err = errno
444
    try:
445
        for f in os.scandir(path):
446
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
447
                ret = folder_size(os.path.join(path, f.name), ret_err)
448
                ret_size += ret[0]
449
                ret_err = ret[1]
450
            else:
451
                try:
452
                    ret_size += f.stat().st_size
453
                except OSError as e:
454
                    ret_err = e.errno
455
    except (OSError, PermissionError) as e:
456
        return 0, e.errno
457
    else:
458
        return ret_size, ret_err
459
460
461
def weak_lru_cache(maxsize=128, typed=False):
462
    """LRU Cache decorator that keeps a weak reference to self
463
    Source: https://stackoverflow.com/a/55990799"""
464
465
    def wrapper(func):
466
        @functools.lru_cache(maxsize, typed)
467
        def _func(_self, *args, **kwargs):
468
            return func(_self(), *args, **kwargs)
469
470
        @functools.wraps(func)
471
        def inner(self, *args, **kwargs):
472
            return _func(weakref.ref(self), *args, **kwargs)
473
474
        return inner
475
476
    return wrapper
477
478
479
def namedtuple_to_dict(data):
480
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
481
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
482
483
484
def list_of_namedtuple_to_list_of_dict(data):
485
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
486
    return [namedtuple_to_dict(d) for d in data]
487
488
489
def replace_special_chars(input_string, by=' '):
490
    """Replace some special char by another in the input_string
491
    Return: the string with the chars replaced"""
492
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
493