Test Failed
Pull Request — develop (#2998)
by
unknown
03:08
created

glances.plugins.ip.PluginModel.update()   B

Complexity

Conditions 6

Size

Total Lines 64
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 6
eloc 14
nop 1
dl 0
loc 64
rs 8.6666
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""IP plugin."""
10
11
import threading
12
13
from glances.globals import json_loads, queue, urlopen_auth
14
from glances.logger import logger
15
from glances.plugins.plugin.model import GlancesPluginModel
16
from glances.timer import Timer, getTimeSinceLastUpdate
17
18
# Import plugin specific dependency
19
try:
20
    import netifaces
21
except ImportError as e:
22
    import_error_tag = True
23
    logger.warning(f"Missing Python Lib ({e}), IP plugin is disabled")
24
else:
25
    import_error_tag = False
26
27
# Fields description
28
# description: human readable description
29
# short_name: shortname to use un UI
30
# unit: unit type
31
# rate: is it a rate ? If yes, // by time_since_update when displayed,
32
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
33
fields_description = {
34
    'address': {
35
        'description': 'Private IP address',
36
    },
37
    'mask': {
38
        'description': 'Private IP mask',
39
    },
40
    'mask_cidr': {
41
        'description': 'Private IP mask in CIDR format',
42
        'unit': 'number',
43
    },
44
    'gateway': {
45
        'description': 'Private IP gateway',
46
    },
47
    'public_address': {
48
        'description': 'Public IP address',
49
    },
50
    'public_info_human': {
51
        'description': 'Public IP information',
52
    },
53
}
54
55
56
class PluginModel(GlancesPluginModel):
57
    """Glances IP Plugin.
58
59
    stats is a dict
60
    """
61
62
    _default_public_refresh_interval = 300
63
64
    def __init__(self, args=None, config=None):
65
        """Init the plugin."""
66
        super().__init__(args=args, config=config, fields_description=fields_description)
67
68
        # We want to display the stat in the curse interface
69
        self.display_curse = True
70
71
        # Public information (see issue #2732)
72
        self.public_address = ""
73
        self.public_info = ""
74
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
75
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
76
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
77
        self.public_field = self.get_conf_value("public_field", default=[None])
78
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
79
        self.public_disabled = (
80
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false'
81
            or self.public_api is None
82
            or self.public_field is None
83
        )
84
        self.public_address_refresh_interval = self.get_conf_value(
85
            "public_refresh_interval", default=self._default_public_refresh_interval
86
        )
87
88
    def get_private_ipv4(self, stats, stop=False):
89
        # Get the default gateway thanks to the netifaces lib
90
        try:
91
            default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
92
        except (KeyError, AttributeError) as e:
93
            logger.debug(f"Cannot grab default gateway IP address ({e})")
94
            stop = True
95
        else:
96
            stats['gateway'] = default_gw[0]
97
98
        return (stop, stats)
99
100
    def get_first_ipv4(self, stats, stop=False):
101
        try:
102
            default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
103
            address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
104
            mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
105
        except (KeyError, AttributeError) as e:
106
            logger.debug(f"Cannot grab private IP address ({e})")
107
            stop = True
108
        else:
109
            stats['address'] = address
110
            stats['mask'] = mask
111
            stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
112
113
        return (stop, stats)
114
115
    def get_public_ipv4(self, stats, stop=True):
116
        time_since_update = getTimeSinceLastUpdate('public-ip')
117
        try:
118
            if not self.public_disabled and (
119
                self.public_address == "" or time_since_update > self.public_address_refresh_interval
120
            ):
121
                self.public_info = PublicIpInfo(self.public_api, self.public_username, self.public_password).get()
122
                self.public_address = self.public_info['ip']
123
        except (KeyError, AttributeError, TypeError) as e:
124
            logger.debug(f"Cannot grab public IP information ({e})")
125
        else:
126
            stats['public_address'] = (
127
                self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
128
            )
129
            stats['public_info_human'] = self.public_info_for_human(self.public_info)
130
131
        return (stop, stats)
132
133
    @GlancesPluginModel._check_decorator
134
    @GlancesPluginModel._log_result_decorator
135
    def update(self):
136
        """Update IP stats using the input method.
137
138
        :return: the stats dict
139
        """
140
        # Init new stats
141
        stats = self.get_init_value()
142
143
        # Revert
144
145
        if self.input_method == 'local' and not import_error_tag:
146
            # Private IP address
147
            # # Get the default gateway thanks to the netifaces lib
148
            # try:
149
            #     default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
150
            # except (KeyError, AttributeError) as e:
151
            #     logger.debug(f"Cannot grab default gateway IP address ({e})")
152
            #     return self.get_init_value()
153
            # else:
154
            #     stats['gateway'] = default_gw[0]
155
            # If multiple IP addresses are available, only the one with the default gateway is returned
156
            stop, stats = self.get_private_ipv4(stats)
157
158
            if not stop:
159
                # try:
160
                #     address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
161
                #     mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
162
                # except (KeyError, AttributeError) as e:
163
                #     logger.debug(f"Cannot grab private IP address ({e})")
164
                #     return self.get_init_value()
165
                # else:
166
                #     stats['address'] = address
167
                #     stats['mask'] = mask
168
                #     stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
169
                stop, stats = self.get_first_ipv4(stats)
170
171
            # Public IP address
172
            if not stop:
173
                stop, stats = self.get_public_ipv4(stats)
174
            # time_since_update = getTimeSinceLastUpdate('public-ip')
175
            # try:
176
            #     if not self.public_disabled and (
177
            #         self.public_address == "" or time_since_update > self.public_address_refresh_interval
178
            #     ):
179
            #         self.public_info = PublicIpInfo(self.public_api, self.public_username, self.public_password).get()
180
            #         self.public_address = self.public_info['ip']
181
            # except (KeyError, AttributeError, TypeError) as e:
182
            #     logger.debug(f"Cannot grab public IP information ({e})")
183
            # else:
184
            #     stats['public_address'] = (
185
            #         self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
186
            #     )
187
            #     stats['public_info_human'] = self.public_info_for_human(self.public_info)
188
189
        elif self.input_method == 'snmp':
190
            # Not implemented yet
191
            pass
192
193
        # Update the stats
194
        self.stats = stats
195
196
        return self.stats
197
198
    def __hide_ip(self, ip):
199
        """Hide last to digit of the given IP address"""
200
        return '.'.join(ip.split('.')[0:2]) + '.*.*'
201
202
    def msg_curse(self, args=None, max_width=None):
203
        """Return the dict to display in the curse interface."""
204
        # Init the return message
205
        ret = []
206
207
        # Only process if stats exist and display plugin enable...
208
        if not self.stats or self.is_disabled() or import_error_tag:
209
            return ret
210
211
        # Build the string message
212
        msg = ' - '
213
        ret.append(self.curse_add_line(msg, optional=True))
214
215
        # Start with the private IP information
216
        msg = 'IP '
217
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
218
        if 'address' in self.stats:
219
            msg = '{}'.format(self.stats['address'])
220
            ret.append(self.curse_add_line(msg, optional=True))
221
        if 'mask_cidr' in self.stats:
222
            # VPN with no internet access (issue #842)
223
            msg = '/{}'.format(self.stats['mask_cidr'])
224
            ret.append(self.curse_add_line(msg, optional=True))
225
226
        # Then with the public IP information
227
        try:
228
            msg_pub = '{}'.format(self.stats['public_address'])
229
        except (UnicodeEncodeError, KeyError):
230
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
231
            pass
232
        else:
233
            if self.stats['public_address']:
234
                msg = ' Pub '
235
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
236
                ret.append(self.curse_add_line(msg_pub, optional=True))
237
238
            if self.stats['public_info_human']:
239
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
240
241
        return ret
242
243
    def public_info_for_human(self, public_info):
244
        """Return the data to pack to the client."""
245
        if not public_info:
246
            return ''
247
248
        return self.public_template.format(**public_info)
249
250
    @staticmethod
251
    def ip_to_cidr(ip):
252
        """Convert IP address to CIDR.
253
254
        Example: '255.255.255.0' will return 24
255
        """
256
        # Thanks to @Atticfire
257
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
258
        if ip is None:
259
            # Correct issue #1528
260
            return 0
261
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
262
263
264
class PublicIpInfo:
265
    """Get public IP information from online service."""
266
267
    def __init__(self, url, username, password, timeout=2):
268
        """Init the class."""
269
        self.url = url
270
        self.username = username
271
        self.password = password
272
        self.timeout = timeout
273
274
    def get(self):
275
        """Return the public IP information returned by one of the online service."""
276
        q = queue.Queue()
277
278
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
279
        t.daemon = True
280
        t.start()
281
282
        timer = Timer(self.timeout)
283
        info = None
284
        while not timer.finished() and info is None:
285
            if q.qsize() > 0:
286
                info = q.get()
287
288
        return info
289
290
    def _get_ip_public_info(self, queue_target, url, username, password):
291
        """Request the url service and put the result in the queue_target."""
292
        try:
293
            response = urlopen_auth(url, username, password).read()
294
        except Exception as e:
295
            logger.debug(f"IP plugin - Cannot get public IP information from {url} ({e})")
296
            queue_target.put(None)
297
        else:
298
            try:
299
                queue_target.put(json_loads(response))
300
            except (ValueError, KeyError) as e:
301
                logger.debug(f"IP plugin - Cannot load public IP information from {url} ({e})")
302
                queue_target.put(None)
303