glances.plugins.connections.PluginModel.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 3
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Connections plugin."""
10
11
import psutil
12
13
from glances.globals import nativestr
14
from glances.logger import logger
15
from glances.plugins.plugin.model import GlancesPluginModel
16
17
# Fields description
18
# description: human readable description
19
# short_name: shortname to use un UI
20
# unit: unit type
21
# rate: is it a rate ? If yes, // by time_since_update when displayed,
22
# min_symbol: Auto unit should be used if value > than 1 'X' (K, M, G)...
23
fields_description = {
24
    'LISTEN': {
25
        'description': 'Number of TCP connections in LISTEN state',
26
        'unit': 'number',
27
    },
28
    'ESTABLISHED': {
29
        'description': 'Number of TCP connections in ESTABLISHED state',
30
        'unit': 'number',
31
    },
32
    'SYN_SENT': {
33
        'description': 'Number of TCP connections in SYN_SENT state',
34
        'unit': 'number',
35
    },
36
    'SYN_RECV': {
37
        'description': 'Number of TCP connections in SYN_RECV state',
38
        'unit': 'number',
39
    },
40
    'initiated': {
41
        'description': 'Number of TCP connections initiated',
42
        'unit': 'number',
43
    },
44
    'terminated': {
45
        'description': 'Number of TCP connections terminated',
46
        'unit': 'number',
47
    },
48
    'nf_conntrack_count': {
49
        'description': 'Number of tracked connections',
50
        'unit': 'number',
51
    },
52
    'nf_conntrack_max': {
53
        'description': 'Maximum number of tracked connections',
54
        'unit': 'number',
55
    },
56
    'nf_conntrack_percent': {
57
        'description': 'Percentage of tracked connections',
58
        'unit': 'percent',
59
    },
60
}
61
62
# Define the history items list
63
# items_history_list = [{'name': 'rx',
64
#                        'description': 'Download rate per second',
65
#                        'y_unit': 'bit/s'},
66
#                       {'name': 'tx',
67
#                        'description': 'Upload rate per second',
68
#                        'y_unit': 'bit/s'}]
69
70
71
class PluginModel(GlancesPluginModel):
72
    """Glances connections plugin.
73
74
    stats is a dict
75
    """
76
77
    status_list = [psutil.CONN_LISTEN, psutil.CONN_ESTABLISHED]
78
    initiated_states = [psutil.CONN_SYN_SENT, psutil.CONN_SYN_RECV]
79
    terminated_states = [
80
        psutil.CONN_FIN_WAIT1,
81
        psutil.CONN_FIN_WAIT2,
82
        psutil.CONN_TIME_WAIT,
83
        psutil.CONN_CLOSE,
84
        psutil.CONN_CLOSE_WAIT,
85
        psutil.CONN_LAST_ACK,
86
    ]
87
    conntrack = {
88
        'nf_conntrack_count': '/proc/sys/net/netfilter/nf_conntrack_count',
89
        'nf_conntrack_max': '/proc/sys/net/netfilter/nf_conntrack_max',
90
    }
91
92
    def __init__(self, args=None, config=None):
93
        """Init the plugin."""
94
        super().__init__(
95
            args=args,
96
            config=config,
97
            # items_history_list=items_history_list,
98
            stats_init_value={'net_connections_enabled': True, 'nf_conntrack_enabled': True},
99
            fields_description=fields_description,
100
        )
101
102
        # We want to display the stat in the curse interface
103
        self.display_curse = True
104
105
    @GlancesPluginModel._check_decorator
106
    @GlancesPluginModel._log_result_decorator
107
    def update(self):
108
        """Update connections stats using the input method.
109
110
        Stats is a dict
111
        """
112
        # Init new stats
113
        stats = self.get_init_value()
114
115
        if self.input_method == 'local':
116
            # Update stats using the PSUtils lib
117
118
            # Grab network interface stat using the psutil net_connections method
119
            if stats['net_connections_enabled']:
120
                try:
121
                    net_connections = psutil.net_connections(kind="tcp")
122
                except Exception as e:
123
                    logger.warning(f'Can not get network connections stats ({e})')
124
                    logger.info('Disable connections stats')
125
                    stats['net_connections_enabled'] = False
126
                    self.stats = stats
127
                    return self.stats
128
129
                for s in self.status_list:
130
                    stats[s] = len([c for c in net_connections if c.status == s])
131
                initiated = 0
132
                for s in self.initiated_states:
133
                    stats[s] = len([c for c in net_connections if c.status == s])
134
                    initiated += stats[s]
135
                stats['initiated'] = initiated
136
                terminated = 0
137
                for s in self.initiated_states:
138
                    stats[s] = len([c for c in net_connections if c.status == s])
139
                    terminated += stats[s]
140
                stats['terminated'] = terminated
141
142
            if stats['nf_conntrack_enabled']:
143
                # Grab connections track directly from the /proc file
144
                for i in self.conntrack:
145
                    try:
146
                        with open(self.conntrack[i]) as f:
147
                            stats[i] = float(f.readline().rstrip("\n"))
148
                    except (OSError, FileNotFoundError) as e:
149
                        logger.warning(f'Can not get network connections track ({e})')
150
                        logger.info('Disable connections track')
151
                        stats['nf_conntrack_enabled'] = False
152
                        self.stats = stats
153
                        return self.stats
154
                if 'nf_conntrack_max' in stats and 'nf_conntrack_count' in stats:
155
                    stats['nf_conntrack_percent'] = stats['nf_conntrack_count'] * 100 / stats['nf_conntrack_max']
156
                else:
157
                    stats['nf_conntrack_enabled'] = False
158
                    self.stats = stats
159
                    return self.stats
160
161
        elif self.input_method == 'snmp':
162
            # Update stats using SNMP
163
            pass
164
165
        # Update the stats
166
        self.stats = stats
167
        return self.stats
168
169
    def update_views(self):
170
        """Update stats views."""
171
        # Call the father's method
172
        super().update_views()
173
174
        # Add specific information
175
        try:
176
            # Alert and log
177
            if self.stats['nf_conntrack_enabled']:
178
                self.views['nf_conntrack_percent']['decoration'] = self.get_alert(header='nf_conntrack_percent')
179
        except KeyError:
180
            # try/except mandatory for Windows compatibility (no conntrack stats)
181
            pass
182
183
    def msg_curse(self, args=None, max_width=None):
184
        """Return the dict to display in the curse interface."""
185
        # Init the return message
186
        ret = []
187
188
        # Only process if stats exist and display plugin enable...
189
        if not self.stats or self.is_disabled() or not max_width:
190
            return ret
191
192
        # Header
193
        if self.stats['net_connections_enabled'] or self.stats['nf_conntrack_enabled']:
194
            msg = '{}'.format('TCP CONNECTIONS')
195
            ret.append(self.curse_add_line(msg, "TITLE"))
196
        # Connections status
197
        if self.stats['net_connections_enabled']:
198
            for s in [psutil.CONN_LISTEN, 'initiated', psutil.CONN_ESTABLISHED, 'terminated']:
199
                ret.append(self.curse_new_line())
200
                msg = '{:{width}}'.format(nativestr(s).capitalize(), width=len(s))
201
                ret.append(self.curse_add_line(msg))
202
                msg = '{:>{width}}'.format(self.stats[s], width=max_width - len(s) + 2)
203
                ret.append(self.curse_add_line(msg))
204
        # Connections track
205
        if (
206
            self.stats['nf_conntrack_enabled']
207
            and 'nf_conntrack_count' in self.stats
208
            and 'nf_conntrack_max' in self.stats
209
        ):
210
            s = 'Tracked'
211
            ret.append(self.curse_new_line())
212
            msg = '{:{width}}'.format(nativestr(s).capitalize(), width=len(s))
213
            ret.append(self.curse_add_line(msg))
214
            msg = '{:>{width}}'.format(
215
                '{:0.0f}/{:0.0f}'.format(self.stats['nf_conntrack_count'], self.stats['nf_conntrack_max']),
216
                width=max_width - len(s) + 2,
217
            )
218
            ret.append(self.curse_add_line(msg, self.get_views(key='nf_conntrack_percent', option='decoration')))
219
220
        return ret
221