Test Failed
Push — develop ( b68812...9ac1ad )
by Nicolas
02:51
created

glances.plugins.ip   A

Complexity

Total Complexity 37

Size/Duplication

Total Lines 251
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 145
dl 0
loc 251
rs 9.44
c 0
b 0
f 0
wmc 37

8 Methods

Rating   Name   Duplication   Size   Complexity  
A PluginModel.ip_to_cidr() 0 12 2
A PluginModel.__init__() 0 24 1
A PublicIpInfo.__init__() 0 6 1
A PluginModel.public_info_for_human() 0 6 2
A PublicIpInfo._get_ip_public_info() 0 13 4
D PluginModel.update() 0 56 13
A PublicIpInfo.get() 0 15 4
C PluginModel.msg_curse() 0 40 10
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""IP plugin."""
11
12
import threading
13
from ujson import loads
14
15
from glances.globals import urlopen, queue, urlopen_auth
16
from glances.logger import logger
17
from glances.timer import Timer
18
from glances.timer import getTimeSinceLastUpdate
19
from glances.plugins.plugin.model import GlancesPluginModel
20
21
# Import plugin specific dependency
22
try:
23
    import netifaces
24
except ImportError as e:
25
    import_error_tag = True
26
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
27
else:
28
    import_error_tag = False
29
30
# Fields description
31
# description: human readable description
32
# short_name: shortname to use un UI
33
# unit: unit type
34
# rate: is it a rate ? If yes, // by time_since_update when displayed,
35
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
36
fields_description = {
37
    'address': {
38
        'description': 'Private IP address',
39
    },
40
    'mask': {
41
        'description': 'Private IP mask',
42
    },
43
    'mask_cidr': {
44
        'description': 'Private IP mask in CIDR format',
45
        'unit': 'number',
46
    },
47
    'gateway': {
48
        'description': 'Private IP gateway',
49
    },
50
    'public_address': {
51
        'description': 'Public IP address',
52
    },
53
    'public_info_human': {
54
        'description': 'Public IP information',
55
    },
56
}
57
58
59
class PluginModel(GlancesPluginModel):
60
    """Glances IP Plugin.
61
62
    stats is a dict
63
    """
64
65
    _default_public_refresh_interval = 300
66
67
    def __init__(self, args=None, config=None):
68
        """Init the plugin."""
69
        super(PluginModel, self).__init__(
70
            args=args, config=config,
71
            fields_description=fields_description
72
        )
73
74
        # We want to display the stat in the curse interface
75
        self.display_curse = True
76
77
        # Public information (see issue #2732)
78
        self.public_address = ""
79
        self.public_info = ""
80
        self.public_api = self.get_conf_value("public_api", default=[None])[0]
81
        self.public_username = self.get_conf_value("public_username", default=[None])[0]
82
        self.public_password = self.get_conf_value("public_password", default=[None])[0]
83
        self.public_field = self.get_conf_value("public_field", default=[None])
84
        self.public_template = self.get_conf_value("public_template", default=[None])[0]
85
        self.public_disabled = (
86
            self.get_conf_value('public_disabled', default='False')[0].lower() != 'false' or
87
            self.public_api is None or self.public_field is None
88
        )
89
        self.public_address_refresh_interval = self.get_conf_value(
90
            "public_refresh_interval", default=self._default_public_refresh_interval
91
        )
92
93
    @GlancesPluginModel._check_decorator
94
    @GlancesPluginModel._log_result_decorator
95
    def update(self):
96
        """Update IP stats using the input method.
97
98
        :return: the stats dict
99
        """
100
        # Init new stats
101
        stats = self.get_init_value()
102
103
        if self.input_method == 'local' and not import_error_tag:
104
            # Private IP address
105
            # Get the default gateway thanks to the netifaces lib
106
            try:
107
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
108
            except (KeyError, AttributeError) as e:
109
                logger.debug("Cannot grab default gateway IP address ({})".format(e))
110
                return self.get_init_value()
111
            else:
112
                stats['gateway'] = default_gw[0]
113
            # If multiple IP addresses are available, only the one with the default gateway is returned
114
            try:
115
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
116
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
117
            except (KeyError, AttributeError) as e:
118
                logger.debug("Cannot grab private IP address ({})".format(e))
119
                return self.get_init_value()
120
            else:
121
                stats['address'] = address
122
                stats['mask'] = mask
123
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
124
125
            # Public IP address
126
            time_since_update = getTimeSinceLastUpdate('public-ip')
127
            try:
128
                if not self.public_disabled and (
129
                    self.public_address == "" or time_since_update > self.public_address_refresh_interval
130
                ):
131
                    self.public_info = PublicIpInfo(
132
                        self.public_api, self.public_username, self.public_password
133
                    ).get()
134
                    self.public_address = self.public_info['ip']
135
            except (KeyError, AttributeError) as e:
136
                logger.debug("Cannot grab public IP information ({})".format(e))
137
            else:
138
                stats['public_address'] = self.public_address
139
                stats['public_info_human'] = self.public_info_for_human(self.public_info)
140
141
        elif self.input_method == 'snmp':
142
            # Not implemented yet
143
            pass
144
145
        # Update the stats
146
        self.stats = stats
147
148
        return self.stats
149
150
    def msg_curse(self, args=None, max_width=None):
151
        """Return the dict to display in the curse interface."""
152
        # Init the return message
153
        ret = []
154
155
        # Only process if stats exist and display plugin enable...
156
        if not self.stats or self.is_disabled() or import_error_tag:
157
            return ret
158
159
        # Build the string message
160
        msg = ' - '
161
        ret.append(self.curse_add_line(msg, optional=True))
162
163
        # Start with the private IP information
164
        msg = 'IP '
165
        ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
166
        if 'address' in self.stats:
167
            msg = '{}'.format(self.stats['address'])
168
            ret.append(self.curse_add_line(msg, optional=True))
169
        if 'mask_cidr' in self.stats:
170
            # VPN with no internet access (issue #842)
171
            msg = '/{}'.format(self.stats['mask_cidr'])
172
            ret.append(self.curse_add_line(msg, optional=True))
173
174
        # Then with the public IP information
175
        try:
176
            msg_pub = '{}'.format(self.stats['public_address'])
177
        except (UnicodeEncodeError, KeyError):
178
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
179
            pass
180
        else:
181
            if self.stats['public_address']:
182
                msg = ' Pub '
183
                ret.append(self.curse_add_line(msg, 'TITLE', optional=True))
184
                ret.append(self.curse_add_line(msg_pub, optional=True))
185
186
            if self.stats['public_info_human']:
187
                ret.append(self.curse_add_line(' {}'.format(self.stats['public_info_human']), optional=True))
188
189
        return ret
190
191
    def public_info_for_human(self, public_info):
192
        """Return the data to pack to the client."""
193
        if not public_info:
194
            return ''
195
196
        return self.public_template.format(**public_info)
197
198
    @staticmethod
199
    def ip_to_cidr(ip):
200
        """Convert IP address to CIDR.
201
202
        Example: '255.255.255.0' will return 24
203
        """
204
        # Thanks to @Atticfire
205
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
206
        if ip is None:
207
            # Correct issue #1528
208
            return 0
209
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
210
211
212
class PublicIpInfo(object):
213
    """Get public IP information from online service."""
214
215
    def __init__(self, url, username, password, timeout=2):
216
        """Init the class."""
217
        self.url = url
218
        self.username = username
219
        self.password = password
220
        self.timeout = timeout
221
222
    def get(self):
223
        """Return the public IP information returned by one of the online service."""
224
        q = queue.Queue()
225
226
        t = threading.Thread(target=self._get_ip_public_info, args=(q, self.url, self.username, self.password))
227
        t.daemon = True
228
        t.start()
229
230
        timer = Timer(self.timeout)
231
        info = None
232
        while not timer.finished() and info is None:
233
            if q.qsize() > 0:
234
                info = q.get()
235
236
        return info
237
238
    def _get_ip_public_info(self, queue_target, url, username, password):
239
        """Request the url service and put the result in the queue_target."""
240
        try:
241
            response = urlopen_auth(url, username, password).read()
242
        except Exception as e:
243
            logger.debug("IP plugin - Cannot get public IP information from {} ({})".format(url, e))
244
            queue_target.put(None)
245
        else:
246
            try:
247
                queue_target.put(loads(response))
248
            except (ValueError, KeyError) as e:
249
                logger.debug("IP plugin - Cannot load public IP information from {} ({})".format(url, e))
250
                queue_target.put(None)
251