Test Failed
Push — master ( ee826a...d9056e )
by Nicolas
03:09
created

glances.stats   F

Complexity

Total Complexity 74

Size/Duplication

Total Lines 381
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 187
dl 0
loc 381
rs 2.48
c 0
b 0
f 0
wmc 74

24 Methods

Rating   Name   Duplication   Size   Complexity  
A GlancesStats.__init__() 0 10 1
A GlancesStats.__getattr__() 0 33 5
B GlancesStats.load_exports() 0 35 7
A GlancesStats.load_limits() 0 5 2
A GlancesStats.getExportsList() 0 12 2
A GlancesStats.getPluginsList() 0 12 2
A GlancesStats.export() 0 18 3
B GlancesStats._load_plugin() 0 25 6
A GlancesStats.getAllViews() 0 3 1
F GlancesStats.load_additional_plugins() 0 54 18
A GlancesStats.getAllViewsAsDict() 0 3 1
A GlancesStats.get_plugin() 0 6 2
A GlancesStats.end() 0 8 3
A GlancesStats.getAllLimits() 0 10 2
A GlancesStats.update() 0 14 3
A GlancesStats.getAllAsDict() 0 3 1
A GlancesStats.getAllExports() 0 10 2
A GlancesStats.load_modules() 0 22 1
A GlancesStats.__update_plugin() 0 5 1
A GlancesStats.load_plugins() 0 13 5
A GlancesStats.getAllLimitsAsDict() 0 10 2
A GlancesStats.get_plugin_list() 0 3 1
A GlancesStats.getAll() 0 3 1
A GlancesStats.getAllExportsAsDict() 0 10 2

How to fix   Complexity   

Complexity

Complex classes like glances.stats often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""The stats manager."""
11
12
import collections
13
import os
14
import sys
15
import threading
16
import traceback
17
from importlib import import_module
18
import pathlib
19
20
from glances.logger import logger
21
from glances.globals import exports_path, plugins_path, sys_path
22
from glances.timer import Counter
23
24
25
class GlancesStats(object):
26
    """This class stores, updates and gives stats."""
27
28
    # Script header constant
29
    header = "glances_"
30
31
    def __init__(self, config=None, args=None):
32
        # Set the config instance
33
        self.config = config
34
35
        # Set the argument instance
36
        self.args = args
37
38
        # Load plugins and exports modules
39
        self.first_export = True
40
        self.load_modules(self.args)
41
42
    def __getattr__(self, item):
43
        """Overwrite the getattr method in case of attribute is not found.
44
45
        The goal is to dynamically generate the following methods:
46
        - getPlugname(): return Plugname stat in JSON format
47
        - getViewsPlugname(): return views of the Plugname stat in JSON format
48
        """
49
        # Check if the attribute starts with 'get'
50
        if item.startswith('getViews'):
51
            # Get the plugin name
52
            plugname = item[len('getViews') :].lower()
53
            # Get the plugin instance
54
            plugin = self._plugins[plugname]
55
            if hasattr(plugin, 'get_json_views'):
56
                # The method get_views exist, return it
57
                return getattr(plugin, 'get_json_views')
58
            else:
59
                # The method get_views is not found for the plugin
60
                raise AttributeError(item)
61
        elif item.startswith('get'):
62
            # Get the plugin name
63
            plugname = item[len('get') :].lower()
64
            # Get the plugin instance
65
            plugin = self._plugins[plugname]
66
            if hasattr(plugin, 'get_stats'):
67
                # The method get_stats exist, return it
68
                return getattr(plugin, 'get_stats')
69
            else:
70
                # The method get_stats is not found for the plugin
71
                raise AttributeError(item)
72
        else:
73
            # Default behavior
74
            raise AttributeError(item)
75
76
    def load_modules(self, args):
77
        """Wrapper to load: plugins and export modules."""
78
79
        # Init the plugins dict
80
        # Active plugins dictionary
81
        self._plugins = collections.defaultdict(dict)
82
        # Load the plugins
83
        self.load_plugins(args=args)
84
85
        # Load addititional plugins
86
        self.load_additional_plugins(args=args, config=self.config)
87
88
        # Init the export modules dict
89
        # Active exporters dictionary
90
        self._exports = collections.defaultdict(dict)
91
        # All available exporters dictionary
92
        self._exports_all = collections.defaultdict(dict)
93
        # Load the export modules
94
        self.load_exports(args=args)
95
96
        # Restoring system path
97
        sys.path = sys_path
98
99
    def _load_plugin(self, plugin_path, args=None, config=None):
100
        """Load the plugin, init it and add to the _plugin dict."""
101
        # Load the plugin class
102
        try:
103
            # Import the plugin
104
            plugin = import_module('glances.plugins.' + plugin_path)
105
            # Init and add the plugin to the dictionary
106
            self._plugins[plugin_path] = plugin.PluginModel(args=args, config=config)
107
        except Exception as e:
108
            # If a plugin can not be loaded, display a critical message
109
            # on the console but do not crash
110
            logger.critical("Error while initializing the {} plugin ({})".format(plugin_path, e))
111
            logger.error(traceback.format_exc())
112
            # An error occurred, disable the plugin
113
            if args is not None:
114
                setattr(args, 'disable_' + plugin_path, False)
115
        else:
116
            # Manage the default status of the plugin (enable or disable)
117
            if args is not None:
118
                # If the all keys are set in the disable_plugin option then look in the enable_plugin option
119
                if getattr(args, 'disable_all', False):
120
                    logger.debug('%s => %s', plugin_path, getattr(args, 'enable_' + plugin_path, False))
121
                    setattr(args, 'disable_' + plugin_path, not getattr(args, 'enable_' + plugin_path, False))
122
                else:
123
                    setattr(args, 'disable_' + plugin_path, getattr(args, 'disable_' + plugin_path, False))
124
125
    def load_plugins(self, args=None):
126
        """Load all plugins in the 'plugins' folder."""
127
        start_duration = Counter()
128
129
        for item in os.listdir(plugins_path):
130
            if os.path.isdir(os.path.join(plugins_path, item)) and not item.startswith('__') and item != 'plugin':
131
                # Load the plugin
132
                start_duration.reset()
133
                self._load_plugin(os.path.basename(item), args=args, config=self.config)
134
                logger.debug("Plugin {} started in {} seconds".format(item, start_duration.get()))
135
136
        # Log plugins list
137
        logger.debug("Active plugins list: {}".format(self.getPluginsList()))
138
139
    def load_additional_plugins(self, args=None, config=None):
140
        """Load additional plugins if defined"""
141
142
        def get_addl_plugins(self, plugin_path):
143
            """Get list of additonal plugins"""
144
            _plugin_list = []
145
            for plugin in os.listdir(plugin_path):
146
                path = os.path.join(plugin_path, plugin)
147
                if os.path.isdir(path) and not path.startswith('__'):
148
                    # Poor man's walk_pkgs - can't use pkgutil as the module would be already imported here!
149
                    for fil in pathlib.Path(path).glob('*.py'):
150
                        if fil.is_file():
151
                            with open(fil) as fd:
152
                                if 'PluginModel' in fd.read():
153
                                    _plugin_list.append(plugin)
154
                                    break
155
156
            return _plugin_list
157
158
        path = None
159
        # Skip section check as implied by has_option
160
        if config and config.parser.has_option('global', 'plugin_dir'):
161
            path = config.parser['global']['plugin_dir']
162
163
        if args and 'plugin_dir' in args and args.plugin_dir:
164
            path = args.plugin_dir
165
166
        if path:
167
            # Get list before starting the counter
168
            _sys_path = sys.path
169
            start_duration = Counter()
170
            # Ensure that plugins can be found in plugin_dir
171
            sys.path.insert(0, path)
172
            for plugin in get_addl_plugins(self, path):
173
                if plugin in sys.modules:
174
                    logger.warn(f"Pugin {plugin} already in sys.modules, skipping (workaround: rename plugin)")
175
                else:
176
                    start_duration.reset()
177
                    try:
178
                        _mod_loaded = import_module(plugin + '.model')
179
                        self._plugins[plugin] = _mod_loaded.PluginModel(args=args, config=config)
180
                        logger.debug("Plugin {} started in {} seconds".format(plugin, start_duration.get()))
181
                    except Exception as e:
182
                        # If a plugin can not be loaded, display a critical message
183
                        # on the console but do not crash
184
                        logger.critical("Error while initializing the {} plugin ({})".format(plugin, e))
185
                        logger.error(traceback.format_exc())
186
                        # An error occurred, disable the plugin
187
                        if args:
188
                            setattr(args, 'disable_' + plugin, False)
189
190
            sys.path = _sys_path
191
            # Log plugins list
192
            logger.debug("Active additional plugins list: {}".format(self.getPluginsList()))
193
194
    def load_exports(self, args=None):
195
        """Load all exporters in the 'exports' folder."""
196
        start_duration = Counter()
197
198
        if args is None:
199
            return False
200
201
        for item in os.listdir(exports_path):
202
            if os.path.isdir(os.path.join(exports_path, item)) and not item.startswith('__'):
203
                # Load the exporter
204
                start_duration.reset()
205
                if item.startswith('glances_'):
206
                    # Avoid circular loop when Glances exporter uses lib with same name
207
                    # Example: influxdb should be named to glances_influxdb
208
                    exporter_name = os.path.basename(item).split('glances_')[1]
209
                else:
210
                    exporter_name = os.path.basename(item)
211
                # Set the disable_<name> to False by default
212
                setattr(self.args, 'export_' + exporter_name, getattr(self.args, 'export_' + exporter_name, False))
213
                # We should import the module
214
                if getattr(self.args, 'export_' + exporter_name, False):
215
                    # Import the export module
216
                    export_module = import_module(item)
217
                    # Add the exporter instance to the active exporters dictionary
218
                    self._exports[exporter_name] = export_module.Export(args=args, config=self.config)
219
                    # Add the exporter instance to the available exporters dictionary
220
                    self._exports_all[exporter_name] = self._exports[exporter_name]
221
                else:
222
                    # Add the exporter name to the available exporters dictionary
223
                    self._exports_all[exporter_name] = exporter_name
224
                logger.debug("Exporter {} started in {} seconds".format(exporter_name, start_duration.get()))
225
226
        # Log plugins list
227
        logger.debug("Active exports modules list: {}".format(self.getExportsList()))
228
        return True
229
230
    def getPluginsList(self, enable=True):
231
        """Return the plugins list.
232
233
        if enable is True, only return the active plugins (default)
234
        if enable is False, return all the plugins
235
236
        Return: list of plugin name
237
        """
238
        if enable:
239
            return [p for p in self._plugins if self._plugins[p].is_enabled()]
240
        else:
241
            return [p for p in self._plugins]
242
243
    def getExportsList(self, enable=True):
244
        """Return the exports list.
245
246
        if enable is True, only return the active exporters (default)
247
        if enable is False, return all the exporters
248
249
        :return: list of export module names
250
        """
251
        if enable:
252
            return [e for e in self._exports]
253
        else:
254
            return [e for e in self._exports_all]
255
256
    def load_limits(self, config=None):
257
        """Load the stats limits (except the one in the exclude list)."""
258
        # For each plugins, call the load_limits method
259
        for p in self._plugins:
260
            self._plugins[p].load_limits(config)
261
262
    def __update_plugin(self, p):
263
        """Update stats, history and views for the given plugin name p"""
264
        self._plugins[p].update()
265
        self._plugins[p].update_stats_history()
266
        self._plugins[p].update_views()
267
268
    def update(self):
269
        """Wrapper method to update the stats.
270
271
        Only called by standalone and server modes
272
        """
273
        threads = []
274
        # Start update of all enable plugins
275
        for p in self.getPluginsList(enable=True):
276
            thread = threading.Thread(target=self.__update_plugin, args=(p,))
277
            thread.start()
278
            threads.append(thread)
279
        # Wait the end of the update
280
        for t in threads:
281
            t.join()
282
283
    def export(self, input_stats=None):
284
        """Export all the stats.
285
286
        Each export module is ran in a dedicated thread.
287
        """
288
        if self.first_export:
289
            logger.debug("Do not export stats during the first iteration because some information are missing")
290
            self.first_export = False
291
            return False
292
293
        input_stats = input_stats or {}
294
295
        for e in self.getExportsList(enable=True):
296
            logger.debug("Export stats using the %s module" % e)
297
            thread = threading.Thread(target=self._exports[e].update, args=(input_stats,))
298
            thread.start()
299
300
        return True
301
302
    def getAll(self):
303
        """Return all the stats (list)."""
304
        return [self._plugins[p].get_raw() for p in self._plugins]
305
306
    def getAllAsDict(self):
307
        """Return all the stats (dict)."""
308
        return {p: self._plugins[p].get_raw() for p in self._plugins}
309
310
    def getAllExports(self, plugin_list=None):
311
        """Return all the stats to be exported as a list.
312
313
        Default behavior is to export all the stat
314
        if plugin_list is provided (list), only export stats of given plugins
315
        """
316
        if plugin_list is None:
317
            # All enabled plugins should be exported
318
            plugin_list = self.getPluginsList()
319
        return [self._plugins[p].get_export() for p in self._plugins]
320
321
    def getAllExportsAsDict(self, plugin_list=None):
322
        """Return all the stats to be exported as a dict.
323
324
        Default behavior is to export all the stat
325
        if plugin_list is provided (list), only export stats of given plugins
326
        """
327
        if plugin_list is None:
328
            # All enabled plugins should be exported
329
            plugin_list = self.getPluginsList()
330
        return {p: self._plugins[p].get_export() for p in plugin_list}
331
332
    def getAllLimits(self, plugin_list=None):
333
        """Return the plugins limits list.
334
335
        Default behavior is to export all the limits
336
        if plugin_list is provided, only export limits of given plugin (list)
337
        """
338
        if plugin_list is None:
339
            # All enabled plugins should be exported
340
            plugin_list = self.getPluginsList()
341
        return [self._plugins[p].limits for p in plugin_list]
342
343
    def getAllLimitsAsDict(self, plugin_list=None):
344
        """Return all the stats limits (dict).
345
346
        Default behavior is to export all the limits
347
        if plugin_list is provided, only export limits of given plugin (list)
348
        """
349
        if plugin_list is None:
350
            # All enabled plugins should be exported
351
            plugin_list = self.getPluginsList()
352
        return {p: self._plugins[p].limits for p in plugin_list}
353
354
    def getAllViews(self):
355
        """Return the plugins views."""
356
        return [self._plugins[p].get_views() for p in self._plugins]
357
358
    def getAllViewsAsDict(self):
359
        """Return all the stats views (dict)."""
360
        return {p: self._plugins[p].get_views() for p in self._plugins}
361
362
    def get_plugin_list(self):
363
        """Return the plugin list."""
364
        return self._plugins
365
366
    def get_plugin(self, plugin_name):
367
        """Return the plugin name."""
368
        if plugin_name in self._plugins:
369
            return self._plugins[plugin_name]
370
        else:
371
            return None
372
373
    def end(self):
374
        """End of the Glances stats."""
375
        # Close export modules
376
        for e in self._exports:
377
            self._exports[e].exit()
378
        # Close plugins
379
        for p in self._plugins:
380
            self._plugins[p].exit()
381