Test Failed
Push — develop ( 7f34ec...1cd17d )
by Nicolas
02:15
created

glances.plugins.ip.PluginModel.update()   F

Complexity

Conditions 14

Size

Total Lines 60
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 38
nop 1
dl 0
loc 60
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.ip.PluginModel.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""IP plugin."""
11
12
import threading
13
from ujson import loads
14
15
from glances.globals import queue, urlopen_auth
16
from glances.logger import logger
17
from glances.timer import Timer
18
from glances.timer import getTimeSinceLastUpdate
19
from glances.plugins.plugin.model import GlancesPluginModel
20
21
# Import plugin specific dependency
22
try:
23
    import netifaces
24
except ImportError as e:
25
    import_error_tag = True
26
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
27
else:
28
    import_error_tag = False
29
30
# Fields description
31
# description: human readable description
32
# short_name: shortname to use un UI
33
# unit: unit type
34
# rate: is it a rate ? If yes, // by time_since_update when displayed,
35
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
36
fields_description = {
37
    'address': {
38
        'description': 'Private IP address',
39
    },
40
    'mask': {
41
        'description': 'Private IP mask',
42
    },
43
    'mask_cidr': {
44
        'description': 'Private IP mask in CIDR format',
45
        'unit': 'number',
46
    },
47
    'gateway': {
48
        'description': 'Private IP gateway',
49
    },
50
    'public_address': {
51
        'description': 'Public IP address',
52
    },
53
    'public_info_human': {
54
        'description': 'Public IP information',
55
    },
56
}
57
58
59
class PluginModel(GlancesPluginModel):
60
    """Glances IP Plugin.
61
62
    stats is a dict
63
    """
64
65
    _default_public_refresh_interval = 300
66
67
    def __init__(self, args=None, config=None):
68
        """Init the plugin."""
69
        super(PluginModel, self).__init__(
70
            args=args, config=config,
71
            fields_description=fields_description
72
        )
73
74
        # We want to display the stat in the curse interface
75
        self.display_curse = True
76
77
        # Public information (see issue #2732)
78
        self.public_address = ""
79
        self.public_info = ""
80
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
81
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
82
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
83
        self.public_field = self.get_conf_value("public_field", default=[None])
84
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
85
        self.public_disabled = (
86
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false' or
87
            self.public_api is None or self.public_field is None
88
        )
89
        self.public_address_refresh_interval = self.get_conf_value(
90
            "public_refresh_interval", default=self._default_public_refresh_interval
91
        )
92
93
    @GlancesPluginModel._check_decorator
94
    @GlancesPluginModel._log_result_decorator
95
    def update(self):
96
        """Update IP stats using the input method.
97
98
        :return: the stats dict
99
        """
100
        # Init new stats
101
        stats = self.get_init_value()
102
103
        if self.input_method == 'local' and not import_error_tag:
104
            # Private IP address
105
            # Get the default gateway thanks to the netifaces lib
106
            try:
107
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
108
            except (KeyError, AttributeError) as e:
109
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
110
                return self.get_init_value()
111
            else:
112
                stats['gateway'] = default_gw[0]
113
            # If multiple IP addresses are available, only the one with the default gateway is returned
114
            try:
115
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
116
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
117
            except (KeyError, AttributeError) as e:
118
                logger.debug("Cannot grab private IP address ({})".format(e))
119
                return self.get_init_value()
120
            else:
121
                stats['address'] = address
122
                stats['mask'] = mask
123
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
124
125
            # Public IP address
126
            time_since_update = getTimeSinceLastUpdate('public-ip')
127
            try:
128
                if not self.public_disabled and (
129
                    self.public_address == "" or time_since_update > self.public_address_refresh_interval
130
                ):
131
                    self.public_info = PublicIpInfo(
132
                        self.public_api, self.public_username, self.public_password
133
                    ).get()
134
                    self.public_address = self.public_info['ip']
135
            except (KeyError, AttributeError) as e:
136
                logger.debug("Cannot grab public IP information ({})".format(e))
137
            else:
138
                stats['public_address'] = (
139
                    self.public_address if not self.args.hide_public_info else self.__hide_ip(self.public_address)
140
                )
141
                stats['public_info_human'] = (
142
                    self.public_info_for_human(self.public_info)
143
                )
144
145
        elif self.input_method == 'snmp':
146
            # Not implemented yet
147
            pass
148
149
        # Update the stats
150
        self.stats = stats
151
152
        return self.stats
153
154
    def __hide_ip(self, ip):
155
        """Hide last to digit of the given IP address"""
156
        return '.'.join(ip.split('.')[0:2]) + '.*.*'
157
158
    def msg_curse(self, args=None, max_width=None):
159
        """Return the dict to display in the curse interface."""
160
        # Init the return message
161
        ret = []
162
163
        # Only process if stats exist and display plugin enable...
164
        if not self.stats or self.is_disabled() or import_error_tag:
165
            return ret
166
167
        # Build the string message
168
        msg = ' - '
169
        ret.append(self.curse_add_line(msg, optional=True))
170
171
        # Start with the private IP information
172
        msg = 'IP '
173
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
174
        if 'address' in self.stats:
175
            msg = '{}'.format(self.stats['address'])
176
            ret.append(self.curse_add_line(msg, optional=True))
177
        if 'mask_cidr' in self.stats:
178
            # VPN with no internet access (issue #842)
179
            msg = '/{}'.format(self.stats['mask_cidr'])
180
            ret.append(self.curse_add_line(msg, optional=True))
181
182
        # Then with the public IP information
183
        try:
184
            msg_pub = '{}'.format(self.stats['public_address'])
185
        except (UnicodeEncodeError, KeyError):
186
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
187
            pass
188
        else:
189
            if self.stats['public_address']:
190
                msg = ' Pub '
191
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
192
                ret.append(self.curse_add_line(msg_pub, optional=True))
193
194
            if self.stats['public_info_human']:
195
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
196
197
        return ret
198
199
    def public_info_for_human(self, public_info):
200
        """Return the data to pack to the client."""
201
        if not public_info:
202
            return ''
203
204
        return self.public_template.format(**public_info)
205
206
    @staticmethod
207
    def ip_to_cidr(ip):
208
        """Convert IP address to CIDR.
209
210
        Example: '255.255.255.0' will return 24
211
        """
212
        # Thanks to @Atticfire
213
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
214
        if ip is None:
215
            # Correct issue #1528
216
            return 0
217
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
218
219
220
class PublicIpInfo(object):
221
    """Get public IP information from online service."""
222
223
    def __init__(self, url, username, password, timeout=2):
224
        """Init the class."""
225
        self.url = url
226
        self.username = username
227
        self.password = password
228
        self.timeout = timeout
229
230
    def get(self):
231
        """Return the public IP information returned by one of the online service."""
232
        q = queue.Queue()
233
234
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
235
        t.daemon = True
236
        t.start()
237
238
        timer = Timer(self.timeout)
239
        info = None
240
        while not timer.finished() and info is None:
241
            if q.qsize() > 0:
242
                info = q.get()
243
244
        return info
245
246
    def _get_ip_public_info(self, queue_target, url, username, password):
247
        """Request the url service and put the result in the queue_target."""
248
        try:
249
            response = urlopen_auth(url, username, password).read()
250
        except Exception as e:
251
            logger.debug("IP plugin - Cannot get public IP information from {} ({})".format(url, e))
252
            queue_target.put(None)
253
        else:
254
            try:
255
                queue_target.put(loads(response))
256
            except (ValueError, KeyError) as e:
257
                logger.debug("IP plugin - Cannot load public IP information from {} ({})".format(url, e))
258
                queue_target.put(None)
259