Test Failed
Push — master ( 372380...7cfc0c )
by Nicolas
03:32
created

glances.globals.dictlist_json_dumps()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 5
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from collections import OrderedDict
28
from configparser import ConfigParser, NoOptionError, NoSectionError
29
from datetime import datetime
30
from operator import itemgetter, methodcaller
31
from statistics import mean
32
from typing import Any, Optional, Union
33
from urllib.error import HTTPError, URLError
34
from urllib.parse import urlparse
35
from urllib.request import Request, urlopen
36
37
# Prefer faster libs for JSON (de)serialization
38
# Preference Order: orjson > ujson > json (builtin)
39
try:
40
    import orjson as json
41
42
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
43
except ImportError:
44
    # Need to log info but importing logger will cause cyclic imports
45
    pass
46
47
if 'json' not in globals():
48
    try:
49
        # Note: ujson is not officially supported
50
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
51
        import ujson as json
52
    except ImportError:
53
        import json
54
55
    # To allow ujson & json dumps to serialize datetime
56
    def _json_default(v: Any) -> Any:
57
        if isinstance(v, datetime):
58
            return v.isoformat()
59
        return v
60
61
    json.dumps = functools.partial(json.dumps, default=_json_default)
62
63
##############
64
# GLOBALS VARS
65
##############
66
67
# OS constants (some libraries/features are OS-dependent)
68
BSD = sys.platform.find('bsd') != -1
69
LINUX = sys.platform.startswith('linux')
70
MACOS = sys.platform.startswith('darwin')
71
SUNOS = sys.platform.startswith('sunos')
72
WINDOWS = sys.platform.startswith('win')
73
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
74
75
# Set the AMPs, plugins and export modules path
76
work_path = os.path.realpath(os.path.dirname(__file__))
77
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
78
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
79
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
80
sys_path = sys.path[:]
81
sys.path.insert(1, exports_path)
82
sys.path.insert(1, plugins_path)
83
sys.path.insert(1, amps_path)
84
85
# Types
86
text_type = str
87
binary_type = bytes
88
bool_type = bool
89
long = int
90
91
# Alias errors
92
PermissionError = OSError
93
94
# Alias methods
95
viewkeys = methodcaller('keys')
96
viewvalues = methodcaller('values')
97
viewitems = methodcaller('items')
98
99
100
###################
101
# GLOBALS FUNCTIONS
102
###################
103
104
105
def printandflush(string):
106
    """Print and flush (used by stdout* outputs modules)"""
107
    print(string, flush=True)
108
109
110
def to_ascii(s):
111
    """Convert the bytes string to a ASCII string
112
    Useful to remove accent (diacritics)"""
113
    if isinstance(s, binary_type):
114
        return s.decode()
115
    return s.encode('ascii', 'ignore').decode()
116
117
118
def listitems(d):
119
    return list(d.items())
120
121
122
def listkeys(d):
123
    return list(d.keys())
124
125
126
def listvalues(d):
127
    return list(d.values())
128
129
130
def iteritems(d):
131
    return iter(d.items())
132
133
134
def iterkeys(d):
135
    return iter(d.keys())
136
137
138
def itervalues(d):
139
    return iter(d.values())
140
141
142
def u(s, errors='replace'):
143
    if isinstance(s, text_type):
144
        return s
145
    return s.decode('utf-8', errors=errors)
146
147
148
def b(s, errors='replace'):
149
    if isinstance(s, binary_type):
150
        return s
151
    return s.encode('utf-8', errors=errors)
152
153
154
def nativestr(s, errors='replace'):
155
    if isinstance(s, text_type):
156
        return s
157
    if isinstance(s, (int, float)):
158
        return s.__str__()
159
    return s.decode('utf-8', errors=errors)
160
161
162
def system_exec(command):
163
    """Execute a system command and return the result as a str"""
164
    try:
165
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
166
    except Exception as e:
167
        res = f'ERROR: {e}'
168
    return res.rstrip()
169
170
171
def subsample(data, sampling):
172
    """Compute a simple mean subsampling.
173
174
    Data should be a list of numerical itervalues
175
176
    Return a subsampled list of sampling length
177
    """
178
    if len(data) <= sampling:
179
        return data
180
    sampling_length = int(round(len(data) / float(sampling)))
181
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
182
183
184
def time_series_subsample(data, sampling):
185
    """Compute a simple mean subsampling.
186
187
    Data should be a list of set (time, value)
188
189
    Return a subsampled list of sampling length
190
    """
191
    if len(data) <= sampling:
192
        return data
193
    t = [t[0] for t in data]
194
    v = [t[1] for t in data]
195
    sampling_length = int(round(len(data) / float(sampling)))
196
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
197
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
198
    return list(zip(t_subsampled, v_subsampled))
199
200
201
def to_fahrenheit(celsius):
202
    """Convert Celsius to Fahrenheit."""
203
    return celsius * 1.8 + 32
204
205
206
def is_admin():
207
    """
208
    https://stackoverflow.com/a/19719292
209
    @return: True if the current user is an 'Admin' whatever that
210
    means (root on Unix), otherwise False.
211
    Warning: The inner function fails unless you have Windows XP SP2 or
212
    higher. The failure causes a traceback to be printed and this
213
    function to return False.
214
    """
215
216
    if os.name == 'nt':
217
        import ctypes
218
        import traceback
219
220
        # WARNING: requires Windows XP SP2 or higher!
221
        try:
222
            return ctypes.windll.shell32.IsUserAnAdmin()
223
        except Exception as e:
224
            print(f"Admin check failed with error: {e}")
225
            traceback.print_exc()
226
            return False
227
    else:
228
        # Check for root on Posix
229
        return os.getuid() == 0
230
231
232
def key_exist_value_not_none(k, d):
233
    # Return True if:
234
    # - key k exists
235
    # - d[k] is not None
236
    return k in d and d[k] is not None
237
238
239
def key_exist_value_not_none_not_v(k, d, value='', length=None):
240
    # Return True if:
241
    # - key k exists
242
    # - d[k] is not None
243
    # - d[k] != value
244
    # - if length is not None and len(d[k]) >= length
245
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
246
247
248
def disable(class_name, var):
249
    """Set disable_<var> to True in the class class_name."""
250
    setattr(class_name, 'enable_' + var, False)
251
    setattr(class_name, 'disable_' + var, True)
252
253
254
def enable(class_name, var):
255
    """Set disable_<var> to False in the class class_name."""
256
    setattr(class_name, 'enable_' + var, True)
257
    setattr(class_name, 'disable_' + var, False)
258
259
260
def safe_makedirs(path):
261
    """A safe function for creating a directory tree."""
262
    try:
263
        os.makedirs(path)
264
    except OSError as err:
265
        if err.errno == errno.EEXIST:
266
            if not os.path.isdir(path):
267
                raise
268
        else:
269
            raise
270
271
272
def get_time_diffs(ref, now):
273
    if isinstance(ref, int):
274
        diff = now - datetime.fromtimestamp(ref)
275
    elif isinstance(ref, datetime):
276
        diff = now - ref
277
    elif not ref:
278
        diff = 0
279
280
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
281
282
283
def get_first_true_val(conds):
284
    return next(key for key, val in conds.items() if val)
285
286
287
def maybe_add_plural(count):
288
    return "s" if count > 1 else ""
289
290
291
def build_str_when_more_than_seven_days(day_diff, unit):
292
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
293
294
    count = day_diff // scale
295
296
    return str(count) + " " + unit + maybe_add_plural(count)
297
298
299
def pretty_date(ref, now=None):
300
    """
301
    Get a datetime object or a int() Epoch timestamp and return a
302
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
303
    'just now', etc
304
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
305
306
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
307
    break the function. Get back to the old fashion way.
308
    """
309
    if not now:
310
        now = datetime.now()
311
    if isinstance(ref, int):
312
        diff = now - datetime.fromtimestamp(ref)
313
    elif isinstance(ref, datetime):
314
        diff = now - ref
315
    elif not ref:
316
        diff = 0
317
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
318
    day_diff = diff.days
319
320
    if day_diff < 0:
321
        return ''
322
323
    if day_diff == 0:
324
        if second_diff < 10:
325
            return "just now"
326
        if second_diff < 60:
327
            return str(second_diff) + " secs"
328
        if second_diff < 120:
329
            return "a min"
330
        if second_diff < 3600:
331
            return str(second_diff // 60) + " mins"
332
        if second_diff < 7200:
333
            return "an hour"
334
        if second_diff < 86400:
335
            return str(second_diff // 3600) + " hours"
336
    if day_diff == 1:
337
        return "yesterday"
338
    if day_diff < 7:
339
        return str(day_diff) + " days" if day_diff > 1 else "a day"
340
    if day_diff < 31:
341
        week = day_diff // 7
342
        return str(week) + " weeks" if week > 1 else "a week"
343
    if day_diff < 365:
344
        month = day_diff // 30
345
        return str(month) + " months" if month > 1 else "a month"
346
    year = day_diff // 365
347
    return str(year) + " years" if year > 1 else "an year"
348
349
350
def urlopen_auth(url, username, password):
351
    """Open a url with basic auth"""
352
    return urlopen(
353
        Request(
354
            url,
355
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
356
        )
357
    )
358
359
360
def json_dumps(data) -> bytes:
361
    """Return the object data in a JSON format.
362
363
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
364
    """
365
    try:
366
        res = json.dumps(data)
367
    except UnicodeDecodeError:
368
        res = json.dumps(data, ensure_ascii=False)
369
    # ujson & json libs return strings, but our contract expects bytes
370
    return b(res)
371
372
373
def json_loads(data: Union[str, bytes, bytearray]) -> Union[dict, list]:
374
    """Load a JSON buffer into memory as a Python object"""
375
    return json.loads(data)
376
377
378
def dictlist(data, item):
379
    if isinstance(data, dict):
380
        try:
381
            return {item: data[item]}
382
        except (TypeError, IndexError, KeyError):
383
            return None
384
    elif isinstance(data, list):
385
        try:
386
            # Source:
387
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
388
            # But https://github.com/nicolargo/glances/issues/1401
389
            return {item: list(map(itemgetter(item), data))}
390
        except (TypeError, IndexError, KeyError):
391
            return None
392
    else:
393
        return None
394
395
396
def dictlist_json_dumps(data, item):
397
    dl = dictlist(data, item)
398
    if dl is None:
399
        return None
400
    return json_dumps(dl)
401
402
403
def dictlist_first_key_value(data: list[dict], key, value) -> Optional[dict]:
404
    """In a list of dict, return first item where key=value or none if not found."""
405
    try:
406
        ret = next(item for item in data if key in item and item[key] == value)
407
    except StopIteration:
408
        ret = None
409
    return ret
410
411
412
def string_value_to_float(s):
413
    """Convert a string with a value and an unit to a float.
414
    Example:
415
    '12.5 MB' -> 12500000.0
416
    '32.5 GB' -> 32500000000.0
417
    Args:
418
        s (string): Input string with value and unit
419
    Output:
420
        float: The value in float
421
    """
422
    convert_dict = {
423
        None: 1,
424
        'B': 1,
425
        'KB': 1000,
426
        'MB': 1000000,
427
        'GB': 1000000000,
428
        'TB': 1000000000000,
429
        'PB': 1000000000000000,
430
    }
431
    unpack_string = [
432
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
433
    ]
434
    if len(unpack_string) == 2:
435
        value, unit = unpack_string
436
    elif len(unpack_string) == 1:
437
        value = unpack_string[0]
438
        unit = None
439
    else:
440
        return None
441
    try:
442
        value = float(unpack_string[0])
443
    except ValueError:
444
        return None
445
    return value * convert_dict[unit]
446
447
448
def file_exists(filename):
449
    """Return True if the file exists and is readable."""
450
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
451
452
453
def folder_size(path, errno=0):
454
    """Return a tuple with the size of the directory given by path and the errno.
455
    If an error occurs (for example one file or subfolder is not accessible),
456
    errno is set to the error number.
457
458
    path: <string>
459
    errno: <int> Should always be 0 when calling the function"""
460
    ret_size = 0
461
    ret_err = errno
462
    try:
463
        for f in os.scandir(path):
464
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
465
                ret = folder_size(os.path.join(path, f.name), ret_err)
466
                ret_size += ret[0]
467
                ret_err = ret[1]
468
            else:
469
                try:
470
                    ret_size += f.stat().st_size
471
                except OSError as e:
472
                    ret_err = e.errno
473
    except (OSError, PermissionError) as e:
474
        return 0, e.errno
475
    else:
476
        return ret_size, ret_err
477
478
479
def weak_lru_cache(maxsize=128, typed=False):
480
    """LRU Cache decorator that keeps a weak reference to self
481
    Source: https://stackoverflow.com/a/55990799"""
482
483
    def wrapper(func):
484
        @functools.lru_cache(maxsize, typed)
485
        def _func(_self, *args, **kwargs):
486
            return func(_self(), *args, **kwargs)
487
488
        @functools.wraps(func)
489
        def inner(self, *args, **kwargs):
490
            return _func(weakref.ref(self), *args, **kwargs)
491
492
        return inner
493
494
    return wrapper
495
496
497
def namedtuple_to_dict(data):
498
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
499
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
500
501
502
def list_of_namedtuple_to_list_of_dict(data):
503
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
504
    return [namedtuple_to_dict(d) for d in data]
505
506
507
def replace_special_chars(input_string, by=' '):
508
    """Replace some special char by another in the input_string
509
    Return: the string with the chars replaced"""
510
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
511