Test Failed
Push — master ( cc9054...a8608f )
by Nicolas
03:40
created

glances.globals.time_serie_subsample()   A

Complexity

Conditions 2

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 9
nop 2
dl 0
loc 15
rs 9.95
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import os
21
import platform
22
import queue
23
import re
24
import subprocess
25
import sys
26
import weakref
27
from configparser import ConfigParser, NoOptionError, NoSectionError
28
from datetime import datetime
29
from operator import itemgetter, methodcaller
30
from statistics import mean
31
from typing import Any, Dict, List, Union
32
from urllib.error import HTTPError, URLError
33
from urllib.parse import urlparse
34
from urllib.request import Request, urlopen
35
36
# Prefer faster libs for JSON (de)serialization
37
# Preference Order: orjson > ujson > json (builtin)
38
try:
39
    import orjson as json
40
41
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
42
except ImportError:
43
    # Need to log info but importing logger will cause cyclic imports
44
    pass
45
46
if 'json' not in globals():
47
    try:
48
        # Note: ujson is not officially supported
49
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
50
        import ujson as json
51
    except ImportError:
52
        import json
53
54
    # To allow ujson & json dumps to serialize datetime
55
    def _json_default(v: Any) -> Any:
56
        if isinstance(v, datetime):
57
            return v.isoformat()
58
        return v
59
60
    json.dumps = functools.partial(json.dumps, default=_json_default)
61
62
##############
63
# GLOBALS VARS
64
##############
65
66
# OS constants (some libraries/features are OS-dependent)
67
BSD = sys.platform.find('bsd') != -1
68
LINUX = sys.platform.startswith('linux')
69
MACOS = sys.platform.startswith('darwin')
70
SUNOS = sys.platform.startswith('sunos')
71
WINDOWS = sys.platform.startswith('win')
72
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
73
74
# Set the AMPs, plugins and export modules path
75
work_path = os.path.realpath(os.path.dirname(__file__))
76
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
77
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
78
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
79
sys_path = sys.path[:]
80
sys.path.insert(1, exports_path)
81
sys.path.insert(1, plugins_path)
82
sys.path.insert(1, amps_path)
83
84
# Types
85
text_type = str
86
binary_type = bytes
87
bool_type = bool
88
long = int
89
90
# Alias errors
91
PermissionError = OSError
92
93
# Alias methods
94
viewkeys = methodcaller('keys')
95
viewvalues = methodcaller('values')
96
viewitems = methodcaller('items')
97
98
99
###################
100
# GLOBALS FUNCTIONS
101
###################
102
103
104
def printandflush(string):
105
    """Print and flush (used by stdout* outputs modules)"""
106
    print(string, flush=True)
107
108
109
def to_ascii(s):
110
    """Convert the bytes string to a ASCII string
111
    Useful to remove accent (diacritics)"""
112
    if isinstance(s, binary_type):
113
        return s.decode()
114
    return s.encode('ascii', 'ignore').decode()
115
116
117
def listitems(d):
118
    return list(d.items())
119
120
121
def listkeys(d):
122
    return list(d.keys())
123
124
125
def listvalues(d):
126
    return list(d.values())
127
128
129
def iteritems(d):
130
    return iter(d.items())
131
132
133
def iterkeys(d):
134
    return iter(d.keys())
135
136
137
def itervalues(d):
138
    return iter(d.values())
139
140
141
def u(s, errors='replace'):
142
    if isinstance(s, text_type):
143
        return s
144
    return s.decode('utf-8', errors=errors)
145
146
147
def b(s, errors='replace'):
148
    if isinstance(s, binary_type):
149
        return s
150
    return s.encode('utf-8', errors=errors)
151
152
153
def nativestr(s, errors='replace'):
154
    if isinstance(s, text_type):
155
        return s
156
    if isinstance(s, (int, float)):
157
        return s.__str__()
158
    return s.decode('utf-8', errors=errors)
159
160
161
def system_exec(command):
162
    """Execute a system command and return the result as a str"""
163
    try:
164
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
165
    except Exception as e:
166
        res = f'ERROR: {e}'
167
    return res.rstrip()
168
169
170
def subsample(data, sampling):
171
    """Compute a simple mean subsampling.
172
173
    Data should be a list of numerical itervalues
174
175
    Return a subsampled list of sampling length
176
    """
177
    if len(data) <= sampling:
178
        return data
179
    sampling_length = int(round(len(data) / float(sampling)))
180
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
181
182
183
def time_series_subsample(data, sampling):
184
    """Compute a simple mean subsampling.
185
186
    Data should be a list of set (time, value)
187
188
    Return a subsampled list of sampling length
189
    """
190
    if len(data) <= sampling:
191
        return data
192
    t = [t[0] for t in data]
193
    v = [t[1] for t in data]
194
    sampling_length = int(round(len(data) / float(sampling)))
195
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
196
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
197
    return list(zip(t_subsampled, v_subsampled))
198
199
200
def to_fahrenheit(celsius):
201
    """Convert Celsius to Fahrenheit."""
202
    return celsius * 1.8 + 32
203
204
205
def is_admin():
206
    """
207
    https://stackoverflow.com/a/19719292
208
    @return: True if the current user is an 'Admin' whatever that
209
    means (root on Unix), otherwise False.
210
    Warning: The inner function fails unless you have Windows XP SP2 or
211
    higher. The failure causes a traceback to be printed and this
212
    function to return False.
213
    """
214
215
    if os.name == 'nt':
216
        import ctypes
217
        import traceback
218
219
        # WARNING: requires Windows XP SP2 or higher!
220
        try:
221
            return ctypes.windll.shell32.IsUserAnAdmin()
222
        except Exception as e:
223
            print(f"Admin check failed with error: {e}")
224
            traceback.print_exc()
225
            return False
226
    else:
227
        # Check for root on Posix
228
        return os.getuid() == 0
229
230
231
def key_exist_value_not_none(k, d):
232
    # Return True if:
233
    # - key k exists
234
    # - d[k] is not None
235
    return k in d and d[k] is not None
236
237
238
def key_exist_value_not_none_not_v(k, d, value='', length=None):
239
    # Return True if:
240
    # - key k exists
241
    # - d[k] is not None
242
    # - d[k] != value
243
    # - if length is not None and len(d[k]) >= length
244
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
245
246
247
def disable(class_name, var):
248
    """Set disable_<var> to True in the class class_name."""
249
    setattr(class_name, 'enable_' + var, False)
250
    setattr(class_name, 'disable_' + var, True)
251
252
253
def enable(class_name, var):
254
    """Set disable_<var> to False in the class class_name."""
255
    setattr(class_name, 'enable_' + var, True)
256
    setattr(class_name, 'disable_' + var, False)
257
258
259
def safe_makedirs(path):
260
    """A safe function for creating a directory tree."""
261
    try:
262
        os.makedirs(path)
263
    except OSError as err:
264
        if err.errno == errno.EEXIST:
265
            if not os.path.isdir(path):
266
                raise
267
        else:
268
            raise
269
270
271
def pretty_date(ref=False, now=datetime.now()):
272
    """
273
    Get a datetime object or a int() Epoch timestamp and return a
274
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
275
    'just now', etc
276
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
277
    """
278
    if isinstance(ref, int):
279
        diff = now - datetime.fromtimestamp(ref)
280
    elif isinstance(ref, datetime):
281
        diff = now - ref
282
    elif not ref:
283
        diff = 0
284
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
285
    day_diff = diff.days
286
287
    if day_diff < 0:
288
        return ''
289
290
    if day_diff == 0:
291
        if second_diff < 10:
292
            return "just now"
293
        if second_diff < 60:
294
            return str(second_diff) + " secs"
295
        if second_diff < 120:
296
            return "a min"
297
        if second_diff < 3600:
298
            return str(second_diff // 60) + " mins"
299
        if second_diff < 7200:
300
            return "an hour"
301
        if second_diff < 86400:
302
            return str(second_diff // 3600) + " hours"
303
    if day_diff == 1:
304
        return "yesterday"
305
    if day_diff < 7:
306
        return str(day_diff) + " days"
307
    if day_diff < 31:
308
        return str(day_diff // 7) + " weeks" if (day_diff // 7) > 1 else "1 week"
309
    if day_diff < 365:
310
        return str(day_diff // 30) + " months" if (day_diff // 30) > 1 else "1 month"
311
    return str(day_diff // 365) + " years" if (day_diff // 365) > 1 else "1 year"
312
313
314
def urlopen_auth(url, username, password):
315
    """Open a url with basic auth"""
316
    return urlopen(
317
        Request(
318
            url,
319
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
320
        )
321
    )
322
323
324
def json_dumps(data) -> bytes:
325
    """Return the object data in a JSON format.
326
327
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
328
    """
329
    try:
330
        res = json.dumps(data)
331
    except UnicodeDecodeError:
332
        res = json.dumps(data, ensure_ascii=False)
333
    # ujson & json libs return strings, but our contract expects bytes
334
    return b(res)
335
336
337
def json_loads(data: Union[str, bytes, bytearray]) -> Union[Dict, List]:
338
    """Load a JSON buffer into memory as a Python object"""
339
    return json.loads(data)
340
341
342
def dictlist(data, item):
343
    if isinstance(data, dict):
344
        try:
345
            return {item: data[item]}
346
        except (TypeError, IndexError, KeyError):
347
            return None
348
    elif isinstance(data, list):
349
        try:
350
            # Source:
351
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
352
            # But https://github.com/nicolargo/glances/issues/1401
353
            return {item: list(map(itemgetter(item), data))}
354
        except (TypeError, IndexError, KeyError):
355
            return None
356
    else:
357
        return None
358
359
360
def json_dumps_dictlist(data, item):
361
    dl = dictlist(data, item)
362
    if dl is None:
363
        return None
364
    return json_dumps(dl)
365
366
367
def string_value_to_float(s):
368
    """Convert a string with a value and an unit to a float.
369
    Example:
370
    '12.5 MB' -> 12500000.0
371
    '32.5 GB' -> 32500000000.0
372
    Args:
373
        s (string): Input string with value and unit
374
    Output:
375
        float: The value in float
376
    """
377
    convert_dict = {
378
        None: 1,
379
        'B': 1,
380
        'KB': 1000,
381
        'MB': 1000000,
382
        'GB': 1000000000,
383
        'TB': 1000000000000,
384
        'PB': 1000000000000000,
385
    }
386
    unpack_string = [
387
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
388
    ]
389
    if len(unpack_string) == 2:
390
        value, unit = unpack_string
391
    elif len(unpack_string) == 1:
392
        value = unpack_string[0]
393
        unit = None
394
    else:
395
        return None
396
    try:
397
        value = float(unpack_string[0])
398
    except ValueError:
399
        return None
400
    return value * convert_dict[unit]
401
402
403
def file_exists(filename):
404
    """Return True if the file exists and is readable."""
405
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
406
407
408
def folder_size(path, errno=0):
409
    """Return a tuple with the size of the directory given by path and the errno.
410
    If an error occurs (for example one file or subfolder is not accessible),
411
    errno is set to the error number.
412
413
    path: <string>
414
    errno: <int> Should always be 0 when calling the function"""
415
    ret_size = 0
416
    ret_err = errno
417
    try:
418
        for f in os.scandir(path):
419
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
420
                ret = folder_size(os.path.join(path, f.name), ret_err)
421
                ret_size += ret[0]
422
                ret_err = ret[1]
423
            else:
424
                try:
425
                    ret_size += f.stat().st_size
426
                except OSError as e:
427
                    ret_err = e.errno
428
    except (OSError, PermissionError) as e:
429
        return 0, e.errno
430
    else:
431
        return ret_size, ret_err
432
433
434
def weak_lru_cache(maxsize=128, typed=False):
435
    """LRU Cache decorator that keeps a weak reference to self
436
    Source: https://stackoverflow.com/a/55990799"""
437
438
    def wrapper(func):
439
        @functools.lru_cache(maxsize, typed)
440
        def _func(_self, *args, **kwargs):
441
            return func(_self(), *args, **kwargs)
442
443
        @functools.wraps(func)
444
        def inner(self, *args, **kwargs):
445
            return _func(weakref.ref(self), *args, **kwargs)
446
447
        return inner
448
449
    return wrapper
450
451
452
def namedtuple_to_dict(data):
453
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
454
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
455
456
457
def list_of_namedtuple_to_list_of_dict(data):
458
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
459
    return [namedtuple_to_dict(d) for d in data]
460
461
462
def replace_special_chars(input_string, by=' '):
463
    """Replace some special char by another in the input_string
464
    Return: the string with the chars replaced"""
465
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
466