Test Failed
Push — master ( ee826a...d9056e )
by Nicolas
03:09
created

glances.plugins.sensors   F

Complexity

Total Complexity 81

Size/Duplication

Total Lines 436
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 252
dl 0
loc 436
rs 2
c 0
b 0
f 0
wmc 81

18 Methods

Rating   Name   Duplication   Size   Complexity  
B PluginModel.update() 0 34 6
A PluginModel.__get_temperature() 0 7 3
A PluginModel.__set_type() 0 16 2
A PluginModel.__transform_sensors() 0 16 4
A PluginModel.get_key() 0 3 1
A PluginModel.__get_fan_speed() 0 7 3
A PluginModel.__get_bat_percent() 0 7 3
A PluginModel.__init__() 0 31 3
A PluginModel.__get_hddtemp() 0 7 3
A PluginModel.__get_alias() 0 9 3
A GlancesGrabSensors.reset() 0 3 1
A GlancesGrabSensors.__update__() 0 15 2
D PluginModel.msg_curse() 0 56 12
D GlancesGrabSensors.build_sensors_list() 0 38 12
A PluginModel.battery_trend() 0 11 5
A GlancesGrabSensors.__init__() 0 29 5
C PluginModel.update_views() 0 34 10
A GlancesGrabSensors.get() 0 12 3

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.sensors often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Sensors plugin."""
11
12
import psutil
13
import warnings
14
import threading
15
16
from glances.logger import logger
17
from glances.globals import iteritems, to_fahrenheit
18
from glances.timer import Counter
19
from glances.plugins.sensors.sensor.glances_batpercent import PluginModel as BatPercentPluginModel
20
from glances.plugins.sensors.sensor.glances_hddtemp import PluginModel as HddTempPluginModel
21
from glances.outputs.glances_unicode import unicode_message
22
from glances.plugins.plugin.model import GlancesPluginModel
23
24
SENSOR_TEMP_TYPE = 'temperature_core'
25
SENSOR_TEMP_UNIT = 'C'
26
27
SENSOR_FAN_TYPE = 'fan_speed'
28
SENSOR_FAN_UNIT = 'R'
29
30
SENSOR_HDDTEMP_TYPE = 'temperature_hdd'
31
SENSOR_HDDTEMP_UNIT = 'C'
32
33
SENSORS_BATTERY_TYPE = 'battery'
34
SENSORS_BATTERY_UNIT = '%'
35
36
# Define the default refresh multiplicator
37
# Default value is 3 * Glances refresh time
38
# Can be overwritten by the refresh option in the sensors section of the glances.conf file
39
DEFAULT_REFRESH = 3
40
41
# Fields description
42
# description: human readable description
43
# short_name: shortname to use un UI
44
# unit: unit type
45
# rate: is it a rate ? If yes, // by time_since_update when displayed,
46
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
47
fields_description = {
48
    'label': {
49
        'description': 'Sensor label',
50
    },
51
    'unit': {
52
        'description': 'Sensor unit',
53
    },
54
    'value': {
55
        'description': 'Sensor value',
56
        'unit': 'number',
57
    },
58
    'warning': {
59
        'description': 'Warning threshold',
60
        'unit': 'number',
61
    },
62
    'critical': {
63
        'description': 'Critical threshold',
64
        'unit': 'number',
65
    },
66
    'type': {
67
        'description': 'Sensor type (one of battery, temperature_core, fan_speed)',
68
    },
69
}
70
71
72
class PluginModel(GlancesPluginModel):
73
    """Glances sensors plugin.
74
75
    The stats list includes both sensors and hard disks stats, if any.
76
    The sensors are already grouped by chip type and then sorted by label.
77
    The hard disks are already sorted by label.
78
    """
79
80
    def __init__(self, args=None, config=None):
81
        """Init the plugin."""
82
        super(PluginModel, self).__init__(
83
            args=args, config=config, stats_init_value=[], fields_description=fields_description
84
        )
85
86
        start_duration = Counter()
87
88
        # Init the sensor class
89
        start_duration.reset()
90
        self.glances_grab_sensors = GlancesGrabSensors()
91
        logger.debug("Generic sensor plugin init duration: {} seconds".format(start_duration.get()))
92
93
        # Instance for the HDDTemp Plugin in order to display the hard disks
94
        # temperatures
95
        start_duration.reset()
96
        self.hddtemp_plugin = HddTempPluginModel(args=args, config=config)
97
        logger.debug("HDDTemp sensor plugin init duration: {} seconds".format(start_duration.get()))
98
99
        # Instance for the BatPercent in order to display the batteries
100
        # capacities
101
        start_duration.reset()
102
        self.batpercent_plugin = BatPercentPluginModel(args=args, config=config)
103
        logger.debug("Battery sensor plugin init duration: {} seconds".format(start_duration.get()))
104
105
        # We want to display the stat in the curse interface
106
        self.display_curse = True
107
108
        # Not necessary to refresh every refresh time
109
        if args and self.get_refresh() == args.time:
110
            self.set_refresh(self.get_refresh() * DEFAULT_REFRESH)
111
112
    def get_key(self):
113
        """Return the key of the list."""
114
        return 'label'
115
116
    def __get_temperature(self, stats, index):
117
        try:
118
            temperature = self.__set_type(self.glances_grab_sensors.get(SENSOR_TEMP_TYPE), SENSOR_TEMP_TYPE)
119
        except Exception as e:
120
            logger.error("Cannot grab sensors temperatures (%s)" % e)
121
        else:
122
            stats[index] = self.__transform_sensors(temperature)
123
124
    def __get_fan_speed(self, stats, index):
125
        try:
126
            fan_speed = self.__set_type(self.glances_grab_sensors.get(SENSOR_FAN_TYPE), SENSOR_FAN_TYPE)
127
        except Exception as e:
128
            logger.error("Cannot grab FAN speed (%s)" % e)
129
        else:
130
            stats[index] = self.__transform_sensors(fan_speed)
131
132
    def __get_hddtemp(self, stats, index):
133
        try:
134
            hddtemp = self.__set_type(self.hddtemp_plugin.update(), SENSOR_HDDTEMP_TYPE)
135
        except Exception as e:
136
            logger.error("Cannot grab HDD temperature (%s)" % e)
137
        else:
138
            stats[index] = self.__transform_sensors(hddtemp)
139
140
    def __get_bat_percent(self, stats, index):
141
        try:
142
            bat_percent = self.__set_type(self.batpercent_plugin.update(), SENSORS_BATTERY_TYPE)
143
        except Exception as e:
144
            logger.error("Cannot grab battery percent (%s)" % e)
145
        else:
146
            stats[index] = self.__transform_sensors(bat_percent)
147
148
    def __transform_sensors(self, threads_stats):
149
        """Hide, alias and sort the result"""
150
        stats_transformed = []
151
        for stat in threads_stats:
152
            # Hide sensors configured in the hide ou show configuration key
153
            if not self.is_display(stat["label"].lower()):
154
                continue
155
            # Set alias for sensors
156
            stat["label"] = self.__get_alias(stat)
157
            # Add the stat to the stats_transformed list
158
            stats_transformed.append(stat)
159
        # Remove duplicates thanks to https://stackoverflow.com/a/9427216/1919431
160
        stats_transformed = [dict(t) for t in {tuple(d.items()) for d in stats_transformed}]
161
        # Sort by label
162
        stats_transformed = sorted(stats_transformed, key=lambda d: d['label'])
163
        return stats_transformed
164
165
    @GlancesPluginModel._check_decorator
166
    @GlancesPluginModel._log_result_decorator
167
    def update(self):
168
        """Update sensors stats using the input method."""
169
        # Init new stats
170
        stats = self.get_init_value()
171
172
        if self.input_method == 'local':
173
            threads_stats = [None] * 4
174
            threads = [
175
                threading.Thread(name=SENSOR_TEMP_TYPE, target=self.__get_temperature, args=(threads_stats, 0)),
176
                threading.Thread(name=SENSOR_FAN_TYPE, target=self.__get_fan_speed, args=(threads_stats, 1)),
177
                threading.Thread(name=SENSOR_HDDTEMP_TYPE, target=self.__get_hddtemp, args=(threads_stats, 2)),
178
                threading.Thread(name=SENSORS_BATTERY_TYPE, target=self.__get_bat_percent, args=(threads_stats, 3)),
179
            ]
180
            # Start threads in //
181
            for t in threads:
182
                t.start()
183
            # Wait threads are finished
184
            for t in threads:
185
                t.join()
186
            # Merge the results
187
            for s in threads_stats:
188
                stats.extend(s)
189
        elif self.input_method == 'snmp':
190
            # Update stats using SNMP
191
            # No standard:
192
            # http://www.net-snmp.org/wiki/index.php/Net-SNMP_and_lm-sensors_on_Ubuntu_10.04
193
            pass
194
195
        # Update the stats
196
        self.stats = stats
197
198
        return self.stats
199
200
    def __get_alias(self, stats):
201
        """Return the alias of the sensor."""
202
        # Get the alias for each stat
203
        if self.has_alias(stats["label"].lower()):
204
            return self.has_alias(stats["label"].lower())
205
        elif self.has_alias("{}_{}".format(stats["label"], stats["type"]).lower()):
206
            return self.has_alias("{}_{}".format(stats["label"], stats["type"]).lower())
207
        else:
208
            return stats["label"]
209
210
    def __set_type(self, stats, sensor_type):
211
        """Set the plugin type.
212
213
        4 types of stats is possible in the sensors plugin:
214
        - Core temperature: SENSOR_TEMP_TYPE
215
        - Fan speed: SENSOR_FAN_TYPE
216
        - HDD temperature: 'temperature_hdd'
217
        - Battery capacity: 'battery'
218
        """
219
        for i in stats:
220
            # Set the sensors type
221
            i.update({'type': sensor_type})
222
            # also add the key name
223
            i.update({'key': self.get_key()})
224
225
        return stats
226
227
    def update_views(self):
228
        """Update stats views."""
229
        # Call the father's method
230
        super(PluginModel, self).update_views()
231
232
        # Add specifics information
233
        # Alert
234
        for i in self.stats:
235
            if not i['value']:
236
                continue
237
            # Alert processing
238
            if i['type'] == SENSOR_TEMP_TYPE:
239
                if self.is_limit('critical', stat_name=SENSOR_TEMP_TYPE + '_' + i['label']):
240
                    # By default use the thresholds configured in the glances.conf file (see #2058)
241
                    alert = self.get_alert(current=i['value'], header=SENSOR_TEMP_TYPE + '_' + i['label'])
242
                else:
243
                    # Else use the system thresholds
244
                    if i['critical'] is None:
245
                        alert = 'DEFAULT'
246
                    elif i['value'] >= i['critical']:
247
                        alert = 'CRITICAL'
248
                    elif i['warning'] is None:
249
                        alert = 'DEFAULT'
250
                    elif i['value'] >= i['warning']:
251
                        alert = 'WARNING'
252
                    else:
253
                        alert = 'OK'
254
            elif i['type'] == SENSORS_BATTERY_TYPE:
255
                # Battery is in %
256
                alert = self.get_alert(current=100 - i['value'], header=i['type'])
257
            else:
258
                alert = self.get_alert(current=i['value'], header=i['type'])
259
            # Set the alert in the view
260
            self.views[i[self.get_key()]]['value']['decoration'] = alert
261
262
    def battery_trend(self, stats):
263
        """Return the trend character for the battery"""
264
        if 'status' not in stats:
265
            return ''
266
        if stats['status'].startswith('Charg'):
267
            return unicode_message('ARROW_UP')
268
        elif stats['status'].startswith('Discharg'):
269
            return unicode_message('ARROW_DOWN')
270
        elif stats['status'].startswith('Full'):
271
            return unicode_message('CHECK')
272
        return ''
273
274
    def msg_curse(self, args=None, max_width=None):
275
        """Return the dict to display in the curse interface."""
276
        # Init the return message
277
        ret = []
278
279
        # Only process if stats exist and display plugin enable...
280
        if not self.stats or self.is_disabled():
281
            return ret
282
283
        # Max size for the interface name
284
        if max_width:
285
            name_max_width = max_width - 12
286
        else:
287
            # No max_width defined, return an emptu curse message
288
            logger.debug("No max_width defined for the {} plugin, it will not be displayed.".format(self.plugin_name))
289
            return ret
290
291
        # Header
292
        msg = '{:{width}}'.format('SENSORS', width=name_max_width)
293
        ret.append(self.curse_add_line(msg, "TITLE"))
294
295
        # Stats
296
        for i in self.stats:
297
            # Do not display anything if no battery are detected
298
            if i['type'] == SENSORS_BATTERY_TYPE and i['value'] == []:
299
                continue
300
            # New line
301
            ret.append(self.curse_new_line())
302
            msg = '{:{width}}'.format(i["label"][:name_max_width], width=name_max_width)
303
            ret.append(self.curse_add_line(msg))
304
            if i['value'] in (b'ERR', b'SLP', b'UNK', b'NOS'):
305
                msg = '{:>14}'.format(i['value'])
306
                ret.append(
307
                    self.curse_add_line(msg, self.get_views(item=i[self.get_key()], key='value', option='decoration'))
308
                )
309
            else:
310
                if args.fahrenheit and i['type'] != SENSORS_BATTERY_TYPE and i['type'] != SENSOR_FAN_TYPE:
311
                    trend = ''
312
                    value = to_fahrenheit(i['value'])
313
                    unit = 'F'
314
                else:
315
                    trend = self.battery_trend(i)
316
                    value = i['value']
317
                    unit = i['unit']
318
                try:
319
                    msg = '{:.0f}{}{}'.format(value, unit, trend)
320
                    msg = '{:>14}'.format(msg)
321
                    ret.append(
322
                        self.curse_add_line(
323
                            msg, self.get_views(item=i[self.get_key()], key='value', option='decoration')
324
                        )
325
                    )
326
                except (TypeError, ValueError):
327
                    pass
328
329
        return ret
330
331
332
class GlancesGrabSensors(object):
333
    """Get sensors stats."""
334
335
    def __init__(self):
336
        """Init sensors stats."""
337
        # Temperatures
338
        self.init_temp = False
339
        self.sensor_temps = {}
340
        try:
341
            # psutil>=5.1.0, Linux-only
342
            self.sensor_temps = psutil.sensors_temperatures()
343
        except AttributeError:
344
            logger.debug("Cannot grab temperatures. Platform not supported.")
345
        else:
346
            self.init_temp = True
347
            # Solve an issue #1203 concerning a RunTimeError warning message displayed
348
            # in the curses interface.
349
            warnings.filterwarnings("ignore")
350
351
        # Fans
352
        self.init_fan = False
353
        self.sensor_fans = {}
354
        try:
355
            # psutil>=5.2.0, Linux-only
356
            self.sensor_fans = psutil.sensors_fans()
357
        except AttributeError:
358
            logger.debug("Cannot grab fans speed. Platform not supported.")
359
        else:
360
            self.init_fan = True
361
362
        # Init the stats
363
        self.reset()
364
365
    def reset(self):
366
        """Reset/init the stats."""
367
        self.sensors_list = []
368
369
    def __update__(self):
370
        """Update the stats."""
371
        # Reset the list
372
        self.reset()
373
374
        if not self.init_temp:
375
            return self.sensors_list
376
377
        # Temperatures sensors
378
        self.sensors_list.extend(self.build_sensors_list(SENSOR_TEMP_UNIT))
379
380
        # Fans sensors
381
        self.sensors_list.extend(self.build_sensors_list(SENSOR_FAN_UNIT))
382
383
        return self.sensors_list
384
385
    def build_sensors_list(self, type):
386
        """Build the sensors list depending of the type.
387
388
        type: SENSOR_TEMP_UNIT or SENSOR_FAN_UNIT
389
390
        output: a list
391
        """
392
        ret = []
393
        if type == SENSOR_TEMP_UNIT and self.init_temp:
394
            input_list = self.sensor_temps
395
            self.sensor_temps = psutil.sensors_temperatures()
396
        elif type == SENSOR_FAN_UNIT and self.init_fan:
397
            input_list = self.sensor_fans
398
            self.sensor_fans = psutil.sensors_fans()
399
        else:
400
            return ret
401
        for chip_name, chip in iteritems(input_list):
402
            label_index = 1
403
            for chip_name_index, feature in enumerate(chip):
404
                sensors_current = {}
405
                # Sensor name
406
                if feature.label == '':
407
                    sensors_current['label'] = chip_name + ' ' + str(chip_name_index)
408
                elif feature.label in [i['label'] for i in ret]:
409
                    sensors_current['label'] = feature.label + ' ' + str(label_index)
410
                    label_index += 1
411
                else:
412
                    sensors_current['label'] = feature.label
413
                # Sensors value, limit and unit
414
                sensors_current['unit'] = type
415
                sensors_current['value'] = int(getattr(feature, 'current', 0) if getattr(feature, 'current', 0) else 0)
416
                system_warning = getattr(feature, 'high', None)
417
                system_critical = getattr(feature, 'critical', None)
418
                sensors_current['warning'] = int(system_warning) if system_warning is not None else None
419
                sensors_current['critical'] = int(system_critical) if system_critical is not None else None
420
                # Add sensor to the list
421
                ret.append(sensors_current)
422
        return ret
423
424
    def get(self, sensor_type=SENSOR_TEMP_TYPE):
425
        """Get sensors list."""
426
        self.__update__()
427
        if sensor_type == SENSOR_TEMP_TYPE:
428
            ret = [s for s in self.sensors_list if s['unit'] == SENSOR_TEMP_UNIT]
429
        elif sensor_type == SENSOR_FAN_TYPE:
430
            ret = [s for s in self.sensors_list if s['unit'] == SENSOR_FAN_UNIT]
431
        else:
432
            # Unknown type
433
            logger.debug("Unknown sensor type %s" % sensor_type)
434
            ret = []
435
        return ret
436