Test Failed
Push — master ( a8608f...5a3ce6 )
by Nicolas
02:28
created

glances.globals.get_first_true_val()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Dict, List, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
37
# Prefer faster libs for JSON (de)serialization
38
# Preference Order: orjson > ujson > json (builtin)
39
try:
40
    import orjson as json
41
42
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
43
except ImportError:
44
    # Need to log info but importing logger will cause cyclic imports
45
    pass
46
47
if 'json' not in globals():
48
    try:
49
        # Note: ujson is not officially supported
50
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
51
        import ujson as json
52
    except ImportError:
53
        import json
54
55
    # To allow ujson & json dumps to serialize datetime
56
    def _json_default(v: Any) -> Any:
57
        if isinstance(v, datetime):
58
            return v.isoformat()
59
        return v
60
61
    json.dumps = functools.partial(json.dumps, default=_json_default)
62
63
##############
64
# GLOBALS VARS
65
##############
66
67
# OS constants (some libraries/features are OS-dependent)
68
BSD = sys.platform.find('bsd') != -1
69
LINUX = sys.platform.startswith('linux')
70
MACOS = sys.platform.startswith('darwin')
71
SUNOS = sys.platform.startswith('sunos')
72
WINDOWS = sys.platform.startswith('win')
73
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
74
75
# Set the AMPs, plugins and export modules path
76
work_path = os.path.realpath(os.path.dirname(__file__))
77
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
78
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
79
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
80
sys_path = sys.path[:]
81
sys.path.insert(1, exports_path)
82
sys.path.insert(1, plugins_path)
83
sys.path.insert(1, amps_path)
84
85
# Types
86
text_type = str
87
binary_type = bytes
88
bool_type = bool
89
long = int
90
91
# Alias errors
92
PermissionError = OSError
93
94
# Alias methods
95
viewkeys = methodcaller('keys')
96
viewvalues = methodcaller('values')
97
viewitems = methodcaller('items')
98
99
100
###################
101
# GLOBALS FUNCTIONS
102
###################
103
104
105
def printandflush(string):
106
    """Print and flush (used by stdout* outputs modules)"""
107
    print(string, flush=True)
108
109
110
def to_ascii(s):
111
    """Convert the bytes string to a ASCII string
112
    Useful to remove accent (diacritics)"""
113
    if isinstance(s, binary_type):
114
        return s.decode()
115
    return s.encode('ascii', 'ignore').decode()
116
117
118
def listitems(d):
119
    return list(d.items())
120
121
122
def listkeys(d):
123
    return list(d.keys())
124
125
126
def listvalues(d):
127
    return list(d.values())
128
129
130
def iteritems(d):
131
    return iter(d.items())
132
133
134
def iterkeys(d):
135
    return iter(d.keys())
136
137
138
def itervalues(d):
139
    return iter(d.values())
140
141
142
def u(s, errors='replace'):
143
    if isinstance(s, text_type):
144
        return s
145
    return s.decode('utf-8', errors=errors)
146
147
148
def b(s, errors='replace'):
149
    if isinstance(s, binary_type):
150
        return s
151
    return s.encode('utf-8', errors=errors)
152
153
154
def nativestr(s, errors='replace'):
155
    if isinstance(s, text_type):
156
        return s
157
    if isinstance(s, (int, float)):
158
        return s.__str__()
159
    return s.decode('utf-8', errors=errors)
160
161
162
def system_exec(command):
163
    """Execute a system command and return the result as a str"""
164
    try:
165
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
166
    except Exception as e:
167
        res = f'ERROR: {e}'
168
    return res.rstrip()
169
170
171
def subsample(data, sampling):
172
    """Compute a simple mean subsampling.
173
174
    Data should be a list of numerical itervalues
175
176
    Return a subsampled list of sampling length
177
    """
178
    if len(data) <= sampling:
179
        return data
180
    sampling_length = int(round(len(data) / float(sampling)))
181
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
182
183
184
def time_series_subsample(data, sampling):
185
    """Compute a simple mean subsampling.
186
187
    Data should be a list of set (time, value)
188
189
    Return a subsampled list of sampling length
190
    """
191
    if len(data) <= sampling:
192
        return data
193
    t = [t[0] for t in data]
194
    v = [t[1] for t in data]
195
    sampling_length = int(round(len(data) / float(sampling)))
196
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
197
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
198
    return list(zip(t_subsampled, v_subsampled))
199
200
201
def to_fahrenheit(celsius):
202
    """Convert Celsius to Fahrenheit."""
203
    return celsius * 1.8 + 32
204
205
206
def is_admin():
207
    """
208
    https://stackoverflow.com/a/19719292
209
    @return: True if the current user is an 'Admin' whatever that
210
    means (root on Unix), otherwise False.
211
    Warning: The inner function fails unless you have Windows XP SP2 or
212
    higher. The failure causes a traceback to be printed and this
213
    function to return False.
214
    """
215
216
    if os.name == 'nt':
217
        import ctypes
218
        import traceback
219
220
        # WARNING: requires Windows XP SP2 or higher!
221
        try:
222
            return ctypes.windll.shell32.IsUserAnAdmin()
223
        except Exception as e:
224
            print(f"Admin check failed with error: {e}")
225
            traceback.print_exc()
226
            return False
227
    else:
228
        # Check for root on Posix
229
        return os.getuid() == 0
230
231
232
def key_exist_value_not_none(k, d):
233
    # Return True if:
234
    # - key k exists
235
    # - d[k] is not None
236
    return k in d and d[k] is not None
237
238
239
def key_exist_value_not_none_not_v(k, d, value='', length=None):
240
    # Return True if:
241
    # - key k exists
242
    # - d[k] is not None
243
    # - d[k] != value
244
    # - if length is not None and len(d[k]) >= length
245
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
246
247
248
def disable(class_name, var):
249
    """Set disable_<var> to True in the class class_name."""
250
    setattr(class_name, 'enable_' + var, False)
251
    setattr(class_name, 'disable_' + var, True)
252
253
254
def enable(class_name, var):
255
    """Set disable_<var> to False in the class class_name."""
256
    setattr(class_name, 'enable_' + var, True)
257
    setattr(class_name, 'disable_' + var, False)
258
259
260
def safe_makedirs(path):
261
    """A safe function for creating a directory tree."""
262
    try:
263
        os.makedirs(path)
264
    except OSError as err:
265
        if err.errno == errno.EEXIST:
266
            if not os.path.isdir(path):
267
                raise
268
        else:
269
            raise
270
271
272
def get_time_diffs(ref, now):
273
    if isinstance(ref, int):
274
        diff = now - datetime.fromtimestamp(ref)
275
    elif isinstance(ref, datetime):
276
        diff = now - ref
277
    elif not ref:
278
        diff = 0
279
280
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
281
282
283
def get_first_true_val(conds):
284
    return next(key for key, val in conds.items() if val)
285
286
287
def maybe_add_plural(count):
288
    return "s" if count > 1 else ""
289
290
291
def build_str_when_more_than_seven_days(day_diff, unit):
292
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
293
294
    count = day_diff // scale
295
296
    return str(count) + " " + unit + maybe_add_plural(count)
297
298
299
def get_conds_day_diff(diff):
300
    day_diff = diff.days
301
    return OrderedDict(
302
        {
303
            '': day_diff < 0,
304
            get_first_true_val(get_conds_sec_diff(diff)): day_diff == 0,
305
            "yesterday": day_diff == 1,
306
            str(day_diff) + " days": day_diff < 7,
307
            build_str_when_more_than_seven_days(day_diff, "week"): day_diff < 31,
308
            build_str_when_more_than_seven_days(day_diff, "month"): day_diff < 365,
309
            build_str_when_more_than_seven_days(day_diff, "year"): day_diff >= 365,
310
        }
311
    )
312
313
314
def get_conds_sec_diff(diff):
315
    second_diff = diff.seconds
316
    return OrderedDict(
317
        {
318
            "just now": second_diff < 10,
319
            str(second_diff) + " secs": second_diff < 60,
320
            "a min": second_diff < 120,
321
            str(second_diff // 60) + " mins": second_diff < 3600,
322
            "an hour": second_diff < 7200,
323
            str(second_diff // 3600) + " hours": second_diff < 86400,
324
        }
325
    )
326
327
328
def pretty_date(ref=False, now=datetime.now()):
329
    """
330
    Get a datetime object or a int() Epoch timestamp and return a
331
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
332
    'just now', etc
333
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
334
    """
335
    diff = get_time_diffs(ref, now)
336
337
    return get_first_true_val(get_conds_day_diff(diff))
338
339
340
def urlopen_auth(url, username, password):
341
    """Open a url with basic auth"""
342
    return urlopen(
343
        Request(
344
            url,
345
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
346
        )
347
    )
348
349
350
def json_dumps(data) -> bytes:
351
    """Return the object data in a JSON format.
352
353
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
354
    """
355
    try:
356
        res = json.dumps(data)
357
    except UnicodeDecodeError:
358
        res = json.dumps(data, ensure_ascii=False)
359
    # ujson & json libs return strings, but our contract expects bytes
360
    return b(res)
361
362
363
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
364
    """Load a JSON buffer into memory as a Python object"""
365
    return json.loads(data)
366
367
368
def dictlist(data, item):
369
    if isinstance(data, dict):
370
        try:
371
            return {item: data[item]}
372
        except (TypeError, IndexError, KeyError):
373
            return None
374
    elif isinstance(data, list):
375
        try:
376
            # Source:
377
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
378
            # But https://github.com/nicolargo/glances/issues/1401
379
            return {item: list(map(itemgetter(item), data))}
380
        except (TypeError, IndexError, KeyError):
381
            return None
382
    else:
383
        return None
384
385
386
def json_dumps_dictlist(data, item):
387
    dl = dictlist(data, item)
388
    if dl is None:
389
        return None
390
    return json_dumps(dl)
391
392
393
def string_value_to_float(s):
394
    """Convert a string with a value and an unit to a float.
395
    Example:
396
    '12.5 MB' -> 12500000.0
397
    '32.5 GB' -> 32500000000.0
398
    Args:
399
        s (string): Input string with value and unit
400
    Output:
401
        float: The value in float
402
    """
403
    convert_dict = {
404
        None: 1,
405
        'B': 1,
406
        'KB': 1000,
407
        'MB': 1000000,
408
        'GB': 1000000000,
409
        'TB': 1000000000000,
410
        'PB': 1000000000000000,
411
    }
412
    unpack_string = [
413
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
414
    ]
415
    if len(unpack_string) == 2:
416
        value, unit = unpack_string
417
    elif len(unpack_string) == 1:
418
        value = unpack_string[0]
419
        unit = None
420
    else:
421
        return None
422
    try:
423
        value = float(unpack_string[0])
424
    except ValueError:
425
        return None
426
    return value * convert_dict[unit]
427
428
429
def file_exists(filename):
430
    """Return True if the file exists and is readable."""
431
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
432
433
434
def folder_size(path, errno=0):
435
    """Return a tuple with the size of the directory given by path and the errno.
436
    If an error occurs (for example one file or subfolder is not accessible),
437
    errno is set to the error number.
438
439
    path: <string>
440
    errno: <int> Should always be 0 when calling the function"""
441
    ret_size = 0
442
    ret_err = errno
443
    try:
444
        for f in os.scandir(path):
445
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
446
                ret = folder_size(os.path.join(path, f.name), ret_err)
447
                ret_size += ret[0]
448
                ret_err = ret[1]
449
            else:
450
                try:
451
                    ret_size += f.stat().st_size
452
                except OSError as e:
453
                    ret_err = e.errno
454
    except (OSError, PermissionError) as e:
455
        return 0, e.errno
456
    else:
457
        return ret_size, ret_err
458
459
460
def weak_lru_cache(maxsize=128, typed=False):
461
    """LRU Cache decorator that keeps a weak reference to self
462
    Source: https://stackoverflow.com/a/55990799"""
463
464
    def wrapper(func):
465
        @functools.lru_cache(maxsize, typed)
466
        def _func(_self, *args, **kwargs):
467
            return func(_self(), *args, **kwargs)
468
469
        @functools.wraps(func)
470
        def inner(self, *args, **kwargs):
471
            return _func(weakref.ref(self), *args, **kwargs)
472
473
        return inner
474
475
    return wrapper
476
477
478
def namedtuple_to_dict(data):
479
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
480
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
481
482
483
def list_of_namedtuple_to_list_of_dict(data):
484
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
485
    return [namedtuple_to_dict(d) for d in data]
486
487
488
def replace_special_chars(input_string, by=' '):
489
    """Replace some special char by another in the input_string
490
    Return: the string with the chars replaced"""
491
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
492