Test Failed
Push — develop ( 734632...e1e4e8 )
by Nicolas
02:17
created

glances.globals.listkeys()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
37
# Prefer faster libs for JSON (de)serialization
38
# Preference Order: orjson > ujson > json (builtin)
39
try:
40
    import orjson as json
41
42
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
43
except ImportError:
44
    # Need to log info but importing logger will cause cyclic imports
45
    pass
46
47
if 'json' not in globals():
48
    try:
49
        # Note: ujson is not officially supported
50
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
51
        import ujson as json
52
    except ImportError:
53
        import json
54
55
    # To allow ujson & json dumps to serialize datetime
56
    def _json_default(v: Any) -> Any:
57
        if isinstance(v, datetime):
58
            return v.isoformat()
59
        return v
60
61
    json.dumps = functools.partial(json.dumps, default=_json_default)
62
63
##############
64
# GLOBALS VARS
65
##############
66
67
# OS constants (some libraries/features are OS-dependent)
68
BSD = sys.platform.find('bsd') != -1
69
LINUX = sys.platform.startswith('linux')
70
MACOS = sys.platform.startswith('darwin')
71
SUNOS = sys.platform.startswith('sunos')
72
WINDOWS = sys.platform.startswith('win')
73
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
74
75
# Set the AMPs, plugins and export modules path
76
work_path = os.path.realpath(os.path.dirname(__file__))
77
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
78
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
79
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
80
sys_path = sys.path[:]
81
sys.path.insert(1, exports_path)
82
sys.path.insert(1, plugins_path)
83
sys.path.insert(1, amps_path)
84
85
# Types
86
text_type = str
87
binary_type = bytes
88
bool_type = bool
89
long = int
90
91
# Alias errors
92
PermissionError = OSError
93
94
# Alias methods
95
viewkeys = methodcaller('keys')
96
viewvalues = methodcaller('values')
97
viewitems = methodcaller('items')
98
99
100
###################
101
# GLOBALS FUNCTIONS
102
###################
103
104
105
def printandflush(string):
106
    """Print and flush (used by stdout* outputs modules)"""
107
    print(string, flush=True)
108
109
110
def to_ascii(s):
111
    """Convert the bytes string to a ASCII string
112
    Useful to remove accent (diacritics)"""
113
    if isinstance(s, binary_type):
114
        return s.decode()
115
    return s.encode('ascii', 'ignore').decode()
116
117
118
def listitems(d):
119
    return list(d.items())
120
121
122
def listkeys(d):
123
    return list(d.keys())
124
125
126
def listvalues(d):
127
    return list(d.values())
128
129
130
def iteritems(d):
131
    return iter(d.items())
132
133
134
def iterkeys(d):
135
    return iter(d.keys())
136
137
138
def itervalues(d):
139
    return iter(d.values())
140
141
142
def u(s, errors='replace'):
143
    if isinstance(s, text_type):
144
        return s
145
    return s.decode('utf-8', errors=errors)
146
147
148
def b(s, errors='replace'):
149
    if isinstance(s, binary_type):
150
        return s
151
    return s.encode('utf-8', errors=errors)
152
153
154
def nativestr(s, errors='replace'):
155
    if isinstance(s, text_type):
156
        return s
157
    if isinstance(s, (int, float)):
158
        return s.__str__()
159
    return s.decode('utf-8', errors=errors)
160
161
162
def system_exec(command):
163
    """Execute a system command and return the result as a str"""
164
    try:
165
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
166
    except Exception as e:
167
        res = f'ERROR: {e}'
168
    return res.rstrip()
169
170
171
def subsample(data, sampling):
172
    """Compute a simple mean subsampling.
173
174
    Data should be a list of numerical itervalues
175
176
    Return a subsampled list of sampling length
177
    """
178
    if len(data) <= sampling:
179
        return data
180
    sampling_length = int(round(len(data) / float(sampling)))
181
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
182
183
184
def time_series_subsample(data, sampling):
185
    """Compute a simple mean subsampling.
186
187
    Data should be a list of set (time, value)
188
189
    Return a subsampled list of sampling length
190
    """
191
    if len(data) <= sampling:
192
        return data
193
    t = [t[0] for t in data]
194
    v = [t[1] for t in data]
195
    sampling_length = int(round(len(data) / float(sampling)))
196
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
197
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
198
    return list(zip(t_subsampled, v_subsampled))
199
200
201
def to_fahrenheit(celsius):
202
    """Convert Celsius to Fahrenheit."""
203
    return celsius * 1.8 + 32
204
205
206
def is_admin():
207
    """
208
    https://stackoverflow.com/a/19719292
209
    @return: True if the current user is an 'Admin' whatever that
210
    means (root on Unix), otherwise False.
211
    Warning: The inner function fails unless you have Windows XP SP2 or
212
    higher. The failure causes a traceback to be printed and this
213
    function to return False.
214
    """
215
216
    if os.name == 'nt':
217
        import ctypes
218
        import traceback
219
220
        # WARNING: requires Windows XP SP2 or higher!
221
        try:
222
            return ctypes.windll.shell32.IsUserAnAdmin()
223
        except Exception as e:
224
            print(f"Admin check failed with error: {e}")
225
            traceback.print_exc()
226
            return False
227
    else:
228
        # Check for root on Posix
229
        return os.getuid() == 0
230
231
232
def key_exist_value_not_none(k, d):
233
    # Return True if:
234
    # - key k exists
235
    # - d[k] is not None
236
    return k in d and d[k] is not None
237
238
239
def key_exist_value_not_none_not_v(k, d, value='', length=None):
240
    # Return True if:
241
    # - key k exists
242
    # - d[k] is not None
243
    # - d[k] != value
244
    # - if length is not None and len(d[k]) >= length
245
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
246
247
248
def disable(class_name, var):
249
    """Set disable_<var> to True in the class class_name."""
250
    setattr(class_name, 'enable_' + var, False)
251
    setattr(class_name, 'disable_' + var, True)
252
253
254
def enable(class_name, var):
255
    """Set disable_<var> to False in the class class_name."""
256
    setattr(class_name, 'enable_' + var, True)
257
    setattr(class_name, 'disable_' + var, False)
258
259
260
def safe_makedirs(path):
261
    """A safe function for creating a directory tree."""
262
    try:
263
        os.makedirs(path)
264
    except OSError as err:
265
        if err.errno == errno.EEXIST:
266
            if not os.path.isdir(path):
267
                raise
268
        else:
269
            raise
270
271
272
def get_time_diffs(ref, now):
273
    if isinstance(ref, int):
274
        diff = now - datetime.fromtimestamp(ref)
275
    elif isinstance(ref, datetime):
276
        diff = now - ref
277
    elif not ref:
278
        diff = 0
279
280
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
281
282
283
def get_first_true_val(conds):
284
    return next(key for key, val in conds.items() if val)
285
286
287
def maybe_add_plural(count):
288
    return "s" if count > 1 else ""
289
290
291
def build_str_when_more_than_seven_days(day_diff, unit):
292
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
293
294
    count = day_diff // scale
295
296
    return str(count) + " " + unit + maybe_add_plural(count)
297
298
299
def pretty_date(time=False):
300
    """
301
    Get a datetime object or a int() Epoch timestamp and return a
302
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
303
    'just now', etc
304
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
305
306
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
307
    break the function. Get back to the old fashion way.
308
    """
309
    now = datetime.now()
310
    if isinstance(time, int):
311
        diff = now - datetime.fromtimestamp(time)
312
    elif isinstance(time, datetime):
313
        diff = now - time
314
    elif not time:
315
        diff = 0
316
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
317
    day_diff = diff.days
318
319
    if day_diff < 0:
320
        return ''
321
322
    if day_diff == 0:
323
        if second_diff < 10:
324
            return "just now"
325
        if second_diff < 60:
326
            return str(second_diff) + " secs"
327
        if second_diff < 120:
328
            return "a min"
329
        if second_diff < 3600:
330
            return str(second_diff // 60) + " mins"
331
        if second_diff < 7200:
332
            return "an hour"
333
        if second_diff < 86400:
334
            return str(second_diff // 3600) + " hours"
335
    if day_diff == 1:
336
        return "yesterday"
337
    if day_diff < 7:
338
        return str(day_diff) + " days"
339
    if day_diff < 31:
340
        return str(day_diff // 7) + " weeks"
341
    if day_diff < 365:
342
        return str(day_diff // 30) + " months"
343
    return str(day_diff // 365) + " years"
344
345
346
def urlopen_auth(url, username, password):
347
    """Open a url with basic auth"""
348
    return urlopen(
349
        Request(
350
            url,
351
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
352
        )
353
    )
354
355
356
def json_dumps(data) -> bytes:
357
    """Return the object data in a JSON format.
358
359
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
360
    """
361
    try:
362
        res = json.dumps(data)
363
    except UnicodeDecodeError:
364
        res = json.dumps(data, ensure_ascii=False)
365
    # ujson & json libs return strings, but our contract expects bytes
366
    return b(res)
367
368
369
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
370
    """Load a JSON buffer into memory as a Python object"""
371
    return json.loads(data)
372
373
374
def dictlist(data, item):
375
    if isinstance(data, dict):
376
        try:
377
            return {item: data[item]}
378
        except (TypeError, IndexError, KeyError):
379
            return None
380
    elif isinstance(data, list):
381
        try:
382
            # Source:
383
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
384
            # But https://github.com/nicolargo/glances/issues/1401
385
            return {item: list(map(itemgetter(item), data))}
386
        except (TypeError, IndexError, KeyError):
387
            return None
388
    else:
389
        return None
390
391
392
def json_dumps_dictlist(data, item):
393
    dl = dictlist(data, item)
394
    if dl is None:
395
        return None
396
    return json_dumps(dl)
397
398
399
def string_value_to_float(s):
400
    """Convert a string with a value and an unit to a float.
401
    Example:
402
    '12.5 MB' -> 12500000.0
403
    '32.5 GB' -> 32500000000.0
404
    Args:
405
        s (string): Input string with value and unit
406
    Output:
407
        float: The value in float
408
    """
409
    convert_dict = {
410
        None: 1,
411
        'B': 1,
412
        'KB': 1000,
413
        'MB': 1000000,
414
        'GB': 1000000000,
415
        'TB': 1000000000000,
416
        'PB': 1000000000000000,
417
    }
418
    unpack_string = [
419
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
420
    ]
421
    if len(unpack_string) == 2:
422
        value, unit = unpack_string
423
    elif len(unpack_string) == 1:
424
        value = unpack_string[0]
425
        unit = None
426
    else:
427
        return None
428
    try:
429
        value = float(unpack_string[0])
430
    except ValueError:
431
        return None
432
    return value * convert_dict[unit]
433
434
435
def file_exists(filename):
436
    """Return True if the file exists and is readable."""
437
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
438
439
440
def folder_size(path, errno=0):
441
    """Return a tuple with the size of the directory given by path and the errno.
442
    If an error occurs (for example one file or subfolder is not accessible),
443
    errno is set to the error number.
444
445
    path: <string>
446
    errno: <int> Should always be 0 when calling the function"""
447
    ret_size = 0
448
    ret_err = errno
449
    try:
450
        for f in os.scandir(path):
451
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
452
                ret = folder_size(os.path.join(path, f.name), ret_err)
453
                ret_size += ret[0]
454
                ret_err = ret[1]
455
            else:
456
                try:
457
                    ret_size += f.stat().st_size
458
                except OSError as e:
459
                    ret_err = e.errno
460
    except (OSError, PermissionError) as e:
461
        return 0, e.errno
462
    else:
463
        return ret_size, ret_err
464
465
466
def weak_lru_cache(maxsize=128, typed=False):
467
    """LRU Cache decorator that keeps a weak reference to self
468
    Source: https://stackoverflow.com/a/55990799"""
469
470
    def wrapper(func):
471
        @functools.lru_cache(maxsize, typed)
472
        def _func(_self, *args, **kwargs):
473
            return func(_self(), *args, **kwargs)
474
475
        @functools.wraps(func)
476
        def inner(self, *args, **kwargs):
477
            return _func(weakref.ref(self), *args, **kwargs)
478
479
        return inner
480
481
    return wrapper
482
483
484
def namedtuple_to_dict(data):
485
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
486
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
487
488
489
def list_of_namedtuple_to_list_of_dict(data):
490
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
491
    return [namedtuple_to_dict(d) for d in data]
492
493
494
def replace_special_chars(input_string, by=' '):
495
    """Replace some special char by another in the input_string
496
    Return: the string with the chars replaced"""
497
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
498