Test Failed
Push — develop ( ed5254...2ee2ff )
by Nicolas
02:18
created

PublicIpAddress._get_ip_public()   A

Complexity

Conditions 5

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 12
nop 5
dl 0
loc 16
rs 9.3333
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""IP plugin."""
11
12
import threading
13
from json import loads
14
15
from glances.compat import iterkeys, urlopen, queue, urlopen_auth
16
from glances.logger import logger
17
from glances.timer import Timer, getTimeSinceLastUpdate
18
from glances.plugins.glances_plugin import GlancesPlugin
19
20
# Import plugin specific dependency
21
try:
22
    import netifaces
23
except ImportError as e:
24
    import_error_tag = True
25
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
26
else:
27
    import_error_tag = False
28
29
# List of online services to retrieve public IP address
30
# List of tuple (url, json, key)
31
# - url: URL of the Web site
32
# - json: service return a JSON (True) or string (False)
33
# - key: key of the IP address in the JSON structure
34
urls = [
35
    #  glances_ip.py plugin relies on low rating / malicious site domain #1975
36
    # ('https://ip.42.pl/raw', False, None),
37
    ('https://httpbin.org/ip', True, 'origin'),
38
    ('https://api.ipify.org/?format=json', True, 'ip'),
39
    ('https://ipv4.jsonip.com', True, 'ip'),
40
]
41
42
43
class Plugin(GlancesPlugin):
44
    """Glances IP Plugin.
45
46
    stats is a dict
47
    """
48
49
    _default_public_refresh_interval = 300
50
    _default_public_ip_disabled = ["False"]
51
52
    def __init__(self, args=None, config=None):
53
        """Init the plugin."""
54
        super(Plugin, self).__init__(args=args, config=config)
55
56
        # We want to display the stat in the curse interface
57
        self.display_curse = True
58
59
        # For public IP address
60
        self.public_address = ""
61
        self.public_address_refresh_interval = self.get_conf_value(
62
            "public_refresh_interval", default=self._default_public_refresh_interval
63
        )
64
65
        public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled)
66
        self.public_ip_disabled = True if public_ip_disabled == ["True"] else False
67
68
        # For the Censys options (see issue #2105)
69
        self.public_info = ""
70
        self.censys_url = self.get_conf_value("censys_url", default=[None])[0]
71
        self.censys_username = self.get_conf_value("censys_username", default=[None])[0]
72
        self.censys_password = self.get_conf_value("censys_password", default=[None])[0]
73
        self.censys_fields = self.get_conf_value("censys_fields", default=[None])
74
        self.public_info_disabled = (
75
            self.censys_url is None
76
            or self.censys_username is None
77
            or self.censys_password is None
78
            or self.censys_fields is None
79
        )
80
81
    @GlancesPlugin._check_decorator
82
    @GlancesPlugin._log_result_decorator
83
    def update(self):
84
        """Update IP stats using the input method.
85
86
        :return: the stats dict
87
        """
88
        # Init new stats
89
        stats = self.get_init_value()
90
91
        if self.input_method == 'local' and not import_error_tag:
92
            # Update stats using the netifaces lib
93
            # Start with the default IP gateway
94
            try:
95
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
96
            except (KeyError, AttributeError) as e:
97
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
98
                return {}
99
            else:
100
                stats['gateway'] = default_gw[0]
101
102
            # Then the private IP address
103
            # If multiple IP addresses are available, only the one with the default gateway is returned
104
            try:
105
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
106
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
107
            except (KeyError, AttributeError) as e:
108
                logger.debug("Cannot grab private IP address ({})".format(e))
109
                return {}
110
            else:
111
                stats['address'] = address
112
                stats['mask'] = mask
113
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
114
115
            # Finally with the public IP address
116
            time_since_update = getTimeSinceLastUpdate('public-ip')
117
            try:
118
                if not self.public_ip_disabled and (
119
                    self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
120
                ):
121
                    self.public_address = PublicIpAddress().get()
122
                    if not self.public_info_disabled:
123
                        self.public_info = PublicIpInfo(
124
                            self.public_address, self.censys_url, self.censys_username, self.censys_password
125
                        ).get()
126
            except (KeyError, AttributeError) as e:
127
                logger.debug("Cannot grab public IP information ({})".format(e))
128
            else:
129
                stats['public_address'] = self.public_address
130
                stats['public_info'] = self.public_info
131
                stats['public_info_human'] = self.public_info_for_human(stats['public_info'])
132
133
        elif self.input_method == 'snmp':
134
            # Not implemented yet
135
            pass
136
137
        # Update the stats
138
        self.stats = stats
139
140
        return self.stats
141
142
    def msg_curse(self, args=None, max_width=None):
143
        """Return the dict to display in the curse interface."""
144
        # Init the return message
145
        ret = []
146
147
        # Only process if stats exist and display plugin enable...
148
        if not self.stats or self.is_disabled() or import_error_tag:
149
            return ret
150
151
        # Build the string message
152
        msg = ' - '
153
        ret.append(self.curse_add_line(msg, optional=True))
154
155
        # Start with the private IP information
156
        msg = 'IP '
157
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
158
        if 'address' in self.stats:
159
            msg = '{}'.format(self.stats['address'])
160
            ret.append(self.curse_add_line(msg, optional=True))
161
        if 'mask_cidr' in self.stats:
162
            # VPN with no internet access (issue #842)
163
            msg = '/{}'.format(self.stats['mask_cidr'])
164
            ret.append(self.curse_add_line(msg, optional=True))
165
166
        # Then with the public IP information
167
        try:
168
            msg_pub = '{}'.format(self.stats['public_address'])
169
        except (UnicodeEncodeError, KeyError):
170
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
171
            pass
172
        else:
173
            if self.stats['public_address']:
174
                msg = ' Pub '
175
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
176
                ret.append(self.curse_add_line(msg_pub, optional=True))
177
178
            if self.stats['public_info_human']:
179
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']),
180
                                               optional=True))
181
182
        return ret
183
184
    def public_info_for_human(self, public_info):
185
        """Return the data to pack to the client."""
186
        if not public_info:
187
            return ''
188
189
        field_result = []
190
        for f in self.censys_fields:
191
            field = f.split(':')
192
            if len(field) == 1 and field[0] in public_info:
193
                field_result.append('{}'.format(public_info[field[0]]))
194
            elif (
195
                len(field) == 2
196
                and field[0] in public_info
197
                and field[1] in public_info[field[0]]
198
            ):
199
                field_result.append('{}'.format(public_info[field[0]][field[1]]))
200
        return '/'.join(field_result)
201
202
    @staticmethod
203
    def ip_to_cidr(ip):
204
        """Convert IP address to CIDR.
205
206
        Example: '255.255.255.0' will return 24
207
        """
208
        # Thanks to @Atticfire
209
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
210
        if ip is None:
211
            # Correct issue #1528
212
            return 0
213
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
214
215
216
class PublicIpAddress(object):
217
    """Get public IP address from online services."""
218
219
    def __init__(self, timeout=2):
220
        """Init the class."""
221
        self.timeout = timeout
222
223
    def get(self):
224
        """Get the first public IP address returned by one of the online services."""
225
        q = queue.Queue()
226
227
        for u, j, k in urls:
228
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
229
            t.daemon = True
230
            t.start()
231
232
        timer = Timer(self.timeout)
233
        ip = None
234
        while not timer.finished() and ip is None:
235
            if q.qsize() > 0:
236
                ip = q.get()
237
238
        if ip is None:
239
            return None
240
241
        return ', '.join(set([x.strip() for x in ip.split(',')]))
242
243
    def _get_ip_public(self, queue_target, url, json=False, key=None):
244
        """Request the url service and put the result in the queue_target."""
245
        try:
246
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
247
        except Exception as e:
248
            logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
249
            queue_target.put(None)
250
        else:
251
            # Request depend on service
252
            try:
253
                if not json:
254
                    queue_target.put(response)
255
                else:
256
                    queue_target.put(loads(response)[key])
257
            except ValueError:
258
                queue_target.put(None)
259
260
261
class PublicIpInfo(object):
262
    """Get public IP information from Censys online service."""
263
264
    def __init__(self, ip, url, username, password, timeout=2):
265
        """Init the class."""
266
        self.ip = ip
267
        self.url = url
268
        self.username = username
269
        self.password = password
270
        self.timeout = timeout
271
272
    def get(self):
273
        """Return the public IP information returned by one of the online service."""
274
        q = queue.Queue()
275
276
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.ip, self.url, self.username, self.password))
277
        t.daemon = True
278
        t.start()
279
280
        timer = Timer(self.timeout)
281
        info = None
282
        while not timer.finished() and info is None:
283
            if q.qsize() > 0:
284
                info = q.get()
285
286
        if info is None:
287
            return None
288
289
        return info
290
291
    def _get_ip_public_info(self, queue_target, ip, url, username, password):
292
        """Request the url service and put the result in the queue_target."""
293
        request_url = "{}/v2/hosts/{}".format(url, ip)
294
        try:
295
            response = urlopen_auth(request_url, username, password).read()
296
        except Exception as e:
297
            logger.debug("IP plugin - Cannot open URL {} ({})".format(request_url, e))
298
            queue_target.put(None)
299
        else:
300
            try:
301
                queue_target.put(loads(response)['result'])
302
            except (ValueError, KeyError) as e:
303
                logger.debug("IP plugin - Cannot get result field from {} ({})".format(request_url, e))
304
                queue_target.put(None)
305