GlancesPluginModel.curse_add_stat()   F
last analyzed

Complexity

Conditions 18

Size

Total Lines 92
Code Lines 44

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 18
eloc 44
nop 7
dl 0
loc 92
rs 1.2
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.plugin.model.GlancesPluginModel.curse_add_stat() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import (
21
    auto_unit,
22
    dictlist,
23
    dictlist_json_dumps,
24
    json_dumps,
25
    list_to_dict,
26
    listkeys,
27
    mean,
28
    nativestr,
29
)
30
from glances.history import GlancesHistory
31
from glances.logger import logger
32
from glances.outputs.glances_unicode import unicode_message
33
from glances.thresholds import glances_thresholds
34
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
35
36
fields_unit_short = {'percent': '%'}
37
38
fields_unit_type = {
39
    'percent': 'float',
40
    'percents': 'float',
41
    'number': 'int',
42
    'numbers': 'int',
43
    'int': 'int',
44
    'ints': 'int',
45
    'float': 'float',
46
    'floats': 'float',
47
    'second': 'int',
48
    'seconds': 'int',
49
    'byte': 'int',
50
    'bytes': 'int',
51
}
52
53
54
class GlancesPluginModel:
55
    """Main class for Glances plugin model."""
56
57
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
58
        """Init the plugin of plugins model class.
59
60
        All Glances' plugins model should inherit from this class. Most of the
61
        methods are already implemented in the father classes.
62
63
        Your plugin should return a dict or a list of dicts (stored in the
64
        self.stats). As an example, you can have a look on the mem plugin
65
        (for dict) or network (for list of dicts).
66
67
        From version 4 of the API, the plugin should return a dict.
68
69
        A plugin should implement:
70
        - the reset method: to set your self.stats variable to {} or []
71
        - the update method: where your self.stats variable is set
72
        and optionally:
73
        - the get_key method: set the key of the dict (only for list of dict)
74
        - all others methods you want to overwrite
75
76
        :args: args parameters
77
        :config: configuration parameters
78
        :items_history_list: list of items to store in the history
79
        :stats_init_value: Default value for a stats item
80
        """
81
        # Build the plugin name
82
        # Internal or external module (former prefixed by 'glances.plugins')
83
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
84
        self.plugin_name = _mod.split('.')[0]
85
86
        if self.plugin_name.startswith('glances_'):
87
            self.plugin_name = self.plugin_name.split('glances_')[1]
88
        logger.debug(f"Init {self.plugin_name} plugin")
89
90
        # Init the args
91
        self.args = args
92
93
        # Init the default alignment (for curses)
94
        self._align = 'left'
95
96
        # Init the input method
97
        self._input_method = 'local'
98
        self._short_system_name = None
99
100
        # Init the history list
101
        self.items_history_list = items_history_list
102
        self.stats_history = self.init_stats_history()
103
104
        # Init the limits (configuration keys) dictionary
105
        self._limits = {}
106
        if config is not None:
107
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
108
            self.load_limits(config=config)
109
110
        # Init the alias (dictionary)
111
        self.alias = self.read_alias()
112
113
        # Init the actions
114
        self.actions = GlancesActions(args=args)
115
116
        # Init the views
117
        self.views = {}
118
119
        # Hide stats if all the hide_zero_fields has never been != 0
120
        # Default is False, always display stats
121
        self.hide_zero = False
122
        # The threshold needed to display a value if hide_zero is true.
123
        # Only hide a value if it is less than hide_threshold_bytes.
124
        self.hide_threshold_bytes = 0
125
        self.hide_zero_fields = []
126
127
        # Set the initial refresh time to display stats the first time
128
        self.refresh_timer = Timer(0)
129
130
        # Init stats description
131
        self.fields_description = fields_description
132
133
        # Init the stats
134
        self.stats_init_value = stats_init_value
135
        self.time_since_last_update = None
136
        self.stats = None
137
        self.stats_previous = None
138
        self.reset()
139
140
    def __str__(self):
141
        """Return the human-readable stats."""
142
        return str(self.stats)
143
144
    def __repr__(self):
145
        """Return the raw stats."""
146
        if isinstance(self.stats, list):
147
            return str(list_to_dict(self.stats))
148
        return str(self.stats)
149
150
    def __getitem__(self, item):
151
        """Return the stats item."""
152
        if isinstance(self.stats, dict) and item in self.stats:
153
            return self.stats[item]
154
155
        if isinstance(self.stats, list):
156
            ltd = list_to_dict(self.stats)
157
            if item in ltd:
158
                return ltd[item]
159
160
        raise KeyError(f"'{self.__class__.__name__}' object has no key '{item}'")
161
162
    def keys(self):
163
        """Return the keys of the stats."""
164
        if isinstance(self.stats, dict):
165
            return listkeys(self.stats)
166
        if isinstance(self.stats, list):
167
            return listkeys(list_to_dict(self.stats))
168
        return []
169
170
    def get(self, item, default=None):
171
        """Return the stats item or default if not found."""
172
        try:
173
            return self[item]
174
        except KeyError:
175
            return default
176
177
    def get_init_value(self):
178
        """Return a copy of the init value."""
179
        return copy.copy(self.stats_init_value)
180
181
    def reset(self):
182
        """Reset the stats.
183
184
        This method should be overwritten by child classes.
185
        """
186
        self.stats = self.get_init_value()
187
188
    def exit(self):
189
        """Just log an event when Glances exit."""
190
        logger.debug(f"Stop the {self.plugin_name} plugin")
191
192
    def get_key(self):
193
        """Return the key of the list."""
194
        return
195
196
    def is_enabled(self, plugin_name=None):
197
        """Return true if plugin is enabled."""
198
        if not plugin_name:
199
            plugin_name = self.plugin_name
200
        try:
201
            d = getattr(self.args, 'disable_' + plugin_name)
202
        except AttributeError:
203
            d = getattr(self.args, 'enable_' + plugin_name, True)
204
        return d is False
205
206
    def is_disabled(self, plugin_name=None):
207
        """Return true if plugin is disabled."""
208
        return not self.is_enabled(plugin_name=plugin_name)
209
210
    def history_enable(self):
211
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
212
213
    def init_stats_history(self):
214
        """Init the stats history (dict of GlancesAttribute)."""
215
        if self.history_enable():
216
            init_list = [a['name'] for a in self.get_items_history_list()]
217
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
218
        return GlancesHistory()
219
220
    def reset_stats_history(self):
221
        """Reset the stats history (dict of GlancesAttribute)."""
222
        if self.history_enable():
223
            reset_list = [a['name'] for a in self.get_items_history_list()]
224
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
225
            self.stats_history.reset()
226
227
    def update_stats_history(self):
228
        """Update stats history."""
229
        # Exit if no history
230
        if not self.history_enable():
231
            return
232
        # Build the history
233
        _get_export = self.get_export()
234
        if not (_get_export and self.history_enable()):
235
            return
236
        # Itern through items history
237
        item_name = '' if self.get_key() is None else self.get_key()
238
        for i in self.get_items_history_list():
239
            if isinstance(_get_export, list):
240
                # Stats is a list of data
241
                # Iter through stats (for example, iter through network interface)
242
                for l_export in _get_export:
243
                    if i['name'] in l_export:
244
                        self.stats_history.add(
245
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
246
                            l_export[i['name']],
247
                            description=i['description'],
248
                            history_max_size=self._limits['history_size'],
249
                        )
250
            else:
251
                # Stats is not a list
252
                # Add the item to the history directly
253
                self.stats_history.add(
254
                    nativestr(i['name']),
255
                    _get_export[i['name']],
256
                    description=i['description'],
257
                    history_max_size=self._limits['history_size'],
258
                )
259
260
    def get_items_history_list(self):
261
        """Return the items history list."""
262
        return self.items_history_list
263
264
    def get_raw_history(self, item=None, nb=0):
265
        """Return the history (RAW format).
266
267
        - the stats history (dict of list) if item is None
268
        - the stats history for the given item (list) instead
269
        - None if item did not exist in the history
270
        """
271
        s = self.stats_history.get(nb=nb)
272
        if item is None:
273
            return s
274
        if item in s:
275
            return s[item]
276
        return None
277
278
    def get_export_history(self, item=None):
279
        """Return the stats history object to export."""
280
        return self.get_raw_history(item=item)
281
282
    def get_stats_history(self, item=None, nb=0):
283
        """Return the stats history (JSON format)."""
284
        s = self.stats_history.get_json(nb=nb)
285
286
        if item is None:
287
            return json_dumps(s)
288
289
        return dictlist_json_dumps(s, item)
290
291
    def get_trend(self, item, nb=30):
292
        """Get the trend regarding to the last nb values.
293
294
        The trend is the diffirence between the mean of the last 0 to nb / 2
295
        and nb / 2 to nb values.
296
        """
297
        raw_history = self.get_raw_history(item=item, nb=nb)
298
        if raw_history is None or len(raw_history) < nb:
299
            return None
300
        last_nb = [v[1] for v in raw_history]
301
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
302
303
    @property
304
    def input_method(self):
305
        """Get the input method."""
306
        return self._input_method
307
308
    @input_method.setter
309
    def input_method(self, input_method):
310
        """Set the input method.
311
312
        * local: system local grab (psutil or direct access)
313
        * snmp: Client server mode via SNMP
314
        * glances: Client server mode via Glances API
315
        """
316
        self._input_method = input_method
317
318
    @property
319
    def short_system_name(self):
320
        """Get the short detected OS name (SNMP)."""
321
        return self._short_system_name
322
323
    def sorted_stats(self):
324
        """Get the stats sorted by an alias (if present) or key."""
325
        key = self.get_key()
326
        if key is None:
327
            return self.stats
328
        try:
329
            return sorted(
330
                self.stats,
331
                key=lambda stat: tuple(
332
                    int(part) if part.isdigit() else part.lower()
333
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
334
                ),
335
            )
336
        except TypeError:
337
            # Correct "Starting an alias with a number causes a crash #1885"
338
            return sorted(
339
                self.stats,
340
                key=lambda stat: tuple(
341
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
342
                ),
343
            )
344
345
    @short_system_name.setter
346
    def short_system_name(self, short_name):
347
        """Set the short detected OS name (SNMP)."""
348
        self._short_system_name = short_name
349
350
    def set_stats(self, input_stats):
351
        """Set the stats to input_stats."""
352
        self.stats = input_stats
353
354
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
355
        """Update stats using SNMP.
356
357
        If bulk=True, use a bulk request instead of a get request.
358
        """
359
        snmp_oid = snmp_oid or {}
360
361
        from glances.snmp import GlancesSNMPClient
362
363
        # Init the SNMP request
364
        snmp_client = GlancesSNMPClient(
365
            host=self.args.client,
366
            port=self.args.snmp_port,
367
            version=self.args.snmp_version,
368
            community=self.args.snmp_community,
369
        )
370
371
        # Process the SNMP request
372
        ret = {}
373
        if bulk:
374
            # Bulk request
375
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(snmp_oid.values()))
376
            logger.info(snmp_result)
377
            if len(snmp_oid) == 1:
378
                # Bulk command for only one OID
379
                # Note: key is the item indexed but the OID result
380
                for item in snmp_result:
381
                    if item.keys()[0].startswith(snmp_oid.values()[0]):
382
                        ret[snmp_oid.keys()[0] + item.keys()[0].split(snmp_oid.values()[0])[1]] = item.values()[0]
383
            else:
384
                # Build the internal dict with the SNMP result
385
                # Note: key is the first item in the snmp_oid
386
                index = 1
387
                for item in snmp_result:
388
                    item_stats = {}
389
                    item_key = None
390
                    for key in snmp_oid:
391
                        oid = snmp_oid[key] + '.' + str(index)
392
                        if oid in item:
393
                            if item_key is None:
394
                                item_key = item[oid]
395
                            else:
396
                                item_stats[key] = item[oid]
397
                    if item_stats:
398
                        ret[item_key] = item_stats
399
                    index += 1
400
        else:
401
            # Simple get request
402
            snmp_result = snmp_client.get_by_oid(*list(snmp_oid.values()))
403
404
            # Build the internal dict with the SNMP result
405
            for key in snmp_oid:
406
                ret[key] = snmp_result[snmp_oid[key]]
407
408
        return ret
409
410
    def get_raw(self):
411
        """Return the stats object."""
412
        return self.stats
413
414
    def get_export(self):
415
        """Return the stats object to export.
416
        By default, return the raw stats.
417
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
418
        """
419
        return self.get_raw()
420
421
    def get_stats(self):
422
        """Return the stats object in JSON format."""
423
        return json_dumps(self.get_raw())
424
425
    def get_json(self):
426
        """Return the stats object in JSON format."""
427
        return self.get_stats()
428
429
    def get_raw_stats_item(self, item):
430
        """Return the stats object for a specific item in RAW format.
431
432
        Stats should be a list of dict (processlist, network...)
433
        """
434
        return dictlist(self.get_raw(), item)
435
436
    def get_raw_stats_key(self, item, key):
437
        """Return the stats object for a specific item in RAW format.
438
439
        Stats should be a list of dict (processlist, network...)
440
        """
441
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
442
443
    def get_stats_item(self, item):
444
        """Return the stats object for a specific item in JSON format.
445
446
        Stats should be a list of dict (processlist, network...)
447
        """
448
        return dictlist_json_dumps(self.get_raw(), item)
449
450
    def get_raw_stats_value(self, item, value):
451
        """Return the stats object for a specific item=value.
452
453
        Return None if the item=value does not exist
454
        Return None if the item is not a list of dict
455
        """
456
        if not isinstance(self.get_raw(), list):
457
            return None
458
459
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
460
            value = int(value)
461
        try:
462
            return {value: [i for i in self.get_raw() if i[item] == value]}
463
        except (KeyError, ValueError) as e:
464
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
465
            return None
466
467
    def get_stats_value(self, item, value):
468
        """Return the stats object for a specific item=value in JSON format.
469
470
        Stats should be a list of dict (processlist, network...)
471
        """
472
        rsv = self.get_raw_stats_value(item, value)
473
        if rsv is None:
474
            return None
475
        return json_dumps(rsv)
476
477
    def get_item_info(self, item, key, default=None):
478
        """Return the item info grabbed into self.fields_description."""
479
        if self.fields_description is None or item not in self.fields_description:
480
            return default
481
        return self.fields_description[item].get(key, default)
482
483
    def _build_field_decoration(self, field):
484
        """Return the field decoration.
485
486
        The decoration is used to display the field in the UI.
487
        """
488
        # Manage the decoration
489
        if self.fields_description and field in self.fields_description:
490
            if (
491
                self.fields_description[field].get('rate') is True
492
                and isinstance(self.stats, dict)
493
                and self.stats.get('time_since_update', 0) == 0
494
            ):
495
                return 'DEFAULT'
496
            if self.fields_description[field].get('log') is True:
497
                return self.get_alert_log(self.stats[field], header=field)
498
            if self.fields_description[field].get('alert') is True:
499
                return self.get_alert(self.stats[field], header=field)
500
        return 'DEFAULT'
501
502
    def _build_field_optional(self, field):
503
        """Return true if the field is optional."""
504
        if self.fields_description and field in self.fields_description:
505
            return self.fields_description[field].get('optional', False)
506
        return False
507
508
    def _build_view_for_field(self, field):
509
        value = {
510
            'decoration': self._build_field_decoration(field),
511
            'optional': self._build_field_optional(field),
512
            'additional': False,
513
            'splittable': False,
514
            'hidden': False,
515
        }
516
517
        # Manage the hidden feature
518
        # Allow to automatically hide fields when values is never different than 0
519
        # Refactoring done for #2929
520
        if not self.hide_zero:
521
            value['hidden'] = False
522
        elif field in self.views and 'hidden' in self.views[field]:
523
            value['hidden'] = self.views[field]['hidden']
524
            if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
525
                value['hidden'] = False
526
        else:
527
            value['hidden'] = field in self.hide_zero_fields
528
529
        return value
530
531
    def update_views(self):
532
        """Update the stats views.
533
534
        The V of MVC
535
        A dict of dict with the needed information to display the stats.
536
        Example for the stat xxx:
537
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
538
                'optional': False,        >>> Is the stat optional
539
                'additional': False,      >>> Is the stat provide additional information
540
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
541
                'hidden': False}          >>> Is the stats should be hidden in the UI
542
        """
543
        ret = {}
544
545
        if self.get_raw() is not None and isinstance(self.get_raw(), list) and self.get_key() is not None:
546
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
547
            for i in self.get_raw():
548
                key = i[self.get_key()]
549
                ret[key] = {}
550
                for field in listkeys(i):
551
                    ret[key][field] = self._build_view_for_field(field)
552
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
553
            # Stats are stored in a dict (ex: CPU, LOAD...)
554
            for field in listkeys(self.get_raw()):
555
                ret[field] = self._build_view_for_field(field)
556
557
        self.views = ret
558
559
        return self.views
560
561
    def set_views(self, input_views):
562
        """Set the views to input_views."""
563
        self.views = input_views
564
565
    def reset_views(self):
566
        """Reset the views to input_views."""
567
        self.views = {}
568
569
    def get_views(self, item=None, key=None, option=None):
570
        """Return the views object.
571
572
        If key is None, return all the view for the current plugin
573
        else if option is None return the view for the specific key (all option)
574
        else return the view of the specific key/option
575
576
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
577
        """
578
        if item is None:
579
            item_views = self.views
580
        else:
581
            item_views = self.views[item]
582
583
        if key is None:
584
            return item_views
585
        if key not in item_views:
586
            return 'DEFAULT'
587
        if option is None:
588
            return item_views[key]
589
        if option in item_views[key]:
590
            return item_views[key][option]
591
        return 'DEFAULT'
592
593
    def get_json_views(self, item=None, key=None, option=None):
594
        """Return the views (in JSON)."""
595
        return json_dumps(self.get_views(item, key, option))
596
597
    def load_limits(self, config):
598
        """Load limits from the configuration file, if it exists."""
599
        # By default set the history length to 3 points per second during one day
600
        self._limits['history_size'] = 28800
601
602
        if not hasattr(config, 'has_section'):
603
            return False
604
605
        # Read the global section
606
        # TODO: not optimized because this section is loaded for each plugin...
607
        if config.has_section('global'):
608
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
609
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
610
611
        # Read the plugin specific section
612
        if config.has_section(self.plugin_name):
613
            for level, _ in config.items(self.plugin_name):
614
                # Read limits
615
                limit = '_'.join([self.plugin_name, level])
616
                try:
617
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
618
                except ValueError:
619
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
620
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
621
622
        return True
623
624
    @property
625
    def limits(self):
626
        """Return the limits object."""
627
        return self._limits
628
629
    @limits.setter
630
    def limits(self, input_limits):
631
        """Set the limits to input_limits."""
632
        self._limits = input_limits
633
634
    def set_refresh(self, value):
635
        """Set the plugin refresh rate"""
636
        self.set_limits('refresh', value)
637
638
    def get_refresh(self):
639
        """Return the plugin refresh time"""
640
        ret = self.get_limits(item='refresh')
641
        if ret is None:
642
            ret = self.args.time if hasattr(self.args, 'time') else 2
643
        return ret
644
645
    def get_refresh_time(self):
646
        """Return the plugin refresh time"""
647
        return self.get_refresh()
648
649
    def set_limits(self, item, value):
650
        """Set the limits object."""
651
        self._limits[f'{self.plugin_name}_{item}'] = value
652
653
    def get_limits(self, item=None):
654
        """Return the limits object."""
655
        if item is None:
656
            return self._limits
657
        return self._limits.get(f'{self.plugin_name}_{item}', None)
658
659
    def get_stats_action(self):
660
        """Return stats for the action.
661
662
        By default return all the stats.
663
        Can be overwrite by plugins implementation.
664
        For example, Docker will return self.stats['containers']
665
        """
666
        return self.stats
667
668
    def get_stat_name(self, header=""):
669
        """Return the stat name with an optional header"""
670
        ret = self.plugin_name
671
        if header != '':
672
            ret += '_' + header
673
        return ret
674
675
    def get_alert(
676
        self,
677
        current=0,
678
        minimum=0,
679
        maximum=100,
680
        highlight_zero=True,
681
        is_max=False,
682
        header="",
683
        action_key=None,
684
        log=False,
685
    ):
686
        """Return the alert status relative to a current value.
687
688
        Use this function for minor stats.
689
690
        If current < CAREFUL of max then alert = OK
691
        If current > CAREFUL of max then alert = CAREFUL
692
        If current > WARNING of max then alert = WARNING
693
        If current > CRITICAL of max then alert = CRITICAL
694
695
        If highlight=True than 0.0 is highlighted
696
697
        If defined 'header' is added between the plugin name and the status.
698
        Only useful for stats with several alert status.
699
700
        If defined, 'action_key' define the key for the actions.
701
        By default, the action_key is equal to the header.
702
703
        If log=True than add log if necessary
704
        elif log=False than do not log
705
        elif log=None than apply the config given in the conf file
706
        """
707
        # Manage 0 (0.0) value if highlight_zero is not True
708
        if not highlight_zero and current == 0:
709
            return 'DEFAULT'
710
711
        # Compute the %
712
        try:
713
            value = (current * 100) / maximum
714
        except ZeroDivisionError:
715
            return 'DEFAULT'
716
        except TypeError:
717
            return 'DEFAULT'
718
719
        # Build the stat_name
720
        stat_name = self.get_stat_name(header=header).lower()
721
722
        # Manage limits
723
        # If is_max is set then default style is set to MAX else default is set to OK
724
        ret = 'MAX' if is_max else 'OK'
725
726
        # Iter through limits
727
        critical = self.get_limit('critical', stat_name=stat_name)
728
        warning = self.get_limit('warning', stat_name=stat_name)
729
        careful = self.get_limit('careful', stat_name=stat_name)
730
        if critical and value >= critical:
731
            ret = 'CRITICAL'
732
        elif warning and value >= warning:
733
            ret = 'WARNING'
734
        elif careful and value >= careful:
735
            ret = 'CAREFUL'
736
        elif not careful and not warning and not critical:
737
            ret = 'DEFAULT'
738
        else:
739
            ret = 'OK'
740
741
        if current < minimum:
742
            ret = 'CAREFUL'
743
744
        # Manage log
745
        log_str = ""
746
        if self.get_limit_log(stat_name=stat_name, default_action=log) and ret != 'DEFAULT':
747
            # Add _LOG to the return string
748
            # So stats will be highlighted with a specific color
749
            log_str = "_LOG"
750
            # Add the log to the events list
751
            glances_events.add(ret, stat_name.upper(), value)
752
753
        # Manage threshold
754
        self.manage_threshold(stat_name, ret)
755
756
        # Manage action
757
        self.manage_action(stat_name, ret.lower(), header, action_key)
758
759
        # Default is 'OK'
760
        return ret + log_str
761
762
    def filter_stats(self, stats):
763
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
764
        if hasattr(stats, '_asdict'):
765
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
766
        if isinstance(stats, dict):
767
            return {k: v for k, v in stats.items() if k in self.fields_description}
768
        if isinstance(stats, list):
769
            return [self.filter_stats(s) for s in stats]
770
        return stats
771
772
    def manage_threshold(self, stat_name, trigger):
773
        """Manage the threshold for the current stat."""
774
        glances_thresholds.add(stat_name, trigger)
775
776
    def manage_action(self, stat_name, trigger, header, action_key):
777
        """Manage the action for the current stat."""
778
        # Here is a command line for the current trigger ?
779
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
780
        if not command and not repeat:
781
            # Reset the trigger
782
            self.actions.set(stat_name, trigger)
783
        else:
784
            # Define the action key for the stats dict
785
            # If not define, then it sets to header
786
            if action_key is None:
787
                action_key = header
788
789
            # A command line is available for the current alert
790
            # 1) Build the {{mustache}} dictionary
791
            if isinstance(self.get_stats_action(), list):
792
                # If the stats are stored in a list of dict (fs plugin for example)
793
                # Return the dict for the current header
794
                mustache_dict = {}
795
                for item in self.get_stats_action():
796
                    if item[self.get_key()] == action_key:
797
                        mustache_dict = item
798
                        break
799
            else:
800
                # Use the stats dict
801
                mustache_dict = self.get_stats_action()
802
            # 2) Run the action
803
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
804
805
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
806
        """Get the alert log."""
807
        return self.get_alert(
808
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
809
        )
810
811
    def is_limit(self, criticality, stat_name=""):
812
        """Return true if the criticality limit exist for the given stat_name"""
813
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
814
815
    def get_limit(self, criticality=None, stat_name=""):
816
        """Return the limit value for the given criticality.
817
        If criticality is None, return the dict of all the limits."""
818
        if criticality is None:
819
            return self._limits
820
821
        # Get the limit for stat + header
822
        # Example: network_wlan0_rx_careful
823
        stat_name = stat_name.lower()
824
        if stat_name + '_' + criticality in self._limits:
825
            return self._limits[stat_name + '_' + criticality]
826
        if self.plugin_name + '_' + criticality in self._limits:
827
            return self._limits[self.plugin_name + '_' + criticality]
828
829
        return None
830
831
    def get_limit_action(self, criticality, stat_name=""):
832
        """Return the tuple (action, repeat) for the alert.
833
834
        - action is a command line
835
        - repeat is a bool
836
        """
837
        # Get the action for stat + header
838
        # Example: network_wlan0_rx_careful_action
839
        # Action key available ?
840
        ret = [
841
            (stat_name + '_' + criticality + '_action', False),
842
            (stat_name + '_' + criticality + '_action_repeat', True),
843
            (self.plugin_name + '_' + criticality + '_action', False),
844
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
845
        ]
846
        for r in ret:
847
            if r[0] in self._limits:
848
                return self._limits[r[0]], r[1]
849
850
        # No key found, return None
851
        return None, None
852
853
    def get_limit_log(self, stat_name, default_action=False):
854
        """Return the log tag for the alert."""
855
        # Get the log tag for stat + header
856
        # Example: network_wlan0_rx_log
857
        if stat_name + '_log' in self._limits:
858
            return self._limits[stat_name + '_log'][0].lower() == 'true'
859
        if self.plugin_name + '_log' in self._limits:
860
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
861
        return default_action
862
863
    def get_conf_value(self, value, header="", plugin_name=None, convert_bool=False, default=[]):
864
        """Return the configuration (header_) value for the current plugin.
865
866
        ...or the one given by the plugin_name var.
867
        """
868
        if plugin_name is None:
869
            # If not default use the current plugin name
870
            plugin_name = self.plugin_name
871
872
        if header != "":
873
            # Add the header
874
            plugin_name = plugin_name + '_' + header
875
876
        try:
877
            ret = self._limits[plugin_name + '_' + value]
878
            return bool(ret[0]) if convert_bool else ret
879
        except KeyError:
880
            return default
881
882
    def is_show(self, value, header=""):
883
        """Return True if the value is in the show configuration list.
884
885
        If the show value is empty, return True (show by default)
886
887
        The show configuration list is defined in the glances.conf file.
888
        It is a comma-separated list of regexp.
889
        Example for diskio:
890
        show=sda.*
891
        """
892
        # TODO: possible optimisation: create a re.compile list
893
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
894
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
895
896
    def is_hide(self, value, header=""):
897
        """Return True if the value is in the hide configuration list.
898
899
        The hide configuration list is defined in the glances.conf file.
900
        It is a comma-separated list of regexp.
901
        Example for diskio:
902
        hide=sda2,sda5,loop.*
903
        """
904
        # TODO: possible optimisation: create a re.compile list
905
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
906
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
907
908
    def is_display(self, value, header=""):
909
        """Return True if the value should be displayed in the UI"""
910
        if self.get_conf_value('show', header=header) != []:
911
            return self.is_show(value, header=header)
912
        return not self.is_hide(value, header=header)
913
914
    def is_display_any(self, *values, header=""):
915
        """Return True if any of the values should be displayed in the UI"""
916
        if self.get_conf_value('show', header=header) != []:
917
            return any(self.is_show(value, header=header) for value in values)
918
        return not any(self.is_hide(value, header=header) for value in values)
919
920
    def read_alias(self):
921
        if self.plugin_name + '_' + 'alias' in self._limits:
922
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
923
        return {}
924
925
    def has_alias(self, header):
926
        """Return the alias name for the relative header it it exists otherwise None."""
927
        return self.alias.get(header, None)
928
929
    def msg_curse(self, args=None, max_width=None):
930
        """Return default string to display in the curse interface."""
931
        return [self.curse_add_line(str(self.stats))]
932
933
    def get_stats_display(self, args=None, max_width=None):
934
        """Return a dict with all the information needed to display the stat.
935
936
        key     | description
937
        ----------------------------
938
        display | Display the stat (True or False)
939
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
940
        align   | Message position
941
        """
942
        display_curse = False
943
944
        if hasattr(self, 'display_curse'):
945
            display_curse = self.display_curse
946
        if hasattr(self, 'align'):
947
            align_curse = self._align
948
949
        if max_width is not None:
950
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 946 is False. Are you sure this can never be the case?
Loading history...
951
        else:
952
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
953
954
        return ret
955
956
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
957
        """Return a dict with.
958
959
        Where:
960
            msg: string
961
            decoration:
962
                DEFAULT: no decoration
963
                UNDERLINE: underline
964
                BOLD: bold
965
                TITLE: for stat title
966
                PROCESS: for process name
967
                STATUS: for process status
968
                CPU_TIME: for process cpu time
969
                OK: Value is OK and non logged
970
                OK_LOG: Value is OK and logged
971
                CAREFUL: Value is CAREFUL and non logged
972
                CAREFUL_LOG: Value is CAREFUL and logged
973
                WARNING: Value is WARNING and non logged
974
                WARNING_LOG: Value is WARNING and logged
975
                CRITICAL: Value is CRITICAL and non logged
976
                CRITICAL_LOG: Value is CRITICAL and logged
977
            optional: True if the stat is optional (display only if space is available)
978
            additional: True if the stat is additional (display only if space is available after optional)
979
            spittable: Line can be split to fit on the screen (default is not)
980
        """
981
        return {
982
            'msg': msg,
983
            'decoration': decoration,
984
            'optional': optional,
985
            'additional': additional,
986
            'splittable': splittable,
987
        }
988
989
    def curse_new_line(self):
990
        """Go to a new line."""
991
        return self.curse_add_line('\n')
992
993
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
994
        """Return a list of dict messages with the 'key: value' result
995
996
          <=== width ===>
997
        __key     : 80.5%__
998
        | |       | |    |_ trailer
999
        | |       | |_ self.stats[key]
1000
        | |       |_ separator
1001
        | |_ 'short_name' description or key or nothing if display_key is True
1002
        |_ header
1003
1004
        Instead of:
1005
            msg = '  {:8}'.format('idle:')
1006
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1007
            msg = '{:5.1f}%'.format(self.stats['idle'])
1008
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1009
1010
        Use:
1011
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
1012
1013
        """
1014
        if key not in self.stats:
1015
            return []
1016
1017
        # Check if a shortname is defined
1018
        if not display_key:
1019
            key_name = ''
1020
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
1021
            key_name = self.fields_description[key]['short_name']
1022
        else:
1023
            key_name = key
1024
1025
        # Check if unit is defined and get the short unit char in the unit_sort dict
1026
        if (
1027
            key in self.fields_description
1028
            and 'unit' in self.fields_description[key]
1029
            and self.fields_description[key]['unit'] in fields_unit_short
1030
        ):
1031
            # Get the shortname
1032
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
1033
        else:
1034
            unit_short = ''
1035
1036
        # Check if unit is defined and get the unit type unit_type dict
1037
        if (
1038
            key in self.fields_description
1039
            and 'unit' in self.fields_description[key]
1040
            and self.fields_description[key]['unit'] in fields_unit_type
1041
        ):
1042
            # Get the shortname
1043
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
1044
        else:
1045
            unit_type = 'float'
1046
1047
        # Is it a rate ? Yes, get the pre-computed rate value
1048
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
1049
            value = self.stats.get(key + '_rate_per_sec', None)
1050
        else:
1051
            value = self.stats.get(key, None)
1052
1053
        if width is None:
1054
            msg_item = header + f'{key_name}' + separator
1055
            msg_template_float = '{:.1f}{}'
1056
            msg_template = '{}{}'
1057
        else:
1058
            # Define the size of the message
1059
            # item will be on the left
1060
            # value will be on the right
1061
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1062
            msg_template_float = '{:5.1f}{}'
1063
            msg_template = '{:>5}{}'
1064
1065
        if value is None:
1066
            msg_value = msg_template.format('-', '')
1067
        elif unit_type == 'float':
1068
            msg_value = msg_template_float.format(value, unit_short)
1069
        elif 'min_symbol' in self.fields_description[key]:
1070
            msg_value = msg_template.format(
1071
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1072
            )
1073
        else:
1074
            msg_value = msg_template.format(int(value), unit_short)
1075
1076
        # Add the trailer
1077
        msg_value = msg_value + trailer
1078
1079
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1080
        optional = self.get_views(key=key, option='optional')
1081
1082
        return [
1083
            self.curse_add_line(msg_item, optional=optional),
1084
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1085
        ]
1086
1087
    @property
1088
    def align(self):
1089
        """Get the curse align."""
1090
        return self._align
1091
1092
    @align.setter
1093
    def align(self, value):
1094
        """Set the curse align.
1095
1096
        value: left, right, bottom.
1097
        """
1098
        self._align = value
1099
1100
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1101
        """Return a nice human-readable string out of number."""
1102
        return auto_unit(number, low_precision=low_precision, min_symbol=min_symbol, none_symbol=none_symbol)
1103
1104
    def trend_msg(self, trend, significant=1):
1105
        """Return the trend message.
1106
1107
        Do not take into account if trend < significant
1108
        """
1109
        ret = '-'
1110
        if trend is None:
1111
            ret = ' '
1112
        elif trend > significant:
1113
            ret = unicode_message('ARROW_UP', self.args)
1114
        elif trend < -significant:
1115
            ret = unicode_message('ARROW_DOWN', self.args)
1116
        return ret
1117
1118
    def _check_decorator(fct):
1119
        """Check decorator for update method.
1120
1121
        It checks:
1122
        - if the plugin is enabled.
1123
        - if the refresh_timer is finished
1124
        """
1125
1126
        def wrapper(self, *args, **kw):
1127
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1128
                # Run the method
1129
                ret = fct(self, *args, **kw)
1130
                # Reset the timer
1131
                self.refresh_timer.set(self.get_refresh())
1132
                self.refresh_timer.reset()
1133
            else:
1134
                # No need to call the method
1135
                # Return the last result available
1136
                ret = self.stats
1137
            return ret
1138
1139
        return wrapper
1140
1141 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1142
        """Log (DEBUG) the result of the function fct."""
1143
1144
        def wrapper(*args, **kw):
1145
            counter = Counter()
1146
            ret = fct(*args, **kw)
1147
            duration = counter.get()
1148
            class_name = args[0].__class__.__name__
1149
            class_module = args[0].__class__.__module__
1150
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1151
            return ret
1152
1153
        return wrapper
1154
1155
    def _manage_rate(fct):
1156
        """Manage rate decorator for update method."""
1157
1158
        def compute_rate(self, stat, stat_previous):
1159
            if stat_previous is None:
1160
                return stat
1161
1162
            # 1) set _gauge for all the rate fields
1163
            # 2) compute the _rate_per_sec
1164
            # 3) set the original field to the delta between the current and the previous value
1165
            for field in self.fields_description:
1166
                # For all the field with the rate=True flag
1167
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1168
                if self.fields_description[field].get('rate', False):
1169
                    # Create a new metadata with the gauge
1170
                    stat['time_since_update'] = self.time_since_last_update
1171
                    stat[field + '_gauge'] = stat[field]
1172
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1173
                        # The stat becomes the delta between the current and the previous value
1174
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1175
                        # Compute the rate
1176
                        if self.time_since_last_update > 0:
1177
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1178
                        else:
1179
                            stat[field] = 0
1180
                            stat[field + '_rate_per_sec'] = 0
1181
                    else:
1182
                        # Avoid strange rate at the first run
1183
                        stat[field] = 0
1184
                        stat[field + '_rate_per_sec'] = 0
1185
            return stat
1186
1187
        def compute_rate_on_list(self, stats, stats_previous):
1188
            if stats_previous is None:
1189
                return stats
1190
1191
            for stat in stats:
1192
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1193
                if len(olds) == 1:
1194
                    compute_rate(self, stat, olds[0])
1195
            return stats
1196
1197
        def wrapper(self, *args, **kw):
1198
            # Call the father method
1199
            stats = fct(self, *args, **kw)
1200
1201
            # Get the time since the last update
1202
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1203
1204
            # Compute the rate
1205
            if isinstance(stats, dict):
1206
                # Stats is a dict
1207
                compute_rate(self, stats, self.stats_previous)
1208
            elif isinstance(stats, list):
1209
                # Stats is a list
1210
                compute_rate_on_list(self, stats, self.stats_previous)
1211
1212
            # Memorized the current stats for next run
1213
            self.stats_previous = copy.deepcopy(stats)
1214
1215
            return stats
1216
1217
        return wrapper
1218
1219
    # Mandatory to call the decorator in child classes
1220
    _check_decorator = staticmethod(_check_decorator)
1221
    _log_result_decorator = staticmethod(_log_result_decorator)
1222
    _manage_rate = staticmethod(_manage_rate)
1223