1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of Glances. |
||
4 | # |
||
5 | # Copyright (C) 2019 Nicolargo <[email protected]> |
||
6 | # |
||
7 | # Glances is free software; you can redistribute it and/or modify |
||
8 | # it under the terms of the GNU Lesser General Public License as published by |
||
9 | # the Free Software Foundation, either version 3 of the License, or |
||
10 | # (at your option) any later version. |
||
11 | # |
||
12 | # Glances is distributed in the hope that it will be useful, |
||
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
15 | # GNU Lesser General Public License for more details. |
||
16 | # |
||
17 | # You should have received a copy of the GNU Lesser General Public License |
||
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
19 | |||
20 | """IP plugin.""" |
||
21 | |||
22 | import threading |
||
23 | from json import loads |
||
24 | |||
25 | from glances.compat import iterkeys, urlopen, queue |
||
26 | from glances.logger import logger |
||
27 | from glances.timer import Timer |
||
28 | from glances.plugins.glances_plugin import GlancesPlugin |
||
29 | |||
30 | # Import plugin specific dependency |
||
31 | try: |
||
32 | import netifaces |
||
33 | except ImportError as e: |
||
34 | import_error_tag = True |
||
35 | logger.warning("Missing Python Lib ({}), IP plugin is disabled".format(e)) |
||
36 | else: |
||
37 | import_error_tag = False |
||
38 | |||
39 | # List of online services to retreive public IP address |
||
40 | # List of tuple (url, json, key) |
||
41 | # - url: URL of the Web site |
||
42 | # - json: service return a JSON (True) or string (False) |
||
43 | # - key: key of the IP addresse in the JSON structure |
||
44 | urls = [('https://ip.42.pl/raw', False, None), |
||
45 | ('https://httpbin.org/ip', True, 'origin'), |
||
46 | ('https://jsonip.com', True, 'ip'), |
||
47 | ('https://api.ipify.org/?format=json', True, 'ip')] |
||
48 | |||
49 | |||
50 | class Plugin(GlancesPlugin): |
||
51 | """Glances IP Plugin. |
||
52 | |||
53 | stats is a dict |
||
54 | """ |
||
55 | |||
56 | def __init__(self, args=None, config=None): |
||
57 | """Init the plugin.""" |
||
58 | super(Plugin, self).__init__(args=args, config=config) |
||
59 | |||
60 | # We want to display the stat in the curse interface |
||
61 | self.display_curse = True |
||
62 | |||
63 | # Get the public IP address once (not for each refresh) |
||
64 | if not self.is_disable() and not import_error_tag: |
||
65 | self.public_address = PublicIpAddress().get() |
||
66 | |||
67 | @GlancesPlugin._check_decorator |
||
68 | @GlancesPlugin._log_result_decorator |
||
69 | def update(self): |
||
70 | """Update IP stats using the input method. |
||
71 | |||
72 | Stats is dict |
||
73 | """ |
||
74 | # Init new stats |
||
75 | stats = self.get_init_value() |
||
76 | |||
77 | if self.input_method == 'local' and not import_error_tag: |
||
78 | # Update stats using the netifaces lib |
||
79 | try: |
||
80 | default_gw = netifaces.gateways()['default'][netifaces.AF_INET] |
||
81 | except (KeyError, AttributeError) as e: |
||
82 | logger.debug("Cannot grab the default gateway ({})".format(e)) |
||
83 | else: |
||
84 | try: |
||
85 | stats['address'] = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['addr'] |
||
86 | stats['mask'] = netifaces.ifaddresses(default_gw[1])[netifaces.AF_INET][0]['netmask'] |
||
87 | stats['mask_cidr'] = self.ip_to_cidr(stats['mask']) |
||
88 | stats['gateway'] = netifaces.gateways()['default'][netifaces.AF_INET][0] |
||
89 | stats['public_address'] = self.public_address |
||
90 | except (KeyError, AttributeError) as e: |
||
91 | logger.debug("Cannot grab IP information: {}".format(e)) |
||
92 | elif self.input_method == 'snmp': |
||
93 | # Not implemented yet |
||
94 | pass |
||
95 | |||
96 | # Update the stats |
||
97 | self.stats = stats |
||
98 | |||
99 | return self.stats |
||
100 | |||
101 | def update_views(self): |
||
102 | """Update stats views.""" |
||
103 | # Call the father's method |
||
104 | super(Plugin, self).update_views() |
||
105 | |||
106 | # Add specifics informations |
||
107 | # Optional |
||
108 | for key in iterkeys(self.stats): |
||
109 | self.views[key]['optional'] = True |
||
110 | |||
111 | def msg_curse(self, args=None, max_width=None): |
||
112 | """Return the dict to display in the curse interface.""" |
||
113 | # Init the return message |
||
114 | ret = [] |
||
115 | |||
116 | # Only process if stats exist and display plugin enable... |
||
117 | if not self.stats or self.is_disable() or import_error_tag: |
||
118 | return ret |
||
119 | |||
120 | # Build the string message |
||
121 | msg = ' - ' |
||
122 | ret.append(self.curse_add_line(msg)) |
||
123 | msg = 'IP ' |
||
124 | ret.append(self.curse_add_line(msg, 'TITLE')) |
||
125 | if 'address' in self.stats: |
||
126 | msg = '{}'.format(self.stats['address']) |
||
127 | ret.append(self.curse_add_line(msg)) |
||
128 | if 'mask_cidr' in self.stats: |
||
129 | # VPN with no internet access (issue #842) |
||
130 | msg = '/{}'.format(self.stats['mask_cidr']) |
||
131 | ret.append(self.curse_add_line(msg)) |
||
132 | try: |
||
133 | msg_pub = '{}'.format(self.stats['public_address']) |
||
134 | except (UnicodeEncodeError, KeyError): |
||
135 | # Add KeyError exception (see https://github.com/nicolargo/glances/issues/1469) |
||
136 | pass |
||
137 | else: |
||
138 | if self.stats['public_address'] is not None: |
||
139 | msg = ' Pub ' |
||
140 | ret.append(self.curse_add_line(msg, 'TITLE')) |
||
141 | ret.append(self.curse_add_line(msg_pub)) |
||
142 | |||
143 | return ret |
||
144 | |||
145 | @staticmethod |
||
146 | def ip_to_cidr(ip): |
||
147 | """Convert IP address to CIDR. |
||
148 | |||
149 | Example: '255.255.255.0' will return 24 |
||
150 | """ |
||
151 | # Thanks to @Atticfire |
||
152 | # See https://github.com/nicolargo/glances/issues/1417#issuecomment-469894399 |
||
153 | return sum(bin(int(x)).count('1') for x in ip.split('.')) |
||
154 | |||
155 | |||
156 | class PublicIpAddress(object): |
||
157 | """Get public IP address from online services.""" |
||
158 | |||
159 | def __init__(self, timeout=2): |
||
160 | """Init the class.""" |
||
161 | self.timeout = timeout |
||
162 | |||
163 | def get(self): |
||
164 | """Get the first public IP address returned by one of the online services.""" |
||
165 | q = queue.Queue() |
||
166 | |||
167 | for u, j, k in urls: |
||
168 | t = threading.Thread(target=self._get_ip_public, args=(q, u, j, k)) |
||
169 | t.daemon = True |
||
170 | t.start() |
||
171 | |||
172 | timer = Timer(self.timeout) |
||
173 | ip = None |
||
174 | while not timer.finished() and ip is None: |
||
175 | if q.qsize() > 0: |
||
176 | ip = q.get() |
||
177 | |||
178 | return ', '.join(set([x.strip() for x in ip.split(',')])) |
||
179 | |||
180 | def _get_ip_public(self, queue_target, url, json=False, key=None): |
||
181 | """Request the url service and put the result in the queue_target.""" |
||
182 | try: |
||
183 | response = urlopen(url, timeout=self.timeout).read().decode('utf-8') |
||
184 | except Exception as e: |
||
0 ignored issues
–
show
|
|||
185 | logger.debug("IP plugin - Cannot open URL {} ({})".format(url, e)) |
||
186 | queue_target.put(None) |
||
187 | else: |
||
188 | # Request depend on service |
||
189 | try: |
||
190 | if not json: |
||
191 | queue_target.put(response) |
||
192 | else: |
||
193 | queue_target.put(loads(response)[key]) |
||
194 | except ValueError: |
||
195 | queue_target.put(None) |
||
196 |
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.
So, unless you specifically plan to handle any error, consider adding a more specific exception.