Test Failed
Push — develop ( f77e03...b747c8 )
by Nicolas
02:44 queued 14s
created

glances.plugins.plugin.model   F

Complexity

Total Complexity 239

Size/Duplication

Total Lines 1224
Duplicated Lines 1.06 %

Importance

Changes 0
Metric Value
eloc 623
dl 13
loc 1224
rs 1.977
c 0
b 0
f 0
wmc 239

74 Methods

Rating   Name   Duplication   Size   Complexity  
A GlancesPluginModel.set_views() 0 3 1
A GlancesPluginModel.get_alert_log() 0 4 1
A GlancesPluginModel.get_export() 0 6 1
A GlancesPluginModel.get_raw() 0 3 1
B GlancesPluginModel.update_stats_history() 0 27 8
A GlancesPluginModel.__str__() 0 3 1
A GlancesPluginModel.get_stats() 0 3 1
A GlancesPluginModel.history_enable() 0 2 1
A GlancesPluginModel.get_items_history_list() 0 3 1
A GlancesPluginModel.get_raw_stats_item() 0 6 1
A GlancesPluginModel.set_stats() 0 3 1
A GlancesPluginModel.reset_stats_history() 0 6 2
A GlancesPluginModel.get_key() 0 3 1
A GlancesPluginModel.__init__() 0 82 3
A GlancesPluginModel.is_enabled() 0 9 3
A GlancesPluginModel.input_method() 0 4 1
A GlancesPluginModel.reset() 0 6 1
A GlancesPluginModel.get_raw_history() 0 13 3
A GlancesPluginModel.get_stats_history() 0 8 2
B GlancesPluginModel.sorted_stats() 0 19 6
A GlancesPluginModel.exit() 0 3 1
A GlancesPluginModel.is_disabled() 0 3 1
A GlancesPluginModel.__repr__() 0 3 1
A GlancesPluginModel.get_json() 0 3 1
A GlancesPluginModel.init_stats_history() 0 6 2
A GlancesPluginModel.get_init_value() 0 3 1
A GlancesPluginModel.get_export_history() 0 3 1
A GlancesPluginModel.short_system_name() 0 4 1
C GlancesPluginModel.get_stats_snmp() 0 57 11
A GlancesPluginModel.get_trend() 0 11 3
A GlancesPluginModel.reset_views() 0 3 1
A GlancesPluginModel.get_stats_value() 0 9 2
B GlancesPluginModel.get_raw_stats_value() 0 16 6
A GlancesPluginModel.curse_new_line() 0 3 1
A GlancesPluginModel.manage_threshold() 0 3 1
A GlancesPluginModel.get_raw_stats_key() 0 6 1
A GlancesPluginModel.filter_stats() 0 9 4
A GlancesPluginModel.get_stats_item() 0 6 1
A GlancesPluginModel.get_item_info() 0 5 3
A GlancesPluginModel.is_display() 0 5 2
F GlancesPluginModel.update_views() 0 68 21
A GlancesPluginModel.is_limit() 0 3 1
A GlancesPluginModel.get_stats_action() 0 8 1
A GlancesPluginModel.limits() 0 4 1
A GlancesPluginModel.set_refresh() 0 3 1
F GlancesPluginModel.get_alert() 0 86 17
A GlancesPluginModel.get_conf_value() 0 17 4
B GlancesPluginModel.get_views() 0 23 6
A GlancesPluginModel.get_limits() 0 5 2
A GlancesPluginModel.get_stat_name() 0 6 2
A GlancesPluginModel.get_limit_action() 0 21 3
A GlancesPluginModel.get_refresh_time() 0 3 1
A GlancesPluginModel.set_limits() 0 3 1
B GlancesPluginModel.manage_action() 0 28 7
A GlancesPluginModel.get_limit_log() 0 9 3
A GlancesPluginModel.get_limit() 0 15 4
A GlancesPluginModel.get_refresh() 0 6 3
A GlancesPluginModel.get_json_views() 0 3 1
B GlancesPluginModel.load_limits() 0 26 6
A GlancesPluginModel.is_hide() 0 11 1
A GlancesPluginModel.is_show() 0 13 1
A GlancesPluginModel.curse_add_line() 0 31 1
A GlancesPluginModel.align() 0 4 1
A GlancesPluginModel.has_alias() 0 3 1
F GlancesPluginModel.curse_add_stat() 0 92 18
A GlancesPluginModel.trend_msg() 0 13 4
D GlancesPluginModel._manage_rate() 0 63 13
C GlancesPluginModel.auto_unit() 0 54 11
A GlancesPluginModel.read_alias() 0 4 2
A GlancesPluginModel._log_result_decorator() 13 13 1
A GlancesPluginModel.get_stats_display() 0 22 4
A GlancesPluginModel.is_display_any() 0 5 2
A GlancesPluginModel.msg_curse() 0 3 1
A GlancesPluginModel._check_decorator() 0 22 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like glances.plugins.plugin.model often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import dictlist, dictlist_json_dumps, iterkeys, itervalues, json_dumps, listkeys, mean, nativestr
21
from glances.history import GlancesHistory
22
from glances.logger import logger
23
from glances.outputs.glances_unicode import unicode_message
24
from glances.thresholds import glances_thresholds
25
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
26
27
fields_unit_short = {'percent': '%'}
28
29
fields_unit_type = {
30
    'percent': 'float',
31
    'percents': 'float',
32
    'number': 'int',
33
    'numbers': 'int',
34
    'int': 'int',
35
    'ints': 'int',
36
    'float': 'float',
37
    'floats': 'float',
38
    'second': 'int',
39
    'seconds': 'int',
40
    'byte': 'int',
41
    'bytes': 'int',
42
}
43
44
45
class GlancesPluginModel:
46
    """Main class for Glances plugin model."""
47
48
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
49
        """Init the plugin of plugins model class.
50
51
        All Glances' plugins model should inherit from this class. Most of the
52
        methods are already implemented in the father classes.
53
54
        Your plugin should return a dict or a list of dicts (stored in the
55
        self.stats). As an example, you can have a look on the mem plugin
56
        (for dict) or network (for list of dicts).
57
58
        From version 4 of the API, the plugin should return a dict.
59
60
        A plugin should implement:
61
        - the reset method: to set your self.stats variable to {} or []
62
        - the update method: where your self.stats variable is set
63
        and optionally:
64
        - the get_key method: set the key of the dict (only for list of dict)
65
        - all others methods you want to overwrite
66
67
        :args: args parameters
68
        :config: configuration parameters
69
        :items_history_list: list of items to store in the history
70
        :stats_init_value: Default value for a stats item
71
        """
72
        # Build the plugin name
73
        # Internal or external module (former prefixed by 'glances.plugins')
74
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
75
        self.plugin_name = _mod.split('.')[0]
76
77
        if self.plugin_name.startswith('glances_'):
78
            self.plugin_name = self.plugin_name.split('glances_')[1]
79
        logger.debug(f"Init {self.plugin_name} plugin")
80
81
        # Init the args
82
        self.args = args
83
84
        # Init the default alignment (for curses)
85
        self._align = 'left'
86
87
        # Init the input method
88
        self._input_method = 'local'
89
        self._short_system_name = None
90
91
        # Init the history list
92
        self.items_history_list = items_history_list
93
        self.stats_history = self.init_stats_history()
94
95
        # Init the limits (configuration keys) dictionary
96
        self._limits = {}
97
        if config is not None:
98
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
99
            self.load_limits(config=config)
100
101
        # Init the alias (dictionary)
102
        self.alias = self.read_alias()
103
104
        # Init the actions
105
        self.actions = GlancesActions(args=args)
106
107
        # Init the views
108
        self.views = {}
109
110
        # Hide stats if all the hide_zero_fields has never been != 0
111
        # Default is False, always display stats
112
        self.hide_zero = False
113
        # The threshold needed to display a value if hide_zero is true.
114
        # Only hide a value if it is less than hide_threshold_bytes.
115
        self.hide_threshold_bytes = 0
116
        self.hide_zero_fields = []
117
118
        # Set the initial refresh time to display stats the first time
119
        self.refresh_timer = Timer(0)
120
121
        # Init stats description
122
        self.fields_description = fields_description
123
124
        # Init the stats
125
        self.stats_init_value = stats_init_value
126
        self.time_since_last_update = None
127
        self.stats = None
128
        self.stats_previous = None
129
        self.reset()
130
131
    def __repr__(self):
132
        """Return the raw stats."""
133
        return str(self.stats)
134
135
    def __str__(self):
136
        """Return the human-readable stats."""
137
        return str(self.stats)
138
139
    def get_init_value(self):
140
        """Return a copy of the init value."""
141
        return copy.copy(self.stats_init_value)
142
143
    def reset(self):
144
        """Reset the stats.
145
146
        This method should be overwritten by child classes.
147
        """
148
        self.stats = self.get_init_value()
149
150
    def exit(self):
151
        """Just log an event when Glances exit."""
152
        logger.debug(f"Stop the {self.plugin_name} plugin")
153
154
    def get_key(self):
155
        """Return the key of the list."""
156
        return
157
158
    def is_enabled(self, plugin_name=None):
159
        """Return true if plugin is enabled."""
160
        if not plugin_name:
161
            plugin_name = self.plugin_name
162
        try:
163
            d = getattr(self.args, 'disable_' + plugin_name)
164
        except AttributeError:
165
            d = getattr(self.args, 'enable_' + plugin_name, True)
166
        return d is False
167
168
    def is_disabled(self, plugin_name=None):
169
        """Return true if plugin is disabled."""
170
        return not self.is_enabled(plugin_name=plugin_name)
171
172
    def history_enable(self):
173
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
174
175
    def init_stats_history(self):
176
        """Init the stats history (dict of GlancesAttribute)."""
177
        if self.history_enable():
178
            init_list = [a['name'] for a in self.get_items_history_list()]
179
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
180
        return GlancesHistory()
181
182
    def reset_stats_history(self):
183
        """Reset the stats history (dict of GlancesAttribute)."""
184
        if self.history_enable():
185
            reset_list = [a['name'] for a in self.get_items_history_list()]
186
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
187
            self.stats_history.reset()
188
189
    def update_stats_history(self):
190
        """Update stats history."""
191
        # Build the history
192
        if not (self.get_export() and self.history_enable()):
193
            return
194
        # Itern through items history
195
        item_name = '' if self.get_key() is None else self.get_key()
196
        for i in self.get_items_history_list():
197
            if isinstance(self.get_export(), list):
198
                # Stats is a list of data
199
                # Iter through stats (for example, iter through network interface)
200
                for l_export in self.get_export():
201
                    if i['name'] in l_export:
202
                        self.stats_history.add(
203
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
204
                            l_export[i['name']],
205
                            description=i['description'],
206
                            history_max_size=self._limits['history_size'],
207
                        )
208
            else:
209
                # Stats is not a list
210
                # Add the item to the history directly
211
                self.stats_history.add(
212
                    nativestr(i['name']),
213
                    self.get_export()[i['name']],
214
                    description=i['description'],
215
                    history_max_size=self._limits['history_size'],
216
                )
217
218
    def get_items_history_list(self):
219
        """Return the items history list."""
220
        return self.items_history_list
221
222
    def get_raw_history(self, item=None, nb=0):
223
        """Return the history (RAW format).
224
225
        - the stats history (dict of list) if item is None
226
        - the stats history for the given item (list) instead
227
        - None if item did not exist in the history
228
        """
229
        s = self.stats_history.get(nb=nb)
230
        if item is None:
231
            return s
232
        if item in s:
233
            return s[item]
234
        return None
235
236
    def get_export_history(self, item=None):
237
        """Return the stats history object to export."""
238
        return self.get_raw_history(item=item)
239
240
    def get_stats_history(self, item=None, nb=0):
241
        """Return the stats history (JSON format)."""
242
        s = self.stats_history.get_json(nb=nb)
243
244
        if item is None:
245
            return json_dumps(s)
246
247
        return dictlist_json_dumps(s, item)
248
249
    def get_trend(self, item, nb=30):
250
        """Get the trend regarding to the last nb values.
251
252
        The trend is the diffirence between the mean of the last 0 to nb / 2
253
        and nb / 2 to nb values.
254
        """
255
        raw_history = self.get_raw_history(item=item, nb=nb)
256
        if raw_history is None or len(raw_history) < nb:
257
            return None
258
        last_nb = [v[1] for v in raw_history]
259
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
260
261
    @property
262
    def input_method(self):
263
        """Get the input method."""
264
        return self._input_method
265
266
    @input_method.setter
267
    def input_method(self, input_method):
268
        """Set the input method.
269
270
        * local: system local grab (psutil or direct access)
271
        * snmp: Client server mode via SNMP
272
        * glances: Client server mode via Glances API
273
        """
274
        self._input_method = input_method
275
276
    @property
277
    def short_system_name(self):
278
        """Get the short detected OS name (SNMP)."""
279
        return self._short_system_name
280
281
    def sorted_stats(self):
282
        """Get the stats sorted by an alias (if present) or key."""
283
        key = self.get_key()
284
        if key is None:
285
            return self.stats
286
        try:
287
            return sorted(
288
                self.stats,
289
                key=lambda stat: tuple(
290
                    int(part) if part.isdigit() else part.lower()
291
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
292
                ),
293
            )
294
        except TypeError:
295
            # Correct "Starting an alias with a number causes a crash #1885"
296
            return sorted(
297
                self.stats,
298
                key=lambda stat: tuple(
299
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
300
                ),
301
            )
302
303
    @short_system_name.setter
304
    def short_system_name(self, short_name):
305
        """Set the short detected OS name (SNMP)."""
306
        self._short_system_name = short_name
307
308
    def set_stats(self, input_stats):
309
        """Set the stats to input_stats."""
310
        self.stats = input_stats
311
312
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
313
        """Update stats using SNMP.
314
315
        If bulk=True, use a bulk request instead of a get request.
316
        """
317
        snmp_oid = snmp_oid or {}
318
319
        from glances.snmp import GlancesSNMPClient
320
321
        # Init the SNMP request
322
        snmp_client = GlancesSNMPClient(
323
            host=self.args.client,
324
            port=self.args.snmp_port,
325
            version=self.args.snmp_version,
326
            community=self.args.snmp_community,
327
        )
328
329
        # Process the SNMP request
330
        ret = {}
331
        if bulk:
332
            # Bulk request
333
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(itervalues(snmp_oid)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable itervalues does not seem to be defined.
Loading history...
334
            logger.info(snmp_result)
335
            if len(snmp_oid) == 1:
336
                # Bulk command for only one OID
337
                # Note: key is the item indexed but the OID result
338
                for item in snmp_result:
339
                    if iterkeys(item)[0].startswith(itervalues(snmp_oid)[0]):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable iterkeys does not seem to be defined.
Loading history...
340
                        ret[iterkeys(snmp_oid)[0] + iterkeys(item)[0].split(itervalues(snmp_oid)[0])[1]] = itervalues(
341
                            item
342
                        )[0]
343
            else:
344
                # Build the internal dict with the SNMP result
345
                # Note: key is the first item in the snmp_oid
346
                index = 1
347
                for item in snmp_result:
348
                    item_stats = {}
349
                    item_key = None
350
                    for key in iterkeys(snmp_oid):
351
                        oid = snmp_oid[key] + '.' + str(index)
352
                        if oid in item:
353
                            if item_key is None:
354
                                item_key = item[oid]
355
                            else:
356
                                item_stats[key] = item[oid]
357
                    if item_stats:
358
                        ret[item_key] = item_stats
359
                    index += 1
360
        else:
361
            # Simple get request
362
            snmp_result = snmp_client.get_by_oid(*list(itervalues(snmp_oid)))
363
364
            # Build the internal dict with the SNMP result
365
            for key in iterkeys(snmp_oid):
366
                ret[key] = snmp_result[snmp_oid[key]]
367
368
        return ret
369
370
    def get_raw(self):
371
        """Return the stats object."""
372
        return self.stats
373
374
    def get_export(self):
375
        """Return the stats object to export.
376
        By default, return the raw stats.
377
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
378
        """
379
        return self.get_raw()
380
381
    def get_stats(self):
382
        """Return the stats object in JSON format."""
383
        return json_dumps(self.get_raw())
384
385
    def get_json(self):
386
        """Return the stats object in JSON format."""
387
        return self.get_stats()
388
389
    def get_raw_stats_item(self, item):
390
        """Return the stats object for a specific item in RAW format.
391
392
        Stats should be a list of dict (processlist, network...)
393
        """
394
        return dictlist(self.get_raw(), item)
395
396
    def get_raw_stats_key(self, item, key):
397
        """Return the stats object for a specific item in RAW format.
398
399
        Stats should be a list of dict (processlist, network...)
400
        """
401
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
402
403
    def get_stats_item(self, item):
404
        """Return the stats object for a specific item in JSON format.
405
406
        Stats should be a list of dict (processlist, network...)
407
        """
408
        return dictlist_json_dumps(self.get_raw(), item)
409
410
    def get_raw_stats_value(self, item, value):
411
        """Return the stats object for a specific item=value.
412
413
        Return None if the item=value does not exist
414
        Return None if the item is not a list of dict
415
        """
416
        if not isinstance(self.get_raw(), list):
417
            return None
418
419
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
420
            value = int(value)
421
        try:
422
            return {value: [i for i in self.get_raw() if i[item] == value]}
423
        except (KeyError, ValueError) as e:
424
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
425
            return None
426
427
    def get_stats_value(self, item, value):
428
        """Return the stats object for a specific item=value in JSON format.
429
430
        Stats should be a list of dict (processlist, network...)
431
        """
432
        rsv = self.get_raw_stats_value(item, value)
433
        if rsv is None:
434
            return None
435
        return json_dumps(rsv)
436
437
    def get_item_info(self, item, key, default=None):
438
        """Return the item info grabbed into self.fields_description."""
439
        if self.fields_description is None or item not in self.fields_description:
440
            return default
441
        return self.fields_description[item].get(key, default)
442
443
    def update_views(self):
444
        """Update the stats views.
445
446
        The V of MVC
447
        A dict of dict with the needed information to display the stats.
448
        Example for the stat xxx:
449
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
450
                'optional': False,        >>> Is the stat optional
451
                'additional': False,      >>> Is the stat provide additional information
452
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
453
                'hidden': False}          >>> Is the stats should be hidden in the UI
454
        """
455
        ret = {}
456
457
        if isinstance(self.get_raw(), list) and self.get_raw() is not None and self.get_key() is not None:
458
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
459
            for i in self.get_raw():
460
                key = i[self.get_key()]
461
                ret[key] = {}
462
                for field in listkeys(i):
463
                    value = {
464
                        'decoration': 'DEFAULT',
465
                        'optional': False,
466
                        'additional': False,
467
                        'splittable': False,
468
                    }
469
                    # Manage the hidden feature
470
                    # Allow to automatically hide fields when values is never different than 0
471
                    # Refactoring done for #2929
472
                    if not self.hide_zero:
473
                        value['hidden'] = False
474
                    elif key in self.views and field in self.views[key] and 'hidden' in self.views[key][field]:
475
                        value['hidden'] = self.views[key][field]['hidden']
476
                        if (
477
                            field in self.hide_zero_fields
478
                            and i[field] is not None
479
                            and i[field] > self.hide_threshold_bytes
480
                        ):
481
                            value['hidden'] = False
482
                    else:
483
                        value['hidden'] = field in self.hide_zero_fields
484
                    ret[key][field] = value
485
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
486
            # Stats are stored in a dict (ex: CPU, LOAD...)
487
            for field in listkeys(self.get_raw()):
488
                value = {
489
                    'decoration': 'DEFAULT',
490
                    'optional': False,
491
                    'additional': False,
492
                    'splittable': False,
493
                    'hidden': False,
494
                }
495
                # Manage the hidden feature
496
                # Allow to automatically hide fields when values is never different than 0
497
                # Refactoring done for #2929
498
                if not self.hide_zero:
499
                    value['hidden'] = False
500
                elif field in self.views and 'hidden' in self.views[field]:
501
                    value['hidden'] = self.views[field]['hidden']
502
                    if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
503
                        value['hidden'] = False
504
                else:
505
                    value['hidden'] = field in self.hide_zero_fields
506
                ret[field] = value
507
508
        self.views = ret
509
510
        return self.views
511
512
    def set_views(self, input_views):
513
        """Set the views to input_views."""
514
        self.views = input_views
515
516
    def reset_views(self):
517
        """Reset the views to input_views."""
518
        self.views = {}
519
520
    def get_views(self, item=None, key=None, option=None):
521
        """Return the views object.
522
523
        If key is None, return all the view for the current plugin
524
        else if option is None return the view for the specific key (all option)
525
        else return the view of the specific key/option
526
527
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
528
        """
529
        if item is None:
530
            item_views = self.views
531
        else:
532
            item_views = self.views[item]
533
534
        if key is None:
535
            return item_views
536
        if key not in item_views:
537
            return 'DEFAULT'
538
        if option is None:
539
            return item_views[key]
540
        if option in item_views[key]:
541
            return item_views[key][option]
542
        return 'DEFAULT'
543
544
    def get_json_views(self, item=None, key=None, option=None):
545
        """Return the views (in JSON)."""
546
        return json_dumps(self.get_views(item, key, option))
547
548
    def load_limits(self, config):
549
        """Load limits from the configuration file, if it exists."""
550
        # By default set the history length to 3 points per second during one day
551
        self._limits['history_size'] = 28800
552
553
        if not hasattr(config, 'has_section'):
554
            return False
555
556
        # Read the global section
557
        # TODO: not optimized because this section is loaded for each plugin...
558
        if config.has_section('global'):
559
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
560
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
561
562
        # Read the plugin specific section
563
        if config.has_section(self.plugin_name):
564
            for level, _ in config.items(self.plugin_name):
565
                # Read limits
566
                limit = '_'.join([self.plugin_name, level])
567
                try:
568
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
569
                except ValueError:
570
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
571
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
572
573
        return True
574
575
    @property
576
    def limits(self):
577
        """Return the limits object."""
578
        return self._limits
579
580
    @limits.setter
581
    def limits(self, input_limits):
582
        """Set the limits to input_limits."""
583
        self._limits = input_limits
584
585
    def set_refresh(self, value):
586
        """Set the plugin refresh rate"""
587
        self.set_limits('refresh', value)
588
589
    def get_refresh(self):
590
        """Return the plugin refresh time"""
591
        ret = self.get_limits(item='refresh')
592
        if ret is None:
593
            ret = self.args.time if hasattr(self.args, 'time') else 2
594
        return ret
595
596
    def get_refresh_time(self):
597
        """Return the plugin refresh time"""
598
        return self.get_refresh()
599
600
    def set_limits(self, item, value):
601
        """Set the limits object."""
602
        self._limits[f'{self.plugin_name}_{item}'] = value
603
604
    def get_limits(self, item=None):
605
        """Return the limits object."""
606
        if item is None:
607
            return self._limits
608
        return self._limits.get(f'{self.plugin_name}_{item}', None)
609
610
    def get_stats_action(self):
611
        """Return stats for the action.
612
613
        By default return all the stats.
614
        Can be overwrite by plugins implementation.
615
        For example, Docker will return self.stats['containers']
616
        """
617
        return self.stats
618
619
    def get_stat_name(self, header=""):
620
        """Return the stat name with an optional header"""
621
        ret = self.plugin_name
622
        if header != '':
623
            ret += '_' + header
624
        return ret
625
626
    def get_alert(
627
        self,
628
        current=0,
629
        minimum=0,
630
        maximum=100,
631
        highlight_zero=True,
632
        is_max=False,
633
        header="",
634
        action_key=None,
635
        log=False,
636
    ):
637
        """Return the alert status relative to a current value.
638
639
        Use this function for minor stats.
640
641
        If current < CAREFUL of max then alert = OK
642
        If current > CAREFUL of max then alert = CAREFUL
643
        If current > WARNING of max then alert = WARNING
644
        If current > CRITICAL of max then alert = CRITICAL
645
646
        If highlight=True than 0.0 is highlighted
647
648
        If defined 'header' is added between the plugin name and the status.
649
        Only useful for stats with several alert status.
650
651
        If defined, 'action_key' define the key for the actions.
652
        By default, the action_key is equal to the header.
653
654
        If log=True than add log if necessary
655
        elif log=False than do not log
656
        elif log=None than apply the config given in the conf file
657
        """
658
        # Manage 0 (0.0) value if highlight_zero is not True
659
        if not highlight_zero and current == 0:
660
            return 'DEFAULT'
661
662
        # Compute the %
663
        try:
664
            value = (current * 100) / maximum
665
        except ZeroDivisionError:
666
            return 'DEFAULT'
667
        except TypeError:
668
            return 'DEFAULT'
669
670
        # Build the stat_name
671
        stat_name = self.get_stat_name(header=header).lower()
672
673
        # Manage limits
674
        # If is_max is set then default style is set to MAX else default is set to OK
675
        ret = 'MAX' if is_max else 'OK'
676
677
        # Iter through limits
678
        critical = self.get_limit('critical', stat_name=stat_name)
679
        warning = self.get_limit('warning', stat_name=stat_name)
680
        careful = self.get_limit('careful', stat_name=stat_name)
681
        if critical and value >= critical:
682
            ret = 'CRITICAL'
683
        elif warning and value >= warning:
684
            ret = 'WARNING'
685
        elif careful and value >= careful:
686
            ret = 'CAREFUL'
687
        elif not careful and not warning and not critical:
688
            ret = 'DEFAULT'
689
        else:
690
            ret = 'OK'
691
692
        if current < minimum:
693
            ret = 'CAREFUL'
694
695
        # Manage log
696
        log_str = ""
697
        if self.get_limit_log(stat_name=stat_name, default_action=log):
698
            # Add _LOG to the return string
699
            # So stats will be highlighted with a specific color
700
            log_str = "_LOG"
701
            # Add the log to the events list
702
            glances_events.add(ret, stat_name.upper(), value)
703
704
        # Manage threshold
705
        self.manage_threshold(stat_name, ret)
706
707
        # Manage action
708
        self.manage_action(stat_name, ret.lower(), header, action_key)
709
710
        # Default is 'OK'
711
        return ret + log_str
712
713
    def filter_stats(self, stats):
714
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
715
        if hasattr(stats, '_asdict'):
716
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
717
        if isinstance(stats, dict):
718
            return {k: v for k, v in stats.items() if k in self.fields_description}
719
        if isinstance(stats, list):
720
            return [self.filter_stats(s) for s in stats]
721
        return stats
722
723
    def manage_threshold(self, stat_name, trigger):
724
        """Manage the threshold for the current stat."""
725
        glances_thresholds.add(stat_name, trigger)
726
727
    def manage_action(self, stat_name, trigger, header, action_key):
728
        """Manage the action for the current stat."""
729
        # Here is a command line for the current trigger ?
730
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
731
        if not command and not repeat:
732
            # Reset the trigger
733
            self.actions.set(stat_name, trigger)
734
        else:
735
            # Define the action key for the stats dict
736
            # If not define, then it sets to header
737
            if action_key is None:
738
                action_key = header
739
740
            # A command line is available for the current alert
741
            # 1) Build the {{mustache}} dictionary
742
            if isinstance(self.get_stats_action(), list):
743
                # If the stats are stored in a list of dict (fs plugin for example)
744
                # Return the dict for the current header
745
                mustache_dict = {}
746
                for item in self.get_stats_action():
747
                    if item[self.get_key()] == action_key:
748
                        mustache_dict = item
749
                        break
750
            else:
751
                # Use the stats dict
752
                mustache_dict = self.get_stats_action()
753
            # 2) Run the action
754
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
755
756
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
757
        """Get the alert log."""
758
        return self.get_alert(
759
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
760
        )
761
762
    def is_limit(self, criticality, stat_name=""):
763
        """Return true if the criticality limit exist for the given stat_name"""
764
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
765
766
    def get_limit(self, criticality=None, stat_name=""):
767
        """Return the limit value for the given criticality.
768
        If criticality is None, return the dict of all the limits."""
769
        if criticality is None:
770
            return self._limits
771
772
        # Get the limit for stat + header
773
        # Example: network_wlan0_rx_careful
774
        stat_name = stat_name.lower()
775
        if stat_name + '_' + criticality in self._limits:
776
            return self._limits[stat_name + '_' + criticality]
777
        if self.plugin_name + '_' + criticality in self._limits:
778
            return self._limits[self.plugin_name + '_' + criticality]
779
780
        return None
781
782
    def get_limit_action(self, criticality, stat_name=""):
783
        """Return the tuple (action, repeat) for the alert.
784
785
        - action is a command line
786
        - repeat is a bool
787
        """
788
        # Get the action for stat + header
789
        # Example: network_wlan0_rx_careful_action
790
        # Action key available ?
791
        ret = [
792
            (stat_name + '_' + criticality + '_action', False),
793
            (stat_name + '_' + criticality + '_action_repeat', True),
794
            (self.plugin_name + '_' + criticality + '_action', False),
795
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
796
        ]
797
        for r in ret:
798
            if r[0] in self._limits:
799
                return self._limits[r[0]], r[1]
800
801
        # No key found, return None
802
        return None, None
803
804
    def get_limit_log(self, stat_name, default_action=False):
805
        """Return the log tag for the alert."""
806
        # Get the log tag for stat + header
807
        # Example: network_wlan0_rx_log
808
        if stat_name + '_log' in self._limits:
809
            return self._limits[stat_name + '_log'][0].lower() == 'true'
810
        if self.plugin_name + '_log' in self._limits:
811
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
812
        return default_action
813
814
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
815
        """Return the configuration (header_) value for the current plugin.
816
817
        ...or the one given by the plugin_name var.
818
        """
819
        if plugin_name is None:
820
            # If not default use the current plugin name
821
            plugin_name = self.plugin_name
822
823
        if header != "":
824
            # Add the header
825
            plugin_name = plugin_name + '_' + header
826
827
        try:
828
            return self._limits[plugin_name + '_' + value]
829
        except KeyError:
830
            return default
831
832
    def is_show(self, value, header=""):
833
        """Return True if the value is in the show configuration list.
834
835
        If the show value is empty, return True (show by default)
836
837
        The show configuration list is defined in the glances.conf file.
838
        It is a comma-separated list of regexp.
839
        Example for diskio:
840
        show=sda.*
841
        """
842
        # TODO: possible optimisation: create a re.compile list
843
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
844
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
845
846
    def is_hide(self, value, header=""):
847
        """Return True if the value is in the hide configuration list.
848
849
        The hide configuration list is defined in the glances.conf file.
850
        It is a comma-separated list of regexp.
851
        Example for diskio:
852
        hide=sda2,sda5,loop.*
853
        """
854
        # TODO: possible optimisation: create a re.compile list
855
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
856
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
857
858
    def is_display(self, value, header=""):
859
        """Return True if the value should be displayed in the UI"""
860
        if self.get_conf_value('show', header=header) != []:
861
            return self.is_show(value, header=header)
862
        return not self.is_hide(value, header=header)
863
864
    def is_display_any(self, *values, header=""):
865
        """Return True if any of the values should be displayed in the UI"""
866
        if self.get_conf_value('show', header=header) != []:
867
            return any(self.is_show(value, header=header) for value in values)
868
        return not any(self.is_hide(value, header=header) for value in values)
869
870
    def read_alias(self):
871
        if self.plugin_name + '_' + 'alias' in self._limits:
872
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
873
        return {}
874
875
    def has_alias(self, header):
876
        """Return the alias name for the relative header it it exists otherwise None."""
877
        return self.alias.get(header, None)
878
879
    def msg_curse(self, args=None, max_width=None):
880
        """Return default string to display in the curse interface."""
881
        return [self.curse_add_line(str(self.stats))]
882
883
    def get_stats_display(self, args=None, max_width=None):
884
        """Return a dict with all the information needed to display the stat.
885
886
        key     | description
887
        ----------------------------
888
        display | Display the stat (True or False)
889
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
890
        align   | Message position
891
        """
892
        display_curse = False
893
894
        if hasattr(self, 'display_curse'):
895
            display_curse = self.display_curse
896
        if hasattr(self, 'align'):
897
            align_curse = self._align
898
899
        if max_width is not None:
900
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 896 is False. Are you sure this can never be the case?
Loading history...
901
        else:
902
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
903
904
        return ret
905
906
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
907
        """Return a dict with.
908
909
        Where:
910
            msg: string
911
            decoration:
912
                DEFAULT: no decoration
913
                UNDERLINE: underline
914
                BOLD: bold
915
                TITLE: for stat title
916
                PROCESS: for process name
917
                STATUS: for process status
918
                CPU_TIME: for process cpu time
919
                OK: Value is OK and non logged
920
                OK_LOG: Value is OK and logged
921
                CAREFUL: Value is CAREFUL and non logged
922
                CAREFUL_LOG: Value is CAREFUL and logged
923
                WARNING: Value is WARNING and non logged
924
                WARNING_LOG: Value is WARNING and logged
925
                CRITICAL: Value is CRITICAL and non logged
926
                CRITICAL_LOG: Value is CRITICAL and logged
927
            optional: True if the stat is optional (display only if space is available)
928
            additional: True if the stat is additional (display only if space is available after optional)
929
            spittable: Line can be split to fit on the screen (default is not)
930
        """
931
        return {
932
            'msg': msg,
933
            'decoration': decoration,
934
            'optional': optional,
935
            'additional': additional,
936
            'splittable': splittable,
937
        }
938
939
    def curse_new_line(self):
940
        """Go to a new line."""
941
        return self.curse_add_line('\n')
942
943
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
944
        """Return a list of dict messages with the 'key: value' result
945
946
          <=== width ===>
947
        __key     : 80.5%__
948
        | |       | |    |_ trailer
949
        | |       | |_ self.stats[key]
950
        | |       |_ separator
951
        | |_ 'short_name' description or key or nothing if display_key is True
952
        |_ header
953
954
        Instead of:
955
            msg = '  {:8}'.format('idle:')
956
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
957
            msg = '{:5.1f}%'.format(self.stats['idle'])
958
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
959
960
        Use:
961
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
962
963
        """
964
        if key not in self.stats:
965
            return []
966
967
        # Check if a shortname is defined
968
        if not display_key:
969
            key_name = ''
970
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
971
            key_name = self.fields_description[key]['short_name']
972
        else:
973
            key_name = key
974
975
        # Check if unit is defined and get the short unit char in the unit_sort dict
976
        if (
977
            key in self.fields_description
978
            and 'unit' in self.fields_description[key]
979
            and self.fields_description[key]['unit'] in fields_unit_short
980
        ):
981
            # Get the shortname
982
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
983
        else:
984
            unit_short = ''
985
986
        # Check if unit is defined and get the unit type unit_type dict
987
        if (
988
            key in self.fields_description
989
            and 'unit' in self.fields_description[key]
990
            and self.fields_description[key]['unit'] in fields_unit_type
991
        ):
992
            # Get the shortname
993
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
994
        else:
995
            unit_type = 'float'
996
997
        # Is it a rate ? Yes, get the pre-computed rate value
998
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
999
            value = self.stats.get(key + '_rate_per_sec', None)
1000
        else:
1001
            value = self.stats.get(key, None)
1002
1003
        if width is None:
1004
            msg_item = header + f'{key_name}' + separator
1005
            msg_template_float = '{:.1f}{}'
1006
            msg_template = '{}{}'
1007
        else:
1008
            # Define the size of the message
1009
            # item will be on the left
1010
            # value will be on the right
1011
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1012
            msg_template_float = '{:5.1f}{}'
1013
            msg_template = '{:>5}{}'
1014
1015
        if value is None:
1016
            msg_value = msg_template.format('-', '')
1017
        elif unit_type == 'float':
1018
            msg_value = msg_template_float.format(value, unit_short)
1019
        elif 'min_symbol' in self.fields_description[key]:
1020
            msg_value = msg_template.format(
1021
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1022
            )
1023
        else:
1024
            msg_value = msg_template.format(int(value), unit_short)
1025
1026
        # Add the trailer
1027
        msg_value = msg_value + trailer
1028
1029
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1030
        optional = self.get_views(key=key, option='optional')
1031
1032
        return [
1033
            self.curse_add_line(msg_item, optional=optional),
1034
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1035
        ]
1036
1037
    @property
1038
    def align(self):
1039
        """Get the curse align."""
1040
        return self._align
1041
1042
    @align.setter
1043
    def align(self, value):
1044
        """Set the curse align.
1045
1046
        value: left, right, bottom.
1047
        """
1048
        self._align = value
1049
1050
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1051
        """Make a nice human-readable string out of number.
1052
1053
        Number of decimal places increases as quantity approaches 1.
1054
        CASE: 613421788        RESULT:       585M low_precision:       585M
1055
        CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
1056
        CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
1057
        CASE: 838471403472     RESULT:       781G low_precision:       781G
1058
        CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
1059
        CASE: 1073741824       RESULT:      1024M low_precision:      1024M
1060
        CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
1061
1062
        :low_precision: returns less decimal places potentially (default is False)
1063
                        sacrificing precision for more readability.
1064
        :min_symbol: Do not approach if number < min_symbol (default is K)
1065
        :decimal_count: if set, force the number of decimal number (default is None)
1066
        """
1067
        if number is None:
1068
            return none_symbol
1069
        symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
1070
        if min_symbol in symbols:
1071
            symbols = symbols[symbols.index(min_symbol) :]
1072
        prefix = {
1073
            'Y': 1208925819614629174706176,
1074
            'Z': 1180591620717411303424,
1075
            'E': 1152921504606846976,
1076
            'P': 1125899906842624,
1077
            'T': 1099511627776,
1078
            'G': 1073741824,
1079
            'M': 1048576,
1080
            'K': 1024,
1081
        }
1082
1083
        if number == 0:
1084
            # Avoid 0.0
1085
            return '0'
1086
1087
        for symbol in reversed(symbols):
1088
            value = float(number) / prefix[symbol]
1089
            if value > 1:
1090
                decimal_precision = 0
1091
                if value < 10:
1092
                    decimal_precision = 2
1093
                elif value < 100:
1094
                    decimal_precision = 1
1095
                if low_precision:
1096
                    if symbol in 'MK':
1097
                        decimal_precision = 0
1098
                    else:
1099
                        decimal_precision = min(1, decimal_precision)
1100
                elif symbol in 'K':
1101
                    decimal_precision = 0
1102
                return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
1103
        return f'{number!s}'
1104
1105
    def trend_msg(self, trend, significant=1):
1106
        """Return the trend message.
1107
1108
        Do not take into account if trend < significant
1109
        """
1110
        ret = '-'
1111
        if trend is None:
1112
            ret = ' '
1113
        elif trend > significant:
1114
            ret = unicode_message('ARROW_UP', self.args)
1115
        elif trend < -significant:
1116
            ret = unicode_message('ARROW_DOWN', self.args)
1117
        return ret
1118
1119
    def _check_decorator(fct):
1120
        """Check decorator for update method.
1121
1122
        It checks:
1123
        - if the plugin is enabled.
1124
        - if the refresh_timer is finished
1125
        """
1126
1127
        def wrapper(self, *args, **kw):
1128
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1129
                # Run the method
1130
                ret = fct(self, *args, **kw)
1131
                # Reset the timer
1132
                self.refresh_timer.set(self.get_refresh())
1133
                self.refresh_timer.reset()
1134
            else:
1135
                # No need to call the method
1136
                # Return the last result available
1137
                ret = self.stats
1138
            return ret
1139
1140
        return wrapper
1141
1142 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1143
        """Log (DEBUG) the result of the function fct."""
1144
1145
        def wrapper(*args, **kw):
1146
            counter = Counter()
1147
            ret = fct(*args, **kw)
1148
            duration = counter.get()
1149
            class_name = args[0].__class__.__name__
1150
            class_module = args[0].__class__.__module__
1151
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1152
            return ret
1153
1154
        return wrapper
1155
1156
    def _manage_rate(fct):
1157
        """Manage rate decorator for update method."""
1158
1159
        def compute_rate(self, stat, stat_previous):
1160
            if stat_previous is None:
1161
                return stat
1162
1163
            # 1) set _gauge for all the rate fields
1164
            # 2) compute the _rate_per_sec
1165
            # 3) set the original field to the delta between the current and the previous value
1166
            for field in self.fields_description:
1167
                # For all the field with the rate=True flag
1168
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1169
                if self.fields_description[field].get('rate', False):
1170
                    # Create a new metadata with the gauge
1171
                    stat['time_since_update'] = self.time_since_last_update
1172
                    stat[field + '_gauge'] = stat[field]
1173
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1174
                        # The stat becomes the delta between the current and the previous value
1175
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1176
                        # Compute the rate
1177
                        if self.time_since_last_update > 0:
1178
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1179
                        else:
1180
                            stat[field] = None
1181
                            stat[field + '_rate_per_sec'] = None
1182
                    else:
1183
                        # Avoid strange rate at the first run
1184
                        stat[field] = None
1185
                        stat[field + '_rate_per_sec'] = None
1186
            return stat
1187
1188
        def compute_rate_on_list(self, stats, stats_previous):
1189
            if stats_previous is None:
1190
                return stats
1191
1192
            for stat in stats:
1193
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1194
                if len(olds) == 1:
1195
                    compute_rate(self, stat, olds[0])
1196
            return stats
1197
1198
        def wrapper(self, *args, **kw):
1199
            # Call the father method
1200
            stats = fct(self, *args, **kw)
1201
1202
            # Get the time since the last update
1203
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1204
1205
            # Compute the rate
1206
            if isinstance(stats, dict):
1207
                # Stats is a dict
1208
                compute_rate(self, stats, self.stats_previous)
1209
            elif isinstance(stats, list):
1210
                # Stats is a list
1211
                compute_rate_on_list(self, stats, self.stats_previous)
1212
1213
            # Memorized the current stats for next run
1214
            self.stats_previous = copy.deepcopy(stats)
1215
1216
            return stats
1217
1218
        return wrapper
1219
1220
    # Mandatory to call the decorator in child classes
1221
    _check_decorator = staticmethod(_check_decorator)
1222
    _log_result_decorator = staticmethod(_log_result_decorator)
1223
    _manage_rate = staticmethod(_manage_rate)
1224