Test Failed
Push — develop ( 30cc9f...48251c )
by Nicolas
02:03
created

glances.plugins.ip   D

Complexity

Total Complexity 58

Size/Duplication

Total Lines 301
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 179
dl 0
loc 301
rs 4.5599
c 0
b 0
f 0
wmc 58

11 Methods

Rating   Name   Duplication   Size   Complexity  
A PluginModel.ip_to_cidr() 0 12 2
A PluginModel.__init__() 0 27 2
A PublicIpAddress.__init__() 0 3 1
A PublicIpInfo.__init__() 0 7 1
B PluginModel.public_info_for_human() 0 13 8
A PublicIpInfo._get_ip_public_info() 0 14 4
A PublicIpAddress._get_ip_public() 0 16 5
F PluginModel.update() 0 62 14
B PublicIpAddress.get() 0 19 6
A PublicIpInfo.get() 0 18 5
C PluginModel.msg_curse() 0 40 10

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.ip often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""IP plugin."""
11
12
import threading
13
from ujson import loads
14
15
from glances.globals import urlopen, queue, urlopen_auth
16
from glances.logger import logger
17
from glances.timer import Timer
18
from glances.timer import getTimeSinceLastUpdate
19
from glances.plugins.plugin.model import GlancesPluginModel
20
21
# Import plugin specific dependency
22
try:
23
    import netifaces
24
except ImportError as e:
25
    import_error_tag = True
26
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
27
else:
28
    import_error_tag = False
29
30
# List of online services to retrieve public IP address
31
# List of tuple (url, json, key)
32
# - url: URL of the Web site
33
# - json: service return a JSON (True) or string (False)
34
# - key: key of the IP address in the JSON structure
35
urls = [
36
    ('https://httpbin.org/ip', True, 'origin'),
37
    ('https://api.ipify.org/?format=json', True, 'ip'),
38
    ('https://ipv4.jsonip.com', True, 'ip'),
39
]
40
41
42
class PluginModel(GlancesPluginModel):
43
    """Glances IP Plugin.
44
45
    stats is a dict
46
    """
47
48
    _default_public_refresh_interval = 300
49
    _default_public_ip_disabled = ["False"]
50
51
    def __init__(self, args=None, config=None):
52
        """Init the plugin."""
53
        super(PluginModel, self).__init__(args=args, config=config)
54
55
        # We want to display the stat in the curse interface
56
        self.display_curse = True
57
58
        # For public IP address
59
        self.public_address = ""
60
        self.public_address_refresh_interval = self.get_conf_value(
61
            "public_refresh_interval", default=self._default_public_refresh_interval
62
        )
63
64
        public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled)
65
        self.public_ip_disabled = True if public_ip_disabled == ["True"] else False
66
67
        # For the Censys options (see issue #2105)
68
        self.public_info = ""
69
        self.censys_url = self.get_conf_value("censys_url", default=[None])[0]
70
        self.censys_username = self.get_conf_value("censys_username", default=[None])[0]
71
        self.censys_password = self.get_conf_value("censys_password", default=[None])[0]
72
        self.censys_fields = self.get_conf_value("censys_fields", default=[None])
73
        self.public_info_disabled = (
74
            self.censys_url is None
75
            or self.censys_username is None
76
            or self.censys_password is None
77
            or self.censys_fields is None
78
        )
79
80
    @GlancesPluginModel._check_decorator
81
    @GlancesPluginModel._log_result_decorator
82
    def update(self):
83
        """Update IP stats using the input method.
84
85
        :return: the stats dict
86
        """
87
        # Init new stats
88
        stats = self.get_init_value()
89
90
        if self.input_method == 'local' and not import_error_tag:
91
            # Update stats using the netifaces lib
92
            # Start with the default IP gateway
93
            try:
94
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
95
            except (KeyError, AttributeError) as e:
96
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
97
                return {}
98
            else:
99
                stats['gateway'] = default_gw[0]
100
101
            # Then the private IP address
102
            # If multiple IP addresses are available, only the one with the default gateway is returned
103
            try:
104
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
105
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
106
            except (KeyError, AttributeError) as e:
107
                logger.debug("Cannot grab private IP address ({})".format(e))
108
                return {}
109
            else:
110
                stats['address'] = address
111
                stats['mask'] = mask
112
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
113
114
            # Finally with the public IP address
115
            time_since_update = getTimeSinceLastUpdate('public-ip')
116
            try:
117
                if not self.public_ip_disabled and (
118
                    self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
119
                ):
120
                    self.public_address = PublicIpAddress().get()
121
                    if not self.public_info_disabled:
122
                        self.public_info = PublicIpInfo(
123
                            self.public_address, self.censys_url, self.censys_username, self.censys_password
124
                        ).get()
125
            except (KeyError, AttributeError) as e:
126
                logger.debug("Cannot grab public IP information ({})".format(e))
127
            else:
128
                stats['public_address'] = self.public_address
129
                # Too much information provided in the public_info
130
                # Limit it to public_info_for_human
131
                # stats['public_info'] = self.public_info
132
                stats['public_info_human'] = self.public_info_for_human(self.public_info)
133
134
        elif self.input_method == 'snmp':
135
            # Not implemented yet
136
            pass
137
138
        # Update the stats
139
        self.stats = stats
140
141
        return self.stats
142
143
    def msg_curse(self, args=None, max_width=None):
144
        """Return the dict to display in the curse interface."""
145
        # Init the return message
146
        ret = []
147
148
        # Only process if stats exist and display plugin enable...
149
        if not self.stats or self.is_disabled() or import_error_tag:
150
            return ret
151
152
        # Build the string message
153
        msg = ' - '
154
        ret.append(self.curse_add_line(msg, optional=True))
155
156
        # Start with the private IP information
157
        msg = 'IP '
158
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
159
        if 'address' in self.stats:
160
            msg = '{}'.format(self.stats['address'])
161
            ret.append(self.curse_add_line(msg, optional=True))
162
        if 'mask_cidr' in self.stats:
163
            # VPN with no internet access (issue #842)
164
            msg = '/{}'.format(self.stats['mask_cidr'])
165
            ret.append(self.curse_add_line(msg, optional=True))
166
167
        # Then with the public IP information
168
        try:
169
            msg_pub = '{}'.format(self.stats['public_address'])
170
        except (UnicodeEncodeError, KeyError):
171
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
172
            pass
173
        else:
174
            if self.stats['public_address']:
175
                msg = ' Pub '
176
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
177
                ret.append(self.curse_add_line(msg_pub, optional=True))
178
179
            if self.stats['public_info_human']:
180
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
181
182
        return ret
183
184
    def public_info_for_human(self, public_info):
185
        """Return the data to pack to the client."""
186
        if not public_info:
187
            return ''
188
189
        field_result = []
190
        for f in self.censys_fields:
191
            field = f.split(':')
192
            if len(field) == 1 and field[0] in public_info:
193
                field_result.append('{}'.format(public_info[field[0]]))
194
            elif len(field) == 2 and field[0] in public_info and field[1] in public_info[field[0]]:
195
                field_result.append('{}'.format(public_info[field[0]][field[1]]))
196
        return '/'.join(field_result)
197
198
    @staticmethod
199
    def ip_to_cidr(ip):
200
        """Convert IP address to CIDR.
201
202
        Example: '255.255.255.0' will return 24
203
        """
204
        # Thanks to @Atticfire
205
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
206
        if ip is None:
207
            # Correct issue #1528
208
            return 0
209
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
210
211
212
class PublicIpAddress(object):
213
    """Get public IP address from online services."""
214
215
    def __init__(self, timeout=2):
216
        """Init the class."""
217
        self.timeout = timeout
218
219
    def get(self):
220
        """Get the first public IP address returned by one of the online services."""
221
        q = queue.Queue()
222
223
        for u, j, k in urls:
224
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
225
            t.daemon = True
226
            t.start()
227
228
        timer = Timer(self.timeout)
229
        ip = None
230
        while not timer.finished() and ip is None:
231
            if q.qsize() > 0:
232
                ip = q.get()
233
234
        if ip is None:
235
            return None
236
237
        return ', '.join(set([x.strip() for x in ip.split(',')]))
238
239
    def _get_ip_public(self, queue_target, url, json=False, key=None):
240
        """Request the url service and put the result in the queue_target."""
241
        try:
242
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
243
        except Exception as e:
244
            logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
245
            queue_target.put(None)
246
        else:
247
            # Request depend on service
248
            try:
249
                if not json:
250
                    queue_target.put(response)
251
                else:
252
                    queue_target.put(loads(response)[key])
253
            except ValueError:
254
                queue_target.put(None)
255
256
257
class PublicIpInfo(object):
258
    """Get public IP information from Censys online service."""
259
260
    def __init__(self, ip, url, username, password, timeout=2):
261
        """Init the class."""
262
        self.ip = ip
263
        self.url = url
264
        self.username = username
265
        self.password = password
266
        self.timeout = timeout
267
268
    def get(self):
269
        """Return the public IP information returned by one of the online service."""
270
        q = queue.Queue()
271
272
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.ip, self.url, self.username, self.password))
273
        t.daemon = True
274
        t.start()
275
276
        timer = Timer(self.timeout)
277
        info = None
278
        while not timer.finished() and info is None:
279
            if q.qsize() > 0:
280
                info = q.get()
281
282
        if info is None:
283
            return None
284
285
        return info
286
287
    def _get_ip_public_info(self, queue_target, ip, url, username, password):
288
        """Request the url service and put the result in the queue_target."""
289
        request_url = "{}/v2/hosts/{}".format(url, ip)
290
        try:
291
            response = urlopen_auth(request_url, username, password).read()
292
        except Exception as e:
293
            logger.debug("IP plugin - Cannot open URL {} ({})".format(request_url, e))
294
            queue_target.put(None)
295
        else:
296
            try:
297
                queue_target.put(loads(response)['result'])
298
            except (ValueError, KeyError) as e:
299
                logger.debug("IP plugin - Cannot get result field from {} ({})".format(request_url, e))
300
                queue_target.put(None)
301