glances.plugins.glances_ip   D
last analyzed

Complexity

Total Complexity 58

Size/Duplication

Total Lines 300
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 58
eloc 178
dl 0
loc 300
rs 4.5599
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A PublicIpInfo.get() 0 18 5
B Plugin.public_info_for_human() 0 13 8
A PublicIpAddress.__init__() 0 3 1
C Plugin.msg_curse() 0 40 10
B PublicIpAddress.get() 0 19 6
A PublicIpInfo.__init__() 0 7 1
A Plugin.ip_to_cidr() 0 12 2
A PublicIpAddress._get_ip_public() 0 16 5
A PublicIpInfo._get_ip_public_info() 0 14 4
A Plugin.__init__() 0 27 2
F Plugin.update() 0 62 14

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.glances_ip often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""IP plugin."""
11
12
import threading
13
from ujson import loads
14
15
from glances.compat import queue, urlopen, urlopen_auth
16
from glances.logger import logger
17
from glances.timer import Timer, getTimeSinceLastUpdate
18
from glances.plugins.glances_plugin import GlancesPlugin
19
20
# Import plugin specific dependency
21
try:
22
    import netifaces
23
except ImportError as e:
24
    import_error_tag = True
25
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
26
else:
27
    import_error_tag = False
28
29
# List of online services to retrieve public IP address
30
# List of tuple (url, json, key)
31
# - url: URL of the Web site
32
# - json: service return a JSON (True) or string (False)
33
# - key: key of the IP address in the JSON structure
34
urls = [
35
    ('https://httpbin.org/ip', True, 'origin'),
36
    ('https://api.ipify.org/?format=json', True, 'ip'),
37
    ('https://ipv4.jsonip.com', True, 'ip'),
38
]
39
40
41
class Plugin(GlancesPlugin):
42
    """Glances IP Plugin.
43
44
    stats is a dict
45
    """
46
47
    _default_public_refresh_interval = 300
48
    _default_public_ip_disabled = ["False"]
49
50
    def __init__(self, args=None, config=None):
51
        """Init the plugin."""
52
        super(Plugin, self).__init__(args=args, config=config)
53
54
        # We want to display the stat in the curse interface
55
        self.display_curse = True
56
57
        # For public IP address
58
        self.public_address = ""
59
        self.public_address_refresh_interval = self.get_conf_value(
60
            "public_refresh_interval", default=self._default_public_refresh_interval
61
        )
62
63
        public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled)
64
        self.public_ip_disabled = True if public_ip_disabled == ["True"] else False
65
66
        # For the Censys options (see issue #2105)
67
        self.public_info = ""
68
        self.censys_url = self.get_conf_value("censys_url", default=[None])[0]
69
        self.censys_username = self.get_conf_value("censys_username", default=[None])[0]
70
        self.censys_password = self.get_conf_value("censys_password", default=[None])[0]
71
        self.censys_fields = self.get_conf_value("censys_fields", default=[None])
72
        self.public_info_disabled = (
73
            self.censys_url is None
74
            or self.censys_username is None
75
            or self.censys_password is None
76
            or self.censys_fields is None
77
        )
78
79
    @GlancesPlugin._check_decorator
80
    @GlancesPlugin._log_result_decorator
81
    def update(self):
82
        """Update IP stats using the input method.
83
84
        :return: the stats dict
85
        """
86
        # Init new stats
87
        stats = self.get_init_value()
88
89
        if self.input_method == 'local' and not import_error_tag:
90
            # Update stats using the netifaces lib
91
            # Start with the default IP gateway
92
            try:
93
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
94
            except (KeyError, AttributeError) as e:
95
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
96
                return {}
97
            else:
98
                stats['gateway'] = default_gw[0]
99
100
            # Then the private IP address
101
            # If multiple IP addresses are available, only the one with the default gateway is returned
102
            try:
103
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
104
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
105
            except (KeyError, AttributeError) as e:
106
                logger.debug("Cannot grab private IP address ({})".format(e))
107
                return {}
108
            else:
109
                stats['address'] = address
110
                stats['mask'] = mask
111
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
112
113
            # Finally with the public IP address
114
            time_since_update = getTimeSinceLastUpdate('public-ip')
115
            try:
116
                if not self.public_ip_disabled and (
117
                    self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
118
                ):
119
                    self.public_address = PublicIpAddress().get()
120
                    if not self.public_info_disabled:
121
                        self.public_info = PublicIpInfo(
122
                            self.public_address, self.censys_url, self.censys_username, self.censys_password
123
                        ).get()
124
            except (KeyError, AttributeError) as e:
125
                logger.debug("Cannot grab public IP information ({})".format(e))
126
            else:
127
                stats['public_address'] = self.public_address
128
                # Too much information provided in the public_info
129
                # Limit it to public_info_for_human
130
                # stats['public_info'] = self.public_info
131
                stats['public_info_human'] = self.public_info_for_human(self.public_info)
132
133
        elif self.input_method == 'snmp':
134
            # Not implemented yet
135
            pass
136
137
        # Update the stats
138
        self.stats = stats
139
140
        return self.stats
141
142
    def msg_curse(self, args=None, max_width=None):
143
        """Return the dict to display in the curse interface."""
144
        # Init the return message
145
        ret = []
146
147
        # Only process if stats exist and display plugin enable...
148
        if not self.stats or self.is_disabled() or import_error_tag:
149
            return ret
150
151
        # Build the string message
152
        msg = ' - '
153
        ret.append(self.curse_add_line(msg, optional=True))
154
155
        # Start with the private IP information
156
        msg = 'IP '
157
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
158
        if 'address' in self.stats:
159
            msg = '{}'.format(self.stats['address'])
160
            ret.append(self.curse_add_line(msg, optional=True))
161
        if 'mask_cidr' in self.stats:
162
            # VPN with no internet access (issue #842)
163
            msg = '/{}'.format(self.stats['mask_cidr'])
164
            ret.append(self.curse_add_line(msg, optional=True))
165
166
        # Then with the public IP information
167
        try:
168
            msg_pub = '{}'.format(self.stats['public_address'])
169
        except (UnicodeEncodeError, KeyError):
170
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
171
            pass
172
        else:
173
            if self.stats['public_address']:
174
                msg = ' Pub '
175
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
176
                ret.append(self.curse_add_line(msg_pub, optional=True))
177
178
            if self.stats['public_info_human']:
179
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
180
181
        return ret
182
183
    def public_info_for_human(self, public_info):
184
        """Return the data to pack to the client."""
185
        if not public_info:
186
            return ''
187
188
        field_result = []
189
        for f in self.censys_fields:
190
            field = f.split(':')
191
            if len(field) == 1 and field[0] in public_info:
192
                field_result.append('{}'.format(public_info[field[0]]))
193
            elif len(field) == 2 and field[0] in public_info and field[1] in public_info[field[0]]:
194
                field_result.append('{}'.format(public_info[field[0]][field[1]]))
195
        return '/'.join(field_result)
196
197
    @staticmethod
198
    def ip_to_cidr(ip):
199
        """Convert IP address to CIDR.
200
201
        Example: '255.255.255.0' will return 24
202
        """
203
        # Thanks to @Atticfire
204
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
205
        if ip is None:
206
            # Correct issue #1528
207
            return 0
208
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
209
210
211
class PublicIpAddress(object):
212
    """Get public IP address from online services."""
213
214
    def __init__(self, timeout=2):
215
        """Init the class."""
216
        self.timeout = timeout
217
218
    def get(self):
219
        """Get the first public IP address returned by one of the online services."""
220
        q = queue.Queue()
221
222
        for u, j, k in urls:
223
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
224
            t.daemon = True
225
            t.start()
226
227
        timer = Timer(self.timeout)
228
        ip = None
229
        while not timer.finished() and ip is None:
230
            if q.qsize() > 0:
231
                ip = q.get()
232
233
        if ip is None:
234
            return None
235
236
        return ', '.join(set([x.strip() for x in ip.split(',')]))
237
238
    def _get_ip_public(self, queue_target, url, json=False, key=None):
239
        """Request the url service and put the result in the queue_target."""
240
        try:
241
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
242
        except Exception as e:
243
            logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
244
            queue_target.put(None)
245
        else:
246
            # Request depend on service
247
            try:
248
                if not json:
249
                    queue_target.put(response)
250
                else:
251
                    queue_target.put(loads(response)[key])
252
            except ValueError:
253
                queue_target.put(None)
254
255
256
class PublicIpInfo(object):
257
    """Get public IP information from Censys online service."""
258
259
    def __init__(self, ip, url, username, password, timeout=2):
260
        """Init the class."""
261
        self.ip = ip
262
        self.url = url
263
        self.username = username
264
        self.password = password
265
        self.timeout = timeout
266
267
    def get(self):
268
        """Return the public IP information returned by one of the online service."""
269
        q = queue.Queue()
270
271
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.ip, self.url, self.username, self.password))
272
        t.daemon = True
273
        t.start()
274
275
        timer = Timer(self.timeout)
276
        info = None
277
        while not timer.finished() and info is None:
278
            if q.qsize() > 0:
279
                info = q.get()
280
281
        if info is None:
282
            return None
283
284
        return info
285
286
    def _get_ip_public_info(self, queue_target, ip, url, username, password):
287
        """Request the url service and put the result in the queue_target."""
288
        request_url = "{}/v2/hosts/{}".format(url, ip)
289
        try:
290
            response = urlopen_auth(request_url, username, password).read()
291
        except Exception as e:
292
            logger.debug("IP plugin - Cannot open URL {} ({})".format(request_url, e))
293
            queue_target.put(None)
294
        else:
295
            try:
296
                queue_target.put(loads(response)['result'])
297
            except (ValueError, KeyError) as e:
298
                logger.debug("IP plugin - Cannot get result field from {} ({})".format(request_url, e))
299
                queue_target.put(None)
300