Test Failed
Push — develop ( 091e49...9bf3cb )
by Nicolas
02:31 queued 13s
created

glances.stats   F

Complexity

Total Complexity 73

Size/Duplication

Total Lines 376
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 182
dl 0
loc 376
rs 2.56
c 0
b 0
f 0
wmc 73

23 Methods

Rating   Name   Duplication   Size   Complexity  
B GlancesStats._load_plugin() 0 25 6
A GlancesStats.__init__() 0 10 1
A GlancesStats.__getattr__() 0 33 5
A GlancesStats.load_modules() 0 22 1
A GlancesStats.load_plugins() 0 13 5
A GlancesStats.export() 0 18 3
B GlancesStats.load_exports() 0 35 7
A GlancesStats.getAllViews() 0 3 1
F GlancesStats.load_additional_plugins() 0 53 18
A GlancesStats.getAllViewsAsDict() 0 3 1
A GlancesStats.get_plugin() 0 6 2
A GlancesStats.end() 0 8 3
A GlancesStats.update() 0 15 3
A GlancesStats.getAllLimits() 0 10 2
A GlancesStats.getAllAsDict() 0 3 1
A GlancesStats.load_limits() 0 5 2
A GlancesStats.getAllExports() 0 10 2
A GlancesStats.getExportsList() 0 12 2
A GlancesStats.getAllLimitsAsDict() 0 10 2
A GlancesStats.get_plugin_list() 0 3 1
A GlancesStats.getPluginsList() 0 12 2
A GlancesStats.getAllExportsAsDict() 0 10 2
A GlancesStats.getAll() 0 3 1

How to fix   Complexity   

Complexity

Complex classes like glances.stats often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
"""The stats manager."""
11
12
import collections
13
import os
14
import sys
15
import threading
16
import traceback
17
from importlib import import_module
18
import pathlib
19
20
from glances.logger import logger
21
from glances.globals import exports_path, plugins_path, sys_path
22
from glances.timer import Counter
23
24
25
class GlancesStats(object):
26
27
    """This class stores, updates and gives stats."""
28
29
    # Script header constant
30
    header = "glances_"
31
32
    def __init__(self, config=None, args=None):
33
        # Set the config instance
34
        self.config = config
35
36
        # Set the argument instance
37
        self.args = args
38
39
        # Load plugins and exports modules
40
        self.first_export = True
41
        self.load_modules(self.args)
42
43
    def __getattr__(self, item):
44
        """Overwrite the getattr method in case of attribute is not found.
45
46
        The goal is to dynamically generate the following methods:
47
        - getPlugname(): return Plugname stat in JSON format
48
        - getViewsPlugname(): return views of the Plugname stat in JSON format
49
        """
50
        # Check if the attribute starts with 'get'
51
        if item.startswith('getViews'):
52
            # Get the plugin name
53
            plugname = item[len('getViews') :].lower()
54
            # Get the plugin instance
55
            plugin = self._plugins[plugname]
56
            if hasattr(plugin, 'get_json_views'):
57
                # The method get_views exist, return it
58
                return getattr(plugin, 'get_json_views')
59
            else:
60
                # The method get_views is not found for the plugin
61
                raise AttributeError(item)
62
        elif item.startswith('get'):
63
            # Get the plugin name
64
            plugname = item[len('get') :].lower()
65
            # Get the plugin instance
66
            plugin = self._plugins[plugname]
67
            if hasattr(plugin, 'get_stats'):
68
                # The method get_stats exist, return it
69
                return getattr(plugin, 'get_stats')
70
            else:
71
                # The method get_stats is not found for the plugin
72
                raise AttributeError(item)
73
        else:
74
            # Default behavior
75
            raise AttributeError(item)
76
77
    def load_modules(self, args):
78
        """Wrapper to load: plugins and export modules."""
79
80
        # Init the plugins dict
81
        # Active plugins dictionary
82
        self._plugins = collections.defaultdict(dict)
83
        # Load the plugins
84
        self.load_plugins(args=args)
85
86
        # Load addititional plugins
87
        self.load_additional_plugins(args=args, config=self.config)
88
89
        # Init the export modules dict
90
        # Active exporters dictionary
91
        self._exports = collections.defaultdict(dict)
92
        # All available exporters dictionary
93
        self._exports_all = collections.defaultdict(dict)
94
        # Load the export modules
95
        self.load_exports(args=args)
96
97
        # Restoring system path
98
        sys.path = sys_path
99
100
    def _load_plugin(self, plugin_path, args=None, config=None):
101
        """Load the plugin, init it and add to the _plugin dict."""
102
        # Load the plugin class
103
        try:
104
            # Import the plugin
105
            plugin = import_module('glances.plugins.' + plugin_path)
106
            # Init and add the plugin to the dictionary
107
            self._plugins[plugin_path] = plugin.PluginModel(args=args, config=config)
108
        except Exception as e:
109
            # If a plugin can not be loaded, display a critical message
110
            # on the console but do not crash
111
            logger.critical("Error while initializing the {} plugin ({})".format(plugin_path, e))
112
            logger.error(traceback.format_exc())
113
            # An error occurred, disable the plugin
114
            if args is not None:
115
                setattr(args, 'disable_' + plugin_path, False)
116
        else:
117
            # Manage the default status of the plugin (enable or disable)
118
            if args is not None:
119
                # If the all keys are set in the disable_plugin option then look in the enable_plugin option
120
                if getattr(args, 'disable_all', False):
121
                    logger.debug('%s => %s', plugin_path, getattr(args, 'enable_' + plugin_path, False))
122
                    setattr(args, 'disable_' + plugin_path, not getattr(args, 'enable_' + plugin_path, False))
123
                else:
124
                    setattr(args, 'disable_' + plugin_path, getattr(args, 'disable_' + plugin_path, False))
125
126
    def load_plugins(self, args=None):
127
        """Load all plugins in the 'plugins' folder."""
128
        start_duration = Counter()
129
130
        for item in os.listdir(plugins_path):
131
            if os.path.isdir(os.path.join(plugins_path, item)) and not item.startswith('__') and item != 'plugin':
132
                # Load the plugin
133
                start_duration.reset()
134
                self._load_plugin(os.path.basename(item), args=args, config=self.config)
135
                logger.debug("Plugin {} started in {} seconds".format(item, start_duration.get()))
136
137
        # Log plugins list
138
        logger.debug("Active plugins list: {}".format(self.getPluginsList()))
139
        
140
    def load_additional_plugins(self, args=None, config=None):
141
        """ Load additional plugins if defined """
142
        def get_addl_plugins(self, plugin_path):
143
            """ Get list of additonal plugins """
144
            _plugin_list = []
145
            for plugin in os.listdir(plugin_path):
146
                path = os.path.join(plugin_path, plugin)
147
                if os.path.isdir(path) and not path.startswith('__'):
148
                    # Poor man's walk_pkgs - can't use pkgutil as the module would be already imported here!
149
                    for fil in pathlib.Path(path).glob('*.py'):
150
                        if fil.is_file():
151
                            with open(fil) as fd:
152
                                if 'PluginModel' in fd.read():
153
                                    _plugin_list.append(plugin)
154
                                    break
155
156
            return _plugin_list
157
158
        path = None
159
        # Skip section check as implied by has_option
160
        if config and config.parser.has_option('global', 'plugin_dir'):
161
            path = config.parser['global']['plugin_dir']
162
163
        if args and 'plugin_dir' in args and args.plugin_dir:
164
            path = args.plugin_dir
165
            
166
        if path:
167
            # Get list before starting the counter
168
            _sys_path = sys.path
169
            start_duration = Counter()
170
            # Ensure that plugins can be found in plugin_dir
171
            sys.path.insert(0, path)
172
            for plugin in get_addl_plugins(self, path):
173
                if plugin in sys.modules:
174
                    logger.warn(f"Pugin {plugin} already in sys.modules, skipping (workaround: rename plugin)")
175
                else:
176
                    start_duration.reset()
177
                    try:
178
                        _mod_loaded = import_module(plugin+'.model')
179
                        self._plugins[plugin] = _mod_loaded.PluginModel(args=args, config=config)
180
                        logger.debug("Plugin {} started in {} seconds".format(plugin, start_duration.get()))
181
                    except Exception as e:
182
                        # If a plugin can not be loaded, display a critical message
183
                        # on the console but do not crash
184
                        logger.critical("Error while initializing the {} plugin ({})".format(plugin, e))
185
                        logger.error(traceback.format_exc())
186
                        # An error occurred, disable the plugin
187
                        if args:
188
                            setattr(args, 'disable_' + plugin, False)
189
190
            sys.path = _sys_path
191
            # Log plugins list
192
            logger.debug("Active additional plugins list: {}".format(self.getPluginsList()))
193
                    
194
    def load_exports(self, args=None):
195
        """Load all exporters in the 'exports' folder."""
196
        start_duration = Counter()
197
198
        if args is None:
199
            return False
200
201
        for item in os.listdir(exports_path):
202
            if os.path.isdir(os.path.join(exports_path, item)) and not item.startswith('__'):
203
                # Load the exporter
204
                start_duration.reset()
205
                if item.startswith('glances_'):
206
                    # Avoid circular loop when Glances exporter uses lib with same name
207
                    # Example: influxdb should be named to glances_influxdb
208
                    exporter_name = os.path.basename(item).split('glances_')[1]
209
                else:
210
                    exporter_name = os.path.basename(item)
211
                # Set the disable_<name> to False by default
212
                setattr(self.args, 'export_' + exporter_name, getattr(self.args, 'export_' + exporter_name, False))
213
                # We should import the module
214
                if getattr(self.args, 'export_' + exporter_name, False):
215
                    # Import the export module
216
                    export_module = import_module(item)
217
                    # Add the exporter instance to the active exporters dictionary
218
                    self._exports[exporter_name] = export_module.Export(args=args, config=self.config)
219
                    # Add the exporter instance to the available exporters dictionary
220
                    self._exports_all[exporter_name] = self._exports[exporter_name]
221
                else:
222
                    # Add the exporter name to the available exporters dictionary
223
                    self._exports_all[exporter_name] = exporter_name
224
                logger.debug("Exporter {} started in {} seconds".format(exporter_name, start_duration.get()))
225
226
        # Log plugins list
227
        logger.debug("Active exports modules list: {}".format(self.getExportsList()))
228
        return True
229
230
    def getPluginsList(self, enable=True):
231
        """Return the plugins list.
232
233
        if enable is True, only return the active plugins (default)
234
        if enable is False, return all the plugins
235
236
        Return: list of plugin name
237
        """
238
        if enable:
239
            return [p for p in self._plugins if self._plugins[p].is_enabled()]
240
        else:
241
            return [p for p in self._plugins]
242
243
    def getExportsList(self, enable=True):
244
        """Return the exports list.
245
246
        if enable is True, only return the active exporters (default)
247
        if enable is False, return all the exporters
248
249
        :return: list of export module names
250
        """
251
        if enable:
252
            return [e for e in self._exports]
253
        else:
254
            return [e for e in self._exports_all]
255
256
    def load_limits(self, config=None):
257
        """Load the stats limits (except the one in the exclude list)."""
258
        # For each plugins, call the load_limits method
259
        for p in self._plugins:
260
            self._plugins[p].load_limits(config)
261
262
    def update(self):
263
        """Wrapper method to update the stats."""
264
        # For standalone and server modes
265
        # For each plugins, call the update method
266
        for p in self._plugins:
267
            if self._plugins[p].is_disabled():
268
                # If current plugin is disable
269
                # then continue to next plugin
270
                continue
271
            # Update the stats...
272
            self._plugins[p].update()
273
            # ... the history
274
            self._plugins[p].update_stats_history()
275
            # ... and the views
276
            self._plugins[p].update_views()
277
278
    def export(self, input_stats=None):
279
        """Export all the stats.
280
281
        Each export module is ran in a dedicated thread.
282
        """
283
        if self.first_export:
284
            logger.debug("Do not export stats during the first iteration because some information are missing")
285
            self.first_export = False
286
            return False
287
288
        input_stats = input_stats or {}
289
290
        for e in self._exports:
291
            logger.debug("Export stats using the %s module" % e)
292
            thread = threading.Thread(target=self._exports[e].update, args=(input_stats,))
293
            thread.start()
294
295
        return True
296
297
    def getAll(self):
298
        """Return all the stats (list)."""
299
        return [self._plugins[p].get_raw() for p in self._plugins]
300
301
    def getAllAsDict(self):
302
        """Return all the stats (dict)."""
303
        return {p: self._plugins[p].get_raw() for p in self._plugins}
304
305
    def getAllExports(self, plugin_list=None):
306
        """Return all the stats to be exported (list).
307
308
        Default behavior is to export all the stat
309
        if plugin_list is provided, only export stats of given plugin (list)
310
        """
311
        if plugin_list is None:
312
            # All enabled plugins should be exported
313
            plugin_list = self.getPluginsList()
314
        return [self._plugins[p].get_export() for p in self._plugins]
315
316
    def getAllExportsAsDict(self, plugin_list=None):
317
        """Return all the stats to be exported (list).
318
319
        Default behavior is to export all the stat
320
        if plugin_list is provided, only export stats of given plugin (list)
321
        """
322
        if plugin_list is None:
323
            # All enabled plugins should be exported
324
            plugin_list = self.getPluginsList()
325
        return {p: self._plugins[p].get_export() for p in plugin_list}
326
327
    def getAllLimits(self, plugin_list=None):
328
        """Return the plugins limits list.
329
330
        Default behavior is to export all the limits
331
        if plugin_list is provided, only export limits of given plugin (list)
332
        """
333
        if plugin_list is None:
334
            # All enabled plugins should be exported
335
            plugin_list = self.getPluginsList()
336
        return [self._plugins[p].limits for p in plugin_list]
337
338
    def getAllLimitsAsDict(self, plugin_list=None):
339
        """Return all the stats limits (dict).
340
341
        Default behavior is to export all the limits
342
        if plugin_list is provided, only export limits of given plugin (list)
343
        """
344
        if plugin_list is None:
345
            # All enabled plugins should be exported
346
            plugin_list = self.getPluginsList()
347
        return {p: self._plugins[p].limits for p in plugin_list}
348
349
    def getAllViews(self):
350
        """Return the plugins views."""
351
        return [self._plugins[p].get_views() for p in self._plugins]
352
353
    def getAllViewsAsDict(self):
354
        """Return all the stats views (dict)."""
355
        return {p: self._plugins[p].get_views() for p in self._plugins}
356
357
    def get_plugin_list(self):
358
        """Return the plugin list."""
359
        return self._plugins
360
361
    def get_plugin(self, plugin_name):
362
        """Return the plugin name."""
363
        if plugin_name in self._plugins:
364
            return self._plugins[plugin_name]
365
        else:
366
            return None
367
368
    def end(self):
369
        """End of the Glances stats."""
370
        # Close export modules
371
        for e in self._exports:
372
            self._exports[e].exit()
373
        # Close plugins
374
        for p in self._plugins:
375
            self._plugins[p].exit()
376