Test Failed
Push — master ( 372380...7cfc0c )
by Nicolas
03:32
created

PluginModel.get_default_gateway()   A

Complexity

Conditions 2

Size

Total Lines 9
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""IP plugin."""
10
11
import threading
12
13
from glances.globals import json_loads, queue, urlopen_auth
14
from glances.logger import logger
15
from glances.plugins.plugin.model import GlancesPluginModel
16
from glances.timer import Timer, getTimeSinceLastUpdate
17
18
# Import plugin specific dependency
19
try:
20
    import netifaces
21
except ImportError as e:
22
    import_error_tag = True
23
    logger.warning(f"Missing Python Lib ({e}), IP plugin is disabled")
24
else:
25
    import_error_tag = False
26
27
try:
28
    netifaces.default_gateway()
29
except Exception:
30
    import_error_tag = True
31
    logger.warning("Netifaces2 should be installed in your Python environment, IP plugin is disabled")
32
else:
33
    import_error_tag = False
34
35
36
# Fields description
37
# description: human readable description
38
# short_name: shortname to use un UI
39
# unit: unit type
40
# rate: is it a rate ? If yes, // by time_since_update when displayed,
41
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
42
fields_description = {
43
    'address': {
44
        'description': 'Private IP address',
45
    },
46
    'mask': {
47
        'description': 'Private IP mask',
48
    },
49
    'mask_cidr': {
50
        'description': 'Private IP mask in CIDR format',
51
        'unit': 'number',
52
    },
53
    'gateway': {
54
        'description': 'Private IP gateway',
55
    },
56
    'public_address': {
57
        'description': 'Public IP address',
58
    },
59
    'public_info_human': {
60
        'description': 'Public IP information',
61
    },
62
}
63
64
65
class PluginModel(GlancesPluginModel):
66
    """Glances IP Plugin.
67
68
    stats is a dict
69
    """
70
71
    _default_public_refresh_interval = 300
72
73
    def __init__(self, args=None, config=None):
74
        """Init the plugin."""
75
        super().__init__(args=args, config=config, fields_description=fields_description)
76
77
        # We want to display the stat in the curse interface
78
        self.display_curse = True
79
80
        # Public information (see issue #2732)
81
        self.public_address = ""
82
        self.public_info = ""
83
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
84
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
85
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
86
        self.public_field = self.get_conf_value("public_field", default=[None])
87
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
88
        self.public_disabled = (
89
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false'
90
            or self.public_api is None
91
            or self.public_field is None
92
        )
93
        self.public_address_refresh_interval = self.get_conf_value(
94
            "public_refresh_interval", default=self._default_public_refresh_interval
95
        )
96
97
    def get_default_gateway(self):
98
        # Get the default gateway thanks to the netifaces lib
99
        # Return a tupple with: ('192.168.1.1', 'wlp0s20f3')
100
        try:
101
            default_gw = netifaces.default_gateway()[netifaces.AF_INET]
102
        except (KeyError, AttributeError) as e:
103
            logger.debug(f"Cannot grab default gateway IP address ({e})")
104
            return None
105
        return default_gw
106
107
    def get_first_ip(self, stats):
108
        default_gateway = self.get_default_gateway()
109
        try:
110
            if not default_gateway:
111
                default_interface = netifaces.interfaces_by_index()[netifaces.AF_INET]
112
            else:
113
                default_interface = default_gateway[1]
114
            address = netifaces.ifaddresses(default_interface)[netifaces.AF_INET][0]['addr']
115
            mask = netifaces.ifaddresses(default_interface)[netifaces.AF_INET][0]['mask']
116
        except (KeyError, AttributeError) as e:
117
            logger.debug(f"Cannot grab private IP address ({e})")
118
        else:
119
            stats['address'] = address
120
            stats['mask'] = mask
121
            stats['mask_cidr'] = self.ip_to_cidr(mask)
122
123
        return stats
124
125
    def get_public_ip(self, stats):
126
        time_since_update = getTimeSinceLastUpdate('public-ip')
127
        try:
128
            if not self.public_disabled and (
129
                self.public_address == "" or time_since_update > self.public_address_refresh_interval
130
            ):
131
                self.public_info = PublicIpInfo(self.public_api, self.public_username, self.public_password).get()
132
                self.public_address = self.public_info['ip']
133
        except (KeyError, AttributeError, TypeError) as e:
134
            logger.debug(f"Cannot grab public IP information ({e})")
135
        else:
136
            stats['public_address'] = (
137
                self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
138
            )
139
            stats['public_info_human'] = self.public_info_for_human(self.public_info)
140
141
        return stats
142
143
    @GlancesPluginModel._check_decorator
144
    @GlancesPluginModel._log_result_decorator
145
    def update(self):
146
        """Update IP stats using the input method.
147
148
        :return: the stats dict
149
        """
150
        # Init new stats
151
        stats = self.get_init_value()
152
153
        if self.input_method == 'local' and not import_error_tag:
154
            stats = self.get_stats_for_local_input(stats)
155
156
        elif self.input_method == 'snmp':
157
            # Not implemented yet
158
            pass
159
160
        # Update the stats
161
        self.stats = stats
162
163
        return self.stats
164
165
    def get_stats_for_local_input(self, stats):
166
        # Get Public and Private IP address
167
        if self.get_first_ip(stats):
168
            self.get_public_ip(stats)
169
        return stats
170
171
    def __hide_ip(self, ip):
172
        """Hide last to digit of the given IP address"""
173
        return '.'.join(ip.split('.')[0:2]) + '.*.*'
174
175
    def msg_curse(self, args=None, max_width=None):
176
        """Return the dict to display in the curse interface."""
177
        # Init the return message
178
        ret = []
179
180
        # Only process if stats exist and display plugin enable...
181
        if not self.stats or self.is_disabled() or import_error_tag:
182
            return ret
183
184
        # Build the string message
185
        msg = ' - '
186
        ret.append(self.curse_add_line(msg, optional=True))
187
188
        # Start with the private IP information
189
        if 'address' in self.stats:
190
            msg = 'IP '
191
            ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
192
            msg = '{}'.format(self.stats['address'])
193
            ret.append(self.curse_add_line(msg, optional=True))
194
        if 'mask_cidr' in self.stats:
195
            # VPN with no internet access (issue #842)
196
            msg = '/{}'.format(self.stats['mask_cidr'])
197
            ret.append(self.curse_add_line(msg, optional=True))
198
199
        # Then with the public IP information
200
        try:
201
            msg_pub = '{}'.format(self.stats['public_address'])
202
        except (UnicodeEncodeError, KeyError):
203
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
204
            pass
205
        else:
206
            if self.stats['public_address']:
207
                msg = ' Pub '
208
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
209
                ret.append(self.curse_add_line(msg_pub, optional=True))
210
211
            if self.stats['public_info_human']:
212
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
213
214
        return ret
215
216
    def public_info_for_human(self, public_info):
217
        """Return the data to pack to the client."""
218
        if not public_info:
219
            return ''
220
221
        return self.public_template.format(**public_info)
222
223
    @staticmethod
224
    def ip_to_cidr(ip):
225
        """Convert IP address to CIDR.
226
227
        Example: '255.255.255.0' will return 24
228
        """
229
        # Thanks to @Atticfire
230
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
231
        if ip is None:
232
            # Correct issue #1528
233
            return 0
234
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
235
236
237
class PublicIpInfo:
238
    """Get public IP information from online service."""
239
240
    def __init__(self, url, username, password, timeout=2):
241
        """Init the class."""
242
        self.url = url
243
        self.username = username
244
        self.password = password
245
        self.timeout = timeout
246
247
    def get(self):
248
        """Return the public IP information returned by one of the online service."""
249
        q = queue.Queue()
250
251
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
252
        t.daemon = True
253
        t.start()
254
255
        timer = Timer(self.timeout)
256
        info = None
257
        while not timer.finished() and info is None:
258
            if q.qsize() > 0:
259
                info = q.get()
260
261
        return info
262
263
    def _get_ip_public_info(self, queue_target, url, username, password):
264
        """Request the url service and put the result in the queue_target."""
265
        try:
266
            response = urlopen_auth(url, username, password).read()
267
        except Exception as e:
268
            logger.debug(f"IP plugin - Cannot get public IP information from {url} ({e})")
269
            queue_target.put(None)
270
        else:
271
            try:
272
                queue_target.put(json_loads(response))
273
            except (ValueError, KeyError) as e:
274
                logger.debug(f"IP plugin - Cannot load public IP information from {url} ({e})")
275
                queue_target.put(None)
276