Test Failed
Push — master ( 128504...ccd56b )
by Nicolas
03:24
created

glances.plugins.glances_ip.Plugin.update()   C

Complexity

Conditions 10

Size

Total Lines 44
Code Lines 28

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 28
nop 1
dl 0
loc 44
rs 5.9999
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.glances_ip.Plugin.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2019 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""IP plugin."""
21
22
import threading
23
from json import loads
24
25
from glances.compat import iterkeys, urlopen, queue
26
from glances.logger import logger
27
from glances.timer import Timer, getTimeSinceLastUpdate
28
from glances.plugins.glances_plugin import GlancesPlugin
29
30
# Import plugin specific dependency
31
try:
32
    import netifaces
33
except ImportError as e:
34
    import_error_tag = True
35
    logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e))
36
else:
37
    import_error_tag = False
38
39
# List of online services to retrieve public IP address
40
# List of tuple (url, json, key)
41
# - url: URL of the Web site
42
# - json: service return a JSON (True) or string (False)
43
# - key: key of the IP address in the JSON structure
44
urls = [
45
    #  glances_ip.py plugin relies on low rating / malicious site domain #1975
46
    # ('https://ip.42.pl/raw', False, None),
47
    ('https://httpbin.org/ip', True, 'origin'),
48
    ('https://api.ipify.org/?format=json', True, 'ip'),
49
    ('https://ipv4.jsonip.com', True, 'ip'),
50
]
51
52
53
class Plugin(GlancesPlugin):
54
    """Glances IP Plugin.
55
56
    stats is a dict
57
    """
58
59
    _default_public_refresh_interval = 300
60
    _default_public_ip_disabled = ["False"]
61
62
    def __init__(self, args=None, config=None):
63
        """Init the plugin."""
64
        super(Plugin, self).__init__(args=args, config=config)
65
66
        # We want to display the stat in the curse interface
67
        self.display_curse = True
68
69
        # For public IP address
70
        self.public_address = ""
71
        self.public_address_refresh_interval = self.get_conf_value(
72
            "public_refresh_interval", default=self._default_public_refresh_interval
73
        )
74
75
        public_ip_disabled = self.get_conf_value("public_ip_disabled", default=self._default_public_ip_disabled)
76
        self.public_ip_disabled = True if public_ip_disabled == ["True"] else False
77
78
    @GlancesPlugin._check_decorator
79
    @GlancesPlugin._log_result_decorator
80
    def update(self):
81
        """Update IP stats using the input method.
82
83
        :return: the stats dict
84
        """
85
        # Init new stats
86
        stats = self.get_init_value()
87
88
        if self.input_method == 'local' and not import_error_tag:
89
            # Update stats using the netifaces lib
90
            try:
91
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
92
            except (KeyError, AttributeError) as e:
93
                logger.debug("Cannot grab the default gateway ({})".format(e))
94
                return {}
95
96
            try:
97
                address = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
98
                mask = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
99
100
                time_since_update = getTimeSinceLastUpdate('public-ip')
101
                if not self.public_ip_disabled and (
102
                    self.stats.get('address') != address or time_since_update > self.public_address_refresh_interval
103
                ):
104
                    self.public_address = PublicIpAddress().get()
105
            except (KeyError, AttributeError) as e:
106
                logger.debug("Cannot grab IP information: {}".format(e))
107
            else:
108
                stats['address'] = address
109
                stats['mask'] = mask
110
                stats['mask_cidr'] = self.ip_to_cidr(stats['mask'])
111
                stats['gateway'] = default_gw[0]
112
                stats['public_address'] = self.public_address
113
114
        elif self.input_method == 'snmp':
115
            # Not implemented yet
116
            pass
117
118
        # Update the stats
119
        self.stats = stats
120
121
        return self.stats
122
123
    def update_views(self):
124
        """Update stats views."""
125
        # Call the father's method
126
        super(Plugin, self).update_views()
127
128
        # Add specifics information
129
        # Optional
130
        for key in iterkeys(self.stats):
131
            self.views[key]['optional'] = True
132
133
    def msg_curse(self, args=None, max_width=None):
134
        """Return the dict to display in the curse interface."""
135
        # Init the return message
136
        ret = []
137
138
        # Only process if stats exist and display plugin enable...
139
        if not self.stats or self.is_disabled() or import_error_tag:
140
            return ret
141
142
        # Build the string message
143
        msg = ' - '
144
        ret.append(self.curse_add_line(msg))
145
        msg = 'IP '
146
        ret.append(self.curse_add_line(msg, 'TITLE'))
147
        if 'address' in self.stats:
148
            msg = '{}'.format(self.stats['address'])
149
            ret.append(self.curse_add_line(msg))
150
        if 'mask_cidr' in self.stats:
151
            # VPN with no internet access (issue #842)
152
            msg = '/{}'.format(self.stats['mask_cidr'])
153
            ret.append(self.curse_add_line(msg))
154
        try:
155
            msg_pub = '{}'.format(self.stats['public_address'])
156
        except (UnicodeEncodeError, KeyError):
157
            # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469)
158
            pass
159
        else:
160
            if self.stats['public_address']:
161
                msg = ' Pub '
162
                ret.append(self.curse_add_line(msg, 'TITLE'))
163
                ret.append(self.curse_add_line(msg_pub))
164
165
        return ret
166
167
    @staticmethod
168
    def ip_to_cidr(ip):
169
        """Convert IP address to CIDR.
170
171
        Example: '255.255.255.0' will return 24
172
        """
173
        # Thanks to @Atticfire
174
        # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399
175
        if ip is None:
176
            # Correct issue #1528
177
            return 0
178
        return sum(bin(int(x)).count('1') for x in ip.split('.'))
179
180
181
class PublicIpAddress(object):
182
    """Get public IP address from online services."""
183
184
    def __init__(self, timeout=2):
185
        """Init the class."""
186
        self.timeout = timeout
187
188
    def get(self):
189
        """Get the first public IP address returned by one of the online services."""
190
        q = queue.Queue()
191
192
        for u, j, k in urls:
193
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
194
            t.daemon = True
195
            t.start()
196
197
        timer = Timer(self.timeout)
198
        ip = None
199
        while not timer.finished() and ip is None:
200
            if q.qsize() > 0:
201
                ip = q.get()
202
203
        if ip is None:
204
            return None
205
206
        return ', '.join(set([x.strip() for x in ip.split(',')]))
207
208
    def _get_ip_public(self, queue_target, url, json=False, key=None):
209
        """Request the url service and put the result in the queue_target."""
210
        try:
211
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
212
        except Exception as e:
213
            logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e))
214
            queue_target.put(None)
215
        else:
216
            # Request depend on service
217
            try:
218
                if not json:
219
                    queue_target.put(response)
220
                else:
221
                    queue_target.put(loads(response)[key])
222
            except ValueError:
223
                queue_target.put(None)
224