Completed
Push — master ( 3dcd25...20576f )
by Nicolas
01:26
created

PublicIpAddress._get_ip_public()   B

Complexity

Conditions 5

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 1
Metric Value
cc 5
c 4
b 0
f 1
dl 0
loc 16
rs 8.5454
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# Copyright (C) 2016 Nicolargo <[email protected]>
6
#
7
# Glances is free software; you can redistribute it and/or modify
8
# it under the terms of the GNU Lesser General Public License as published by
9
# the Free Software Foundation, either version 3 of the License, or
10
# (at your option) any later version.
11
#
12
# Glances is distributed in the hope that it will be useful,
13
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
# GNU Lesser General Public License for more details.
16
#
17
# You should have received a copy of the GNU Lesser General Public License
18
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20
"""IP plugin."""
21
22
import threading
23
from json import loads
24
25
from glances.compat import iterkeys, urlopen, URLError, queue
26
from glances.globals import BSD
27
from glances.logger import logger
28
from glances.timer import Timer
29
from glances.plugins.glances_plugin import GlancesPlugin
30
31
# XXX *BSDs: Segmentation fault (core dumped)
32
# -- https://bitbucket.org/al45tair/netifaces/issues/15
33
# Also used in the ports_list script
34
if not BSD:
35
    try:
36
        import netifaces
37
        netifaces_tag = True
38
    except ImportError:
39
        netifaces_tag = False
40
else:
41
    netifaces_tag = False
42
43
# List of online services to retreive public IP address
44
# List of tuple (url, json, key)
45
# - url: URL of the Web site
46
# - json: service return a JSON (True) or string (False)
47
# - key: key of the IP addresse in the JSON structure
48
urls = [('http://ip.42.pl/raw', False, None),
49
        ('http://httpbin.org/ip', True, 'origin'),
50
        ('http://jsonip.com', True, 'ip'),
51
        ('https://api.ipify.org/?format=json', True, 'ip')]
52
53
54
class Plugin(GlancesPlugin):
55
56
    """Glances IP Plugin.
57
58
    stats is a dict
59
    """
60
61
    def __init__(self, args=None):
62
        """Init the plugin."""
63
        super(Plugin, self).__init__(args=args)
64
65
        # We want to display the stat in the curse interface
66
        self.display_curse = True
67
68
        # Get the public IP address once
69
        self.public_address = PublicIpAddress().get()
70
71
        # Init the stats
72
        self.reset()
73
74
    def reset(self):
75
        """Reset/init the stats."""
76
        self.stats = {}
77
78
    @GlancesPlugin._log_result_decorator
79
    def update(self):
80
        """Update IP stats using the input method.
81
82
        Stats is dict
83
        """
84
        # Reset stats
85
        self.reset()
86
87
        if self.input_method == 'local' and netifaces_tag:
88
            # Update stats using the netifaces lib
89
            try:
90
                default_gw = netifaces.gateways()['default'][netifaces.AF_INET]
91
            except (KeyError, AttributeError) as e:
92
                logger.debug("Cannot grab the default gateway ({})".format(e))
93
            else:
94
                try:
95
                    self.stats['address'] = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr']
96
                    self.stats['mask'] = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask']
97
                    self.stats['mask_cidr'] = self.ip_to_cidr(self.stats['mask'])
98
                    self.stats['gateway'] = netifaces.gateways()['default'][netifaces.AF_INET][0]
99
                    # !!! SHOULD be done once, not on each refresh
100
                    self.stats['public_address'] = self.public_address
101
                except (KeyError, AttributeError) as e:
102
                    logger.debug("Cannot grab IP information: {}".format(e))
103
        elif self.input_method == 'snmp':
104
            # Not implemented yet
105
            pass
106
107
        # Update the view
108
        self.update_views()
109
110
        return self.stats
111
112
    def update_views(self):
113
        """Update stats views."""
114
        # Call the father's method
115
        super(Plugin, self).update_views()
116
117
        # Add specifics informations
118
        # Optional
119
        for key in iterkeys(self.stats):
120
            self.views[key]['optional'] = True
121
122
    def msg_curse(self, args=None):
123
        """Return the dict to display in the curse interface."""
124
        # Init the return message
125
        ret = []
126
127
        # Only process if stats exist and display plugin enable...
128
        if not self.stats or args.disable_ip:
129
            return ret
130
131
        # Build the string message
132
        msg = ' - '
133
        ret.append(self.curse_add_line(msg))
134
        msg = 'IP '
135
        ret.append(self.curse_add_line(msg, 'TITLE'))
136
        msg = '{}'.format(self.stats['address'])
137
        ret.append(self.curse_add_line(msg))
138
        if 'mask_cidr' in self.stats:
139
            # VPN with no internet access (issue #842)
140
            msg = '/{}'.format(self.stats['mask_cidr'])
141
            ret.append(self.curse_add_line(msg))
142
        try:
143
            msg_pub = '{}'.format(self.stats['public_address'])
144
        except UnicodeEncodeError:
145
            pass
146
        else:
147
            if self.stats['public_address'] is not None:
148
                msg = ' Pub '
149
                ret.append(self.curse_add_line(msg, 'TITLE'))
150
                ret.append(self.curse_add_line(msg_pub))
151
152
        return ret
153
154
    @staticmethod
155
    def ip_to_cidr(ip):
156
        """Convert IP address to CIDR.
157
158
        Example: '255.255.255.0' will return 24
159
        """
160
        return sum([int(x) << 8 for x in ip.split('.')]) // 8128
161
162
163
class PublicIpAddress(object):
164
    """Get public IP address from online services"""
165
166
    def __init__(self, timeout=2):
167
        self.timeout = timeout
168
169
    def get(self):
170
        """Get the first public IP address returned by one of the online services"""
171
        q = queue.Queue()
172
173
        for u, j, k in urls:
174
            t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k))
175
            t.daemon = True
176
            t.start()
177
178
        t = Timer(self.timeout)
179
        ip = None
180
        while not t.finished() and ip is None:
181
            if q.qsize() > 0:
182
                ip = q.get()
183
184
        return ip
185
186
    def _get_ip_public(self, queue_target, url, json=False, key=None):
187
        """Request the url service and put the result in the queue_target"""
188
        try:
189
            response = urlopen(url, timeout=self.timeout).read().decode('utf-8')
190
        except Exception as e:
191
            logger.debug("IP plugin - Can not open URL {0} ({1})".format(url, e))
192
            queue_target.put(None)
193
        else:
194
            # Request depend on service
195
            try:
196
                if not json:
197
                    queue_target.put(response)
198
                else:
199
                    queue_target.put(loads(response)[key])
200
            except ValueError:
201
                queue_target.put(None)
202