glances.config.Config.read()   A
last analyzed

Complexity

Conditions 5

Size

Total Lines 19
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 15
nop 1
dl 0
loc 19
rs 9.1832
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Manage the configuration file."""
10
11
import builtins
12
import multiprocessing
13
import os
14
import re
15
import sys
16
17
from glances.globals import BSD, LINUX, MACOS, SUNOS, WINDOWS, ConfigParser, NoOptionError, NoSectionError, system_exec
18
from glances.logger import logger
19
20
21
def user_config_dir():
22
    r"""Return a list of per-user config dir (full path).
23
24
    - Linux, *BSD, SunOS: ~/.config/glances
25
    - macOS: ~/Library/Application Support/glances
26
    - Windows: %APPDATA%\glances
27
    """
28
    paths = []
29
    if WINDOWS:
30
        paths.append(os.environ.get('APPDATA'))
31
    elif MACOS:
32
        paths.append(os.environ.get('XDG_CONFIG_HOME') or os.path.expanduser('~/.config'))
33
        paths.append(os.path.expanduser('~/Library/Application Support'))
34
    else:
35
        paths.append(os.environ.get('XDG_CONFIG_HOME') or os.path.expanduser('~/.config'))
36
37
    return [os.path.join(path, 'glances') if path is not None else '' for path in paths]
38
39
40
def user_cache_dir():
41
    r"""Return a list of per-user cache dir (full path).
42
43
    - Linux, *BSD, SunOS: ~/.cache/glances
44
    - macOS: ~/Library/Caches/glances
45
    - Windows: {%LOCALAPPDATA%,%APPDATA%}\glances\cache
46
    """
47
    if WINDOWS:
48
        path = os.path.join(os.environ.get('LOCALAPPDATA') or os.environ.get('APPDATA'), 'glances', 'cache')
49
    elif MACOS:
50
        path = os.path.expanduser('~/Library/Caches/glances')
51
    else:
52
        path = os.path.join(os.environ.get('XDG_CACHE_HOME') or os.path.expanduser('~/.cache'), 'glances')
53
54
    return [path]
55
56
57
def system_config_dir():
58
    r"""Return a list of system-wide config dir (full path).
59
60
    - Linux, SunOS: /etc/glances
61
    - *BSD, macOS: /usr/local/etc/glances
62
    - Windows: %APPDATA%\glances
63
    """
64
    if LINUX or SUNOS:
65
        path = '/etc'
66
    elif BSD or MACOS:
67
        path = '/usr/local/etc'
68
    else:
69
        path = os.environ.get('APPDATA')
70
    if path is None:
71
        path = ''
72
    else:
73
        path = os.path.join(path, 'glances')
74
75
    return [path]
76
77
78
def default_config_dir():
79
    r"""Return a list of system-wide config dir (full path).
80
81
    - Linux, SunOS, *BSD, macOS: /usr/share/doc (as defined in the setup.py files)
82
    - Windows: %APPDATA%\glances
83
    """
84
    paths = []
85
86
    # Add system path
87
    if LINUX or SUNOS or BSD or MACOS:
88
        paths.append(os.path.join(sys.prefix, 'share', 'doc'))
89
    else:
90
        paths.append(os.environ.get('APPDATA'))
91
92
    # If we are in venv (issue #2803), sys.prefix != sys.base_prefix and we
93
    # already added venv path with sys.prefix. Add base_prefix path too
94
    if in_virtualenv():
95
        paths.append(os.path.join(sys.base_prefix, 'share', 'doc'))
96
97
    return [os.path.join(path, 'glances') if path is not None else '' for path in paths]
98
99
100
def in_virtualenv():
101
    # Source: https://stackoverflow.com/questions/1871549/how-to-determine-if-python-is-running-inside-a-virtualenv/1883251#1883251
102
    return sys.prefix != get_base_prefix_compat()
103
104
105
def get_base_prefix_compat():
106
    """Get base/real prefix, or sys.prefix if there is none."""
107
    # Source: https://stackoverflow.com/questions/1871549/how-to-determine-if-python-is-running-inside-a-virtualenv/1883251#1883251
108
    return getattr(sys, "base_prefix", None) or getattr(sys, "real_prefix", None) or sys.prefix
109
110
111
class Config:
112
    """This class is used to access/read config file, if it exists.
113
114
    :param config_dir: the path to search for config file
115
    :type config_dir: str or None
116
    """
117
118
    def __init__(self, config_dir=None):
119
        self.config_dir = config_dir
120
        self.config_filename = 'glances.conf'
121
        self._loaded_config_file = None
122
        self._config_file_paths = self.config_file_paths()
123
124
        # Re pattern for optimize research of `foo`
125
        self.re_pattern = re.compile(r'(\`.+?\`)')
126
127
        try:
128
            self.parser = ConfigParser(interpolation=None)
129
        except TypeError:
130
            self.parser = ConfigParser()
131
132
        self.read()
133
134
    def config_file_paths(self):
135
        r"""Get a list of config file paths.
136
137
        The list is built taking into account of the OS, priority and location.
138
139
        * custom path: /path/to/glances
140
        * Linux, SunOS: ~/.config/glances, /etc/glances
141
        * *BSD: ~/.config/glances, /usr/local/etc/glances
142
        * macOS: ~/.config/glances, ~/Library/Application Support/glances, /usr/local/etc/glances
143
        * Windows: %APPDATA%\glances
144
145
        The config file will be searched in the following order of priority:
146
            * /path/to/file (via -C flag)
147
            * user's home directory (per-user settings)
148
            * system-wide directory (system-wide settings)
149
            * default pip directory (as defined in the setup.py file)
150
        """
151
        paths = []
152
153
        # self.config_dir is the path to the config file (via -C flag)
154
        if self.config_dir:
155
            paths.append(self.config_dir)
156
157
        # user_config_dir() returns a list of paths
158
        paths.extend([os.path.join(path, self.config_filename) for path in user_config_dir()])
159
160
        # system_config_dir() returns a list of paths
161
        paths.extend([os.path.join(path, self.config_filename) for path in system_config_dir()])
162
163
        # default_config_dir() returns a list of paths
164
        paths.extend([os.path.join(path, self.config_filename) for path in default_config_dir()])
165
166
        return paths
167
168
    def read(self):
169
        """Read the config file, if it exists. Using defaults otherwise."""
170
        for config_file in self._config_file_paths:
171
            logger.debug(f'Search glances.conf file in {config_file}')
172
            if os.path.exists(config_file):
173
                try:
174
                    with builtins.open(config_file, encoding='utf-8') as f:
175
                        self.parser.read_file(f)
176
                        self.parser.read(f)
177
                    logger.info(f"Read configuration file '{config_file}'")
178
                except UnicodeDecodeError as err:
179
                    logger.error(f"Can not read configuration file '{config_file}': {err}")
180
                    sys.exit(1)
181
                # Save the loaded configuration file path (issue #374)
182
                self._loaded_config_file = config_file
183
                break
184
185
        # Set the default values for section not configured
186
        self.sections_set_default()
187
188
    def sections_set_default(self):
189
        # Globals
190
        if not self.parser.has_section('global'):
191
            self.parser.add_section('global')
192
        self.set_default('global', 'strftime_format', '')
193
        self.set_default('global', 'check_update', 'true')
194
195
        # Quicklook
196
        if not self.parser.has_section('quicklook'):
197
            self.parser.add_section('quicklook')
198
        self.set_default_cwc('quicklook', 'cpu')
199
        self.set_default_cwc('quicklook', 'mem')
200
        self.set_default_cwc('quicklook', 'swap')
201
202
        # CPU
203
        if not self.parser.has_section('cpu'):
204
            self.parser.add_section('cpu')
205
        self.set_default_cwc('cpu', 'user')
206
        self.set_default_cwc('cpu', 'system')
207
        self.set_default_cwc('cpu', 'steal')
208
        # By default I/O wait should be lower than 1/number of CPU cores
209
        iowait_bottleneck = (1.0 / multiprocessing.cpu_count()) * 100.0
210
        self.set_default_cwc(
211
            'cpu',
212
            'iowait',
213
            [
214
                str(iowait_bottleneck - (iowait_bottleneck * 0.20)),
215
                str(iowait_bottleneck - (iowait_bottleneck * 0.10)),
216
                str(iowait_bottleneck),
217
            ],
218
        )
219
        # Context switches bottleneck identification #1212
220
        ctx_switches_bottleneck = (500000 * 0.10) * multiprocessing.cpu_count()
221
        self.set_default_cwc(
222
            'cpu',
223
            'ctx_switches',
224
            [
225
                str(ctx_switches_bottleneck - (ctx_switches_bottleneck * 0.20)),
226
                str(ctx_switches_bottleneck - (ctx_switches_bottleneck * 0.10)),
227
                str(ctx_switches_bottleneck),
228
            ],
229
        )
230
231
        # Per-CPU
232
        if not self.parser.has_section('percpu'):
233
            self.parser.add_section('percpu')
234
        self.set_default_cwc('percpu', 'user')
235
        self.set_default_cwc('percpu', 'system')
236
237
        # Load
238
        if not self.parser.has_section('load'):
239
            self.parser.add_section('load')
240
        self.set_default_cwc('load', cwc=['0.7', '1.0', '5.0'])
241
242
        # Mem
243
        if not self.parser.has_section('mem'):
244
            self.parser.add_section('mem')
245
        self.set_default_cwc('mem')
246
247
        # Swap
248
        if not self.parser.has_section('memswap'):
249
            self.parser.add_section('memswap')
250
        self.set_default_cwc('memswap')
251
252
        # NETWORK
253
        if not self.parser.has_section('network'):
254
            self.parser.add_section('network')
255
        self.set_default_cwc('network', 'rx')
256
        self.set_default_cwc('network', 'tx')
257
258
        # FS
259
        if not self.parser.has_section('fs'):
260
            self.parser.add_section('fs')
261
        self.set_default_cwc('fs')
262
263
        # Sensors
264
        if not self.parser.has_section('sensors'):
265
            self.parser.add_section('sensors')
266
        self.set_default_cwc('sensors', 'temperature_hdd', cwc=['45', '52', '60'])
267
        self.set_default_cwc('sensors', 'battery', cwc=['70', '80', '90'])
268
269
        # Process list
270
        if not self.parser.has_section('processlist'):
271
            self.parser.add_section('processlist')
272
        self.set_default_cwc('processlist', 'cpu')
273
        self.set_default_cwc('processlist', 'mem')
274
275
    @property
276
    def loaded_config_file(self):
277
        """Return the loaded configuration file."""
278
        return self._loaded_config_file
279
280
    def as_dict(self):
281
        """Return the configuration as a dict"""
282
        dictionary = {}
283
        for section in self.parser.sections():
284
            dictionary[section] = {}
285
            for option in self.parser.options(section):
286
                dictionary[section][option] = self.parser.get(section, option)
287
        return dictionary
288
289
    def sections(self):
290
        """Return a list of all sections."""
291
        return self.parser.sections()
292
293
    def items(self, section):
294
        """Return the items list of a section."""
295
        return self.parser.items(section)
296
297
    def has_section(self, section):
298
        """Return info about the existence of a section."""
299
        return self.parser.has_section(section)
300
301
    def set_default_cwc(self, section, option_header=None, cwc=['50', '70', '90']):
302
        """Set default values for careful, warning and critical."""
303
        if option_header is None:
304
            header = ''
305
        else:
306
            header = option_header + '_'
307
        self.set_default(section, header + 'careful', cwc[0])
308
        self.set_default(section, header + 'warning', cwc[1])
309
        self.set_default(section, header + 'critical', cwc[2])
310
311
    def set_default(self, section, option, default):
312
        """If the option did not exist, create a default value."""
313
        if not self.parser.has_option(section, option):
314
            self.parser.set(section, option, default)
315
316
    def get_value(self, section, option, default=None):
317
        """Get the value of an option, if it exists.
318
319
        If it did not exist, then return the default value.
320
321
        It allows user to define dynamic configuration key (see issue#1204)
322
        Dynamic value should starts and end with the ` char
323
        Example: prefix=`hostname`
324
        """
325
        ret = default
326
        try:
327
            ret = self.parser.get(section, option)
328
        except (NoOptionError, NoSectionError):
329
            pass
330
331
        # Search a substring `foo` and replace it by the result of its exec
332
        if ret is not None:
333
            try:
334
                match = self.re_pattern.findall(ret)
335
                for m in match:
336
                    ret = ret.replace(m, system_exec(m[1:-1]))
337
            except TypeError:
338
                pass
339
        return ret
340
341
    def get_list_value(self, section, option, default=None, separator=','):
342
        """Get the list value of an option, if it exists."""
343
        try:
344
            return self.parser.get(section, option).split(separator)
345
        except (NoOptionError, NoSectionError):
346
            return default
347
348
    def get_int_value(self, section, option, default=0):
349
        """Get the int value of an option, if it exists."""
350
        try:
351
            return self.parser.getint(section, option)
352
        except (NoOptionError, NoSectionError):
353
            return int(default)
354
355
    def get_float_value(self, section, option, default=0.0):
356
        """Get the float value of an option, if it exists."""
357
        try:
358
            return self.parser.getfloat(section, option)
359
        except (NoOptionError, NoSectionError):
360
            return float(default)
361
362
    def get_bool_value(self, section, option, default=True):
363
        """Get the bool value of an option, if it exists."""
364
        try:
365
            return self.parser.getboolean(section, option)
366
        except (NoOptionError, NoSectionError):
367
            return bool(default)
368