Test Failed
Push — master ( cc9054...a8608f )
by Nicolas
03:40
created

PluginModel.msg_curse()   D

Complexity

Conditions 12

Size

Total Lines 41
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 12
eloc 30
nop 3
dl 0
loc 41
rs 4.8
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like glances.plugins.connections.PluginModel.msg_curse() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Connections plugin."""
10
11
import psutil
12
13
from glances.globals import nativestr
14
from glances.logger import logger
15
from glances.plugins.plugin.model import GlancesPluginModel
16
17
# Fields description
18
# description: human readable description
19
# short_name: shortname to use un UI
20
# unit: unit type
21
# rate: is it a rate ? If yes, // by time_since_update when displayed,
22
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
23
fields_description = {
24
    'LISTEN': {
25
        'description': 'Number of TCP connections in LISTEN state',
26
        'unit': 'number',
27
    },
28
    'ESTABLISHED': {
29
        'description': 'Number of TCP connections in ESTABLISHED state',
30
        'unit': 'number',
31
    },
32
    'SYN_SENT': {
33
        'description': 'Number of TCP connections in SYN_SENT state',
34
        'unit': 'number',
35
    },
36
    'SYN_RECV': {
37
        'description': 'Number of TCP connections in SYN_RECV state',
38
        'unit': 'number',
39
    },
40
    'initiated': {
41
        'description': 'Number of TCP connections initiated',
42
        'unit': 'number',
43
    },
44
    'terminated': {
45
        'description': 'Number of TCP connections terminated',
46
        'unit': 'number',
47
    },
48
    'nf_conntrack_count': {
49
        'description': 'Number of tracked connections',
50
        'unit': 'number',
51
    },
52
    'nf_conntrack_max': {
53
        'description': 'Maximum number of tracked connections',
54
        'unit': 'number',
55
    },
56
    'nf_conntrack_percent': {
57
        'description': 'Percentage of tracked connections',
58
        'unit': 'percent',
59
    },
60
}
61
62
# Define the history items list
63
# items_history_list = [{'name': 'rx',
64
#                        'description': 'Download rate per second',
65
#                        'y_unit': 'bit/s'},
66
#                       {'name': 'tx',
67
#                        'description': 'Upload rate per second',
68
#                        'y_unit': 'bit/s'}]
69
70
71
class PluginModel(GlancesPluginModel):
72
    """Glances connections plugin.
73
74
    stats is a dict
75
    """
76
77
    status_list = [psutil.CONN_LISTEN, psutil.CONN_ESTABLISHED]
78
    initiated_states = [psutil.CONN_SYN_SENT, psutil.CONN_SYN_RECV]
79
    terminated_states = [
80
        psutil.CONN_FIN_WAIT1,
81
        psutil.CONN_FIN_WAIT2,
82
        psutil.CONN_TIME_WAIT,
83
        psutil.CONN_CLOSE,
84
        psutil.CONN_CLOSE_WAIT,
85
        psutil.CONN_LAST_ACK,
86
    ]
87
    conntrack = {
88
        'nf_conntrack_count': '/proc/sys/net/netfilter/nf_conntrack_count',
89
        'nf_conntrack_max': '/proc/sys/net/netfilter/nf_conntrack_max',
90
    }
91
92
    def __init__(self, args=None, config=None):
93
        """Init the plugin."""
94
        super().__init__(
95
            args=args,
96
            config=config,
97
            # items_history_list=items_history_list,
98
            stats_init_value={'net_connections_enabled': True, 'nf_conntrack_enabled': True},
99
            fields_description=fields_description,
100
        )
101
102
        # We want to display the stat in the curse interface
103
        self.display_curse = True
104
105
    @GlancesPluginModel._check_decorator
106
    @GlancesPluginModel._log_result_decorator
107
    def update(self):
108
        """Update connections stats using the input method.
109
110
        Stats is a dict
111
        """
112
        # Init new stats
113
        stats = self.get_init_value()
114
115
        if self.input_method == 'local':
116
            # Update stats using the PSUtils lib
117
118
            # Grab network interface stat using the psutil net_connections method
119
            if stats['net_connections_enabled']:
120
                try:
121
                    net_connections = psutil.net_connections(kind="tcp")
122
                except Exception as e:
123
                    logger.warning(f'Can not get network connections stats ({e})')
124
                    logger.info('Disable connections stats')
125
                    stats['net_connections_enabled'] = False
126
                    self.stats = stats
127
                    return self.stats
128
129
                for s in self.status_list:
130
                    stats[s] = len([c for c in net_connections if c.status == s])
131
                initiated = 0
132
                for s in self.initiated_states:
133
                    stats[s] = len([c for c in net_connections if c.status == s])
134
                    initiated += stats[s]
135
                stats['initiated'] = initiated
136
                terminated = 0
137
                for s in self.initiated_states:
138
                    stats[s] = len([c for c in net_connections if c.status == s])
139
                    terminated += stats[s]
140
                stats['terminated'] = terminated
141
142
            if stats['nf_conntrack_enabled']:
143
                # Grab connections track directly from the /proc file
144
                for i in self.conntrack:
145
                    try:
146
                        with open(self.conntrack[i]) as f:
147
                            stats[i] = float(f.readline().rstrip("\n"))
148
                    except (OSError, FileNotFoundError) as e:
149
                        logger.warning(f'Can not get network connections track ({e})')
150
                        logger.info('Disable connections track')
151
                        stats['nf_conntrack_enabled'] = False
152
                        self.stats = stats
153
                        return self.stats
154
                if 'nf_conntrack_max' in stats and 'nf_conntrack_count' in stats:
155
                    stats['nf_conntrack_percent'] = stats['nf_conntrack_count'] * 100 / stats['nf_conntrack_max']
156
                else:
157
                    stats['nf_conntrack_enabled'] = False
158
                    self.stats = stats
159
                    return self.stats
160
161
        elif self.input_method == 'snmp':
162
            # Update stats using SNMP
163
            pass
164
165
        # Update the stats
166
        self.stats = stats
167
        return self.stats
168
169
    def update_views(self):
170
        """Update stats views."""
171
        # Call the father's method
172
        super().update_views()
173
174
        # Add specific information
175
        try:
176
            # Alert and log
177
            if self.stats['nf_conntrack_enabled']:
178
                self.views['nf_conntrack_percent']['decoration'] = self.get_alert(header='nf_conntrack_percent')
179
        except KeyError:
180
            # try/except mandatory for Windows compatibility (no conntrack stats)
181
            pass
182
183
    def msg_curse(self, args=None, max_width=None):
184
        """Return the dict to display in the curse interface."""
185
        # Init the return message
186
        ret = []
187
188
        # Only process if stats exist and display plugin enable...
189
        if not self.stats or self.is_disabled() or not max_width:
190
            return ret
191
192
        # Header
193
        logger.info(self.stats)
194
        if self.stats['net_connections_enabled'] or self.stats['nf_conntrack_enabled']:
195
            msg = '{}'.format('TCP CONNECTIONS')
196
            ret.append(self.curse_add_line(msg, "TITLE"))
197
        # Connections status
198
        if self.stats['net_connections_enabled']:
199
            for s in [psutil.CONN_LISTEN, 'initiated', psutil.CONN_ESTABLISHED, 'terminated']:
200
                if s not in self.stats:
201
                    continue
202
                ret.append(self.curse_new_line())
203
                msg = '{:{width}}'.format(nativestr(s).capitalize(), width=len(s))
204
                ret.append(self.curse_add_line(msg))
205
                msg = '{:>{width}}'.format(self.stats[s], width=max_width - len(s) + 2)
206
                ret.append(self.curse_add_line(msg))
207
        # Connections track
208
        if (
209
            self.stats['nf_conntrack_enabled']
210
            and 'nf_conntrack_count' in self.stats
211
            and 'nf_conntrack_max' in self.stats
212
        ):
213
            s = 'Tracked'
214
            ret.append(self.curse_new_line())
215
            msg = '{:{width}}'.format(nativestr(s).capitalize(), width=len(s))
216
            ret.append(self.curse_add_line(msg))
217
            msg = '{:>{width}}'.format(
218
                '{:0.0f}/{:0.0f}'.format(self.stats['nf_conntrack_count'], self.stats['nf_conntrack_max']),
219
                width=max_width - len(s) + 2,
220
            )
221
            ret.append(self.curse_add_line(msg, self.get_views(key='nf_conntrack_percent', option='decoration')))
222
223
        return ret
224