Test Failed
Push — master ( 7cfc0c...4c3161 )
by Nicolas
04:02
created

glances.plugins.connections.PluginModel.update()   F

Complexity

Conditions 14

Size

Total Lines 63
Code Lines 46

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 14
eloc 46
nop 1
dl 0
loc 63
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.connections.PluginModel.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Connections plugin."""
10
11
import psutil
12
13
from glances.globals import nativestr
14
from glances.logger import logger
15
from glances.plugins.plugin.model import GlancesPluginModel
16
17
# Fields description
18
# description: human readable description
19
# short_name: shortname to use un UI
20
# unit: unit type
21
# rate: is it a rate ? If yes, // by time_since_update when displayed,
22
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
23
fields_description = {
24
    'LISTEN': {
25
        'description': 'Number of TCP connections in LISTEN state',
26
        'unit': 'number',
27
    },
28
    'ESTABLISHED': {
29
        'description': 'Number of TCP connections in ESTABLISHED state',
30
        'unit': 'number',
31
    },
32
    'SYN_SENT': {
33
        'description': 'Number of TCP connections in SYN_SENT state',
34
        'unit': 'number',
35
    },
36
    'SYN_RECV': {
37
        'description': 'Number of TCP connections in SYN_RECV state',
38
        'unit': 'number',
39
    },
40
    'initiated': {
41
        'description': 'Number of TCP connections initiated',
42
        'unit': 'number',
43
    },
44
    'terminated': {
45
        'description': 'Number of TCP connections terminated',
46
        'unit': 'number',
47
    },
48
    'nf_conntrack_count': {
49
        'description': 'Number of tracked connections',
50
        'unit': 'number',
51
    },
52
    'nf_conntrack_max': {
53
        'description': 'Maximum number of tracked connections',
54
        'unit': 'number',
55
    },
56
    'nf_conntrack_percent': {
57
        'description': 'Percentage of tracked connections',
58
        'unit': 'percent',
59
    },
60
}
61
62
# Define the history items list
63
# items_history_list = [{'name': 'rx',
64
#                        'description': 'Download rate per second',
65
#                        'y_unit': 'bit/s'},
66
#                       {'name': 'tx',
67
#                        'description': 'Upload rate per second',
68
#                        'y_unit': 'bit/s'}]
69
70
71
class ConnectionsPlugin(GlancesPluginModel):
72
    """Glances connections plugin.
73
74
    stats is a dict
75
    """
76
77
    status_list = [psutil.CONN_LISTEN, psutil.CONN_ESTABLISHED]
78
    initiated_states = [psutil.CONN_SYN_SENT, psutil.CONN_SYN_RECV]
79
    terminated_states = [
80
        psutil.CONN_FIN_WAIT1,
81
        psutil.CONN_FIN_WAIT2,
82
        psutil.CONN_TIME_WAIT,
83
        psutil.CONN_CLOSE,
84
        psutil.CONN_CLOSE_WAIT,
85
        psutil.CONN_LAST_ACK,
86
    ]
87
    conntrack = {
88
        'nf_conntrack_count': '/proc/sys/net/netfilter/nf_conntrack_count',
89
        'nf_conntrack_max': '/proc/sys/net/netfilter/nf_conntrack_max',
90
    }
91
92
    def __init__(self, args=None, config=None):
93
        """Init the plugin."""
94
        super().__init__(
95
            args=args,
96
            config=config,
97
            # items_history_list=items_history_list,
98
            stats_init_value={'net_connections_enabled': True, 'nf_conntrack_enabled': True},
99
            fields_description=fields_description,
100
        )
101
102
        # We want to display the stat in the curse interface
103
        self.display_curse = True
104
105
    def update_for_net_connections_method(self, stats):
106
        try:
107
            net_connections = psutil.net_connections(kind="tcp")
108
        except Exception as e:
109
            logger.warning(f'Can not get network connections stats ({e})')
110
            logger.info('Disable connections stats')
111
            stats['net_connections_enabled'] = False
112
113
            return stats
114
115
        for s in self.status_list:
116
            stats[s] = len([c for c in net_connections if c.status == s])
117
        initiated = 0
118
        for s in self.initiated_states:
119
            stats[s] = len([c for c in net_connections if c.status == s])
120
            initiated += stats[s]
121
        stats['initiated'] = initiated
122
        terminated = 0
123
        for s in self.initiated_states:
124
            stats[s] = len([c for c in net_connections if c.status == s])
125
            terminated += stats[s]
126
        stats['terminated'] = terminated
127
128
        return stats
129
130
    def update_for_nf_conntrack_method(self, stats):
131
        # Grab connections track directly from the /proc file
132
        for i in self.conntrack:
133
            try:
134
                with open(self.conntrack[i]) as f:
135
                    stats[i] = float(f.readline().rstrip("\n"))
136
            except (OSError, FileNotFoundError) as e:
137
                logger.warning(f'Can not get network connections track ({e})')
138
                logger.info('Disable connections track')
139
                stats['nf_conntrack_enabled'] = False
140
141
                return stats
142
        if 'nf_conntrack_max' in stats and 'nf_conntrack_count' in stats:
143
            stats['nf_conntrack_percent'] = stats['nf_conntrack_count'] * 100 / stats['nf_conntrack_max']
144
        else:
145
            stats['nf_conntrack_enabled'] = False
146
147
        return stats
148
149
    @GlancesPluginModel._check_decorator
150
    @GlancesPluginModel._log_result_decorator
151
    def update(self):
152
        """Update connections stats using the input method.
153
154
        Stats is a dict
155
        """
156
        # Init new stats
157
        stats = self.get_init_value()
158
159
        if self.input_method == 'local':
160
            # Update stats using the PSUtils lib
161
162
            # Grab network interface stat using the psutil net_connections method
163
            if stats['net_connections_enabled']:
164
                stats = self.update_for_net_connections_method(stats)
165
166
            if stats['nf_conntrack_enabled']:
167
                stats = self.update_for_nf_conntrack_method(stats)
168
        elif self.input_method == 'snmp':
169
            # Update stats using SNMP
170
            pass
171
172
        # Update the stats
173
        self.stats = stats
174
        return self.stats
175
176
    def update_views(self):
177
        """Update stats views."""
178
        # Call the father's method
179
        super().update_views()
180
181
        # Add specific information
182
        try:
183
            # Alert and log
184
            if self.stats['nf_conntrack_enabled']:
185
                self.views['nf_conntrack_percent']['decoration'] = self.get_alert(header='nf_conntrack_percent')
186
        except KeyError:
187
            # try/except mandatory for Windows compatibility (no conntrack stats)
188
            pass
189
190
    def msg_curse(self, args=None, max_width=None):
191
        """Return the dict to display in the curse interface."""
192
        # Init the return message
193
        ret = []
194
195
        # Only process if stats exist and display plugin enable...
196
        if not self.stats or self.is_disabled() or not max_width:
197
            return ret
198
199
        # Header
200
        if self.stats['net_connections_enabled'] or self.stats['nf_conntrack_enabled']:
201
            msg = '{}'.format('TCP CONNECTIONS')
202
            ret.append(self.curse_add_line(msg, "TITLE"))
203
        # Connections status
204
        if self.stats['net_connections_enabled']:
205
            for s in [psutil.CONN_LISTEN, 'initiated', psutil.CONN_ESTABLISHED, 'terminated']:
206
                if s not in self.stats:
207
                    continue
208
                ret.append(self.curse_new_line())
209
                msg = '{:{width}}'.format(nativestr(s).capitalize(), width=len(s))
210
                ret.append(self.curse_add_line(msg))
211
                msg = '{:>{width}}'.format(self.stats[s], width=max_width - len(s) + 2)
212
                ret.append(self.curse_add_line(msg))
213
        # Connections track
214
        if (
215
            self.stats['nf_conntrack_enabled']
216
            and 'nf_conntrack_count' in self.stats
217
            and 'nf_conntrack_max' in self.stats
218
        ):
219
            s = 'Tracked'
220
            ret.append(self.curse_new_line())
221
            msg = '{:{width}}'.format(nativestr(s).capitalize(), width=len(s))
222
            ret.append(self.curse_add_line(msg))
223
            msg = '{:>{width}}'.format(
224
                '{:0.0f}/{:0.0f}'.format(self.stats['nf_conntrack_count'], self.stats['nf_conntrack_max']),
225
                width=max_width - len(s) + 2,
226
            )
227
            ret.append(self.curse_add_line(msg, self.get_views(key='nf_conntrack_percent', option='decoration')))
228
229
        return ret
230