Test Failed
Push — develop ( 843246...ad9d52 )
by Nicolas
08:45
created

glances.ports_list   A

Complexity

Total Complexity 10

Size/Duplication

Total Lines 110
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 55
dl 0
loc 110
rs 10
c 0
b 0
f 0
wmc 10

4 Methods

Rating   Name   Duplication   Size   Complexity  
A GlancesPortsList.set_server() 0 3 1
B GlancesPortsList.load() 0 74 7
A GlancesPortsList.get_ports_list() 0 3 1
A GlancesPortsList.__init__() 0 5 1
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Manage the Glances ports list (Ports plugin)."""
10
11
from glances.globals import get_default_gateway
12
from glances.logger import logger
13
14
15
class GlancesPortsList:
16
    """Manage the ports list for the ports plugin."""
17
18
    _section = "ports"
19
    _default_refresh = 60
20
    _default_timeout = 3
21
22
    def __init__(self, config=None, args=None):
23
        # ports_list is a list of dict (JSON compliant)
24
        # [ {'host': 'www.google.fr', 'port': 443, 'refresh': 30, 'description': Internet, 'status': True} ... ]
25
        # Load the configuration file
26
        self._ports_list = self.load(config)
27
28
    def load(self, config):
29
        """Load the ports list from the configuration file."""
30
        ports_list = []
31
32
        if config is None:
33
            logger.debug("No configuration file available. Cannot load ports list.")
34
        elif not config.has_section(self._section):
35
            logger.debug(f"No [{self._section}] section in the configuration file. Cannot load ports list.")
36
        else:
37
            logger.debug(f"Start reading the [{self._section}] section in the configuration file")
38
39
            refresh = int(config.get_value(self._section, 'refresh', default=self._default_refresh))
40
            timeout = int(config.get_value(self._section, 'timeout', default=self._default_timeout))
41
42
            # Add default gateway on top of the ports_list lists
43
            default_gateway = config.get_value(self._section, 'port_default_gateway', default='False')
44
            if default_gateway.lower().startswith('true'):
45
                new_port = {}
46
                # ICMP
47
                new_port['host'] = get_default_gateway()
48
                new_port['port'] = 0
49
                new_port['description'] = 'DefaultGateway'
50
                new_port['refresh'] = refresh
51
                new_port['timeout'] = timeout
52
                new_port['status'] = None
53
                new_port['rtt_warning'] = None
54
                new_port['indice'] = 'port_0'
55
                logger.debug("Add default gateway {} to the static list".format(new_port['host']))
56
                ports_list.append(new_port)
57
58
            # Read the scan list
59
            for i in range(1, 256):
60
                new_port = {}
61
                postfix = f'port_{str(i)}_'
62
63
                # Read mandatory configuration key: host
64
                new_port['host'] = config.get_value(self._section, '{}{}'.format(postfix, 'host'))
65
66
                if new_port['host'] is None:
67
                    continue
68
69
                # Read optionals configuration keys
70
                # Port is set to 0 by default. 0 mean ICMP check instead of TCP check
71
                new_port['port'] = config.get_value(self._section, '{}{}'.format(postfix, 'port'), 0)
72
                new_port['description'] = config.get_value(
73
                    self._section, f'{postfix}description', default="{}:{}".format(new_port['host'], new_port['port'])
74
                )
75
76
                # Default status
77
                new_port['status'] = None
78
79
                # Refresh rate in second
80
                new_port['refresh'] = refresh
81
82
                # Timeout in second
83
                new_port['timeout'] = int(config.get_value(self._section, f'{postfix}timeout', default=timeout))
84
85
                # RTT warning
86
                new_port['rtt_warning'] = config.get_value(self._section, f'{postfix}rtt_warning', default=None)
87
                if new_port['rtt_warning'] is not None:
88
                    # Convert to second
89
                    new_port['rtt_warning'] = int(new_port['rtt_warning']) / 1000.0
90
91
                # Indice
92
                new_port['indice'] = 'port_' + str(i)
93
94
                # Add the server to the list
95
                logger.debug("Add port {}:{} to the static list".format(new_port['host'], new_port['port']))
96
                ports_list.append(new_port)
97
98
            # Ports list loaded
99
            logger.debug(f"Ports list loaded: {ports_list}")
100
101
        return ports_list
102
103
    def get_ports_list(self):
104
        """Return the current server list (dict of dict)."""
105
        return self._ports_list
106
107
    def set_server(self, pos, key, value):
108
        """Set the key to the value for the pos (position in the list)."""
109
        self._ports_list[pos][key] = value
110