glances.globals.weak_lru_cache()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 20
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 3
dl 0
loc 20
rs 9.85
c 0
b 0
f 0
1
# ruff: noqa: F401
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Common objects shared by all Glances modules."""
11
12
################
13
# GLOBAL IMPORTS
14
################
15
16
import base64
17
import errno
18
import functools
19
import importlib
20
import multiprocessing
21
import os
22
import platform
23
import queue
24
import re
25
import socket
26
import subprocess
27
import sys
28
import weakref
29
from collections import OrderedDict
30
from configparser import ConfigParser, NoOptionError, NoSectionError
31
from datetime import datetime
32
from operator import itemgetter, methodcaller
33
from statistics import mean
34
from typing import Any, Optional, Union
35
from urllib.error import HTTPError, URLError
36
from urllib.parse import urlparse
37
from urllib.request import Request, urlopen
38
39
import psutil
40
41
# Prefer faster libs for JSON (de)serialization
42
# Preference Order: orjson > ujson > json (builtin)
43
try:
44
    import orjson as json
45
46
    json.dumps = functools.partial(json.dumps, option=json.OPT_NON_STR_KEYS)
47
except ImportError:
48
    # Need to log info but importing logger will cause cyclic imports
49
    pass
50
51
if 'json' not in globals():
52
    try:
53
        # Note: ujson is not officially supported
54
        # Available as a fallback to allow orjson's unsupported platforms to use a faster serialization lib
55
        import ujson as json
56
    except ImportError:
57
        import json
58
59
    # To allow ujson & json dumps to serialize datetime
60
    def _json_default(v: Any) -> Any:
61
        if isinstance(v, datetime):
62
            return v.isoformat()
63
        return v
64
65
    json.dumps = functools.partial(json.dumps, default=_json_default)
66
67
##############
68
# GLOBALS VARS
69
##############
70
71
# OS constants (some libraries/features are OS-dependent)
72
BSD = sys.platform.find('bsd') != -1
73
LINUX = sys.platform.startswith('linux')
74
MACOS = sys.platform.startswith('darwin')
75
SUNOS = sys.platform.startswith('sunos')
76
WINDOWS = sys.platform.startswith('win')
77
WSL = "linux" in platform.system().lower() and "microsoft" in platform.uname()[3].lower()
78
79
# Set the AMPs, plugins and export modules path
80
work_path = os.path.realpath(os.path.dirname(__file__))
81
amps_path = os.path.realpath(os.path.join(work_path, 'amps'))
82
plugins_path = os.path.realpath(os.path.join(work_path, 'plugins'))
83
exports_path = os.path.realpath(os.path.join(work_path, 'exports'))
84
sys_path = sys.path[:]
85
sys.path.insert(1, exports_path)
86
sys.path.insert(1, plugins_path)
87
sys.path.insert(1, amps_path)
88
89
# Types
90
text_type = str
91
binary_type = bytes
92
bool_type = bool
93
long = int
94
95
# Alias errors
96
PermissionError = OSError
97
98
# Alias methods
99
viewkeys = methodcaller('keys')
100
viewvalues = methodcaller('values')
101
viewitems = methodcaller('items')
102
103
# Multiprocessing start method (on POSIX system)
104
if LINUX or BSD or SUNOS or MACOS:
105
    ctx_mp_fork = multiprocessing.get_context('fork')
106
else:
107
    ctx_mp_fork = multiprocessing.get_context()
108
109
###################
110
# GLOBALS FUNCTIONS
111
###################
112
113
114
def printandflush(string):
115
    """Print and flush (used by stdout* outputs modules)"""
116
    print(string, flush=True)
117
118
119
def to_ascii(s):
120
    """Convert the bytes string to a ASCII string
121
    Useful to remove accent (diacritics)"""
122
    if isinstance(s, binary_type):
123
        return s.decode()
124
    return s.encode('ascii', 'ignore').decode()
125
126
127
def listitems(d):
128
    return list(d.items())
129
130
131
def listkeys(d):
132
    return list(d.keys())
133
134
135
def listvalues(d):
136
    return list(d.values())
137
138
139
def u(s, errors='replace'):
140
    if isinstance(s, text_type):
141
        return s
142
    return s.decode('utf-8', errors=errors)
143
144
145
def b(s, errors='replace'):
146
    if isinstance(s, binary_type):
147
        return s
148
    return s.encode('utf-8', errors=errors)
149
150
151
def nativestr(s, errors='replace'):
152
    if isinstance(s, text_type):
153
        return s
154
    if isinstance(s, (int, float)):
155
        return s.__str__()
156
    return s.decode('utf-8', errors=errors)
157
158
159
def system_exec(command):
160
    """Execute a system command and return the result as a str"""
161
    try:
162
        res = subprocess.run(command.split(' '), stdout=subprocess.PIPE).stdout.decode('utf-8')
163
    except Exception as e:
164
        res = f'ERROR: {e}'
165
    return res.rstrip()
166
167
168
def subsample(data, sampling):
169
    """Compute a simple mean subsampling.
170
171
    Data should be a list of numerical itervalues
172
173
    Return a subsampled list of sampling length
174
    """
175
    if len(data) <= sampling:
176
        return data
177
    sampling_length = int(round(len(data) / float(sampling)))
178
    return [mean(data[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
179
180
181
def time_series_subsample(data, sampling):
182
    """Compute a simple mean subsampling.
183
184
    Data should be a list of set (time, value)
185
186
    Return a subsampled list of sampling length
187
    """
188
    if len(data) <= sampling:
189
        return data
190
    t = [t[0] for t in data]
191
    v = [t[1] for t in data]
192
    sampling_length = int(round(len(data) / float(sampling)))
193
    t_subsampled = [t[s * sampling_length : (s + 1) * sampling_length][0] for s in range(0, sampling)]
194
    v_subsampled = [mean(v[s * sampling_length : (s + 1) * sampling_length]) for s in range(0, sampling)]
195
    return list(zip(t_subsampled, v_subsampled))
196
197
198
def to_fahrenheit(celsius):
199
    """Convert Celsius to Fahrenheit."""
200
    return celsius * 1.8 + 32
201
202
203
def is_admin():
204
    """
205
    https://stackoverflow.com/a/19719292
206
    @return: True if the current user is an 'Admin' whatever that
207
    means (root on Unix), otherwise False.
208
    Warning: The inner function fails unless you have Windows XP SP2 or
209
    higher. The failure causes a traceback to be printed and this
210
    function to return False.
211
    """
212
213
    if os.name == 'nt':
214
        import ctypes
215
        import traceback
216
217
        # WARNING: requires Windows XP SP2 or higher!
218
        try:
219
            return ctypes.windll.shell32.IsUserAnAdmin()
220
        except Exception as e:
221
            print(f"Admin check failed with error: {e}")
222
            traceback.print_exc()
223
            return False
224
    else:
225
        # Check for root on Posix
226
        return os.getuid() == 0
227
228
229
def key_exist_value_not_none(k, d):
230
    # Return True if:
231
    # - key k exists
232
    # - d[k] is not None
233
    return k in d and d[k] is not None
234
235
236
def key_exist_value_not_none_not_v(k, d, value='', length=None):
237
    # Return True if:
238
    # - key k exists
239
    # - d[k] is not None
240
    # - d[k] != value
241
    # - if length is not None and len(d[k]) >= length
242
    return k in d and d[k] is not None and d[k] != value and (length is None or len(d[k]) >= length)
243
244
245
def disable(class_name, var):
246
    """Set disable_<var> to True in the class class_name."""
247
    setattr(class_name, 'enable_' + var, False)
248
    setattr(class_name, 'disable_' + var, True)
249
250
251
def enable(class_name, var):
252
    """Set disable_<var> to False in the class class_name."""
253
    setattr(class_name, 'enable_' + var, True)
254
    setattr(class_name, 'disable_' + var, False)
255
256
257
def safe_makedirs(path):
258
    """A safe function for creating a directory tree."""
259
    try:
260
        os.makedirs(path)
261
    except OSError as err:
262
        if err.errno == errno.EEXIST:
263
            if not os.path.isdir(path):
264
                raise
265
        else:
266
            raise
267
268
269
def get_time_diffs(ref, now):
270
    if isinstance(ref, int):
271
        diff = now - datetime.fromtimestamp(ref)
272
    elif isinstance(ref, datetime):
273
        diff = now - ref
274
    elif not ref:
275
        diff = 0
276
277
    return diff
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
278
279
280
def get_first_true_val(conds):
281
    return next(key for key, val in conds.items() if val)
282
283
284
def maybe_add_plural(count):
285
    return "s" if count > 1 else ""
286
287
288
def build_str_when_more_than_seven_days(day_diff, unit):
289
    scale = {'week': 7, 'month': 30, 'year': 365}[unit]
290
291
    count = day_diff // scale
292
293
    return str(count) + " " + unit + maybe_add_plural(count)
294
295
296
def pretty_date(ref, now=None):
297
    """
298
    Get a datetime object or a int() Epoch timestamp and return a
299
    pretty string like 'an hour ago', 'Yesterday', '3 months ago',
300
    'just now', etc
301
    Source: https://stackoverflow.com/questions/1551382/user-friendly-time-format-in-python
302
303
    Refactoring done in commit https://github.com/nicolargo/glances/commit/f6279baacd4cf0b27ca10df6dc01f091ea86a40a
304
    break the function. Get back to the old fashion way.
305
    """
306
    if not now:
307
        now = datetime.now()
308
    if isinstance(ref, int):
309
        diff = now - datetime.fromtimestamp(ref)
310
    elif isinstance(ref, datetime):
311
        diff = now - ref
312
    elif not ref:
313
        diff = 0
314
    second_diff = diff.seconds
0 ignored issues
show
introduced by
The variable diff does not seem to be defined for all execution paths.
Loading history...
315
    day_diff = diff.days
316
317
    if day_diff < 0:
318
        return ''
319
320
    if day_diff == 0:
321
        if second_diff < 10:
322
            return "just now"
323
        if second_diff < 60:
324
            return str(second_diff) + " secs"
325
        if second_diff < 120:
326
            return "a min"
327
        if second_diff < 3600:
328
            return str(second_diff // 60) + " mins"
329
        if second_diff < 7200:
330
            return "an hour"
331
        if second_diff < 86400:
332
            return str(second_diff // 3600) + " hours"
333
    if day_diff == 1:
334
        return "yesterday"
335
    if day_diff < 7:
336
        return str(day_diff) + " days" if day_diff > 1 else "a day"
337
    if day_diff < 31:
338
        week = day_diff // 7
339
        return str(week) + " weeks" if week > 1 else "a week"
340
    if day_diff < 365:
341
        month = day_diff // 30
342
        return str(month) + " months" if month > 1 else "a month"
343
    year = day_diff // 365
344
    return str(year) + " years" if year > 1 else "an year"
345
346
347
def urlopen_auth(url, username, password):
348
    """Open a url with basic auth"""
349
    return urlopen(
350
        Request(
351
            url,
352
            headers={'Authorization': 'Basic ' + base64.b64encode(f'{username}:{password}'.encode()).decode()},
353
        )
354
    )
355
356
357
def json_dumps(data) -> bytes:
358
    """Return the object data in a JSON format.
359
360
    Manage the issue #815 for Windows OS with UnicodeDecodeError catching.
361
    """
362
    try:
363
        res = json.dumps(data)
364
    except UnicodeDecodeError:
365
        res = json.dumps(data, ensure_ascii=False)
366
    # ujson & json libs return strings, but our contract expects bytes
367
    return b(res)
368
369
370
def json_loads(data: str | bytes | bytearray) -> dict | list:
371
    """Load a JSON buffer into memory as a Python object"""
372
    return json.loads(data)
373
374
375
def list_to_dict(data):
376
    """Convert a list of dict (with key in 'key') to a dict with key as key and value as value."""
377
    if not isinstance(data, list):
378
        return None
379
    return {item[item['key']]: item for item in data if 'key' in item}
380
381
382
def dictlist(data, item):
383
    if isinstance(data, dict):
384
        try:
385
            return {item: data[item]}
386
        except (TypeError, IndexError, KeyError):
387
            return None
388
    elif isinstance(data, list):
389
        try:
390
            # Source:
391
            # http://stackoverflow.com/questions/4573875/python-get-index-of-dictionary-item-in-list
392
            # But https://github.com/nicolargo/glances/issues/1401
393
            return {item: list(map(itemgetter(item), data))}
394
        except (TypeError, IndexError, KeyError):
395
            return None
396
    else:
397
        return None
398
399
400
def dictlist_json_dumps(data, item):
401
    dl = dictlist(data, item)
402
    if dl is None:
403
        return None
404
    return json_dumps(dl)
405
406
407
def dictlist_first_key_value(data: list[dict], key, value) -> dict | None:
408
    """In a list of dict, return first item where key=value or none if not found."""
409
    try:
410
        ret = next(item for item in data if key in item and item[key] == value)
411
    except StopIteration:
412
        ret = None
413
    return ret
414
415
416
def auto_unit(number, low_precision=False, min_symbol='K', none_symbol='-'):
417
    """Make a nice human-readable string out of number.
418
419
    Number of decimal places increases as quantity approaches 1.
420
    CASE: 613421788        RESULT:       585M low_precision:       585M
421
    CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
422
    CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
423
    CASE: 838471403472     RESULT:       781G low_precision:       781G
424
    CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
425
    CASE: 1073741824       RESULT:      1024M low_precision:      1024M
426
    CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
427
428
    :low_precision: returns less decimal places potentially (default is False)
429
                    sacrificing precision for more readability.
430
    :min_symbol: Do not approach if number < min_symbol (default is K)
431
    :decimal_count: if set, force the number of decimal number (default is None)
432
    """
433
    if number is None:
434
        return none_symbol
435
    symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
436
    if min_symbol in symbols:
437
        symbols = symbols[symbols.index(min_symbol) :]
438
    prefix = {
439
        'Y': 1208925819614629174706176,
440
        'Z': 1180591620717411303424,
441
        'E': 1152921504606846976,
442
        'P': 1125899906842624,
443
        'T': 1099511627776,
444
        'G': 1073741824,
445
        'M': 1048576,
446
        'K': 1024,
447
    }
448
449
    if number == 0:
450
        # Avoid 0.0
451
        return '0'
452
453
    # If a value is a float, decimal_precision is 2 else 0
454
    decimal_precision = 2 if isinstance(number, float) else 0
455
    for symbol in reversed(symbols):
456
        value = float(number) / prefix[symbol]
457
        if value > 1:
458
            decimal_precision = 0
459
            if value < 10:
460
                decimal_precision = 2
461
            elif value < 100:
462
                decimal_precision = 1
463
            if low_precision:
464
                if symbol in 'MK':
465
                    decimal_precision = 0
466
                else:
467
                    decimal_precision = min(1, decimal_precision)
468
            elif symbol in 'K':
469
                decimal_precision = 0
470
            return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
471
472
    return f'{number:.{decimal_precision}f}'
473
474
475
def string_value_to_float(s):
476
    """Convert a string with a value and an unit to a float.
477
    Example:
478
    '12.5 MB' -> 12500000.0
479
    '32.5 GB' -> 32500000000.0
480
    Args:
481
        s (string): Input string with value and unit
482
    Output:
483
        float: The value in float
484
    """
485
    convert_dict = {
486
        None: 1,
487
        'B': 1,
488
        'KB': 1000,
489
        'MB': 1000000,
490
        'GB': 1000000000,
491
        'TB': 1000000000000,
492
        'PB': 1000000000000000,
493
    }
494
    unpack_string = [
495
        i[0] if i[1] == '' else i[1].upper() for i in re.findall(r'([\d.]+)|([^\d.]+)', s.replace(' ', ''))
496
    ]
497
    if len(unpack_string) == 2:
498
        value, unit = unpack_string
499
    elif len(unpack_string) == 1:
500
        value = unpack_string[0]
501
        unit = None
502
    else:
503
        return None
504
    try:
505
        value = float(unpack_string[0])
506
    except ValueError:
507
        return None
508
    return value * convert_dict[unit]
509
510
511
def file_exists(filename):
512
    """Return True if the file exists and is readable."""
513
    return os.path.isfile(filename) and os.access(filename, os.R_OK)
514
515
516
def folder_size(path, errno=0):
517
    """Return a tuple with the size of the directory given by path and the errno.
518
    If an error occurs (for example one file or subfolder is not accessible),
519
    errno is set to the error number.
520
521
    path: <string>
522
    errno: <int> Should always be 0 when calling the function"""
523
    ret_size = 0
524
    ret_err = errno
525
    try:
526
        for f in os.scandir(path):
527
            if f.is_dir(follow_symlinks=False) and (f.name != '.' or f.name != '..'):
528
                ret = folder_size(os.path.join(path, f.name), ret_err)
529
                ret_size += ret[0]
530
                ret_err = ret[1]
531
            else:
532
                try:
533
                    ret_size += f.stat().st_size
534
                except OSError as e:
535
                    ret_err = e.errno
536
    except (OSError, PermissionError) as e:
537
        return 0, e.errno
538
    else:
539
        return ret_size, ret_err
540
541
542
def _get_ttl_hash(ttl):
543
    """A simple (dummy) function to return a hash based on the current second.
544
    TODO: Implement a real TTL mechanism.
545
    """
546
    if ttl is None:
547
        return 0
548
    now = datetime.now()
549
    return now.second
550
551
552
def weak_lru_cache(maxsize=1, typed=False, ttl=None):
553
    """LRU Cache decorator that keeps a weak reference to self
554
555
    Warning: When used in a class, the class should implement __eq__(self, other) and __hash__(self) methods
556
557
    Source: https://stackoverflow.com/a/55990799"""
558
559
    def wrapper(func):
560
        @functools.lru_cache(maxsize, typed)
561
        def _func(_self, *args, ttl_hash=None, **kwargs):
562
            del ttl_hash  # Unused parameter, but kept for compatibility
563
            return func(_self(), *args, **kwargs)
564
565
        @functools.wraps(func)
566
        def inner(self, *args, **kwargs):
567
            return _func(weakref.ref(self), *args, ttl_hash=_get_ttl_hash(ttl), **kwargs)
568
569
        return inner
570
571
    return wrapper
572
573
574
def namedtuple_to_dict(data):
575
    """Convert a namedtuple to a dict, using the _asdict() method embedded in PsUtil stats."""
576
    return {k: (v._asdict() if hasattr(v, '_asdict') else v) for k, v in list(data.items())}
577
578
579
def list_of_namedtuple_to_list_of_dict(data):
580
    """Convert a list of namedtuples to a dict, using the _asdict() method embedded in PsUtil stats."""
581
    return [namedtuple_to_dict(d) for d in data]
582
583
584
def replace_special_chars(input_string, by=' '):
585
    """Replace some special char by another in the input_string
586
    Return: the string with the chars replaced"""
587
    return input_string.replace('\r\n', by).replace('\n', by).replace('\t', by)
588
589
590
def atoi(text):
591
    return int(text) if text.isdigit() else text
592
593
594
def natural_keys(text):
595
    """Return a text in a natural/human readable format."""
596
    return [atoi(c) for c in re.split(r'(\d+)', text)]
597
598
599
def exit_after(seconds, default=None):
600
    """Exit the function if it takes more than 'seconds' seconds to complete.
601
    In this case, return the value of 'default' (default: None)."""
602
603
    def handler(q, func, args, kwargs):
604
        q.put(func(*args, **kwargs))
605
606
    def decorator(func):
607
        if not LINUX:
608
            return func
609
610
        def wraps(*args, **kwargs):
611
            try:
612
                q = ctx_mp_fork.Queue()
613
            except PermissionError:
614
                # Manage an exception in Snap packages on Linux
615
                # The strict mode prevent the use of multiprocessing.Queue()
616
                # There is a "dirty" hack:
617
                # https://forum.snapcraft.io/t/python-multiprocessing-permission-denied-in-strictly-confined-snap/15518/2
618
                # But i prefer to just disable the timeout feature in this case
619
                func(*args, **kwargs)
620
            else:
621
                p = ctx_mp_fork.Process(target=handler, args=(q, func, args, kwargs))
622
                p.start()
623
                p.join(timeout=seconds)
624
                if not p.is_alive():
625
                    return q.get()
626
                p.terminate()
627
                p.join(timeout=0.1)
628
                if p.is_alive():
629
                    # Kill in case processes doesn't terminate
630
                    # Happens with cases like broken NFS connections
631
                    p.kill()
632
            return default
633
634
        return wraps
635
636
    return decorator
637
638
639
def split_esc(input_string, sep=None, maxsplit=-1, esc='\\'):
640
    """
641
    Return a list of the substrings in the input_string, using sep as the separator char
642
    and esc as the escape character.
643
644
    sep
645
        The separator used to split the input_string.
646
647
        When set to None (the default value), will split on any whitespace
648
        character (including \n \r \t \f and spaces) unless the character is escaped
649
        and will discard empty strings from the result.
650
    maxsplit
651
        Maximum number of splits.
652
        -1 (the default value) means no limit.
653
    esc
654
        The character used to escape the separator.
655
656
        When set to None, this behaves equivalently to `str.split`.
657
        Defaults to '\\\\' i.e. backslash.
658
659
    Splitting starts at the front of the input_string and works to the end.
660
661
    Note: escape characters in the substrings returned are removed. However, if
662
    maxsplit is reached, escape characters in the remaining, unprocessed substring
663
    are not removed, which allows split_esc to be called on it again.
664
    """
665
    # Input validation
666
    if not isinstance(input_string, str):
667
        raise TypeError(f'must be str, not {input_string.__class__.__name__}')
668
    str.split('', sep=sep, maxsplit=maxsplit)  # Use str.split to validate sep and maxsplit
669
    if esc is None:
670
        return input_string.split(
671
            sep=sep, maxsplit=maxsplit
672
        )  # Short circuit to default implementation if the escape character is None
673
    if not isinstance(esc, str):
674
        raise TypeError(f'must be str or None, not {esc.__class__.__name__}')
675
    if len(esc) == 0:
676
        raise ValueError('empty escape character')
677
    if len(esc) > 1:
678
        raise ValueError('escape must be a single character')
679
680
    # Set up a simple state machine keeping track of whether we have seen an escape character
681
    ret, esc_seen, i = [''], False, 0
682
    while i < len(input_string) and len(ret) - 1 != maxsplit:
683
        if not esc_seen:
684
            if input_string[i] == esc:
685
                # Consume the escape character and transition state
686
                esc_seen = True
687
                i += 1
688
            elif sep is None and input_string[i].isspace():
689
                # Consume as much whitespace as possible
690
                n = 1
691
                while i + n + 1 < len(input_string) and input_string[i + n : i + n + 1].isspace():
692
                    n += 1
693
                ret.append('')
694
                i += n
695
            elif sep is not None and input_string[i : i + len(sep)] == sep:
696
                # Consume the separator
697
                ret.append('')
698
                i += len(sep)
699
            else:
700
                # Otherwise just add the current char
701
                ret[-1] += input_string[i]
702
                i += 1
703
        else:
704
            # Add the current char and transition state back
705
            ret[-1] += input_string[i]
706
            esc_seen = False
707
            i += 1
708
709
    # Append any remaining string if we broke early because of maxsplit
710
    if i < len(input_string):
711
        ret[-1] += input_string[i:]
712
713
    # If splitting on whitespace, discard empty strings from result
714
    if sep is None:
715
        ret = [sub for sub in ret if len(sub) > 0]
716
717
    return ret
718
719
720
def get_ip_address(ipv6=False):
721
    """Get current IP address and netmask as a tuple."""
722
    family = socket.AF_INET6 if ipv6 else socket.AF_INET
723
724
    # Get IP address
725
    stats = psutil.net_if_stats()
726
    addrs = psutil.net_if_addrs()
727
728
    ip_address = None
729
    ip_netmask = None
730
    for interface, stat in stats.items():
731
        if stat.isup and interface != 'lo':
732
            if interface in addrs:
733
                for addr in addrs[interface]:
734
                    if addr.family == family:
735
                        ip_address = addr.address
736
                        ip_netmask = addr.netmask
737
                        break
738
739
    return ip_address, ip_netmask
740
741
742
def get_default_gateway(ipv6=False):
743
    """Get the default gateway IP address."""
744
745
    def convert_ipv4(gateway_hex):
746
        """Convert IPv4 hex (little-endian) to dotted notation."""
747
        return '.'.join(str(int(gateway_hex[i : i + 2], 16)) for i in range(6, -1, -2))
748
749
    def convert_ipv6(gateway_hex):
750
        """Convert IPv6 hex to colon notation."""
751
        return ':'.join(gateway_hex[i : i + 4] for i in range(0, 32, 4))
752
753
    if ipv6:
754
        route_file = '/proc/net/ipv6_route'
755
        default_dest = '00000000000000000000000000000000'
756
        dest_field = 0
757
        gateway_field = 4
758
        converter = convert_ipv6
759
    else:
760
        route_file = '/proc/net/route'
761
        default_dest = '00000000'
762
        dest_field = 1
763
        gateway_field = 2
764
        converter = convert_ipv4
765
766
    try:
767
        with open(route_file) as f:
768
            for line in f:
769
                fields = line.strip().split()
770
                if fields[dest_field] == default_dest:
771
                    return converter(fields[gateway_field])
772
    except (FileNotFoundError, IndexError, ValueError):
773
        return None
774
    return None
775