glances.plugins.glances_diskio   A
last analyzed

Complexity

Total Complexity 26

Size/Duplication

Total Lines 231
Duplicated Lines 9.09 %

Importance

Changes 0
Metric Value
eloc 122
dl 21
loc 231
rs 10
c 0
b 0
f 0
wmc 26

5 Methods

Rating   Name   Duplication   Size   Complexity  
D Plugin.update() 0 81 12
C Plugin.msg_curse() 0 73 9
A Plugin.get_key() 1 3 1
A Plugin.update_views() 0 17 2
A Plugin.__init__() 19 19 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""Disk I/O plugin."""
11
from __future__ import unicode_literals
12
13
from glances.compat import nativestr, n
14
from glances.timer import getTimeSinceLastUpdate
15
from glances.plugins.glances_plugin import GlancesPlugin
16
17
import psutil
18
19
20
# Define the history items list
21
items_history_list = [
22
    {'name': 'read_bytes', 'description': 'Bytes read per second', 'y_unit': 'B/s'},
23
    {'name': 'write_bytes', 'description': 'Bytes write per second', 'y_unit': 'B/s'},
24
]
25
26
27
class Plugin(GlancesPlugin):
28
    """Glances disks I/O plugin.
29
30
    stats is a list
31
    """
32
33 View Code Duplication
    def __init__(self, args=None, config=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
34
        """Init the plugin."""
35
        super(Plugin, self).__init__(
36
            args=args, config=config, items_history_list=items_history_list, stats_init_value=[]
37
        )
38
39
        # We want to display the stat in the curse interface
40
        self.display_curse = True
41
42
        # Hide stats if it has never been != 0
43
        if config is not None:
44
            self.hide_zero = config.get_bool_value(self.plugin_name, 'hide_zero', default=False)
45
        else:
46
            self.hide_zero = False
47
        self.hide_zero_fields = ['read_bytes', 'write_bytes']
48
49
        # Force a first update because we need two update to have the first stat
50
        self.update()
51
        self.refresh_timer.set(0)
52
53
    def get_key(self):
54
        """Return the key of the list."""
55
        return 'disk_name'
56
57
    @GlancesPlugin._check_decorator
58
    @GlancesPlugin._log_result_decorator
59
    def update(self):
60
        """Update disk I/O stats using the input method."""
61
        # Init new stats
62
        stats = self.get_init_value()
63
64
        if self.input_method == 'local':
65
            # Update stats using the standard system lib
66
            # Grab the stat using the psutil disk_io_counters method
67
            # read_count: number of reads
68
            # write_count: number of writes
69
            # read_bytes: number of bytes read
70
            # write_bytes: number of bytes written
71
            # read_time: time spent reading from disk (in milliseconds)
72
            # write_time: time spent writing to disk (in milliseconds)
73
            try:
74
                diskio = psutil.disk_io_counters(perdisk=True)
75
            except Exception:
76
                return stats
77
78
            # Previous disk IO stats are stored in the diskio_old variable
79
            # By storing time data we enable Rx/s and Tx/s calculations in the
80
            # XML/RPC API, which would otherwise be overly difficult work
81
            # for users of the API
82
            time_since_update = getTimeSinceLastUpdate('disk')
83
84
            diskio = diskio
85
            for disk in diskio:
86
                # By default, RamFS is not displayed (issue #714)
87
                if self.args is not None and not self.args.diskio_show_ramfs and disk.startswith('ram'):
88
                    continue
89
90
                # Shall we display the stats ?
91
                if not self.is_display(disk):
92
                    continue
93
94
                # Compute count and bit rate
95
                try:
96
                    diskstat = {
97
                        'time_since_update': time_since_update,
98
                        'disk_name': n(disk),
99
                        'read_count': diskio[disk].read_count - self.diskio_old[disk].read_count,
100
                        'write_count': diskio[disk].write_count - self.diskio_old[disk].write_count,
101
                        'read_bytes': diskio[disk].read_bytes - self.diskio_old[disk].read_bytes,
102
                        'write_bytes': diskio[disk].write_bytes - self.diskio_old[disk].write_bytes,
103
                    }
104
                except (KeyError, AttributeError):
105
                    diskstat = {
106
                        'time_since_update': time_since_update,
107
                        'disk_name': n(disk),
108
                        'read_count': 0,
109
                        'write_count': 0,
110
                        'read_bytes': 0,
111
                        'write_bytes': 0,
112
                    }
113
114
                # Add alias if exist (define in the configuration file)
115
                if self.has_alias(disk) is not None:
116
                    diskstat['alias'] = self.has_alias(disk)
117
118
                # Add the dict key
119
                diskstat['key'] = self.get_key()
120
121
                # Add the current disk stat to the list
122
                stats.append(diskstat)
123
124
            # Save stats to compute next bitrate
125
            try:
126
                self.diskio_old = diskio
127
            except (IOError, UnboundLocalError):
128
                pass
129
        elif self.input_method == 'snmp':
130
            # Update stats using SNMP
131
            # No standard way for the moment...
132
            pass
133
134
        # Update the stats
135
        self.stats = stats
136
137
        return self.stats
138
139
    def update_views(self):
140
        """Update stats views."""
141
        # Call the father's method
142
        super(Plugin, self).update_views()
143
144
        # Check if the stats should be hidden
145
        self.update_views_hidden()
146
147
        # Add specifics information
148
        # Alert
149
        for i in self.get_raw():
150
            disk_real_name = i['disk_name']
151
            self.views[i[self.get_key()]]['read_bytes']['decoration'] = self.get_alert(
152
                int(i['read_bytes'] // i['time_since_update']), header=disk_real_name + '_rx'
153
            )
154
            self.views[i[self.get_key()]]['write_bytes']['decoration'] = self.get_alert(
155
                int(i['write_bytes'] // i['time_since_update']), header=disk_real_name + '_tx'
156
            )
157
158
    def msg_curse(self, args=None, max_width=None):
159
        """Return the dict to display in the curse interface."""
160
        # Init the return message
161
        ret = []
162
163
        # Only process if stats exist and display plugin enable...
164
        if not self.stats or self.is_disabled():
165
            return ret
166
167
        # Max size for the interface name
168
        name_max_width = max_width - 13
169
170
        # Header
171
        msg = '{:{width}}'.format('DISK I/O', width=name_max_width)
172
        ret.append(self.curse_add_line(msg, "TITLE"))
173
        if args.diskio_iops:
174
            msg = '{:>8}'.format('IOR/s')
175
            ret.append(self.curse_add_line(msg))
176
            msg = '{:>7}'.format('IOW/s')
177
            ret.append(self.curse_add_line(msg))
178
        else:
179
            msg = '{:>8}'.format('R/s')
180
            ret.append(self.curse_add_line(msg))
181
            msg = '{:>7}'.format('W/s')
182
            ret.append(self.curse_add_line(msg))
183
        # Disk list (sorted by name)
184
        for i in self.sorted_stats():
185
            # Hide stats if never be different from 0 (issue #1787)
186
            if all([self.get_views(item=i[self.get_key()], key=f, option='hidden') for f in self.hide_zero_fields]):
187
                continue
188
            # Is there an alias for the disk name ?
189
            disk_name = self.has_alias(i['disk_name']) if self.has_alias(i['disk_name']) else i['disk_name']
190
            # New line
191
            ret.append(self.curse_new_line())
192
            if len(disk_name) > name_max_width:
193
                # Cut disk name if it is too long
194
                disk_name = '_' + disk_name[-name_max_width + 1 :]
195
            msg = '{:{width}}'.format(nativestr(disk_name), width=name_max_width + 1)
196
            ret.append(self.curse_add_line(msg))
197
            if args.diskio_iops:
198
                # count
199
                txps = self.auto_unit(int(i['read_count'] // i['time_since_update']))
200
                rxps = self.auto_unit(int(i['write_count'] // i['time_since_update']))
201
                msg = '{:>7}'.format(txps)
202
                ret.append(
203
                    self.curse_add_line(
204
                        msg, self.get_views(item=i[self.get_key()], key='read_count', option='decoration')
205
                    )
206
                )
207
                msg = '{:>7}'.format(rxps)
208
                ret.append(
209
                    self.curse_add_line(
210
                        msg, self.get_views(item=i[self.get_key()], key='write_count', option='decoration')
211
                    )
212
                )
213
            else:
214
                # Bitrate
215
                txps = self.auto_unit(int(i['read_bytes'] // i['time_since_update']))
216
                rxps = self.auto_unit(int(i['write_bytes'] // i['time_since_update']))
217
                msg = '{:>7}'.format(txps)
218
                ret.append(
219
                    self.curse_add_line(
220
                        msg, self.get_views(item=i[self.get_key()], key='read_bytes', option='decoration')
221
                    )
222
                )
223
                msg = '{:>7}'.format(rxps)
224
                ret.append(
225
                    self.curse_add_line(
226
                        msg, self.get_views(item=i[self.get_key()], key='write_bytes', option='decoration')
227
                    )
228
                )
229
230
        return ret
231