1 | # -*- coding: utf-8 -*- |
||
2 | # |
||
3 | # This file is part of Glances. |
||
4 | # |
||
5 | # Copyright (C) 2017 Nicolargo <[email protected]> |
||
6 | # |
||
7 | # Glances is free software; you can redistribute it and/or modify |
||
8 | # it under the terms of the GNU Lesser General Public License as published by |
||
9 | # the Free Software Foundation, either version 3 of the License, or |
||
10 | # (at your option) any later version. |
||
11 | # |
||
12 | # Glances is distributed in the hope that it will be useful, |
||
13 | # but WITHOUT ANY WARRANTY; without even the implied warranty of |
||
14 | # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
||
15 | # GNU Lesser General Public License for more details. |
||
16 | # |
||
17 | # You should have received a copy of the GNU Lesser General Public License |
||
18 | # along with this program. If not, see <http://www.gnu.org/licenses/>. |
||
19 | |||
20 | """Wifi plugin.""" |
||
21 | |||
22 | import operator |
||
23 | |||
24 | from glances.logger import logger |
||
25 | from glances.plugins.glances_plugin import GlancesPlugin |
||
26 | |||
27 | import psutil |
||
28 | # Use the Wifi Python lib (https://pypi.python.org/pypi/wifi) |
||
29 | # Linux-only |
||
30 | try: |
||
31 | from wifi.scan import Cell |
||
32 | from wifi.exceptions import InterfaceError |
||
33 | except ImportError: |
||
34 | logger.debug("Wifi library not found. Glances cannot grab Wifi info.") |
||
35 | wifi_tag = False |
||
36 | else: |
||
37 | wifi_tag = True |
||
38 | |||
39 | |||
40 | class Plugin(GlancesPlugin): |
||
41 | |||
42 | """Glances Wifi plugin. |
||
43 | Get stats of the current Wifi hotspots. |
||
44 | """ |
||
45 | |||
46 | def __init__(self, args=None): |
||
47 | """Init the plugin.""" |
||
48 | super(Plugin, self).__init__(args=args) |
||
49 | |||
50 | # We want to display the stat in the curse interface |
||
51 | self.display_curse = True |
||
52 | |||
53 | # Init the stats |
||
54 | self.reset() |
||
55 | |||
56 | def get_key(self): |
||
57 | """Return the key of the list. |
||
58 | |||
59 | :returns: string -- SSID is the dict key |
||
60 | """ |
||
61 | return 'ssid' |
||
62 | |||
63 | def reset(self): |
||
64 | """Reset/init the stats to an empty list. |
||
65 | |||
66 | :returns: None |
||
67 | """ |
||
68 | self.stats = [] |
||
69 | |||
70 | @GlancesPlugin._check_decorator |
||
71 | @GlancesPlugin._log_result_decorator |
||
72 | def update(self): |
||
73 | """Update Wifi stats using the input method. |
||
74 | |||
75 | Stats is a list of dict (one dict per hotspot) |
||
76 | |||
77 | :returns: list -- Stats is a list of dict (hotspot) |
||
78 | """ |
||
79 | # Reset stats |
||
80 | self.reset() |
||
81 | |||
82 | # Exist if we can not grab the stats |
||
83 | if not wifi_tag: |
||
84 | return self.stats |
||
85 | |||
86 | if self.input_method == 'local': |
||
87 | # Update stats using the standard system lib |
||
88 | |||
89 | # Grab network interface stat using the PsUtil net_io_counter method |
||
90 | try: |
||
91 | netiocounters = psutil.net_io_counters(pernic=True) |
||
92 | except UnicodeDecodeError: |
||
93 | return self.stats |
||
94 | |||
95 | for net in netiocounters: |
||
96 | # Do not take hidden interface into account |
||
97 | if self.is_hide(net): |
||
98 | continue |
||
99 | |||
100 | # Grab the stats using the Wifi Python lib |
||
101 | try: |
||
102 | wifi_cells = Cell.all(net) |
||
103 | except InterfaceError: |
||
104 | # Not a Wifi interface |
||
105 | pass |
||
106 | else: |
||
107 | for wifi_cell in wifi_cells: |
||
108 | hotspot = { |
||
109 | 'key': self.get_key(), |
||
110 | 'ssid': wifi_cell.ssid, |
||
111 | 'signal': wifi_cell.signal, |
||
112 | 'quality': wifi_cell.quality, |
||
113 | 'encrypted': wifi_cell.encrypted, |
||
114 | 'encryption_type': wifi_cell.encryption_type if wifi_cell.encrypted else None |
||
115 | } |
||
116 | # Add the hotspot to the list |
||
117 | self.stats.append(hotspot) |
||
118 | |||
119 | elif self.input_method == 'snmp': |
||
120 | # Update stats using SNMP |
||
121 | |||
122 | # Not implemented yet |
||
123 | pass |
||
124 | |||
125 | return self.stats |
||
126 | |||
127 | def get_alert(self, value): |
||
128 | """Overwrite the default get_alert method. |
||
129 | Alert is on signal quality where lower is better... |
||
130 | |||
131 | :returns: string -- Signal alert |
||
132 | """ |
||
133 | |||
134 | ret = 'OK' |
||
135 | try: |
||
136 | if value <= self.get_limit('critical', stat_name=self.plugin_name): |
||
137 | ret = 'CRITICAL' |
||
138 | elif value <= self.get_limit('warning', stat_name=self.plugin_name): |
||
139 | ret = 'WARNING' |
||
140 | elif value <= self.get_limit('careful', stat_name=self.plugin_name): |
||
141 | ret = 'CAREFUL' |
||
142 | except KeyError: |
||
143 | ret = 'DEFAULT' |
||
144 | |||
145 | return ret |
||
146 | |||
147 | View Code Duplication | def update_views(self): |
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
148 | """Update stats views.""" |
||
149 | # Call the father's method |
||
150 | super(Plugin, self).update_views() |
||
151 | |||
152 | # Add specifics informations |
||
153 | # Alert on signal thresholds |
||
154 | for i in self.stats: |
||
155 | self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal']) |
||
156 | self.views[i[self.get_key()]]['quality']['decoration'] = self.views[i[self.get_key()]]['signal']['decoration'] |
||
157 | |||
158 | def msg_curse(self, args=None, max_width=None): |
||
159 | """Return the dict to display in the curse interface.""" |
||
160 | # Init the return message |
||
161 | ret = [] |
||
162 | |||
163 | # Only process if stats exist and display plugin enable... |
||
164 | if not self.stats or args.disable_wifi or not wifi_tag: |
||
165 | return ret |
||
166 | |||
167 | # Max size for the interface name |
||
168 | if max_width is not None and max_width >= 23: |
||
169 | # Interface size name = max_width - space for encyption + quality |
||
170 | ifname_max_width = max_width - 5 |
||
171 | else: |
||
172 | ifname_max_width = 16 |
||
173 | |||
174 | # Build the string message |
||
175 | # Header |
||
176 | msg = '{:{width}}'.format('WIFI', width=ifname_max_width) |
||
177 | ret.append(self.curse_add_line(msg, "TITLE")) |
||
178 | msg = '{:>7}'.format('dBm') |
||
179 | ret.append(self.curse_add_line(msg)) |
||
180 | |||
181 | # Hotspot list (sorted by name) |
||
182 | for i in sorted(self.stats, key=operator.itemgetter(self.get_key())): |
||
183 | # Do not display hotspot with no name (/ssid) |
||
184 | if i['ssid'] == '': |
||
185 | continue |
||
186 | ret.append(self.curse_new_line()) |
||
187 | # New hotspot |
||
188 | hotspotname = i['ssid'] |
||
189 | # Add the encryption type (if it is available) |
||
190 | if i['encrypted']: |
||
191 | hotspotname += ' {}'.format(i['encryption_type']) |
||
192 | # Cut hotspotname if it is too long |
||
193 | if len(hotspotname) > ifname_max_width: |
||
194 | hotspotname = '_' + hotspotname[-ifname_max_width + 1:] |
||
195 | # Add the new hotspot to the message |
||
196 | msg = '{:{width}}'.format(hotspotname, width=ifname_max_width) |
||
197 | ret.append(self.curse_add_line(msg)) |
||
198 | msg = '{:>7}'.format(i['signal'], width=ifname_max_width) |
||
199 | ret.append(self.curse_add_line(msg, |
||
200 | self.get_views(item=i[self.get_key()], |
||
201 | key='signal', |
||
202 | option='decoration'))) |
||
203 | |||
204 | return ret |
||
205 |