Test Failed
Pull Request — develop (#2997)
by
unknown
02:57
created

glances.plugins.ip.PluginModel.get_public_ipv4()   B

Complexity

Conditions 7

Size

Total Lines 18
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 7
eloc 14
nop 2
dl 0
loc 18
rs 8
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""IP plugin."""
10
11
import threading
12
from itertools import accumulate, dropwhile, islice
13
14
from glances.globals import json_loads, queue, urlopen_auth
15
from glances.logger import logger
16
from glances.plugins.plugin.model import GlancesPluginModel
17
from glances.timer import Timer, getTimeSinceLastUpdate
18
19
# Import plugin specific dependency
20
try:
21
    import netifaces
22
except ImportError as e:
23
    import_error_tag = True
24
    logger.warning(f"Missing Python Lib ({e}), IP plugin is disabled")
25
else:
26
    import_error_tag = False
27
28
# Fields description
29
# description: human readable description
30
# short_name: shortname to use un UI
31
# unit: unit type
32
# rate: is it a rate ? If yes, // by time_since_update when displayed,
33
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
34
fields_description = {
35
    'address': {
36
        'description': 'Private IP address',
37
    },
38
    'mask': {
39
        'description': 'Private IP mask',
40
    },
41
    'mask_cidr': {
42
        'description': 'Private IP mask in CIDR format',
43
        'unit': 'number',
44
    },
45
    'gateway': {
46
        'description': 'Private IP gateway',
47
    },
48
    'public_address': {
49
        'description': 'Public IP address',
50
    },
51
    'public_info_human': {
52
        'description': 'Public IP information',
53
    },
54
}
55
56
57
class PluginModel(GlancesPluginModel):
58
    """Glances IP Plugin.
59
60
    stats is a dict
61
    """
62
63
    _default_public_refresh_interval = 300
64
65
    def __init__(self, args=None, config=None):
66
        """Init the plugin."""
67
        super().__init__(args=args, config=config, fields_description=fields_description)
68
69
        # We want to display the stat in the curse interface
70
        self.display_curse = True
71
72
        # Public information (see issue #2732)
73
        self.public_address = ""
74
        self.public_info = ""
75
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
76
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
77
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
78
        self.public_field = self.get_conf_value("public_field", default=[None])
79
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
80
        self.public_disabled = (
81
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false'
82
            or self.public_api is None
83
            or self.public_field is None
84
        )
85
        self.public_address_refresh_interval = self.get_conf_value(
86
            "public_refresh_interval", default=self._default_public_refresh_interval
87
        )
88
89
    def get_private_ipv4(self, init):
90
        # Get the default gateway thanks to the netifaces lib
91
        stop, stats = init
92
        try:
93
            default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
94
        except (KeyError, AttributeError) as e:
95
            logger.debug(f"Cannot grab default gateway IP address ({e})")
96
            stop = True
97
        else:
98
            stats['gateway'] = default_gw[0]
99
100
        return (stop, stats)
101
102
    def get_first_ipv4(self, init):
103
        stop, stats = init
104
        try:
105
            default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
106
            address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
107
            mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
108
        except (KeyError, AttributeError) as e:
109
            logger.debug(f"Cannot grab private IP address ({e})")
110
            stop = True
111
        else:
112
            stats['address'] = address
113
            stats['mask'] = mask
114
            stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
115
116
        return (stop, stats)
117
118
    def get_public_ipv4(self, init):
119
        stop, stats = init
120
        time_since_update = getTimeSinceLastUpdate('public-ip')
121
        try:
122
            if not self.public_disabled and (
123
                self.public_address == "" or time_since_update > self.public_address_refresh_interval
124
            ):
125
                self.public_info = PublicIpInfo(self.public_api, self.public_username, self.public_password).get()
126
                self.public_address = self.public_info['ip']
127
        except (KeyError, AttributeError, TypeError) as e:
128
            logger.debug(f"Cannot grab public IP information ({e})")
129
        else:
130
            stats['public_address'] = (
131
                self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
132
            )
133
            stats['public_info_human'] = self.public_info_for_human(self.public_info)
134
135
        return (True, stats)
136
137
    @GlancesPluginModel._check_decorator
138
    @GlancesPluginModel._log_result_decorator
139
    def update(self):
140
        """Update IP stats using the input method.
141
142
        :return: the stats dict
143
        """
144
        # Init new stats
145
        init = self.get_init_value()
146
147
        stats = {
148
            self.is_local_input(): self.get_stats_for_local_input(init),
149
            self.is_snmp_input(): init,  # Not implemented yet
150
        }.get(True)
151
152
        # Update the stats
153
        self.stats = stats
154
155
        return self.stats
156
157
    def is_local_input(self):
158
        return self.input_method == 'local' and not import_error_tag
159
160
    def is_snmp_input(self):
161
        return self.input_method == 'snmp'
162
163
    def get_stats_for_local_input(self, stats):
164
        init = (False, stats)
165
        steps = [self.get_private_ipv4, self.get_first_ipv4, self.get_public_ipv4]
166
        accs = accumulate(steps, lambda init, step: step(init), initial=init)
167
        res = dropwhile(lambda init: not init[0], accs)
168
        stop, stats = list(islice(res, 1)).pop()
169
170
        return stats
171
172
    def __hide_ip(self, ip):
173
        """Hide last to digit of the given IP address"""
174
        return '.'.join(ip.split('.')[0:2]) + '.*.*'
175
176
    def msg_curse(self, args=None, max_width=None):
177
        """Return the dict to display in the curse interface."""
178
        # Init the return message
179
        ret = []
180
181
        # Only process if stats exist and display plugin enable...
182
        if not self.stats or self.is_disabled() or import_error_tag:
183
            return ret
184
185
        # Build the string message
186
        msg = ' - '
187
        ret.append(self.curse_add_line(msg, optional=True))
188
189
        # Start with the private IP information
190
        msg = 'IP '
191
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
192
        if 'address' in self.stats:
193
            msg = '{}'.format(self.stats['address'])
194
            ret.append(self.curse_add_line(msg, optional=True))
195
        if 'mask_cidr' in self.stats:
196
            # VPN with no internet access (issue #842)
197
            msg = '/{}'.format(self.stats['mask_cidr'])
198
            ret.append(self.curse_add_line(msg, optional=True))
199
200
        # Then with the public IP information
201
        try:
202
            msg_pub = '{}'.format(self.stats['public_address'])
203
        except (UnicodeEncodeError, KeyError):
204
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
205
            pass
206
        else:
207
            if self.stats['public_address']:
208
                msg = ' Pub '
209
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
210
                ret.append(self.curse_add_line(msg_pub, optional=True))
211
212
            if self.stats['public_info_human']:
213
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
214
215
        return ret
216
217
    def public_info_for_human(self, public_info):
218
        """Return the data to pack to the client."""
219
        if not public_info:
220
            return ''
221
222
        return self.public_template.format(**public_info)
223
224
    @staticmethod
225
    def ip_to_cidr(ip):
226
        """Convert IP address to CIDR.
227
228
        Example: '255.255.255.0' will return 24
229
        """
230
        # Thanks to @Atticfire
231
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
232
        if ip is None:
233
            # Correct issue #1528
234
            return 0
235
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
236
237
238
class PublicIpInfo:
239
    """Get public IP information from online service."""
240
241
    def __init__(self, url, username, password, timeout=2):
242
        """Init the class."""
243
        self.url = url
244
        self.username = username
245
        self.password = password
246
        self.timeout = timeout
247
248
    def get(self):
249
        """Return the public IP information returned by one of the online service."""
250
        q = queue.Queue()
251
252
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
253
        t.daemon = True
254
        t.start()
255
256
        timer = Timer(self.timeout)
257
        info = None
258
        while not timer.finished() and info is None:
259
            if q.qsize() > 0:
260
                info = q.get()
261
262
        return info
263
264
    def _get_ip_public_info(self, queue_target, url, username, password):
265
        """Request the url service and put the result in the queue_target."""
266
        try:
267
            response = urlopen_auth(url, username, password).read()
268
        except Exception as e:
269
            logger.debug(f"IP plugin - Cannot get public IP information from {url} ({e})")
270
            queue_target.put(None)
271
        else:
272
            try:
273
                queue_target.put(json_loads(response))
274
            except (ValueError, KeyError) as e:
275
                logger.debug(f"IP plugin - Cannot load public IP information from {url} ({e})")
276
                queue_target.put(None)
277