Test Failed
Push — master ( b1b8ea...183265 )
by Nicolas
02:55
created

glances.stats.GlancesStats.get_plugin_view()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""The stats manager."""
10
11
import collections
12
import os
13
import pathlib
14
import sys
15
import threading
16
import traceback
17
from importlib import import_module
18
19
from glances.globals import exports_path, plugins_path, sys_path
20
from glances.logger import logger
21
from glances.timer import Counter
22
23
24
class GlancesStats:
25
    """This class stores, updates and gives stats."""
26
27
    # Script header constant
28
    header = "glances_"
29
30
    def __init__(self, config=None, args=None):
31
        # Set the config instance
32
        self.config = config
33
34
        # Set the argument instance
35
        self.args = args
36
37
        # Load plugins and exports modules
38
        self.first_export = True
39
        self.load_modules(self.args)
40
41
    def __getattr__(self, item):
42
        """Overwrite the getattr method in case of attribute is not found.
43
44
        The goal is to dynamically generate the following methods:
45
        - getPlugname(): return Plugname stat in JSON format
46
        - getViewsPlugname(): return views of the Plugname stat in JSON format
47
        """
48
        # Check if the attribute starts with 'get'
49 View Code Duplication
        if item.startswith('getViews'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
50
            # Get the plugin name
51
            plugname = item[len('getViews') :].lower()
52
            # Get the plugin instance
53
            plugin = self._plugins[plugname]
54
            if hasattr(plugin, 'get_json_views'):
55
                # The method get_json_views exist, return it
56
                return getattr(plugin, 'get_json_views')
57
            # The method get_views is not found for the plugin
58
            raise AttributeError(item)
59 View Code Duplication
        if item.startswith('get'):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
60
            # Get the plugin name
61
            plugname = item[len('get') :].lower()
62
            # Get the plugin instance
63
            plugin = self._plugins[plugname]
64
            if hasattr(plugin, 'get_json'):
65
                # The method get_json exist, return it
66
                return getattr(plugin, 'get_json')
67
            # The method get_stats is not found for the plugin
68
            raise AttributeError(item)
69
        # Default behavior
70
        raise AttributeError(item)
71
72
    def load_modules(self, args):
73
        """Wrapper to load: plugins and export modules."""
74
75
        # Init the plugins dict
76
        # Active plugins dictionary
77
        self._plugins = collections.defaultdict(dict)
78
        # Load the plugins
79
        self.load_plugins(args=args)
80
81
        # Load addititional plugins
82
        self.load_additional_plugins(args=args, config=self.config)
83
84
        # Init the export modules dict
85
        # Active exporters dictionary
86
        self._exports = collections.defaultdict(dict)
87
        # All available exporters dictionary
88
        self._exports_all = collections.defaultdict(dict)
89
        # Load the export modules
90
        self.load_exports(args=args)
91
92
        # Restoring system path
93
        sys.path = sys_path
94
95
    def _load_plugin(self, plugin_path, args=None, config=None):
96
        """Load the plugin, init it and add to the _plugin dict."""
97
        # Load the plugin class
98
        try:
99
            # Import the plugin
100
            plugin = import_module('glances.plugins.' + plugin_path)
101
            # Init and add the plugin to the dictionary
102
            self._plugins[plugin_path] = plugin.PluginModel(args=args, config=config)
103
        except Exception as e:
104
            # If a plugin can not be loaded, display a critical message
105
            # on the console but do not crash
106
            logger.critical(f"Error while initializing the {plugin_path} plugin ({e})")
107
            logger.error(traceback.format_exc())
108
            # An error occurred, disable the plugin
109
            if args is not None:
110
                setattr(args, 'disable_' + plugin_path, False)
111
        else:
112
            # Manage the default status of the plugin (enable or disable)
113
            if args is not None:
114
                # If the all keys are set in the disable_plugin option then look in the enable_plugin option
115
                if getattr(args, 'disable_all', False):
116
                    logger.debug('%s => %s', plugin_path, getattr(args, 'enable_' + plugin_path, False))
117
                    setattr(args, 'disable_' + plugin_path, not getattr(args, 'enable_' + plugin_path, False))
118
                else:
119
                    setattr(args, 'disable_' + plugin_path, getattr(args, 'disable_' + plugin_path, False))
120
121
    def load_plugins(self, args=None):
122
        """Load all plugins in the 'plugins' folder."""
123
        start_duration = Counter()
124
125
        for item in os.listdir(plugins_path):
126
            if os.path.isdir(os.path.join(plugins_path, item)) and not item.startswith('__') and item != 'plugin':
127
                # Load the plugin
128
                start_duration.reset()
129
                self._load_plugin(os.path.basename(item), args=args, config=self.config)
130
                logger.debug(f"Plugin {item} started in {start_duration.get()} seconds")
131
132
        # Log plugins list
133
        logger.debug(f"Active plugins list: {self.getPluginsList()}")
134
135
    def load_additional_plugins(self, args=None, config=None):
136
        """Load additional plugins if defined"""
137
138
        def get_addl_plugins(self, plugin_path):
139
            """Get list of additonal plugins"""
140
            _plugin_list = []
141
            for plugin in os.listdir(plugin_path):
142
                path = os.path.join(plugin_path, plugin)
143
                if os.path.isdir(path) and not path.startswith('__'):
144
                    # Poor man's walk_pkgs - can't use pkgutil as the module would be already imported here!
145
                    for fil in pathlib.Path(path).glob('*.py'):
146
                        if fil.is_file():
147
                            with open(fil) as fd:
148
                                if 'PluginModel' in fd.read():
149
                                    _plugin_list.append(plugin)
150
                                    break
151
152
            return _plugin_list
153
154
        path = None
155
        # Skip section check as implied by has_option
156
        if config and config.parser.has_option('global', 'plugin_dir'):
157
            path = config.parser['global']['plugin_dir']
158
159
        if args and 'plugin_dir' in args and args.plugin_dir:
160
            path = args.plugin_dir
161
162
        if path:
163
            # Get list before starting the counter
164
            _sys_path = sys.path
165
            start_duration = Counter()
166
            # Ensure that plugins can be found in plugin_dir
167
            sys.path.insert(0, path)
168
            for plugin in get_addl_plugins(self, path):
169
                if plugin in sys.modules:
170
                    logger.warn(f"Pugin {plugin} already in sys.modules, skipping (workaround: rename plugin)")
171
                else:
172
                    start_duration.reset()
173
                    try:
174
                        _mod_loaded = import_module(plugin + '.model')
175
                        self._plugins[plugin] = _mod_loaded.PluginModel(args=args, config=config)
176
                        logger.debug(f"Plugin {plugin} started in {start_duration.get()} seconds")
177
                    except Exception as e:
178
                        # If a plugin can not be loaded, display a critical message
179
                        # on the console but do not crash
180
                        logger.critical(f"Error while initializing the {plugin} plugin ({e})")
181
                        logger.error(traceback.format_exc())
182
                        # An error occurred, disable the plugin
183
                        if args:
184
                            setattr(args, 'disable_' + plugin, False)
185
186
            sys.path = _sys_path
187
            # Log plugins list
188
            logger.debug(f"Active additional plugins list: {self.getPluginsList()}")
189
190
    def load_exports(self, args=None):
191
        """Load all exporters in the 'exports' folder."""
192
        start_duration = Counter()
193
194
        if args is None:
195
            return False
196
197
        for item in os.listdir(exports_path):
198
            if os.path.isdir(os.path.join(exports_path, item)) and not item.startswith('__'):
199
                # Load the exporter
200
                start_duration.reset()
201
                if item.startswith('glances_'):
202
                    # Avoid circular loop when Glances exporter uses lib with same name
203
                    # Example: influxdb should be named to glances_influxdb
204
                    exporter_name = os.path.basename(item).split('glances_')[1]
205
                else:
206
                    exporter_name = os.path.basename(item)
207
                # Set the disable_<name> to False by default
208
                setattr(self.args, 'export_' + exporter_name, getattr(self.args, 'export_' + exporter_name, False))
209
                # We should import the module
210
                if getattr(self.args, 'export_' + exporter_name, False):
211
                    # Import the export module
212
                    export_module = import_module(item)
213
                    # Add the exporter instance to the active exporters dictionary
214
                    self._exports[exporter_name] = export_module.Export(args=args, config=self.config)
215
                    # Add the exporter instance to the available exporters dictionary
216
                    self._exports_all[exporter_name] = self._exports[exporter_name]
217
                else:
218
                    # Add the exporter name to the available exporters dictionary
219
                    self._exports_all[exporter_name] = exporter_name
220
                logger.debug(f"Exporter {exporter_name} started in {start_duration.get()} seconds")
221
222
        # Log plugins list
223
        logger.debug(f"Active exports modules list: {self.getExportsList()}")
224
        return True
225
226
    def getPluginsList(self, enable=True):
227
        """Return the plugins list.
228
229
        if enable is True, only return the active plugins (default)
230
        if enable is False, return all the plugins
231
232
        Return: list of plugin name
233
        """
234
        if enable:
235
            return [p for p in self._plugins if self._plugins[p].is_enabled()]
236
        return list(self._plugins)
237
238
    def getExportsList(self, enable=True):
239
        """Return the exports list.
240
241
        if enable is True, only return the active exporters (default)
242
        if enable is False, return all the exporters
243
244
        :return: list of export module names
245
        """
246
        if enable:
247
            return list(self._exports)
248
        return list(self._exports_all)
249
250
    def load_limits(self, config=None):
251
        """Load the stats limits (except the one in the exclude list)."""
252
        # For each plugins, call the load_limits method
253
        for p in self._plugins:
254
            self._plugins[p].load_limits(config)
255
256
    def __update_plugin(self, p):
257
        """Update stats, history and views for the given plugin name p"""
258
        self._plugins[p].update()
259
        self._plugins[p].update_stats_history()
260
        self._plugins[p].update_views()
261
262
    def update(self):
263
        """Wrapper method to update the stats.
264
265
        Only called by standalone and server modes
266
        """
267
        threads = []
268
        # Start update of all enable plugins
269
        for p in self.getPluginsList(enable=True):
270
            thread = threading.Thread(target=self.__update_plugin, args=(p,))
271
            thread.start()
272
            threads.append(thread)
273
        # Wait the end of the update
274
        for t in threads:
275
            t.join()
276
277
    def export(self, input_stats=None):
278
        """Export all the stats.
279
280
        Each export module is ran in a dedicated thread.
281
        """
282
        if self.first_export:
283
            logger.debug("Do not export stats during the first iteration because some information are missing")
284
            self.first_export = False
285
            return False
286
287
        input_stats = input_stats or {}
288
289
        for e in self.getExportsList(enable=True):
290
            logger.debug(f"Export stats using the {e} module")
291
            thread = threading.Thread(target=self._exports[e].update, args=(input_stats,))
292
            thread.start()
293
294
        return True
295
296
    def getAll(self):
297
        """Return all the stats (list)."""
298
        return [self._plugins[p].get_raw() for p in self._plugins]
299
300
    def getAllAsDict(self):
301
        """Return all the stats (dict)."""
302
        return {p: self._plugins[p].get_raw() for p in self._plugins}
303
304
    def getAllExports(self, plugin_list=None):
305
        """Return all the stats to be exported as a list.
306
307
        Default behavior is to export all the stat
308
        if plugin_list is provided (list), only export stats of given plugins
309
        """
310
        if plugin_list is None:
311
            # All enabled plugins should be exported
312
            plugin_list = self.getPluginsList()
313
        return [self._plugins[p].get_export() for p in self._plugins]
314
315
    def getAllExportsAsDict(self, plugin_list=None):
316
        """Return all the stats to be exported as a dict.
317
318
        Default behavior is to export all the stat
319
        if plugin_list is provided (list), only export stats of given plugins
320
        """
321
        if plugin_list is None:
322
            # All enabled plugins should be exported
323
            plugin_list = self.getPluginsList()
324
        return {p: self._plugins[p].get_export() for p in plugin_list}
325
326
    def getAllLimits(self, plugin_list=None):
327
        """Return the plugins limits list.
328
329
        Default behavior is to export all the limits
330
        if plugin_list is provided, only export limits of given plugin (list)
331
        """
332
        if plugin_list is None:
333
            # All enabled plugins should be exported
334
            plugin_list = self.getPluginsList()
335
        return [self._plugins[p].limits for p in plugin_list]
336
337
    def getAllLimitsAsDict(self, plugin_list=None):
338
        """Return all the stats limits (dict).
339
340
        Default behavior is to export all the limits
341
        if plugin_list is provided, only export limits of given plugin (list)
342
        """
343
        if plugin_list is None:
344
            # All enabled plugins should be exported
345
            plugin_list = self.getPluginsList()
346
        return {p: self._plugins[p].limits for p in plugin_list}
347
348
    def getAllViews(self):
349
        """Return the plugins views."""
350
        return [self._plugins[p].get_views() for p in self._plugins]
351
352
    def getAllViewsAsDict(self):
353
        """Return all the stats views (dict)."""
354
        return {p: self._plugins[p].get_views() for p in self._plugins}
355
356
    def get_plugin_list(self):
357
        """Return the plugin list."""
358
        return self._plugins
359
360
    def get_plugin(self, plugin_name):
361
        """Return the plugin stats."""
362
        if plugin_name in self._plugins:
363
            return self._plugins[plugin_name]
364
        return None
365
366
    def get_plugin_view(self, plugin_name):
367
        """Return the plugin views."""
368
        if plugin_name in self._plugins:
369
            return self._plugins[plugin_name].get_views()
370
        return None
371
372
    def end(self):
373
        """End of the Glances stats."""
374
        # Close export modules
375
        for e in self._exports:
376
            self._exports[e].exit()
377
        # Close plugins
378
        for p in self._plugins:
379
            self._plugins[p].exit()
380