Test Failed
Push — develop ( 7598fb...a3895e )
by
unknown
59s queued 17s
created

glances.globals.disable()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from configparser import ConfigParser, NoOptionError, NoSectionError
28
from datetime import datetime
29
from operator import itemgetter, methodcaller
30
from statistics import mean
31
from typing import Any, Dict, List, Union
32
from urllib.error import HTTPError, URLError
33
from urllib.parse import urlparse
34
from urllib.request import Request, urlopen
35
from xmlrpc.client import Fault, ProtocolError, Server, ServerProxy, Transport
36
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
37
38
from defusedxml.xmlrpc import monkey_patch
39
40
# Correct issue #1025 by monkey path the xmlrpc lib
41
monkey_patch()
42
43
# Prefer faster libs for JSON (de)serialization
44
# Preference Order: orjson > ujson > json (builtin)
45
try:
46
    import orjson as json
47
48
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
49
except ImportError:
50
    # Need to log info but importing logger will cause cyclic imports
51
    pass
52
53
if 'json' not in globals():
54
    try:
55
        # Note: ujson is not officially supported
56
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
57
        import ujson as json
58
    except ImportError:
59
        import json
60
61
    # To allow ujson & json dumps to serialize datetime
62
    def _json_default(v: Any) -> Any:
63
        if isinstance(v, datetime):
64
            return v.isoformat()
65
        return v
66
67
    json.dumps = functools.partial(json.dumps, default=_json_default)
68
69
##############
70
# GLOBALS VARS
71
##############
72
73
# OS constants (some libraries/features are OS-dependent)
74
BSD = sys.platform.find('bsd') != -1
75
LINUX = sys.platform.startswith('linux')
76
MACOS = sys.platform.startswith('darwin')
77
SUNOS = sys.platform.startswith('sunos')
78
WINDOWS = sys.platform.startswith('win')
79
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
80
81
# Set the AMPs, plugins and export modules path
82
work_path = os.path.realpath(os.path.dirname(__file__))
83
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
84
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
85
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
86
sys_path = sys.path[:]
87
sys.path.insert(1, exports_path)
88
sys.path.insert(1, plugins_path)
89
sys.path.insert(1, amps_path)
90
91
# Types
92
text_type = str
93
binary_type = bytes
94
bool_type = bool
95
long = int
96
97
# Alias errors
98
PermissionError = OSError
99
100
# Alias methods
101
viewkeys = methodcaller('keys')
102
viewvalues = methodcaller('values')
103
viewitems = methodcaller('items')
104
105
106
###################
107
# GLOBALS FUNCTIONS
108
###################
109
110
111
def printandflush(string):
112
    """Print and flush (used by stdout* outputs modules)"""
113
    print(string, flush=True)
114
115
116
def to_ascii(s):
117
    """Convert the bytes string to a ASCII string
118
    Useful to remove accent (diacritics)"""
119
    if isinstance(s, binary_type):
120
        return s.decode()
121
    return s.encode('ascii', 'ignore').decode()
122
123
124
def listitems(d):
125
    return list(d.items())
126
127
128
def listkeys(d):
129
    return list(d.keys())
130
131
132
def listvalues(d):
133
    return list(d.values())
134
135
136
def iteritems(d):
137
    return iter(d.items())
138
139
140
def iterkeys(d):
141
    return iter(d.keys())
142
143
144
def itervalues(d):
145
    return iter(d.values())
146
147
148
def u(s, errors='replace'):
149
    if isinstance(s, text_type):
150
        return s
151
    return s.decode('utf-8', errors=errors)
152
153
154
def b(s, errors='replace'):
155
    if isinstance(s, binary_type):
156
        return s
157
    return s.encode('utf-8', errors=errors)
158
159
160
def nativestr(s, errors='replace'):
161
    if isinstance(s, text_type):
162
        return s
163
    if isinstance(s, (int, float)):
164
        return s.__str__()
165
    return s.decode('utf-8', errors=errors)
166
167
168
def system_exec(command):
169
    """Execute a system command and return the result as a str"""
170
    try:
171
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
172
    except Exception as e:
173
        res = f'ERROR: {e}'
174
    return res.rstrip()
175
176
177
def subsample(data, sampling):
178
    """Compute a simple mean subsampling.
179
180
    Data should be a list of numerical itervalues
181
182
    Return a subsampled list of sampling length
183
    """
184
    if len(data) <= sampling:
185
        return data
186
    sampling_length = int(round(len(data) / float(sampling)))
187
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
188
189
190
def time_series_subsample(data, sampling):
191
    """Compute a simple mean subsampling.
192
193
    Data should be a list of set (time, value)
194
195
    Return a subsampled list of sampling length
196
    """
197
    if len(data) <= sampling:
198
        return data
199
    t = [t[0] for t in data]
200
    v = [t[1] for t in data]
201
    sampling_length = int(round(len(data) / float(sampling)))
202
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
203
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
204
    return list(zip(t_subsampled, v_subsampled))
205
206
207
def to_fahrenheit(celsius):
208
    """Convert Celsius to Fahrenheit."""
209
    return celsius * 1.8 + 32
210
211
212
def is_admin():
213
    """
214
    https://stackoverflow.com/a/19719292
215
    @return: True if the current user is an 'Admin' whatever that
216
    means (root on Unix), otherwise False.
217
    Warning: The inner function fails unless you have Windows XP SP2 or
218
    higher. The failure causes a traceback to be printed and this
219
    function to return False.
220
    """
221
222
    if os.name == 'nt':
223
        import ctypes
224
        import traceback
225
226
        # WARNING: requires Windows XP SP2 or higher!
227
        try:
228
            return ctypes.windll.shell32.IsUserAnAdmin()
229
        except Exception as e:
230
            print(f"Admin check failed with error: {e}")
231
            traceback.print_exc()
232
            return False
233
    else:
234
        # Check for root on Posix
235
        return os.getuid() == 0
236
237
238
def key_exist_value_not_none(k, d):
239
    # Return True if:
240
    # - key k exists
241
    # - d[k] is not None
242
    return k in d and d[k] is not None
243
244
245
def key_exist_value_not_none_not_v(k, d, value='', length=None):
246
    # Return True if:
247
    # - key k exists
248
    # - d[k] is not None
249
    # - d[k] != value
250
    # - if length is not None and len(d[k]) >= length
251
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
252
253
254
def disable(class_name, var):
255
    """Set disable_<var> to True in the class class_name."""
256
    setattr(class_name, 'enable_' + var, False)
257
    setattr(class_name, 'disable_' + var, True)
258
259
260
def enable(class_name, var):
261
    """Set disable_<var> to False in the class class_name."""
262
    setattr(class_name, 'enable_' + var, True)
263
    setattr(class_name, 'disable_' + var, False)
264
265
266
def safe_makedirs(path):
267
    """A safe function for creating a directory tree."""
268
    try:
269
        os.makedirs(path)
270
    except OSError as err:
271
        if err.errno == errno.EEXIST:
272
            if not os.path.isdir(path):
273
                raise
274
        else:
275
            raise
276
277
278
def pretty_date(time=False):
279
    """
280
    Get a datetime object or a int() Epoch timestamp and return a
281
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
282
    'just now', etc
283
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
284
    """
285
    now = datetime.now()
286
    if isinstance(time, int):
287
        diff = now - datetime.fromtimestamp(time)
288
    elif isinstance(time, datetime):
289
        diff = now - time
290
    elif not time:
291
        diff = 0
292
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
293
    day_diff = diff.days
294
295
    if day_diff < 0:
296
        return ''
297
298
    if day_diff == 0:
299
        if second_diff < 10:
300
            return "just now"
301
        if second_diff < 60:
302
            return str(second_diff) + " secs"
303
        if second_diff < 120:
304
            return "a min"
305
        if second_diff < 3600:
306
            return str(second_diff // 60) + " mins"
307
        if second_diff < 7200:
308
            return "an hour"
309
        if second_diff < 86400:
310
            return str(second_diff // 3600) + " hours"
311
    if day_diff == 1:
312
        return "yesterday"
313
    if day_diff < 7:
314
        return str(day_diff) + " days"
315
    if day_diff < 31:
316
        return str(day_diff // 7) + " weeks"
317
    if day_diff < 365:
318
        return str(day_diff // 30) + " months"
319
    return str(day_diff // 365) + " years"
320
321
322
def urlopen_auth(url, username, password):
323
    """Open a url with basic auth"""
324
    return urlopen(
325
        Request(
326
            url,
327
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
328
        )
329
    )
330
331
332
def json_dumps(data) -> bytes:
333
    """Return the object data in a JSON format.
334
335
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
336
    """
337
    try:
338
        res = json.dumps(data)
339
    except UnicodeDecodeError:
340
        res = json.dumps(data, ensure_ascii=False)
341
    # ujson & json libs return strings, but our contract expects bytes
342
    return b(res)
343
344
345
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
346
    """Load a JSON buffer into memory as a Python object"""
347
    return json.loads(data)
348
349
350
def dictlist(data, item):
351
    if isinstance(data, dict):
352
        try:
353
            return {item: data[item]}
354
        except (TypeError, IndexError, KeyError):
355
            return None
356
    elif isinstance(data, list):
357
        try:
358
            # Source:
359
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
360
            # But https://github.com/nicolargo/glances/issues/1401
361
            return {item: list(map(itemgetter(item), data))}
362
        except (TypeError, IndexError, KeyError):
363
            return None
364
    else:
365
        return None
366
367
368
def json_dumps_dictlist(data, item):
369
    dl = dictlist(data, item)
370
    if dl is None:
371
        return None
372
    return json_dumps(dl)
373
374
375
def string_value_to_float(s):
376
    """Convert a string with a value and an unit to a float.
377
    Example:
378
    '12.5 MB' -> 12500000.0
379
    '32.5 GB' -> 32500000000.0
380
    Args:
381
        s (string): Input string with value and unit
382
    Output:
383
        float: The value in float
384
    """
385
    convert_dict = {
386
        None: 1,
387
        'B': 1,
388
        'KB': 1000,
389
        'MB': 1000000,
390
        'GB': 1000000000,
391
        'TB': 1000000000000,
392
        'PB': 1000000000000000,
393
    }
394
    unpack_string = [
395
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
396
    ]
397
    if len(unpack_string) == 2:
398
        value, unit = unpack_string
399
    elif len(unpack_string) == 1:
400
        value = unpack_string[0]
401
        unit = None
402
    else:
403
        return None
404
    try:
405
        value = float(unpack_string[0])
406
    except ValueError:
407
        return None
408
    return value * convert_dict[unit]
409
410
411
def file_exists(filename):
412
    """Return True if the file exists and is readable."""
413
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
414
415
416
def folder_size(path, errno=0):
417
    """Return a tuple with the size of the directory given by path and the errno.
418
    If an error occurs (for example one file or subfolder is not accessible),
419
    errno is set to the error number.
420
421
    path: <string>
422
    errno: <int> Should always be 0 when calling the function"""
423
    ret_size = 0
424
    ret_err = errno
425
    try:
426
        for f in os.scandir(path):
427
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
428
                ret = folder_size(os.path.join(path, f.name), ret_err)
429
                ret_size += ret[0]
430
                ret_err = ret[1]
431
            else:
432
                try:
433
                    ret_size += f.stat().st_size
434
                except OSError as e:
435
                    ret_err = e.errno
436
    except (OSError, PermissionError) as e:
437
        return 0, e.errno
438
    else:
439
        return ret_size, ret_err
440
441
442
def weak_lru_cache(maxsize=128, typed=False):
443
    """LRU Cache decorator that keeps a weak reference to self
444
    Source: https://stackoverflow.com/a/55990799"""
445
446
    def wrapper(func):
447
        @functools.lru_cache(maxsize, typed)
448
        def _func(_self, *args, **kwargs):
449
            return func(_self(), *args, **kwargs)
450
451
        @functools.wraps(func)
452
        def inner(self, *args, **kwargs):
453
            return _func(weakref.ref(self), *args, **kwargs)
454
455
        return inner
456
457
    return wrapper
458
459
460
def namedtuple_to_dict(data):
461
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
462
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
463
464
465
def list_of_namedtuple_to_list_of_dict(data):
466
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
467
    return [namedtuple_to_dict(d) for d in data]
468
469
470
def replace_special_chars(input_string, by=' '):
471
    """Replace some special char by another in the input_string
472
    Return: the string with the chars replaced"""
473
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
474