Issues (48)

glances/globals.py (1 issue)

1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import os
20
import platform
21
import queue
22
import re
23
import subprocess
24
import sys
25
import weakref
26
from configparser import ConfigParser, NoOptionError, NoSectionError
27
from datetime import datetime
28
from operator import itemgetter, methodcaller
29
from statistics import mean
30
from urllib.error import HTTPError, URLError
31
from urllib.parse import urlparse
32
from urllib.request import Request, urlopen
33
from xmlrpc.client import Fault, ProtocolError, Server, ServerProxy, Transport
34
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
35
36
import orjson
37
38
# Correct issue #1025 by monkey path the xmlrpc lib
39
from defusedxml.xmlrpc import monkey_patch
40
41
monkey_patch()
42
43
##############
44
# GLOBALS VARS
45
##############
46
47
# OS constants (some libraries/features are OS-dependent)
48
BSD = sys.platform.find('bsd') != -1
49
LINUX = sys.platform.startswith('linux')
50
MACOS = sys.platform.startswith('darwin')
51
SUNOS = sys.platform.startswith('sunos')
52
WINDOWS = sys.platform.startswith('win')
53
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
54
55
# Set the AMPs, plugins and export modules path
56
work_path = os.path.realpath(os.path.dirname(__file__))
57
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
58
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
59
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
60
sys_path = sys.path[:]
61
sys.path.insert(1, exports_path)
62
sys.path.insert(1, plugins_path)
63
sys.path.insert(1, amps_path)
64
65
# Types
66
text_type = str
67
binary_type = bytes
68
bool_type = bool
69
long = int
70
71
# Alias errors
72
PermissionError = OSError
73
74
# Alias methods
75
viewkeys = methodcaller('keys')
76
viewvalues = methodcaller('values')
77
viewitems = methodcaller('items')
78
79
80
###################
81
# GLOBALS FUNCTIONS
82
###################
83
84
85
def printandflush(string):
86
    """Print and flush (used by stdout* outputs modules)"""
87
    print(string, flush=True)
88
89
90
def to_ascii(s):
91
    """Convert the bytes string to a ASCII string
92
    Usefull to remove accent (diacritics)"""
93
    if isinstance(s, binary_type):
94
        return s.decode()
95
    return s.encode('ascii', 'ignore').decode()
96
97
98
def listitems(d):
99
    return list(d.items())
100
101
102
def listkeys(d):
103
    return list(d.keys())
104
105
106
def listvalues(d):
107
    return list(d.values())
108
109
110
def iteritems(d):
111
    return iter(d.items())
112
113
114
def iterkeys(d):
115
    return iter(d.keys())
116
117
118
def itervalues(d):
119
    return iter(d.values())
120
121
122
def u(s, errors='replace'):
123
    if isinstance(s, text_type):
124
        return s
125
    return s.decode('utf-8', errors=errors)
126
127
128
def b(s, errors='replace'):
129
    if isinstance(s, binary_type):
130
        return s
131
    return s.encode('utf-8', errors=errors)
132
133
134
def nativestr(s, errors='replace'):
135
    if isinstance(s, text_type):
136
        return s
137
    if isinstance(s, (int, float)):
138
        return s.__str__()
139
    return s.decode('utf-8', errors=errors)
140
141
142
def system_exec(command):
143
    """Execute a system command and return the result as a str"""
144
    try:
145
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
146
    except Exception as e:
147
        res = f'ERROR: {e}'
148
    return res.rstrip()
149
150
151
def subsample(data, sampling):
152
    """Compute a simple mean subsampling.
153
154
    Data should be a list of numerical itervalues
155
156
    Return a subsampled list of sampling lenght
157
    """
158
    if len(data) <= sampling:
159
        return data
160
    sampling_length = int(round(len(data) / float(sampling)))
161
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
162
163
164
def time_serie_subsample(data, sampling):
165
    """Compute a simple mean subsampling.
166
167
    Data should be a list of set (time, value)
168
169
    Return a subsampled list of sampling length
170
    """
171
    if len(data) <= sampling:
172
        return data
173
    t = [t[0] for t in data]
174
    v = [t[1] for t in data]
175
    sampling_length = int(round(len(data) / float(sampling)))
176
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
177
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
178
    return list(zip(t_subsampled, v_subsampled))
179
180
181
def to_fahrenheit(celsius):
182
    """Convert Celsius to Fahrenheit."""
183
    return celsius * 1.8 + 32
184
185
186
def is_admin():
187
    """
188
    https://stackoverflow.com/a/19719292
189
    @return: True if the current user is an 'Admin' whatever that
190
    means (root on Unix), otherwise False.
191
    Warning: The inner function fails unless you have Windows XP SP2 or
192
    higher. The failure causes a traceback to be printed and this
193
    function to return False.
194
    """
195
196
    if os.name == 'nt':
197
        import ctypes
198
        import traceback
199
200
        # WARNING: requires Windows XP SP2 or higher!
201
        try:
202
            return ctypes.windll.shell32.IsUserAnAdmin()
203
        except Exception as e:
204
            print(f"Admin check failed with error: {e}")
205
            traceback.print_exc()
206
            return False
207
    else:
208
        # Check for root on Posix
209
        return os.getuid() == 0
210
211
212
def key_exist_value_not_none(k, d):
213
    # Return True if:
214
    # - key k exists
215
    # - d[k] is not None
216
    return k in d and d[k] is not None
217
218
219
def key_exist_value_not_none_not_v(k, d, value='', lengh=None):
220
    # Return True if:
221
    # - key k exists
222
    # - d[k] is not None
223
    # - d[k] != value
224
    # - if lengh is not None and len(d[k]) >= lengh
225
    return k in d and d[k] is not None and d[k] != value and (lengh is None or len(d[k]) >= lengh)
226
227
228
def disable(class_name, var):
229
    """Set disable_<var> to True in the class class_name."""
230
    setattr(class_name, 'enable_' + var, False)
231
    setattr(class_name, 'disable_' + var, True)
232
233
234
def enable(class_name, var):
235
    """Set disable_<var> to False in the class class_name."""
236
    setattr(class_name, 'enable_' + var, True)
237
    setattr(class_name, 'disable_' + var, False)
238
239
240
def safe_makedirs(path):
241
    """A safe function for creating a directory tree."""
242
    try:
243
        os.makedirs(path)
244
    except OSError as err:
245
        if err.errno == errno.EEXIST:
246
            if not os.path.isdir(path):
247
                raise
248
        else:
249
            raise
250
251
252
def pretty_date(time=False):
253
    """
254
    Get a datetime object or a int() Epoch timestamp and return a
255
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
256
    'just now', etc
257
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
258
    """
259
    now = datetime.now()
260
    if isinstance(time, int):
261
        diff = now - datetime.fromtimestamp(time)
262
    elif isinstance(time, datetime):
263
        diff = now - time
264
    elif not time:
265
        diff = 0
266
    second_diff = diff.seconds
0 ignored issues
show
The variable diff does not seem to be defined for all execution paths.
Loading history...
267
    day_diff = diff.days
268
269
    if day_diff < 0:
270
        return ''
271
272
    if day_diff == 0:
273
        if second_diff < 10:
274
            return "just now"
275
        if second_diff < 60:
276
            return str(second_diff) + " secs"
277
        if second_diff < 120:
278
            return "a min"
279
        if second_diff < 3600:
280
            return str(second_diff // 60) + " mins"
281
        if second_diff < 7200:
282
            return "an hour"
283
        if second_diff < 86400:
284
            return str(second_diff // 3600) + " hours"
285
    if day_diff == 1:
286
        return "yesterday"
287
    if day_diff < 7:
288
        return str(day_diff) + " days"
289
    if day_diff < 31:
290
        return str(day_diff // 7) + " weeks"
291
    if day_diff < 365:
292
        return str(day_diff // 30) + " months"
293
    return str(day_diff // 365) + " years"
294
295
296
def urlopen_auth(url, username, password):
297
    """Open a url with basic auth"""
298
    return urlopen(
299
        Request(
300
            url,
301
            headers={'Authorization': 'Basic ' + base64.b64encode((f'{username}:{password}').encode()).decode()},
302
        )
303
    )
304
305
306
def json_dumps(data):
307
    """Return the object data in a JSON format.
308
309
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
310
    """
311
    try:
312
        return orjson.dumps(data)
313
    except UnicodeDecodeError:
314
        return orjson.dumps(data, ensure_ascii=False)
315
316
317
def dictlist(data, item):
318
    if isinstance(data, dict):
319
        try:
320
            return {item: data[item]}
321
        except (TypeError, IndexError, KeyError):
322
            return None
323
    elif isinstance(data, list):
324
        try:
325
            # Source:
326
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
327
            # But https://github.com/nicolargo/glances/issues/1401
328
            return {item: list(map(itemgetter(item), data))}
329
        except (TypeError, IndexError, KeyError):
330
            return None
331
    else:
332
        return None
333
334
335
def json_dumps_dictlist(data, item):
336
    dl = dictlist(data, item)
337
    if dl is None:
338
        return None
339
    return json_dumps(dl)
340
341
342
def string_value_to_float(s):
343
    """Convert a string with a value and an unit to a float.
344
    Example:
345
    '12.5 MB' -> 12500000.0
346
    '32.5 GB' -> 32500000000.0
347
    Args:
348
        s (string): Input string with value and unit
349
    Output:
350
        float: The value in float
351
    """
352
    convert_dict = {
353
        None: 1,
354
        'B': 1,
355
        'KB': 1000,
356
        'MB': 1000000,
357
        'GB': 1000000000,
358
        'TB': 1000000000000,
359
        'PB': 1000000000000000,
360
    }
361
    unpack_string = [
362
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
363
    ]
364
    if len(unpack_string) == 2:
365
        value, unit = unpack_string
366
    elif len(unpack_string) == 1:
367
        value = unpack_string[0]
368
        unit = None
369
    else:
370
        return None
371
    try:
372
        value = float(unpack_string[0])
373
    except ValueError:
374
        return None
375
    return value * convert_dict[unit]
376
377
378
def file_exists(filename):
379
    """Return True if the file exists and is readable."""
380
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
381
382
383
def folder_size(path, errno=0):
384
    """Return a tuple with the size of the directory given by path and the errno.
385
    If an error occurs (for example one file or subfolder is not accessible),
386
    errno is set to the error number.
387
388
    path: <string>
389
    errno: <int> Should always be 0 when calling the function"""
390
    ret_size = 0
391
    ret_err = errno
392
    try:
393
        for f in os.scandir(path):
394
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
395
                ret = folder_size(os.path.join(path, f.name), ret_err)
396
                ret_size += ret[0]
397
                ret_err = ret[1]
398
            else:
399
                try:
400
                    ret_size += f.stat().st_size
401
                except OSError as e:
402
                    ret_err = e.errno
403
    except (OSError, PermissionError) as e:
404
        return 0, e.errno
405
    else:
406
        return ret_size, ret_err
407
408
409
def weak_lru_cache(maxsize=128, typed=False):
410
    """LRU Cache decorator that keeps a weak reference to self
411
    Source: https://stackoverflow.com/a/55990799"""
412
413
    def wrapper(func):
414
        @functools.lru_cache(maxsize, typed)
415
        def _func(_self, *args, **kwargs):
416
            return func(_self(), *args, **kwargs)
417
418
        @functools.wraps(func)
419
        def inner(self, *args, **kwargs):
420
            return _func(weakref.ref(self), *args, **kwargs)
421
422
        return inner
423
424
    return wrapper
425
426
427
def namedtuple_to_dict(data):
428
    """Convert a namedtuple to a dict, using the _asdict() method embeded in PsUtil stats."""
429
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
430
431
432
def list_of_namedtuple_to_list_of_dict(data):
433
    """Convert a list of namedtuples to a dict, using the _asdict() method embeded in PsUtil stats."""
434
    return [namedtuple_to_dict(d) for d in data]
435
436
437
def replace_special_chars(input_string, by=' '):
438
    """Replace some special char by another in the input_string
439
    Return: the string with the chars replaced"""
440
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
441