Test Failed
Push — develop ( 273a40...2379ec )
by Nicolas
02:51 queued 25s
created

glances.config.Config.as_dict()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""Manage the configuration file."""
10
11
import builtins
12
import multiprocessing
13
import os
14
import re
15
import sys
16
17
from glances.globals import BSD, LINUX, MACOS, SUNOS, WINDOWS, ConfigParser, NoOptionError, NoSectionError, system_exec
18
from glances.logger import logger
19
20
21
def user_config_dir():
22
    r"""Return a list of per-user config dir (full path).
23
24
    - Linux, *BSD, SunOS: ~/.config/glances
25
    - macOS: ~/Library/Application Support/glances
26
    - Windows: %APPDATA%\glances
27
    """
28
    paths = []
29
    if WINDOWS:
30
        paths.append(os.environ.get('APPDATA'))
31
    elif MACOS:
32
        paths.append(os.environ.get('XDG_CONFIG_HOME') or os.path.expanduser('~/.config'))
33
        paths.append(os.path.expanduser('~/Library/Application Support'))
34
    else:
35
        paths.append(os.environ.get('XDG_CONFIG_HOME') or os.path.expanduser('~/.config'))
36
37
    return [os.path.join(path, 'glances') if path is not None else '' for path in paths]
38
39
40
def user_cache_dir():
41
    r"""Return a list of per-user cache dir (full path).
42
43
    - Linux, *BSD, SunOS: ~/.cache/glances
44
    - macOS: ~/Library/Caches/glances
45
    - Windows: {%LOCALAPPDATA%,%APPDATA%}\glances\cache
46
    """
47
    if WINDOWS:
48
        path = os.path.join(os.environ.get('LOCALAPPDATA') or os.environ.get('APPDATA'), 'glances', 'cache')
49
    elif MACOS:
50
        path = os.path.expanduser('~/Library/Caches/glances')
51
    else:
52
        path = os.path.join(os.environ.get('XDG_CACHE_HOME') or os.path.expanduser('~/.cache'), 'glances')
53
54
    return [path]
55
56
57
def system_config_dir():
58
    r"""Return a list of system-wide config dir (full path).
59
60
    - Linux, SunOS: /etc/glances
61
    - *BSD, macOS: /usr/local/etc/glances
62
    - Windows: %APPDATA%\glances
63
    """
64
    if LINUX or SUNOS:
65
        path = '/etc'
66
    elif BSD or MACOS:
67
        path = '/usr/local/etc'
68
    else:
69
        path = os.environ.get('APPDATA')
70
    if path is None:
71
        path = ''
72
    else:
73
        path = os.path.join(path, 'glances')
74
75
    return [path]
76
77
78
def default_config_dir():
79
    r"""Return a list of system-wide config dir (full path).
80
81
    - Linux, SunOS, *BSD, macOS: /usr/share/doc (as defined in the setup.py files)
82
    - Windows: %APPDATA%\glances
83
    """
84
    path = []
85
    # Add venv path (solve issue #2803)
86
    if in_virtualenv():
87
        path.append(os.path.join(sys.prefix, 'share', 'doc', 'glances'))
88
89
    # Add others system path
90
    if LINUX or SUNOS or BSD or MACOS:
91
        path.append('/usr/share/doc')
92
    else:
93
        path.append(os.environ.get('APPDATA'))
94
95
    return path
96
97
98
def in_virtualenv():
99
    # Source: https://stackoverflow.com/questions/1871549/how-to-determine-if-python-is-running-inside-a-virtualenv/1883251#1883251
100
    return sys.prefix != get_base_prefix_compat()
101
102
103
def get_base_prefix_compat():
104
    """Get base/real prefix, or sys.prefix if there is none."""
105
    # Source: https://stackoverflow.com/questions/1871549/how-to-determine-if-python-is-running-inside-a-virtualenv/1883251#1883251
106
    return getattr(sys, "base_prefix", None) or getattr(sys, "real_prefix", None) or sys.prefix
107
108
109
class Config:
110
    """This class is used to access/read config file, if it exists.
111
112
    :param config_dir: the path to search for config file
113
    :type config_dir: str or None
114
    """
115
116
    def __init__(self, config_dir=None):
117
        self.config_dir = config_dir
118
        self.config_filename = 'glances.conf'
119
        self._loaded_config_file = None
120
121
        # Re pattern for optimize research of `foo`
122
        self.re_pattern = re.compile(r'(\`.+?\`)')
123
124
        try:
125
            self.parser = ConfigParser(interpolation=None)
126
        except TypeError:
127
            self.parser = ConfigParser()
128
129
        self.read()
130
131
    def config_file_paths(self):
132
        r"""Get a list of config file paths.
133
134
        The list is built taking into account of the OS, priority and location.
135
136
        * custom path: /path/to/glances
137
        * Linux, SunOS: ~/.config/glances, /etc/glances
138
        * *BSD: ~/.config/glances, /usr/local/etc/glances
139
        * macOS: ~/.config/glances, ~/Library/Application Support/glances, /usr/local/etc/glances
140
        * Windows: %APPDATA%\glances
141
142
        The config file will be searched in the following order of priority:
143
            * /path/to/file (via -C flag)
144
            * user's home directory (per-user settings)
145
            * system-wide directory (system-wide settings)
146
            * default pip directory (as defined in the setup.py file)
147
        """
148
        paths = []
149
150
        # self.config_dir is the path to the config file (via -C flag)
151
        if self.config_dir:
152
            paths.append(self.config_dir)
153
154
        # user_config_dir() returns a list of paths
155
        paths.extend([os.path.join(path, self.config_filename) for path in user_config_dir()])
156
157
        # system_config_dir() returns a list of paths
158
        paths.extend([os.path.join(path, self.config_filename) for path in system_config_dir()])
159
160
        # default_config_dir() returns a list of paths
161
        paths.extend([os.path.join(path, self.config_filename) for path in default_config_dir()])
162
163
        return paths
164
165
    def read(self):
166
        """Read the config file, if it exists. Using defaults otherwise."""
167
        for config_file in self.config_file_paths():
168
            logger.debug(f'Search glances.conf file in {config_file}')
169
            if os.path.exists(config_file):
170
                try:
171
                    with builtins.open(config_file, encoding='utf-8') as f:
172
                        self.parser.read_file(f)
173
                        self.parser.read(f)
174
                    logger.info(f"Read configuration file '{config_file}'")
175
                except UnicodeDecodeError as err:
176
                    logger.error(f"Can not read configuration file '{config_file}': {err}")
177
                    sys.exit(1)
178
                # Save the loaded configuration file path (issue #374)
179
                self._loaded_config_file = config_file
180
                break
181
182
        # Set the default values for section not configured
183
        self.sections_set_default()
184
185
    def sections_set_default(self):
186
        # Globals
187
        if not self.parser.has_section('global'):
188
            self.parser.add_section('global')
189
        self.set_default('global', 'strftime_format', '')
190
        self.set_default('global', 'check_update', 'true')
191
192
        # Quicklook
193
        if not self.parser.has_section('quicklook'):
194
            self.parser.add_section('quicklook')
195
        self.set_default_cwc('quicklook', 'cpu')
196
        self.set_default_cwc('quicklook', 'mem')
197
        self.set_default_cwc('quicklook', 'swap')
198
199
        # CPU
200
        if not self.parser.has_section('cpu'):
201
            self.parser.add_section('cpu')
202
        self.set_default_cwc('cpu', 'user')
203
        self.set_default_cwc('cpu', 'system')
204
        self.set_default_cwc('cpu', 'steal')
205
        # By default I/O wait should be lower than 1/number of CPU cores
206
        iowait_bottleneck = (1.0 / multiprocessing.cpu_count()) * 100.0
207
        self.set_default_cwc(
208
            'cpu',
209
            'iowait',
210
            [
211
                str(iowait_bottleneck - (iowait_bottleneck * 0.20)),
212
                str(iowait_bottleneck - (iowait_bottleneck * 0.10)),
213
                str(iowait_bottleneck),
214
            ],
215
        )
216
        # Context switches bottleneck identification #1212
217
        ctx_switches_bottleneck = (500000 * 0.10) * multiprocessing.cpu_count()
218
        self.set_default_cwc(
219
            'cpu',
220
            'ctx_switches',
221
            [
222
                str(ctx_switches_bottleneck - (ctx_switches_bottleneck * 0.20)),
223
                str(ctx_switches_bottleneck - (ctx_switches_bottleneck * 0.10)),
224
                str(ctx_switches_bottleneck),
225
            ],
226
        )
227
228
        # Per-CPU
229
        if not self.parser.has_section('percpu'):
230
            self.parser.add_section('percpu')
231
        self.set_default_cwc('percpu', 'user')
232
        self.set_default_cwc('percpu', 'system')
233
234
        # Load
235
        if not self.parser.has_section('load'):
236
            self.parser.add_section('load')
237
        self.set_default_cwc('load', cwc=['0.7', '1.0', '5.0'])
238
239
        # Mem
240
        if not self.parser.has_section('mem'):
241
            self.parser.add_section('mem')
242
        self.set_default_cwc('mem')
243
244
        # Swap
245
        if not self.parser.has_section('memswap'):
246
            self.parser.add_section('memswap')
247
        self.set_default_cwc('memswap')
248
249
        # NETWORK
250
        if not self.parser.has_section('network'):
251
            self.parser.add_section('network')
252
        self.set_default_cwc('network', 'rx')
253
        self.set_default_cwc('network', 'tx')
254
255
        # FS
256
        if not self.parser.has_section('fs'):
257
            self.parser.add_section('fs')
258
        self.set_default_cwc('fs')
259
260
        # Sensors
261
        if not self.parser.has_section('sensors'):
262
            self.parser.add_section('sensors')
263
        self.set_default_cwc('sensors', 'temperature_core', cwc=['60', '70', '80'])
264
        self.set_default_cwc('sensors', 'temperature_hdd', cwc=['45', '52', '60'])
265
        self.set_default_cwc('sensors', 'battery', cwc=['80', '90', '95'])
266
267
        # Process list
268
        if not self.parser.has_section('processlist'):
269
            self.parser.add_section('processlist')
270
        self.set_default_cwc('processlist', 'cpu')
271
        self.set_default_cwc('processlist', 'mem')
272
273
    @property
274
    def loaded_config_file(self):
275
        """Return the loaded configuration file."""
276
        return self._loaded_config_file
277
278
    def as_dict(self):
279
        """Return the configuration as a dict"""
280
        dictionary = {}
281
        for section in self.parser.sections():
282
            dictionary[section] = {}
283
            for option in self.parser.options(section):
284
                dictionary[section][option] = self.parser.get(section, option)
285
        return dictionary
286
287
    def sections(self):
288
        """Return a list of all sections."""
289
        return self.parser.sections()
290
291
    def items(self, section):
292
        """Return the items list of a section."""
293
        return self.parser.items(section)
294
295
    def has_section(self, section):
296
        """Return info about the existence of a section."""
297
        return self.parser.has_section(section)
298
299
    def set_default_cwc(self, section, option_header=None, cwc=['50', '70', '90']):
300
        """Set default values for careful, warning and critical."""
301
        if option_header is None:
302
            header = ''
303
        else:
304
            header = option_header + '_'
305
        self.set_default(section, header + 'careful', cwc[0])
306
        self.set_default(section, header + 'warning', cwc[1])
307
        self.set_default(section, header + 'critical', cwc[2])
308
309
    def set_default(self, section, option, default):
310
        """If the option did not exist, create a default value."""
311
        if not self.parser.has_option(section, option):
312
            self.parser.set(section, option, default)
313
314
    def get_value(self, section, option, default=None):
315
        """Get the value of an option, if it exists.
316
317
        If it did not exist, then return the default value.
318
319
        It allows user to define dynamic configuration key (see issue#1204)
320
        Dynamic value should starts and end with the ` char
321
        Example: prefix=`hostname`
322
        """
323
        ret = default
324
        try:
325
            ret = self.parser.get(section, option)
326
        except (NoOptionError, NoSectionError):
327
            pass
328
329
        # Search a substring `foo` and replace it by the result of its exec
330
        if ret is not None:
331
            try:
332
                match = self.re_pattern.findall(ret)
333
                for m in match:
334
                    ret = ret.replace(m, system_exec(m[1:-1]))
335
            except TypeError:
336
                pass
337
        return ret
338
339
    def get_list_value(self, section, option, default=None, separator=','):
340
        """Get the list value of an option, if it exists."""
341
        try:
342
            return self.parser.get(section, option).split(separator)
343
        except (NoOptionError, NoSectionError):
344
            return default
345
346
    def get_int_value(self, section, option, default=0):
347
        """Get the int value of an option, if it exists."""
348
        try:
349
            return self.parser.getint(section, option)
350
        except (NoOptionError, NoSectionError):
351
            return int(default)
352
353
    def get_float_value(self, section, option, default=0.0):
354
        """Get the float value of an option, if it exists."""
355
        try:
356
            return self.parser.getfloat(section, option)
357
        except (NoOptionError, NoSectionError):
358
            return float(default)
359
360
    def get_bool_value(self, section, option, default=True):
361
        """Get the bool value of an option, if it exists."""
362
        try:
363
            return self.parser.getboolean(section, option)
364
        except (NoOptionError, NoSectionError):
365
            return bool(default)
366