Test Failed
Push — master ( ce0fc3...e09530 )
by Nicolas
03:36
created

glances.plugins.network   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 337
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 176
dl 0
loc 337
rs 8.8798
c 0
b 0
f 0
wmc 44

6 Methods

Rating   Name   Duplication   Size   Complexity  
A PluginModel.__init__() 0 32 3
B PluginModel.update_local() 0 72 8
A PluginModel.get_key() 0 3 1
A PluginModel.update() 0 15 2
C PluginModel.update_views() 0 33 10
F PluginModel.msg_curse() 0 105 20

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.network often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Network plugin."""
10
11
import psutil
12
13
from glances.logger import logger
14
from glances.plugins.plugin.model import GlancesPluginModel
15
16
# Fields description
17
# description: human readable description
18
# short_name: shortname to use un UI
19
# unit: unit type
20
# rate: if True then compute and add *_gauge and *_rate_per_is fields
21
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
22
fields_description = {
23
    'interface_name': {'description': 'Interface name.'},
24
    'alias': {'description': 'Interface alias name (optional).'},
25
    'bytes_recv': {
26
        'description': 'Number of bytes received.',
27
        'rate': True,
28
        'unit': 'byte',
29
    },
30
    'bytes_sent': {
31
        'description': 'Number of bytes sent.',
32
        'rate': True,
33
        'unit': 'byte',
34
    },
35
    'bytes_all': {
36
        'description': 'Number of bytes received and sent.',
37
        'rate': True,
38
        'unit': 'byte',
39
    },
40
    'speed': {
41
        'description': 'Maximum interface speed (in bit per second). Can return 0 on some operating-system.',
42
        'unit': 'bitpersecond',
43
    },
44
    'is_up': {'description': 'Is the interface up ?', 'unit': 'bool'},
45
}
46
47
# SNMP OID
48
# http://www.net-snmp.org/docs/mibs/interfaces.html
49
# Dict key = interface_name
50
snmp_oid = {
51
    'default': {
52
        'interface_name': '1.3.6.1.2.1.2.2.1.2',
53
        'bytes_recv': '1.3.6.1.2.1.2.2.1.10',
54
        'bytes_sent': '1.3.6.1.2.1.2.2.1.16',
55
    }
56
}
57
58
# Define the history items list
59
items_history_list = [
60
    {'name': 'bytes_recv_rate_per_sec', 'description': 'Download rate per second', 'y_unit': 'B/s'},
61
    {'name': 'bytes_sent_rate_per_sec', 'description': 'Upload rate per second', 'y_unit': 'B/s'},
62
]
63
64
65
class PluginModel(GlancesPluginModel):
66
    """Glances network plugin.
67
68
    stats is a list
69
    """
70
71
    def __init__(self, args=None, config=None):
72
        """Init the plugin."""
73
        super().__init__(
74
            args=args,
75
            config=config,
76
            items_history_list=items_history_list,
77
            fields_description=fields_description,
78
            stats_init_value=[],
79
        )
80
81
        # We want to display the stat in the curse interface
82
        self.display_curse = True
83
84
        # Hide stats if it has never been != 0
85
        if config is not None:
86
            self.hide_zero = config.get_bool_value(self.plugin_name, 'hide_zero', default=False)
87
        else:
88
            self.hide_zero = False
89
        self.hide_zero_fields = ['bytes_recv_rate_per_sec', 'bytes_sent_rate_per_sec']
90
91
        #  Add support for automatically hiding network interfaces that are down
92
        # or that don't have any IP addresses #2799
93
        if config is not None:
94
            self.hide_no_up = config.get_bool_value(self.plugin_name, 'hide_no_up', default=False)
95
            self.hide_no_ip = config.get_bool_value(self.plugin_name, 'hide_no_ip', default=False)
96
        else:
97
            self.hide_no_up = False
98
            self.hide_no_ip = False
99
100
        # Force a first update because we need two updates to have the first stat
101
        self.update()
102
        self.refresh_timer.set(0)
103
104
    def get_key(self):
105
        """Return the key of the list."""
106
        return 'interface_name'
107
108
    # @GlancesPluginModel._check_decorator
109
    @GlancesPluginModel._log_result_decorator
110
    def update(self):
111
        """Update network stats using the input method.
112
113
        :return: list of stats dict (one dict per interface)
114
        """
115
        if self.input_method == 'local':
116
            stats = self.update_local()
117
        else:
118
            stats = self.get_init_value()
119
120
        # Update the stats
121
        self.stats = stats
122
123
        return self.stats
124
125
    @GlancesPluginModel._manage_rate
126
    def update_local(self):
127
        # Update stats using the standard system lib
128
129
        stats = self.get_init_value()
130
131
        # Grab network interface stat using the psutil net_io_counter method
132
        # Example:
133
        # { 'veth4cbf8f0a': snetio(
134
        #   bytes_sent=102038421, bytes_recv=1263258,
135
        #   packets_sent=25046, packets_recv=14114,
136
        #   errin=0, errout=0, dropin=0, dropout=0), ... }
137
        try:
138
            net_io_counters = psutil.net_io_counters(pernic=True)
139
        except UnicodeDecodeError as e:
140
            logger.debug(f'Can not get network interface counters ({e})')
141
            return self.stats
142
143
        # Grab interface's status (issue #765)
144
        # Grab interface's speed (issue #718)
145
        # Example:
146
        # { 'veth4cbf8f0a': snicstats(
147
        #   isup=True, duplex=<NicDuplex.NIC_DUPLEX_FULL: 2>,
148
        #   speed=10000, mtu=1500, flags='up,broadcast,running,multicast'), ... }
149
        net_status = {}
150
        try:
151
            net_status = psutil.net_if_stats()
152
            net_addrs = psutil.net_if_addrs()
153
        except OSError as e:
154
            # see psutil #797/glances #1106
155
            logger.debug(f'Can not get network interface status ({e})')
156
157
        # Filter interfaces (related to #2799)
158
        if self.hide_no_up:
159
            net_status = {k: v for k, v in net_status.items() if v.isup}
160
        if self.hide_no_ip:
161
            net_status = {
162
                k: v
163
                for k, v in net_status.items()
164
                if k in net_addrs and any(a.family != psutil.AF_LINK for a in net_addrs[k])
0 ignored issues
show
introduced by
The variable net_addrs does not seem to be defined for all execution paths.
Loading history...
165
            }
166
167
        for interface_name, interface_stat in net_io_counters.items():
168
            # Do not take hidden interface into account
169
            # or KeyError: 'eth0' when interface is not connected #1348
170
            if not self.is_display(interface_name) or interface_name not in net_status:
171
                continue
172
173
            # Filter stats to keep only the fields we want (define in fields_description)
174
            # It will also convert psutil objects to a standard Python dict
175
            stat = self.filter_stats(interface_stat)
176
            stat.update(self.filter_stats(net_status[interface_name]))
177
178
            # Add the key
179
            stat['key'] = self.get_key()
180
181
            # Add disk name
182
            stat['interface_name'] = interface_name
183
184
            # Add alias define in the configuration file
185
            stat['alias'] = self.has_alias(interface_name)
186
187
            # Add sent + revc  stats
188
            stat['bytes_all'] = stat['bytes_sent'] + stat['bytes_recv']
189
190
            # Interface speed in Mbps, convert it to bps
191
            # Can be always 0 on some OSes
192
            stat['speed'] = stat['speed'] * 1048576
193
194
            stats.append(stat)
195
196
        return stats
197
198
    def update_views(self):
199
        """Update stats views."""
200
        # Call the father's method
201
        super().update_views()
202
203
        # Add specifics information
204
        # Alert
205
        for i in self.get_raw():
206
            # Skip alert if no timespan to measure
207
            if 'bytes_recv_rate_per_sec' not in i or 'bytes_sent_rate_per_sec' not in i:
208
                continue
209
210
            # Convert rate to bps (to be able to compare to interface speed)
211
            bps_rx = int(i['bytes_recv_rate_per_sec'] * 8)
212
            bps_tx = int(i['bytes_sent_rate_per_sec'] * 8)
213
214
            # Decorate the bitrate with the configuration file thresholds
215
            if_real_name = i['interface_name'].split(':')[0]
216
            alert_rx = self.get_alert(bps_rx, header=if_real_name + '_rx')
217
            alert_tx = self.get_alert(bps_tx, header=if_real_name + '_tx')
218
219
            # If nothing is define in the configuration file...
220
            # ... then use the interface speed (not available on all systems)
221
            if alert_rx == 'DEFAULT' and 'speed' in i and i['speed'] != 0:
222
                alert_rx = self.get_alert(current=bps_rx, maximum=i['speed'], header='rx')
223
            if alert_tx == 'DEFAULT' and 'speed' in i and i['speed'] != 0:
224
                alert_tx = self.get_alert(current=bps_tx, maximum=i['speed'], header='tx')
225
226
            # then decorates
227
            self.views[i[self.get_key()]]['bytes_recv']['decoration'] = alert_rx
228
            self.views[i[self.get_key()]]['bytes_recv_rate_per_sec']['decoration'] = alert_rx
229
            self.views[i[self.get_key()]]['bytes_sent']['decoration'] = alert_tx
230
            self.views[i[self.get_key()]]['bytes_sent_rate_per_sec']['decoration'] = alert_rx
231
232
    def msg_curse(self, args=None, max_width=None):
233
        """Return the dict to display in the curse interface."""
234
        # Init the return message
235
        ret = []
236
237
        # Only process if stats exist and display plugin enable...
238
        if not self.stats or self.is_disabled():
239
            return ret
240
241
        # Max size for the interface name
242
        if max_width:
243
            name_max_width = max_width - 12
244
        else:
245
            # No max_width defined, return an empty curse message
246
            logger.debug(f"No max_width defined for the {self.plugin_name} plugin, it will not be displayed.")
247
            return ret
248
249
        # Header
250
        msg = '{:{width}}'.format('NETWORK', width=name_max_width)
251
        ret.append(self.curse_add_line(msg, "TITLE"))
252
        if args.network_cumul:
253
            # Cumulative stats
254
            if args.network_sum:
255
                # Sum stats
256
                msg = '{:>14}'.format('Rx+Tx')
257
                ret.append(self.curse_add_line(msg))
258
            else:
259
                # Rx/Tx stats
260
                msg = '{:>7}'.format('Rx')
261
                ret.append(self.curse_add_line(msg))
262
                msg = '{:>7}'.format('Tx')
263
                ret.append(self.curse_add_line(msg))
264
        else:
265
            # Bitrate stats
266
            if args.network_sum:
267
                # Sum stats
268
                msg = '{:>14}'.format('Rx+Tx/s')
269
                ret.append(self.curse_add_line(msg))
270
            else:
271
                msg = '{:>7}'.format('Rx/s')
272
                ret.append(self.curse_add_line(msg))
273
                msg = '{:>7}'.format('Tx/s')
274
                ret.append(self.curse_add_line(msg))
275
        # Interface list (sorted by name)
276
        for i in self.sorted_stats():
277
            # Do not display interface in down state (issue #765)
278
            if ('is_up' in i) and (i['is_up'] is False):
279
                continue
280
            # Hide stats if never be different from 0 (issue #1787)
281
            if all(self.get_views(item=i[self.get_key()], key=f, option='hidden') for f in self.hide_zero_fields):
282
                continue
283
            # Format stats
284
            # Is there an alias for the interface name ?
285
            if i['alias'] is None:
286
                if_name = i['interface_name'].split(':')[0]
287
            else:
288
                if_name = i['alias']
289
            if len(if_name) > name_max_width:
290
                # Cut interface name if it is too long
291
                if_name = '_' + if_name[-name_max_width + 1 :]
292
293
            if args.byte:
294
                # Bytes per second (for dummy)
295
                to_bit = 1
296
                unit = ''
297
            else:
298
                # Bits per second (for real network administrator | Default)
299
                to_bit = 8
300
                unit = 'b'
301
302
            if args.network_cumul and 'bytes_recv' in i and 'bytes_sent' in i:
303
                rx = self.auto_unit(int(i['bytes_recv'] * to_bit)) + unit
304
                tx = self.auto_unit(int(i['bytes_sent'] * to_bit)) + unit
305
                ax = self.auto_unit(int(i['bytes_all'] * to_bit)) + unit
306
            elif 'bytes_recv_rate_per_sec' in i and 'bytes_sent_rate_per_sec' in i:
307
                rx = self.auto_unit(int(i['bytes_recv_rate_per_sec'] * to_bit)) + unit
308
                tx = self.auto_unit(int(i['bytes_sent_rate_per_sec'] * to_bit)) + unit
309
                ax = self.auto_unit(int(i['bytes_all_rate_per_sec'] * to_bit)) + unit
310
            else:
311
                # Avoid issue when a new interface is created on the fly
312
                # Example: start Glances, then start a new container
313
                continue
314
315
            # New line
316
            ret.append(self.curse_new_line())
317
            msg = '{:{width}}'.format(if_name, width=name_max_width)
318
            ret.append(self.curse_add_line(msg))
319
            if args.network_sum:
320
                msg = f'{ax:>14}'
321
                ret.append(self.curse_add_line(msg))
322
            else:
323
                msg = f'{rx:>7}'
324
                ret.append(
325
                    self.curse_add_line(
326
                        msg, self.get_views(item=i[self.get_key()], key='bytes_recv', option='decoration')
327
                    )
328
                )
329
                msg = f'{tx:>7}'
330
                ret.append(
331
                    self.curse_add_line(
332
                        msg, self.get_views(item=i[self.get_key()], key='bytes_sent', option='decoration')
333
                    )
334
                )
335
336
        return ret
337