Test Failed
Push — master ( ee826a...d9056e )
by Nicolas
03:09
created

PluginModel.public_info_for_human()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""IP plugin."""
11
12
import threading
13
from ujson import loads
14
15
from glances.globals import queue, urlopen_auth
16
from glances.logger import logger
17
from glances.timer import Timer
18
from glances.timer import getTimeSinceLastUpdate
19
from glances.plugins.plugin.model import GlancesPluginModel
20
21
# Import plugin specific dependency
22
try:
23
    import netifaces
24
except ImportError as e:
25
    import_error_tag = True
26
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
27
else:
28
    import_error_tag = False
29
30
# Fields description
31
# description: human readable description
32
# short_name: shortname to use un UI
33
# unit: unit type
34
# rate: is it a rate ? If yes, // by time_since_update when displayed,
35
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
36
fields_description = {
37
    'address': {
38
        'description': 'Private IP address',
39
    },
40
    'mask': {
41
        'description': 'Private IP mask',
42
    },
43
    'mask_cidr': {
44
        'description': 'Private IP mask in CIDR format',
45
        'unit': 'number',
46
    },
47
    'gateway': {
48
        'description': 'Private IP gateway',
49
    },
50
    'public_address': {
51
        'description': 'Public IP address',
52
    },
53
    'public_info_human': {
54
        'description': 'Public IP information',
55
    },
56
}
57
58
59
class PluginModel(GlancesPluginModel):
60
    """Glances IP Plugin.
61
62
    stats is a dict
63
    """
64
65
    _default_public_refresh_interval = 300
66
67
    def __init__(self, args=None, config=None):
68
        """Init the plugin."""
69
        super(PluginModel, self).__init__(args=args, config=config, fields_description=fields_description)
70
71
        # We want to display the stat in the curse interface
72
        self.display_curse = True
73
74
        # Public information (see issue #2732)
75
        self.public_address = ""
76
        self.public_info = ""
77
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
78
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
79
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
80
        self.public_field = self.get_conf_value("public_field", default=[None])
81
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
82
        self.public_disabled = (
83
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false'
84
            or self.public_api is None
85
            or self.public_field is None
86
        )
87
        self.public_address_refresh_interval = self.get_conf_value(
88
            "public_refresh_interval", default=self._default_public_refresh_interval
89
        )
90
91
    @GlancesPluginModel._check_decorator
92
    @GlancesPluginModel._log_result_decorator
93
    def update(self):
94
        """Update IP stats using the input method.
95
96
        :return: the stats dict
97
        """
98
        # Init new stats
99
        stats = self.get_init_value()
100
101
        if self.input_method == 'local' and not import_error_tag:
102
            # Private IP address
103
            # Get the default gateway thanks to the netifaces lib
104
            try:
105
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
106
            except (KeyError, AttributeError) as e:
107
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
108
                return self.get_init_value()
109
            else:
110
                stats['gateway'] = default_gw[0]
111
            # If multiple IP addresses are available, only the one with the default gateway is returned
112
            try:
113
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
114
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
115
            except (KeyError, AttributeError) as e:
116
                logger.debug("Cannot grab private IP address ({})".format(e))
117
                return self.get_init_value()
118
            else:
119
                stats['address'] = address
120
                stats['mask'] = mask
121
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
122
123
            # Public IP address
124
            time_since_update = getTimeSinceLastUpdate('public-ip')
125
            try:
126
                if not self.public_disabled and (
127
                    self.public_address == "" or time_since_update > self.public_address_refresh_interval
128
                ):
129
                    self.public_info = PublicIpInfo(self.public_api, self.public_username, self.public_password).get()
130
                    self.public_address = self.public_info['ip']
131
            except (KeyError, AttributeError, TypeError) as e:
132
                logger.debug("Cannot grab public IP information ({})".format(e))
133
            else:
134
                stats['public_address'] = (
135
                    self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
136
                )
137
                stats['public_info_human'] = self.public_info_for_human(self.public_info)
138
139
        elif self.input_method == 'snmp':
140
            # Not implemented yet
141
            pass
142
143
        # Update the stats
144
        self.stats = stats
145
146
        return self.stats
147
148
    def __hide_ip(self, ip):
149
        """Hide last to digit of the given IP address"""
150
        return '.'.join(ip.split('.')[0:2]) + '.*.*'
151
152
    def msg_curse(self, args=None, max_width=None):
153
        """Return the dict to display in the curse interface."""
154
        # Init the return message
155
        ret = []
156
157
        # Only process if stats exist and display plugin enable...
158
        if not self.stats or self.is_disabled() or import_error_tag:
159
            return ret
160
161
        # Build the string message
162
        msg = ' - '
163
        ret.append(self.curse_add_line(msg, optional=True))
164
165
        # Start with the private IP information
166
        msg = 'IP '
167
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
168
        if 'address' in self.stats:
169
            msg = '{}'.format(self.stats['address'])
170
            ret.append(self.curse_add_line(msg, optional=True))
171
        if 'mask_cidr' in self.stats:
172
            # VPN with no internet access (issue #842)
173
            msg = '/{}'.format(self.stats['mask_cidr'])
174
            ret.append(self.curse_add_line(msg, optional=True))
175
176
        # Then with the public IP information
177
        try:
178
            msg_pub = '{}'.format(self.stats['public_address'])
179
        except (UnicodeEncodeError, KeyError):
180
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
181
            pass
182
        else:
183
            if self.stats['public_address']:
184
                msg = ' Pub '
185
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
186
                ret.append(self.curse_add_line(msg_pub, optional=True))
187
188
            if self.stats['public_info_human']:
189
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
190
191
        return ret
192
193
    def public_info_for_human(self, public_info):
194
        """Return the data to pack to the client."""
195
        if not public_info:
196
            return ''
197
198
        return self.public_template.format(**public_info)
199
200
    @staticmethod
201
    def ip_to_cidr(ip):
202
        """Convert IP address to CIDR.
203
204
        Example: '255.255.255.0' will return 24
205
        """
206
        # Thanks to @Atticfire
207
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
208
        if ip is None:
209
            # Correct issue #1528
210
            return 0
211
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
212
213
214
class PublicIpInfo(object):
215
    """Get public IP information from online service."""
216
217
    def __init__(self, url, username, password, timeout=2):
218
        """Init the class."""
219
        self.url = url
220
        self.username = username
221
        self.password = password
222
        self.timeout = timeout
223
224
    def get(self):
225
        """Return the public IP information returned by one of the online service."""
226
        q = queue.Queue()
227
228
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
229
        t.daemon = True
230
        t.start()
231
232
        timer = Timer(self.timeout)
233
        info = None
234
        while not timer.finished() and info is None:
235
            if q.qsize() > 0:
236
                info = q.get()
237
238
        return info
239
240
    def _get_ip_public_info(self, queue_target, url, username, password):
241
        """Request the url service and put the result in the queue_target."""
242
        try:
243
            response = urlopen_auth(url, username, password).read()
244
        except Exception as e:
245
            logger.debug("IP plugin - Cannot get public IP information from {} ({})".format(url, e))
246
            queue_target.put(None)
247
        else:
248
            try:
249
                queue_target.put(loads(response))
250
            except (ValueError, KeyError) as e:
251
                logger.debug("IP plugin - Cannot load public IP information from {} ({})".format(url, e))
252
                queue_target.put(None)
253