Test Failed
Push — develop ( 843246...ad9d52 )
by Nicolas
08:45
created

glances.plugins.ip.IpPlugin.get_default_gateway()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""IP plugin."""
10
11
import threading
12
13
from glances.globals import get_ip_address, json_loads, queue, urlopen_auth
14
from glances.logger import logger
15
from glances.plugins.plugin.model import GlancesPluginModel
16
from glances.timer import Timer, getTimeSinceLastUpdate
17
18
# Fields description
19
# description: human readable description
20
# short_name: shortname to use un UI
21
# unit: unit type
22
# rate: is it a rate ? If yes, // by time_since_update when displayed,
23
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
24
fields_description = {
25
    'address': {
26
        'description': 'Private IP address',
27
    },
28
    'mask': {
29
        'description': 'Private IP mask',
30
    },
31
    'mask_cidr': {
32
        'description': 'Private IP mask in CIDR format',
33
        'unit': 'number',
34
    },
35
    'gateway': {
36
        'description': 'Private IP gateway',
37
    },
38
    'public_address': {
39
        'description': 'Public IP address',
40
    },
41
    'public_info_human': {
42
        'description': 'Public IP information',
43
    },
44
}
45
46
47
class IpPlugin(GlancesPluginModel):
48
    """Glances IP Plugin.
49
50
    stats is a dict
51
    """
52
53
    _default_public_refresh_interval = 300
54
55
    def __init__(self, args=None, config=None):
56
        """Init the plugin."""
57
        super().__init__(args=args, config=config, fields_description=fields_description)
58
59
        # We want to display the stat in the curse interface
60
        self.display_curse = True
61
62
        # Public information (see issue #2732)
63
        self.public_address = ""
64
        self.public_info = ""
65
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
66
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
67
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
68
        self.public_field = self.get_conf_value("public_field", default=[None])
69
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
70
        self.public_disabled = (
71
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false'
72
            or self.public_api is None
73
            or self.public_field is None
74
        )
75
        self.public_address_refresh_interval = self.get_conf_value(
76
            "public_refresh_interval", default=self._default_public_refresh_interval
77
        )
78
79
    def get_first_ip(self, stats):
80
        stats['address'], stats['mask'] = get_ip_address()
81
        stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
82
83
        return stats
84
85
    def get_public_ip(self, stats):
86
        time_since_update = getTimeSinceLastUpdate('public-ip')
87
        try:
88
            if not self.public_disabled and (
89
                self.public_address == "" or time_since_update > self.public_address_refresh_interval
90
            ):
91
                self.public_info = PublicIpInfo(self.public_api, self.public_username, self.public_password).get()
92
                self.public_address = self.public_info['ip']
93
        except (KeyError, AttributeError, TypeError) as e:
94
            logger.debug(f"Cannot grab public IP information ({e})")
95
        else:
96
            stats['public_address'] = (
97
                self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
98
            )
99
            stats['public_info_human'] = self.public_info_for_human(self.public_info)
100
101
        return stats
102
103
    @GlancesPluginModel._check_decorator
104
    @GlancesPluginModel._log_result_decorator
105
    def update(self):
106
        """Update IP stats using the input method.
107
108
        :return: the stats dict
109
        """
110
        # Init new stats
111
        stats = self.get_init_value()
112
113
        if self.input_method == 'local':
114
            stats = self.get_stats_for_local_input(stats)
115
116
        elif self.input_method == 'snmp':
117
            # Not implemented yet
118
            pass
119
120
        # Update the stats
121
        self.stats = stats
122
123
        return self.stats
124
125
    def get_stats_for_local_input(self, stats):
126
        # Get Public and Private IP address
127
        if self.get_first_ip(stats):
128
            self.get_public_ip(stats)
129
        return stats
130
131
    def __hide_ip(self, ip):
132
        """Hide last to digit of the given IP address"""
133
        return '.'.join(ip.split('.')[0:2]) + '.*.*'
134
135
    def msg_curse(self, args=None, max_width=None):
136
        """Return the dict to display in the curse interface."""
137
        # Init the return message
138
        ret = []
139
140
        # Only process if stats exist and display plugin enable...
141
        if not self.stats or self.is_disabled():
142
            return ret
143
144
        # Start with the private IP information
145
        if 'address' in self.stats:
146
            msg = 'IP '
147
            ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
148
            msg = '{}'.format(self.stats['address'])
149
            ret.append(self.curse_add_line(msg, optional=True))
150
        if 'mask_cidr' in self.stats:
151
            # VPN with no internet access (issue #842)
152
            msg = '/{}'.format(self.stats['mask_cidr'])
153
            ret.append(self.curse_add_line(msg, optional=True))
154
155
        # Then with the public IP information
156
        try:
157
            msg_pub = '{}'.format(self.stats['public_address'])
158
        except (UnicodeEncodeError, KeyError):
159
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
160
            pass
161
        else:
162
            if self.stats['public_address']:
163
                msg = ' Pub '
164
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
165
                ret.append(self.curse_add_line(msg_pub, optional=True))
166
167
            if self.stats['public_info_human']:
168
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
169
170
        return ret
171
172
    def public_info_for_human(self, public_info):
173
        """Return the data to pack to the client."""
174
        if not public_info:
175
            return ''
176
177
        return self.public_template.format(**public_info)
178
179
    @staticmethod
180
    def ip_to_cidr(ip):
181
        """Convert IP address to CIDR.
182
183
        Example: '255.255.255.0' will return 24
184
        """
185
        # Thanks to @Atticfire
186
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
187
        if ip is None:
188
            # Correct issue #1528
189
            return 0
190
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
191
192
193
class PublicIpInfo:
194
    """Get public IP information from online service."""
195
196
    def __init__(self, url, username, password, timeout=2):
197
        """Init the class."""
198
        self.url = url
199
        self.username = username
200
        self.password = password
201
        self.timeout = timeout
202
203
    def get(self):
204
        """Return the public IP information returned by one of the online service."""
205
        q = queue.Queue()
206
207
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
208
        t.daemon = True
209
        t.start()
210
211
        timer = Timer(self.timeout)
212
        info = None
213
        while not timer.finished() and info is None:
214
            if q.qsize() > 0:
215
                info = q.get()
216
217
        return info
218
219
    def _get_ip_public_info(self, queue_target, url, username, password):
220
        """Request the url service and put the result in the queue_target."""
221
        try:
222
            response = urlopen_auth(url, username, password).read()
223
        except Exception as e:
224
            logger.debug(f"IP plugin - Cannot get public IP information from {url} ({e})")
225
            queue_target.put(None)
226
        else:
227
            try:
228
                queue_target.put(json_loads(response))
229
            except (ValueError, KeyError) as e:
230
                logger.debug(f"IP plugin - Cannot load public IP information from {url} ({e})")
231
                queue_target.put(None)
232