Test Failed
Push — develop ( 92de3b...7598fb )
by
unknown
03:04
created

glances.globals.printandflush()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import os
20
import platform
21
import queue
22
import re
23
import subprocess
24
import sys
25
import weakref
26
from configparser import ConfigParser, NoOptionError, NoSectionError
27
from datetime import datetime
28
from operator import itemgetter, methodcaller
29
from statistics import mean
30
from typing import Dict, List, Union
31
from urllib.error import HTTPError, URLError
32
from urllib.parse import urlparse
33
from urllib.request import Request, urlopen
34
from xmlrpc.client import Fault, ProtocolError, Server, ServerProxy, Transport
35
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
36
37
from defusedxml.xmlrpc import monkey_patch
38
39
# Optionally use orjson if available
40
try:
41
    import orjson as json
42
except ImportError:
43
    import json
44
45
# Correct issue #1025 by monkey path the xmlrpc lib
46
monkey_patch()
47
48
##############
49
# GLOBALS VARS
50
##############
51
52
# OS constants (some libraries/features are OS-dependent)
53
BSD = sys.platform.find('bsd') != -1
54
LINUX = sys.platform.startswith('linux')
55
MACOS = sys.platform.startswith('darwin')
56
SUNOS = sys.platform.startswith('sunos')
57
WINDOWS = sys.platform.startswith('win')
58
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
59
60
# Set the AMPs, plugins and export modules path
61
work_path = os.path.realpath(os.path.dirname(__file__))
62
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
63
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
64
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
65
sys_path = sys.path[:]
66
sys.path.insert(1, exports_path)
67
sys.path.insert(1, plugins_path)
68
sys.path.insert(1, amps_path)
69
70
# Types
71
text_type = str
72
binary_type = bytes
73
bool_type = bool
74
long = int
75
76
# Alias errors
77
PermissionError = OSError
78
79
# Alias methods
80
viewkeys = methodcaller('keys')
81
viewvalues = methodcaller('values')
82
viewitems = methodcaller('items')
83
84
85
###################
86
# GLOBALS FUNCTIONS
87
###################
88
89
90
def printandflush(string):
91
    """Print and flush (used by stdout* outputs modules)"""
92
    print(string, flush=True)
93
94
95
def to_ascii(s):
96
    """Convert the bytes string to a ASCII string
97
    Useful to remove accent (diacritics)"""
98
    if isinstance(s, binary_type):
99
        return s.decode()
100
    return s.encode('ascii', 'ignore').decode()
101
102
103
def listitems(d):
104
    return list(d.items())
105
106
107
def listkeys(d):
108
    return list(d.keys())
109
110
111
def listvalues(d):
112
    return list(d.values())
113
114
115
def iteritems(d):
116
    return iter(d.items())
117
118
119
def iterkeys(d):
120
    return iter(d.keys())
121
122
123
def itervalues(d):
124
    return iter(d.values())
125
126
127
def u(s, errors='replace'):
128
    if isinstance(s, text_type):
129
        return s
130
    return s.decode('utf-8', errors=errors)
131
132
133
def b(s, errors='replace'):
134
    if isinstance(s, binary_type):
135
        return s
136
    return s.encode('utf-8', errors=errors)
137
138
139
def nativestr(s, errors='replace'):
140
    if isinstance(s, text_type):
141
        return s
142
    if isinstance(s, (int, float)):
143
        return s.__str__()
144
    return s.decode('utf-8', errors=errors)
145
146
147
def system_exec(command):
148
    """Execute a system command and return the result as a str"""
149
    try:
150
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
151
    except Exception as e:
152
        res = f'ERROR: {e}'
153
    return res.rstrip()
154
155
156
def subsample(data, sampling):
157
    """Compute a simple mean subsampling.
158
159
    Data should be a list of numerical itervalues
160
161
    Return a subsampled list of sampling length
162
    """
163
    if len(data) <= sampling:
164
        return data
165
    sampling_length = int(round(len(data) / float(sampling)))
166
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
167
168
169
def time_serie_subsample(data, sampling):
170
    """Compute a simple mean subsampling.
171
172
    Data should be a list of set (time, value)
173
174
    Return a subsampled list of sampling length
175
    """
176
    if len(data) <= sampling:
177
        return data
178
    t = [t[0] for t in data]
179
    v = [t[1] for t in data]
180
    sampling_length = int(round(len(data) / float(sampling)))
181
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
182
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
183
    return list(zip(t_subsampled, v_subsampled))
184
185
186
def to_fahrenheit(celsius):
187
    """Convert Celsius to Fahrenheit."""
188
    return celsius * 1.8 + 32
189
190
191
def is_admin():
192
    """
193
    https://stackoverflow.com/a/19719292
194
    @return: True if the current user is an 'Admin' whatever that
195
    means (root on Unix), otherwise False.
196
    Warning: The inner function fails unless you have Windows XP SP2 or
197
    higher. The failure causes a traceback to be printed and this
198
    function to return False.
199
    """
200
201
    if os.name == 'nt':
202
        import ctypes
203
        import traceback
204
205
        # WARNING: requires Windows XP SP2 or higher!
206
        try:
207
            return ctypes.windll.shell32.IsUserAnAdmin()
208
        except Exception as e:
209
            print(f"Admin check failed with error: {e}")
210
            traceback.print_exc()
211
            return False
212
    else:
213
        # Check for root on Posix
214
        return os.getuid() == 0
215
216
217
def key_exist_value_not_none(k, d):
218
    # Return True if:
219
    # - key k exists
220
    # - d[k] is not None
221
    return k in d and d[k] is not None
222
223
224
def key_exist_value_not_none_not_v(k, d, value='', length=None):
225
    # Return True if:
226
    # - key k exists
227
    # - d[k] is not None
228
    # - d[k] != value
229
    # - if length is not None and len(d[k]) >= length
230
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
231
232
233
def disable(class_name, var):
234
    """Set disable_<var> to True in the class class_name."""
235
    setattr(class_name, 'enable_' + var, False)
236
    setattr(class_name, 'disable_' + var, True)
237
238
239
def enable(class_name, var):
240
    """Set disable_<var> to False in the class class_name."""
241
    setattr(class_name, 'enable_' + var, True)
242
    setattr(class_name, 'disable_' + var, False)
243
244
245
def safe_makedirs(path):
246
    """A safe function for creating a directory tree."""
247
    try:
248
        os.makedirs(path)
249
    except OSError as err:
250
        if err.errno == errno.EEXIST:
251
            if not os.path.isdir(path):
252
                raise
253
        else:
254
            raise
255
256
257
def pretty_date(time=False):
258
    """
259
    Get a datetime object or a int() Epoch timestamp and return a
260
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
261
    'just now', etc
262
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
263
    """
264
    now = datetime.now()
265
    if isinstance(time, int):
266
        diff = now - datetime.fromtimestamp(time)
267
    elif isinstance(time, datetime):
268
        diff = now - time
269
    elif not time:
270
        diff = 0
271
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
272
    day_diff = diff.days
273
274
    if day_diff < 0:
275
        return ''
276
277
    if day_diff == 0:
278
        if second_diff < 10:
279
            return "just now"
280
        if second_diff < 60:
281
            return str(second_diff) + " secs"
282
        if second_diff < 120:
283
            return "a min"
284
        if second_diff < 3600:
285
            return str(second_diff // 60) + " mins"
286
        if second_diff < 7200:
287
            return "an hour"
288
        if second_diff < 86400:
289
            return str(second_diff // 3600) + " hours"
290
    if day_diff == 1:
291
        return "yesterday"
292
    if day_diff < 7:
293
        return str(day_diff) + " days"
294
    if day_diff < 31:
295
        return str(day_diff // 7) + " weeks"
296
    if day_diff < 365:
297
        return str(day_diff // 30) + " months"
298
    return str(day_diff // 365) + " years"
299
300
301
def urlopen_auth(url, username, password):
302
    """Open a url with basic auth"""
303
    return urlopen(
304
        Request(
305
            url,
306
            headers={'Authorization': 'Basic ' + base64.b64encode((f'{username}:{password}').encode()).decode()},
307
        )
308
    )
309
310
311
def json_dumps(data) -> str:
312
    """Return the object data in a JSON format.
313
314
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
315
    """
316
    try:
317
        return json.dumps(data)
318
    except UnicodeDecodeError:
319
        return json.dumps(data, ensure_ascii=False)
320
321
322
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
323
    """Load a JSON buffer into memory as a Python object"""
324
    return json.loads(data)
325
326
327
def dictlist(data, item):
328
    if isinstance(data, dict):
329
        try:
330
            return {item: data[item]}
331
        except (TypeError, IndexError, KeyError):
332
            return None
333
    elif isinstance(data, list):
334
        try:
335
            # Source:
336
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
337
            # But https://github.com/nicolargo/glances/issues/1401
338
            return {item: list(map(itemgetter(item), data))}
339
        except (TypeError, IndexError, KeyError):
340
            return None
341
    else:
342
        return None
343
344
345
def json_dumps_dictlist(data, item):
346
    dl = dictlist(data, item)
347
    if dl is None:
348
        return None
349
    return json_dumps(dl)
350
351
352
def string_value_to_float(s):
353
    """Convert a string with a value and an unit to a float.
354
    Example:
355
    '12.5 MB' -> 12500000.0
356
    '32.5 GB' -> 32500000000.0
357
    Args:
358
        s (string): Input string with value and unit
359
    Output:
360
        float: The value in float
361
    """
362
    convert_dict = {
363
        None: 1,
364
        'B': 1,
365
        'KB': 1000,
366
        'MB': 1000000,
367
        'GB': 1000000000,
368
        'TB': 1000000000000,
369
        'PB': 1000000000000000,
370
    }
371
    unpack_string = [
372
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
373
    ]
374
    if len(unpack_string) == 2:
375
        value, unit = unpack_string
376
    elif len(unpack_string) == 1:
377
        value = unpack_string[0]
378
        unit = None
379
    else:
380
        return None
381
    try:
382
        value = float(unpack_string[0])
383
    except ValueError:
384
        return None
385
    return value * convert_dict[unit]
386
387
388
def file_exists(filename):
389
    """Return True if the file exists and is readable."""
390
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
391
392
393
def folder_size(path, errno=0):
394
    """Return a tuple with the size of the directory given by path and the errno.
395
    If an error occurs (for example one file or subfolder is not accessible),
396
    errno is set to the error number.
397
398
    path: <string>
399
    errno: <int> Should always be 0 when calling the function"""
400
    ret_size = 0
401
    ret_err = errno
402
    try:
403
        for f in os.scandir(path):
404
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
405
                ret = folder_size(os.path.join(path, f.name), ret_err)
406
                ret_size += ret[0]
407
                ret_err = ret[1]
408
            else:
409
                try:
410
                    ret_size += f.stat().st_size
411
                except OSError as e:
412
                    ret_err = e.errno
413
    except (OSError, PermissionError) as e:
414
        return 0, e.errno
415
    else:
416
        return ret_size, ret_err
417
418
419
def weak_lru_cache(maxsize=128, typed=False):
420
    """LRU Cache decorator that keeps a weak reference to self
421
    Source: https://stackoverflow.com/a/55990799"""
422
423
    def wrapper(func):
424
        @functools.lru_cache(maxsize, typed)
425
        def _func(_self, *args, **kwargs):
426
            return func(_self(), *args, **kwargs)
427
428
        @functools.wraps(func)
429
        def inner(self, *args, **kwargs):
430
            return _func(weakref.ref(self), *args, **kwargs)
431
432
        return inner
433
434
    return wrapper
435
436
437
def namedtuple_to_dict(data):
438
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
439
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
440
441
442
def list_of_namedtuple_to_list_of_dict(data):
443
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
444
    return [namedtuple_to_dict(d) for d in data]
445
446
447
def replace_special_chars(input_string, by=' '):
448
    """Replace some special char by another in the input_string
449
    Return: the string with the chars replaced"""
450
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
451