Test Failed
Push — develop ( 1def83...bd96e3 )
by Nicolas
02:18 queued 12s
created

glances.plugins.wifi.model   A

Complexity

Total Complexity 42

Size/Duplication

Total Lines 273
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 130
dl 0
loc 273
rs 9.0399
c 0
b 0
f 0
wmc 42

12 Methods

Rating   Name   Duplication   Size   Complexity  
A ThreadHotspot.stats() 0 7 2
A ThreadHotspot.stop() 0 3 1
D PluginModel.update() 0 59 12
A PluginModel.get_alert() 0 20 5
A ThreadHotspot.stopped() 0 3 1
A PluginModel.get_key() 0 6 1
A PluginModel.exit() 0 6 2
A PluginModel.update_views() 0 9 2
A ThreadHotspot.__init__() 0 9 1
C PluginModel.msg_curse() 0 44 9
A PluginModel.__init__() 0 9 1
A ThreadHotspot.run() 0 21 5

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.wifi.model often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2023 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Wifi plugin.
11
12
Stats are retreived from the nmcli command line (Linux only):
13
14
# nmcli -t -f active,ssid,signal,security,chan dev wifi
15
16
# nmcli -t -f active,ssid,signal dev wifi
17
no:Livebox-C820:77
18
yes:Livebox-C820:72
19
20
or the /proc/net/wireless file (Linux only):
21
22
# cat /proc/net/wireless
23
Inter-| sta-|   Quality        |   Discarded packets               | Missed | WE
24
 face | tus | link level noise |  nwid  crypt   frag  retry   misc | beacon | 22
25
wlp2s0: 0000   51.  -59.  -256        0      0      0      0   5881        0
26
"""
27
28
import operator
29
from shutil import which
30
import threading
31
import time
32
33
from glances.globals import nativestr, file_exists
34
from glances.plugins.plugin.model import GlancesPluginModel
35
from glances.secure import secure_popen
36
from glances.logger import logger
37
38
# Test if the nmcli command exists and is executable
39
# it allows to get the list of the available hotspots
40
NMCLI_COMMAND = which('nmcli')
41
NMCLI_ARGS = '-t -f active,ssid,signal,security dev wifi'
42
nmcli_command_exists = NMCLI_COMMAND is not None
43
44
# Backup solution is to use the /proc/net/wireless file
45
# but it only give signal information about the current hotspot
46
WIRELESS_FILE = '/proc/net/wireless'
47
wireless_file_exists = file_exists(WIRELESS_FILE)
48
49
if not nmcli_command_exists and not wireless_file_exists:
50
    logger.debug("Wifi plugin is disabled (no %s command or %s file found)" % ('nmcli',
51
                                                                               WIRELESS_FILE))
52
53
54
class PluginModel(GlancesPluginModel):
55
    """Glances Wifi plugin.
56
57
    Get stats of the current Wifi hotspots.
58
    """
59
60
    def __init__(self, args=None, config=None):
61
        """Init the plugin."""
62
        super(PluginModel, self).__init__(args=args, config=config, stats_init_value=[])
63
64
        # We want to display the stat in the curse interface
65
        self.display_curse = True
66
67
        # Global Thread running all the scans
68
        self._thread = None
69
70
    def exit(self):
71
        """Overwrite the exit method to close threads."""
72
        if self._thread is not None:
73
            self._thread.stop()
74
        # Call the father class
75
        super(PluginModel, self).exit()
76
77
    def get_key(self):
78
        """Return the key of the list.
79
80
        :returns: string -- SSID is the dict key
81
        """
82
        return 'ssid'
83
84
    @GlancesPluginModel._check_decorator
85
    @GlancesPluginModel._log_result_decorator
86
    def update(self):
87
        """Update Wifi stats using the input method.
88
89
        Stats is a list of dict (one dict per hotspot)
90
91
        :returns: list -- Stats is a list of dict (hotspot)
92
        """
93
        # Init new stats
94
        stats = self.get_init_value()
95
96
        # Exist if we can not grab the stats
97
        if not nmcli_command_exists and not wireless_file_exists:
98
            return stats
99
100
        if self.input_method == 'local' and nmcli_command_exists:
101
            # Only refresh if there is not other scanning thread
102
            if self._thread is None:
103
                thread_is_running = False
104
            else:
105
                thread_is_running = self._thread.is_alive()
106
            if not thread_is_running:
107
                # Run hotspot scanner thanks to the nmcli command
108
                self._thread = ThreadHotspot(self.get_refresh_time())
109
                self._thread.start()
110
            # Get the result (or [] if the scan is ongoing)
111
            stats = self._thread.stats
112
        elif self.input_method == 'local' and wireless_file_exists:
113
            # As a backup solution, use the /proc/net/wireless file
114
            with open(WIRELESS_FILE, 'r') as f:
115
                # The first two lines are header
116
                f.readline()
117
                f.readline()
118
                # Others lines are Wifi stats
119
                wifi_stats = f.readline()
120
                while wifi_stats != '':
121
                    # Extract the stats
122
                    wifi_stats = wifi_stats.split()
123
                    # Add the Wifi link to the list
124
                    stats.append({
125
                        'key': self.get_key(),
126
                        'ssid': wifi_stats[0][:-1],
127
                        'signal': float(wifi_stats[3]),
128
                        'security': ''
129
                    })
130
                    # Next line
131
                    wifi_stats = f.readline()
132
133
        elif self.input_method == 'snmp':
134
            # Update stats using SNMP
135
136
            # Not implemented yet
137
            pass
138
139
        # Update the stats
140
        self.stats = stats
141
142
        return self.stats
143
144
    def get_alert(self, value):
145
        """Overwrite the default get_alert method.
146
147
        Alert is on signal quality where lower is better...
148
149
        :returns: string -- Signal alert
150
        """
151
        ret = 'OK'
152
        try:
153
            if value <= self.get_limit('critical', stat_name=self.plugin_name):
154
                ret = 'CRITICAL'
155
            elif value <= self.get_limit('warning', stat_name=self.plugin_name):
156
                ret = 'WARNING'
157
            elif value <= self.get_limit('careful', stat_name=self.plugin_name):
158
                ret = 'CAREFUL'
159
        except (TypeError, KeyError):
160
            # Catch TypeError for issue1373
161
            ret = 'DEFAULT'
162
163
        return ret
164
165
    def update_views(self):
166
        """Update stats views."""
167
        # Call the father's method
168
        super(PluginModel, self).update_views()
169
170
        # Add specifics information
171
        # Alert on signal thresholds
172
        for i in self.stats:
173
            self.views[i[self.get_key()]]['signal']['decoration'] = self.get_alert(i['signal'])
174
175
    def msg_curse(self, args=None, max_width=None):
176
        """Return the dict to display in the curse interface."""
177
        # Init the return message
178
        ret = []
179
180
        # Only process if stats exist and display plugin enable...
181
        if not self.stats or not wireless_file_exists or self.is_disabled():
182
            return ret
183
184
        # Max size for the interface name
185
        if_name_max_width = max_width - 5
186
187
        # Build the string message
188
        # Header
189
        msg = '{:{width}}'.format('WIFI', width=if_name_max_width)
190
        ret.append(self.curse_add_line(msg, "TITLE"))
191
        msg = '{:>7}'.format('dBm')
192
        ret.append(self.curse_add_line(msg))
193
194
        # Hotspot list (sorted by name)
195
        for i in sorted(self.stats, key=operator.itemgetter(self.get_key())):
196
            # Do not display hotspot with no name (/ssid)...
197
            # of ssid/signal None... See issue #1151 and #issue1973
198
            if i['ssid'] == '' or i['ssid'] is None or i['signal'] is None:
199
                continue
200
            ret.append(self.curse_new_line())
201
            # New hotspot
202
            hotspot_name = i['ssid']
203
            # Cut hotspot_name if it is too long
204
            if len(hotspot_name) > if_name_max_width:
205
                hotspot_name = '_' + hotspot_name[-if_name_max_width - len(i['security']) + 1:]
206
            # Add the new hotspot to the message
207
            msg = '{:{width}} {security}'.format(nativestr(hotspot_name),
208
                                                 width=if_name_max_width - len(i['security']) - 1,
209
                                                 security=i['security'])
210
            ret.append(self.curse_add_line(msg))
211
            msg = '{:>7}'.format(
212
                i['signal'],
213
            )
214
            ret.append(
215
                self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='signal', option='decoration'))
216
            )
217
218
        return ret
219
220
221
class ThreadHotspot(threading.Thread):
222
    """
223
    Specific thread for the Wifi hotspot scanner.
224
    """
225
226
    def __init__(self, refresh_time=2):
227
        """Init the class."""
228
        super(ThreadHotspot, self).__init__()
229
        # Refresh time
230
        self.refresh_time = refresh_time
231
        # Event needed to stop properly the thread
232
        self._stopper = threading.Event()
233
        # Is part of Ports plugin
234
        self.plugin_name = "wifi"
235
236
    def run(self):
237
        """Get hotspots stats using the nmcli command line"""
238
        while not self.stopped():
239
            # Run the nmcli command
240
            nmcli_raw = secure_popen(NMCLI_COMMAND + ' ' + NMCLI_ARGS).split('\n')
241
            nmcli_result = []
242
            for h in nmcli_raw:
243
                h = h.split(':')
244
                if len(h) != 4 or h[0] != 'yes':
245
                    # Do not process the line if it is not the active hotspot
246
                    continue
247
                nmcli_result.append({
248
                    'key': 'ssid',
249
                    'ssid': h[1],
250
                    'signal': -float(h[2]),
251
                    'security': h[3]
252
                })
253
            self.thread_stats = nmcli_result
254
            # Wait refresh time until next scan
255
            # Note: nmcli cache the result for x seconds
256
            time.sleep(self.refresh_time)
257
258
    @property
259
    def stats(self):
260
        """Stats getter."""
261
        if hasattr(self, 'thread_stats'):
262
            return self.thread_stats
263
        else:
264
            return []
265
266
    def stop(self, timeout=None):
267
        """Stop the thread."""
268
        self._stopper.set()
269
270
    def stopped(self):
271
        """Return True is the thread is stopped."""
272
        return self._stopper.is_set()
273