glances.globals   F
last analyzed

Complexity

Total Complexity 79

Size/Duplication

Total Lines 445
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 248
dl 0
loc 445
rs 2.08
c 0
b 0
f 0
wmc 79

33 Functions

Rating   Name   Duplication   Size   Complexity  
A itervalues() 0 2 1
A key_exist_value_not_none() 0 5 1
A subsample() 0 11 2
A safe_makedirs() 0 10 4
A printandflush() 0 3 1
A nativestr() 0 7 3
A weak_lru_cache() 0 16 1
A disable() 0 4 1
A listitems() 0 2 1
A b() 0 4 2
A system_exec() 0 7 2
A is_admin() 0 24 3
A json_dumps_dictlist() 0 6 2
A urlopen_auth() 0 6 1
A time_serie_subsample() 0 15 2
A dictlist() 0 16 5
F pretty_date() 0 42 16
A to_fahrenheit() 0 3 1
A key_exist_value_not_none_not_v() 0 7 1
A json_dumps() 0 9 2
A listvalues() 0 2 1
A enable() 0 4 1
A iterkeys() 0 2 1
A u() 0 4 2
B string_value_to_float() 0 34 5
A to_ascii() 0 6 2
A iteritems() 0 2 1
A listkeys() 0 2 1
A file_exists() 0 3 1
B folder_size() 0 24 8
A list_of_namedtuple_to_list_of_dict() 0 3 1
A namedtuple_to_dict() 0 3 2
A replace_special_chars() 0 4 1

How to fix   Complexity   

Complexity

Complex classes like glances.globals often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2023 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import errno
17
import os
18
import sys
19
import platform
20
import ujson
21
from operator import itemgetter, methodcaller
22
import unicodedata
23
import types
24
import subprocess
25
from datetime import datetime
26
import re
27
import base64
28
import functools
29
import weakref
30
31
import queue
32
from configparser import ConfigParser, NoOptionError, NoSectionError
33
from statistics import mean
34
from xmlrpc.client import Fault, ProtocolError, ServerProxy, Transport, Server
35
from xmlrpc.server import SimpleXMLRPCRequestHandler, SimpleXMLRPCServer
36
from urllib.request import urlopen, Request
37
from urllib.error import HTTPError, URLError
38
from urllib.parse import urlparse
39
40
# Correct issue #1025 by monkey path the xmlrpc lib
41
from defusedxml.xmlrpc import monkey_patch
42
43
monkey_patch()
44
45
##############
46
# GLOBALS VARS
47
##############
48
49
# OS constants (some libraries/features are OS-dependent)
50
BSD = sys.platform.find('bsd') != -1
51
LINUX = sys.platform.startswith('linux')
52
MACOS = sys.platform.startswith('darwin')
53
SUNOS = sys.platform.startswith('sunos')
54
WINDOWS = sys.platform.startswith('win')
55
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
56
57
# Set the AMPs, plugins and export modules path
58
work_path = os.path.realpath(os.path.dirname(__file__))
59
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
60
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
61
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
62
sys_path = sys.path[:]
63
sys.path.insert(1, exports_path)
64
sys.path.insert(1, plugins_path)
65
sys.path.insert(1, amps_path)
66
67
# Types
68
text_type = str
69
binary_type = bytes
70
bool_type = bool
71
long = int
72
73
# Alias errors
74
PermissionError = OSError
75
76
# Alias methods
77
viewkeys = methodcaller('keys')
78
viewvalues = methodcaller('values')
79
viewitems = methodcaller('items')
80
81
82
###################
83
# GLOBALS FUNCTIONS
84
###################
85
86
87
def printandflush(string):
88
    """Print and flush (used by stdout* outputs modules)"""
89
    print(string, flush=True)
90
91
92
def to_ascii(s):
93
    """Convert the bytes string to a ASCII string
94
    Usefull to remove accent (diacritics)"""
95
    if isinstance(s, binary_type):
96
        return s.decode()
97
    return s.encode('ascii', 'ignore').decode()
98
99
100
def listitems(d):
101
    return list(d.items())
102
103
104
def listkeys(d):
105
    return list(d.keys())
106
107
108
def listvalues(d):
109
    return list(d.values())
110
111
112
def iteritems(d):
113
    return iter(d.items())
114
115
116
def iterkeys(d):
117
    return iter(d.keys())
118
119
120
def itervalues(d):
121
    return iter(d.values())
122
123
124
def u(s, errors='replace'):
125
    if isinstance(s, text_type):
126
        return s
127
    return s.decode('utf-8', errors=errors)
128
129
130
def b(s, errors='replace'):
131
    if isinstance(s, binary_type):
132
        return s
133
    return s.encode('utf-8', errors=errors)
134
135
136
def nativestr(s, errors='replace'):
137
    if isinstance(s, text_type):
138
        return s
139
    elif isinstance(s, (int, float)):
140
        return s.__str__()
141
    else:
142
        return s.decode('utf-8', errors=errors)
143
144
145
def system_exec(command):
146
    """Execute a system command and return the result as a str"""
147
    try:
148
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
149
    except Exception as e:
150
        res = 'ERROR: {}'.format(e)
151
    return res.rstrip()
152
153
154
def subsample(data, sampling):
155
    """Compute a simple mean subsampling.
156
157
    Data should be a list of numerical itervalues
158
159
    Return a subsampled list of sampling lenght
160
    """
161
    if len(data) <= sampling:
162
        return data
163
    sampling_length = int(round(len(data) / float(sampling)))
164
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
165
166
167
def time_serie_subsample(data, sampling):
168
    """Compute a simple mean subsampling.
169
170
    Data should be a list of set (time, value)
171
172
    Return a subsampled list of sampling length
173
    """
174
    if len(data) <= sampling:
175
        return data
176
    t = [t[0] for t in data]
177
    v = [t[1] for t in data]
178
    sampling_length = int(round(len(data) / float(sampling)))
179
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
180
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
181
    return list(zip(t_subsampled, v_subsampled))
182
183
184
def to_fahrenheit(celsius):
185
    """Convert Celsius to Fahrenheit."""
186
    return celsius * 1.8 + 32
187
188
189
def is_admin():
190
    """
191
    https://stackoverflow.com/a/19719292
192
    @return: True if the current user is an 'Admin' whatever that
193
    means (root on Unix), otherwise False.
194
    Warning: The inner function fails unless you have Windows XP SP2 or
195
    higher. The failure causes a traceback to be printed and this
196
    function to return False.
197
    """
198
199
    if os.name == 'nt':
200
        import ctypes
201
        import traceback
202
203
        # WARNING: requires Windows XP SP2 or higher!
204
        try:
205
            return ctypes.windll.shell32.IsUserAnAdmin()
206
        except Exception as e:
207
            print("Admin check failed with error: %s" % e)
208
            traceback.print_exc()
209
            return False
210
    else:
211
        # Check for root on Posix
212
        return os.getuid() == 0
213
214
215
def key_exist_value_not_none(k, d):
216
    # Return True if:
217
    # - key k exists
218
    # - d[k] is not None
219
    return k in d and d[k] is not None
220
221
222
def key_exist_value_not_none_not_v(k, d, value='', lengh=None):
223
    # Return True if:
224
    # - key k exists
225
    # - d[k] is not None
226
    # - d[k] != value
227
    # - if lengh is not None and len(d[k]) >= lengh
228
    return k in d and d[k] is not None and d[k] != value and (lengh is None or len(d[k]) >= lengh)
229
230
231
def disable(class_name, var):
232
    """Set disable_<var> to True in the class class_name."""
233
    setattr(class_name, 'enable_' + var, False)
234
    setattr(class_name, 'disable_' + var, True)
235
236
237
def enable(class_name, var):
238
    """Set disable_<var> to False in the class class_name."""
239
    setattr(class_name, 'enable_' + var, True)
240
    setattr(class_name, 'disable_' + var, False)
241
242
243
def safe_makedirs(path):
244
    """A safe function for creating a directory tree."""
245
    try:
246
        os.makedirs(path)
247
    except OSError as err:
248
        if err.errno == errno.EEXIST:
249
            if not os.path.isdir(path):
250
                raise
251
        else:
252
            raise
253
254
255
def pretty_date(time=False):
256
    """
257
    Get a datetime object or a int() Epoch timestamp and return a
258
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
259
    'just now', etc
260
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
261
    """
262
    now = datetime.now()
263
    if isinstance(time, int):
264
        diff = now - datetime.fromtimestamp(time)
265
    elif isinstance(time, datetime):
266
        diff = now - time
267
    elif not time:
268
        diff = 0
269
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
270
    day_diff = diff.days
271
272
    if day_diff < 0:
273
        return ''
274
275
    if day_diff == 0:
276
        if second_diff < 10:
277
            return "just now"
278
        if second_diff < 60:
279
            return str(second_diff) + " secs"
280
        if second_diff < 120:
281
            return "a min"
282
        if second_diff < 3600:
283
            return str(second_diff // 60) + " mins"
284
        if second_diff < 7200:
285
            return "an hour"
286
        if second_diff < 86400:
287
            return str(second_diff // 3600) + " hours"
288
    if day_diff == 1:
289
        return "yesterday"
290
    if day_diff < 7:
291
        return str(day_diff) + " days"
292
    if day_diff < 31:
293
        return str(day_diff // 7) + " weeks"
294
    if day_diff < 365:
295
        return str(day_diff // 30) + " months"
296
    return str(day_diff // 365) + " years"
297
298
299
def urlopen_auth(url, username, password):
300
    """Open a url with basic auth"""
301
    return urlopen(
302
        Request(
303
            url,
304
            headers={'Authorization': 'Basic ' + base64.b64encode(('%s:%s' % (username, password)).encode()).decode()},
305
        )
306
    )
307
308
309
def json_dumps(data):
310
    """Return the object data in a JSON format.
311
312
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
313
    """
314
    try:
315
        return ujson.dumps(data)
316
    except UnicodeDecodeError:
317
        return ujson.dumps(data, ensure_ascii=False)
318
319
320
def dictlist(data, item):
321
    if isinstance(data, dict):
322
        try:
323
            return {item: data[item]}
324
        except (TypeError, IndexError, KeyError):
325
            return None
326
    elif isinstance(data, list):
327
        try:
328
            # Source:
329
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
330
            # But https://github.com/nicolargo/glances/issues/1401
331
            return {item: list(map(itemgetter(item), data))}
332
        except (TypeError, IndexError, KeyError):
333
            return None
334
    else:
335
        return None
336
337
338
def json_dumps_dictlist(data, item):
339
    dl = dictlist(data, item)
340
    if dl is None:
341
        return None
342
    else:
343
        return json_dumps(dl)
344
345
346
def string_value_to_float(s):
347
    """Convert a string with a value and an unit to a float.
348
    Example:
349
    '12.5 MB' -> 12500000.0
350
    '32.5 GB' -> 32500000000.0
351
    Args:
352
        s (string): Input string with value and unit
353
    Output:
354
        float: The value in float
355
    """
356
    convert_dict = {
357
        None: 1,
358
        'B': 1,
359
        'KB': 1000,
360
        'MB': 1000000,
361
        'GB': 1000000000,
362
        'TB': 1000000000000,
363
        'PB': 1000000000000000,
364
    }
365
    unpack_string = [
366
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
367
    ]
368
    if len(unpack_string) == 2:
369
        value, unit = unpack_string
370
    elif len(unpack_string) == 1:
371
        value = unpack_string[0]
372
        unit = None
373
    else:
374
        return None
375
    try:
376
        value = float(unpack_string[0])
377
    except ValueError:
378
        return None
379
    return value * convert_dict[unit]
380
381
382
def file_exists(filename):
383
    """Return True if the file exists and is readable."""
384
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
385
386
387
def folder_size(path, errno=0):
388
    """Return a tuple with the size of the directory given by path and the errno.
389
    If an error occurs (for example one file or subfolder is not accessible),
390
    errno is set to the error number.
391
392
    path: <string>
393
    errno: <int> Should always be 0 when calling the function"""
394
    ret_size = 0
395
    ret_err = errno
396
    try:
397
        for f in os.scandir(path):
398
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
399
                ret = folder_size(os.path.join(path, f.name), ret_err)
400
                ret_size += ret[0]
401
                ret_err = ret[1]
402
            else:
403
                try:
404
                    ret_size += f.stat().st_size
405
                except OSError as e:
406
                    ret_err = e.errno
407
    except (OSError, PermissionError) as e:
408
        return 0, e.errno
409
    else:
410
        return ret_size, ret_err
411
412
413
def weak_lru_cache(maxsize=128, typed=False):
414
    """LRU Cache decorator that keeps a weak reference to self
415
    Source: https://stackoverflow.com/a/55990799"""
416
417
    def wrapper(func):
418
        @functools.lru_cache(maxsize, typed)
419
        def _func(_self, *args, **kwargs):
420
            return func(_self(), *args, **kwargs)
421
422
        @functools.wraps(func)
423
        def inner(self, *args, **kwargs):
424
            return _func(weakref.ref(self), *args, **kwargs)
425
426
        return inner
427
428
    return wrapper
429
430
431
def namedtuple_to_dict(data):
432
    """Convert a namedtuple to a dict, using the _asdict() method embeded in PsUtil stats."""
433
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in data.items()}
434
435
436
def list_of_namedtuple_to_list_of_dict(data):
437
    """Convert a list of namedtuples to a dict, using the _asdict() method embeded in PsUtil stats."""
438
    return [namedtuple_to_dict(d) for d in data]
439
440
441
def replace_special_chars(input_string, by=' '):
442
    """Replace some special char by another in the input_string
443
    Return: the string with the chars replaced"""
444
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
445