GlancesPluginModel.get_alert()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 86
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 41
nop 9
dl 0
loc 86
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.plugin.model.GlancesPluginModel.get_alert() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import dictlist, dictlist_json_dumps, json_dumps, list_to_dict, listkeys, mean, nativestr
21
from glances.history import GlancesHistory
22
from glances.logger import logger
23
from glances.outputs.glances_unicode import unicode_message
24
from glances.thresholds import glances_thresholds
25
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
26
27
fields_unit_short = {'percent': '%'}
28
29
fields_unit_type = {
30
    'percent': 'float',
31
    'percents': 'float',
32
    'number': 'int',
33
    'numbers': 'int',
34
    'int': 'int',
35
    'ints': 'int',
36
    'float': 'float',
37
    'floats': 'float',
38
    'second': 'int',
39
    'seconds': 'int',
40
    'byte': 'int',
41
    'bytes': 'int',
42
}
43
44
45
class GlancesPluginModel:
46
    """Main class for Glances plugin model."""
47
48
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
49
        """Init the plugin of plugins model class.
50
51
        All Glances' plugins model should inherit from this class. Most of the
52
        methods are already implemented in the father classes.
53
54
        Your plugin should return a dict or a list of dicts (stored in the
55
        self.stats). As an example, you can have a look on the mem plugin
56
        (for dict) or network (for list of dicts).
57
58
        From version 4 of the API, the plugin should return a dict.
59
60
        A plugin should implement:
61
        - the reset method: to set your self.stats variable to {} or []
62
        - the update method: where your self.stats variable is set
63
        and optionally:
64
        - the get_key method: set the key of the dict (only for list of dict)
65
        - all others methods you want to overwrite
66
67
        :args: args parameters
68
        :config: configuration parameters
69
        :items_history_list: list of items to store in the history
70
        :stats_init_value: Default value for a stats item
71
        """
72
        # Build the plugin name
73
        # Internal or external module (former prefixed by 'glances.plugins')
74
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
75
        self.plugin_name = _mod.split('.')[0]
76
77
        if self.plugin_name.startswith('glances_'):
78
            self.plugin_name = self.plugin_name.split('glances_')[1]
79
        logger.debug(f"Init {self.plugin_name} plugin")
80
81
        # Init the args
82
        self.args = args
83
84
        # Init the default alignment (for curses)
85
        self._align = 'left'
86
87
        # Init the input method
88
        self._input_method = 'local'
89
        self._short_system_name = None
90
91
        # Init the history list
92
        self.items_history_list = items_history_list
93
        self.stats_history = self.init_stats_history()
94
95
        # Init the limits (configuration keys) dictionary
96
        self._limits = {}
97
        if config is not None:
98
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
99
            self.load_limits(config=config)
100
101
        # Init the alias (dictionary)
102
        self.alias = self.read_alias()
103
104
        # Init the actions
105
        self.actions = GlancesActions(args=args)
106
107
        # Init the views
108
        self.views = {}
109
110
        # Hide stats if all the hide_zero_fields has never been != 0
111
        # Default is False, always display stats
112
        self.hide_zero = False
113
        # The threshold needed to display a value if hide_zero is true.
114
        # Only hide a value if it is less than hide_threshold_bytes.
115
        self.hide_threshold_bytes = 0
116
        self.hide_zero_fields = []
117
118
        # Set the initial refresh time to display stats the first time
119
        self.refresh_timer = Timer(0)
120
121
        # Init stats description
122
        self.fields_description = fields_description
123
124
        # Init the stats
125
        self.stats_init_value = stats_init_value
126
        self.time_since_last_update = None
127
        self.stats = None
128
        self.stats_previous = None
129
        self.reset()
130
131
    def __str__(self):
132
        """Return the human-readable stats."""
133
        return str(self.stats)
134
135
    def __repr__(self):
136
        """Return the raw stats."""
137
        if isinstance(self.stats, list):
138
            return str(list_to_dict(self.stats))
139
        return str(self.stats)
140
141
    def __getitem__(self, item):
142
        """Return the stats item."""
143
        if isinstance(self.stats, dict) and item in self.stats:
144
            return self.stats[item]
145
146
        if isinstance(self.stats, list):
147
            ltd = list_to_dict(self.stats)
148
            if item in ltd:
149
                return ltd[item]
150
151
        raise KeyError(f"'{self.__class__.__name__}' object has no key '{item}'")
152
153
    def keys(self):
154
        """Return the keys of the stats."""
155
        if isinstance(self.stats, dict):
156
            return listkeys(self.stats)
157
        if isinstance(self.stats, list):
158
            return listkeys(list_to_dict(self.stats))
159
        return []
160
161
    def get(self, item, default=None):
162
        """Return the stats item or default if not found."""
163
        try:
164
            return self[item]
165
        except KeyError:
166
            return default
167
168
    def get_init_value(self):
169
        """Return a copy of the init value."""
170
        return copy.copy(self.stats_init_value)
171
172
    def reset(self):
173
        """Reset the stats.
174
175
        This method should be overwritten by child classes.
176
        """
177
        self.stats = self.get_init_value()
178
179
    def exit(self):
180
        """Just log an event when Glances exit."""
181
        logger.debug(f"Stop the {self.plugin_name} plugin")
182
183
    def get_key(self):
184
        """Return the key of the list."""
185
        return
186
187
    def is_enabled(self, plugin_name=None):
188
        """Return true if plugin is enabled."""
189
        if not plugin_name:
190
            plugin_name = self.plugin_name
191
        try:
192
            d = getattr(self.args, 'disable_' + plugin_name)
193
        except AttributeError:
194
            d = getattr(self.args, 'enable_' + plugin_name, True)
195
        return d is False
196
197
    def is_disabled(self, plugin_name=None):
198
        """Return true if plugin is disabled."""
199
        return not self.is_enabled(plugin_name=plugin_name)
200
201
    def history_enable(self):
202
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
203
204
    def init_stats_history(self):
205
        """Init the stats history (dict of GlancesAttribute)."""
206
        if self.history_enable():
207
            init_list = [a['name'] for a in self.get_items_history_list()]
208
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
209
        return GlancesHistory()
210
211
    def reset_stats_history(self):
212
        """Reset the stats history (dict of GlancesAttribute)."""
213
        if self.history_enable():
214
            reset_list = [a['name'] for a in self.get_items_history_list()]
215
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
216
            self.stats_history.reset()
217
218
    def update_stats_history(self):
219
        """Update stats history."""
220
        # Build the history
221
        _get_export = self.get_export()
222
        if not (_get_export and self.history_enable()):
223
            return
224
        # Itern through items history
225
        item_name = '' if self.get_key() is None else self.get_key()
226
        for i in self.get_items_history_list():
227
            if isinstance(_get_export, list):
228
                # Stats is a list of data
229
                # Iter through stats (for example, iter through network interface)
230
                for l_export in _get_export:
231
                    if i['name'] in l_export:
232
                        self.stats_history.add(
233
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
234
                            l_export[i['name']],
235
                            description=i['description'],
236
                            history_max_size=self._limits['history_size'],
237
                        )
238
            else:
239
                # Stats is not a list
240
                # Add the item to the history directly
241
                self.stats_history.add(
242
                    nativestr(i['name']),
243
                    _get_export[i['name']],
244
                    description=i['description'],
245
                    history_max_size=self._limits['history_size'],
246
                )
247
248
    def get_items_history_list(self):
249
        """Return the items history list."""
250
        return self.items_history_list
251
252
    def get_raw_history(self, item=None, nb=0):
253
        """Return the history (RAW format).
254
255
        - the stats history (dict of list) if item is None
256
        - the stats history for the given item (list) instead
257
        - None if item did not exist in the history
258
        """
259
        s = self.stats_history.get(nb=nb)
260
        if item is None:
261
            return s
262
        if item in s:
263
            return s[item]
264
        return None
265
266
    def get_export_history(self, item=None):
267
        """Return the stats history object to export."""
268
        return self.get_raw_history(item=item)
269
270
    def get_stats_history(self, item=None, nb=0):
271
        """Return the stats history (JSON format)."""
272
        s = self.stats_history.get_json(nb=nb)
273
274
        if item is None:
275
            return json_dumps(s)
276
277
        return dictlist_json_dumps(s, item)
278
279
    def get_trend(self, item, nb=30):
280
        """Get the trend regarding to the last nb values.
281
282
        The trend is the diffirence between the mean of the last 0 to nb / 2
283
        and nb / 2 to nb values.
284
        """
285
        raw_history = self.get_raw_history(item=item, nb=nb)
286
        if raw_history is None or len(raw_history) < nb:
287
            return None
288
        last_nb = [v[1] for v in raw_history]
289
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
290
291
    @property
292
    def input_method(self):
293
        """Get the input method."""
294
        return self._input_method
295
296
    @input_method.setter
297
    def input_method(self, input_method):
298
        """Set the input method.
299
300
        * local: system local grab (psutil or direct access)
301
        * snmp: Client server mode via SNMP
302
        * glances: Client server mode via Glances API
303
        """
304
        self._input_method = input_method
305
306
    @property
307
    def short_system_name(self):
308
        """Get the short detected OS name (SNMP)."""
309
        return self._short_system_name
310
311
    def sorted_stats(self):
312
        """Get the stats sorted by an alias (if present) or key."""
313
        key = self.get_key()
314
        if key is None:
315
            return self.stats
316
        try:
317
            return sorted(
318
                self.stats,
319
                key=lambda stat: tuple(
320
                    int(part) if part.isdigit() else part.lower()
321
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
322
                ),
323
            )
324
        except TypeError:
325
            # Correct "Starting an alias with a number causes a crash #1885"
326
            return sorted(
327
                self.stats,
328
                key=lambda stat: tuple(
329
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
330
                ),
331
            )
332
333
    @short_system_name.setter
334
    def short_system_name(self, short_name):
335
        """Set the short detected OS name (SNMP)."""
336
        self._short_system_name = short_name
337
338
    def set_stats(self, input_stats):
339
        """Set the stats to input_stats."""
340
        self.stats = input_stats
341
342
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
343
        """Update stats using SNMP.
344
345
        If bulk=True, use a bulk request instead of a get request.
346
        """
347
        snmp_oid = snmp_oid or {}
348
349
        from glances.snmp import GlancesSNMPClient
350
351
        # Init the SNMP request
352
        snmp_client = GlancesSNMPClient(
353
            host=self.args.client,
354
            port=self.args.snmp_port,
355
            version=self.args.snmp_version,
356
            community=self.args.snmp_community,
357
        )
358
359
        # Process the SNMP request
360
        ret = {}
361
        if bulk:
362
            # Bulk request
363
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(snmp_oid.values()))
364
            logger.info(snmp_result)
365
            if len(snmp_oid) == 1:
366
                # Bulk command for only one OID
367
                # Note: key is the item indexed but the OID result
368
                for item in snmp_result:
369
                    if item.keys()[0].startswith(snmp_oid.values()[0]):
370
                        ret[snmp_oid.keys()[0] + item.keys()[0].split(snmp_oid.values()[0])[1]] = item.values()[0]
371
            else:
372
                # Build the internal dict with the SNMP result
373
                # Note: key is the first item in the snmp_oid
374
                index = 1
375
                for item in snmp_result:
376
                    item_stats = {}
377
                    item_key = None
378
                    for key in snmp_oid:
379
                        oid = snmp_oid[key] + '.' + str(index)
380
                        if oid in item:
381
                            if item_key is None:
382
                                item_key = item[oid]
383
                            else:
384
                                item_stats[key] = item[oid]
385
                    if item_stats:
386
                        ret[item_key] = item_stats
387
                    index += 1
388
        else:
389
            # Simple get request
390
            snmp_result = snmp_client.get_by_oid(*list(snmp_oid.values()))
391
392
            # Build the internal dict with the SNMP result
393
            for key in snmp_oid:
394
                ret[key] = snmp_result[snmp_oid[key]]
395
396
        return ret
397
398
    def get_raw(self):
399
        """Return the stats object."""
400
        return self.stats
401
402
    def get_export(self):
403
        """Return the stats object to export.
404
        By default, return the raw stats.
405
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
406
        """
407
        return self.get_raw()
408
409
    def get_stats(self):
410
        """Return the stats object in JSON format."""
411
        return json_dumps(self.get_raw())
412
413
    def get_json(self):
414
        """Return the stats object in JSON format."""
415
        return self.get_stats()
416
417
    def get_raw_stats_item(self, item):
418
        """Return the stats object for a specific item in RAW format.
419
420
        Stats should be a list of dict (processlist, network...)
421
        """
422
        return dictlist(self.get_raw(), item)
423
424
    def get_raw_stats_key(self, item, key):
425
        """Return the stats object for a specific item in RAW format.
426
427
        Stats should be a list of dict (processlist, network...)
428
        """
429
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
430
431
    def get_stats_item(self, item):
432
        """Return the stats object for a specific item in JSON format.
433
434
        Stats should be a list of dict (processlist, network...)
435
        """
436
        return dictlist_json_dumps(self.get_raw(), item)
437
438
    def get_raw_stats_value(self, item, value):
439
        """Return the stats object for a specific item=value.
440
441
        Return None if the item=value does not exist
442
        Return None if the item is not a list of dict
443
        """
444
        if not isinstance(self.get_raw(), list):
445
            return None
446
447
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
448
            value = int(value)
449
        try:
450
            return {value: [i for i in self.get_raw() if i[item] == value]}
451
        except (KeyError, ValueError) as e:
452
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
453
            return None
454
455
    def get_stats_value(self, item, value):
456
        """Return the stats object for a specific item=value in JSON format.
457
458
        Stats should be a list of dict (processlist, network...)
459
        """
460
        rsv = self.get_raw_stats_value(item, value)
461
        if rsv is None:
462
            return None
463
        return json_dumps(rsv)
464
465
    def get_item_info(self, item, key, default=None):
466
        """Return the item info grabbed into self.fields_description."""
467
        if self.fields_description is None or item not in self.fields_description:
468
            return default
469
        return self.fields_description[item].get(key, default)
470
471
    def _build_field_decoration(self, field):
472
        """Return the field decoration.
473
474
        The decoration is used to display the field in the UI.
475
        """
476
        # Manage the decoration
477
        if self.fields_description and field in self.fields_description:
478
            if (
479
                self.fields_description[field].get('rate') is True
480
                and isinstance(self.stats, dict)
481
                and self.stats.get('time_since_update', 0) == 0
482
            ):
483
                return 'DEFAULT'
484
            if self.fields_description[field].get('log') is True:
485
                return self.get_alert_log(self.stats[field], header=field)
486
            if self.fields_description[field].get('alert') is True:
487
                return self.get_alert(self.stats[field], header=field)
488
        return 'DEFAULT'
489
490
    def _build_field_optional(self, field):
491
        """Return true if the field is optional."""
492
        if self.fields_description and field in self.fields_description:
493
            return self.fields_description[field].get('optional', False)
494
        return False
495
496
    def _build_view_for_field(self, field):
497
        value = {
498
            'decoration': self._build_field_decoration(field),
499
            'optional': self._build_field_optional(field),
500
            'additional': False,
501
            'splittable': False,
502
            'hidden': False,
503
        }
504
505
        # Manage the hidden feature
506
        # Allow to automatically hide fields when values is never different than 0
507
        # Refactoring done for #2929
508
        if not self.hide_zero:
509
            value['hidden'] = False
510
        elif field in self.views and 'hidden' in self.views[field]:
511
            value['hidden'] = self.views[field]['hidden']
512
            if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
513
                value['hidden'] = False
514
        else:
515
            value['hidden'] = field in self.hide_zero_fields
516
517
        return value
518
519
    def update_views(self):
520
        """Update the stats views.
521
522
        The V of MVC
523
        A dict of dict with the needed information to display the stats.
524
        Example for the stat xxx:
525
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
526
                'optional': False,        >>> Is the stat optional
527
                'additional': False,      >>> Is the stat provide additional information
528
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
529
                'hidden': False}          >>> Is the stats should be hidden in the UI
530
        """
531
        ret = {}
532
533
        if self.get_raw() is not None and isinstance(self.get_raw(), list) and self.get_key() is not None:
534
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
535
            for i in self.get_raw():
536
                key = i[self.get_key()]
537
                ret[key] = {}
538
                for field in listkeys(i):
539
                    ret[key][field] = self._build_view_for_field(field)
540
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
541
            # Stats are stored in a dict (ex: CPU, LOAD...)
542
            for field in listkeys(self.get_raw()):
543
                ret[field] = self._build_view_for_field(field)
544
545
        self.views = ret
546
547
        return self.views
548
549
    def set_views(self, input_views):
550
        """Set the views to input_views."""
551
        self.views = input_views
552
553
    def reset_views(self):
554
        """Reset the views to input_views."""
555
        self.views = {}
556
557
    def get_views(self, item=None, key=None, option=None):
558
        """Return the views object.
559
560
        If key is None, return all the view for the current plugin
561
        else if option is None return the view for the specific key (all option)
562
        else return the view of the specific key/option
563
564
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
565
        """
566
        if item is None:
567
            item_views = self.views
568
        else:
569
            item_views = self.views[item]
570
571
        if key is None:
572
            return item_views
573
        if key not in item_views:
574
            return 'DEFAULT'
575
        if option is None:
576
            return item_views[key]
577
        if option in item_views[key]:
578
            return item_views[key][option]
579
        return 'DEFAULT'
580
581
    def get_json_views(self, item=None, key=None, option=None):
582
        """Return the views (in JSON)."""
583
        return json_dumps(self.get_views(item, key, option))
584
585
    def load_limits(self, config):
586
        """Load limits from the configuration file, if it exists."""
587
        # By default set the history length to 3 points per second during one day
588
        self._limits['history_size'] = 28800
589
590
        if not hasattr(config, 'has_section'):
591
            return False
592
593
        # Read the global section
594
        # TODO: not optimized because this section is loaded for each plugin...
595
        if config.has_section('global'):
596
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
597
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
598
599
        # Read the plugin specific section
600
        if config.has_section(self.plugin_name):
601
            for level, _ in config.items(self.plugin_name):
602
                # Read limits
603
                limit = '_'.join([self.plugin_name, level])
604
                try:
605
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
606
                except ValueError:
607
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
608
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
609
610
        return True
611
612
    @property
613
    def limits(self):
614
        """Return the limits object."""
615
        return self._limits
616
617
    @limits.setter
618
    def limits(self, input_limits):
619
        """Set the limits to input_limits."""
620
        self._limits = input_limits
621
622
    def set_refresh(self, value):
623
        """Set the plugin refresh rate"""
624
        self.set_limits('refresh', value)
625
626
    def get_refresh(self):
627
        """Return the plugin refresh time"""
628
        ret = self.get_limits(item='refresh')
629
        if ret is None:
630
            ret = self.args.time if hasattr(self.args, 'time') else 2
631
        return ret
632
633
    def get_refresh_time(self):
634
        """Return the plugin refresh time"""
635
        return self.get_refresh()
636
637
    def set_limits(self, item, value):
638
        """Set the limits object."""
639
        self._limits[f'{self.plugin_name}_{item}'] = value
640
641
    def get_limits(self, item=None):
642
        """Return the limits object."""
643
        if item is None:
644
            return self._limits
645
        return self._limits.get(f'{self.plugin_name}_{item}', None)
646
647
    def get_stats_action(self):
648
        """Return stats for the action.
649
650
        By default return all the stats.
651
        Can be overwrite by plugins implementation.
652
        For example, Docker will return self.stats['containers']
653
        """
654
        return self.stats
655
656
    def get_stat_name(self, header=""):
657
        """Return the stat name with an optional header"""
658
        ret = self.plugin_name
659
        if header != '':
660
            ret += '_' + header
661
        return ret
662
663
    def get_alert(
664
        self,
665
        current=0,
666
        minimum=0,
667
        maximum=100,
668
        highlight_zero=True,
669
        is_max=False,
670
        header="",
671
        action_key=None,
672
        log=False,
673
    ):
674
        """Return the alert status relative to a current value.
675
676
        Use this function for minor stats.
677
678
        If current < CAREFUL of max then alert = OK
679
        If current > CAREFUL of max then alert = CAREFUL
680
        If current > WARNING of max then alert = WARNING
681
        If current > CRITICAL of max then alert = CRITICAL
682
683
        If highlight=True than 0.0 is highlighted
684
685
        If defined 'header' is added between the plugin name and the status.
686
        Only useful for stats with several alert status.
687
688
        If defined, 'action_key' define the key for the actions.
689
        By default, the action_key is equal to the header.
690
691
        If log=True than add log if necessary
692
        elif log=False than do not log
693
        elif log=None than apply the config given in the conf file
694
        """
695
        # Manage 0 (0.0) value if highlight_zero is not True
696
        if not highlight_zero and current == 0:
697
            return 'DEFAULT'
698
699
        # Compute the %
700
        try:
701
            value = (current * 100) / maximum
702
        except ZeroDivisionError:
703
            return 'DEFAULT'
704
        except TypeError:
705
            return 'DEFAULT'
706
707
        # Build the stat_name
708
        stat_name = self.get_stat_name(header=header).lower()
709
710
        # Manage limits
711
        # If is_max is set then default style is set to MAX else default is set to OK
712
        ret = 'MAX' if is_max else 'OK'
713
714
        # Iter through limits
715
        critical = self.get_limit('critical', stat_name=stat_name)
716
        warning = self.get_limit('warning', stat_name=stat_name)
717
        careful = self.get_limit('careful', stat_name=stat_name)
718
        if critical and value >= critical:
719
            ret = 'CRITICAL'
720
        elif warning and value >= warning:
721
            ret = 'WARNING'
722
        elif careful and value >= careful:
723
            ret = 'CAREFUL'
724
        elif not careful and not warning and not critical:
725
            ret = 'DEFAULT'
726
        else:
727
            ret = 'OK'
728
729
        if current < minimum:
730
            ret = 'CAREFUL'
731
732
        # Manage log
733
        log_str = ""
734
        if self.get_limit_log(stat_name=stat_name, default_action=log):
735
            # Add _LOG to the return string
736
            # So stats will be highlighted with a specific color
737
            log_str = "_LOG"
738
            # Add the log to the events list
739
            glances_events.add(ret, stat_name.upper(), value)
740
741
        # Manage threshold
742
        self.manage_threshold(stat_name, ret)
743
744
        # Manage action
745
        self.manage_action(stat_name, ret.lower(), header, action_key)
746
747
        # Default is 'OK'
748
        return ret + log_str
749
750
    def filter_stats(self, stats):
751
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
752
        if hasattr(stats, '_asdict'):
753
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
754
        if isinstance(stats, dict):
755
            return {k: v for k, v in stats.items() if k in self.fields_description}
756
        if isinstance(stats, list):
757
            return [self.filter_stats(s) for s in stats]
758
        return stats
759
760
    def manage_threshold(self, stat_name, trigger):
761
        """Manage the threshold for the current stat."""
762
        glances_thresholds.add(stat_name, trigger)
763
764
    def manage_action(self, stat_name, trigger, header, action_key):
765
        """Manage the action for the current stat."""
766
        # Here is a command line for the current trigger ?
767
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
768
        if not command and not repeat:
769
            # Reset the trigger
770
            self.actions.set(stat_name, trigger)
771
        else:
772
            # Define the action key for the stats dict
773
            # If not define, then it sets to header
774
            if action_key is None:
775
                action_key = header
776
777
            # A command line is available for the current alert
778
            # 1) Build the {{mustache}} dictionary
779
            if isinstance(self.get_stats_action(), list):
780
                # If the stats are stored in a list of dict (fs plugin for example)
781
                # Return the dict for the current header
782
                mustache_dict = {}
783
                for item in self.get_stats_action():
784
                    if item[self.get_key()] == action_key:
785
                        mustache_dict = item
786
                        break
787
            else:
788
                # Use the stats dict
789
                mustache_dict = self.get_stats_action()
790
            # 2) Run the action
791
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
792
793
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
794
        """Get the alert log."""
795
        return self.get_alert(
796
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
797
        )
798
799
    def is_limit(self, criticality, stat_name=""):
800
        """Return true if the criticality limit exist for the given stat_name"""
801
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
802
803
    def get_limit(self, criticality=None, stat_name=""):
804
        """Return the limit value for the given criticality.
805
        If criticality is None, return the dict of all the limits."""
806
        if criticality is None:
807
            return self._limits
808
809
        # Get the limit for stat + header
810
        # Example: network_wlan0_rx_careful
811
        stat_name = stat_name.lower()
812
        if stat_name + '_' + criticality in self._limits:
813
            return self._limits[stat_name + '_' + criticality]
814
        if self.plugin_name + '_' + criticality in self._limits:
815
            return self._limits[self.plugin_name + '_' + criticality]
816
817
        return None
818
819
    def get_limit_action(self, criticality, stat_name=""):
820
        """Return the tuple (action, repeat) for the alert.
821
822
        - action is a command line
823
        - repeat is a bool
824
        """
825
        # Get the action for stat + header
826
        # Example: network_wlan0_rx_careful_action
827
        # Action key available ?
828
        ret = [
829
            (stat_name + '_' + criticality + '_action', False),
830
            (stat_name + '_' + criticality + '_action_repeat', True),
831
            (self.plugin_name + '_' + criticality + '_action', False),
832
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
833
        ]
834
        for r in ret:
835
            if r[0] in self._limits:
836
                return self._limits[r[0]], r[1]
837
838
        # No key found, return None
839
        return None, None
840
841
    def get_limit_log(self, stat_name, default_action=False):
842
        """Return the log tag for the alert."""
843
        # Get the log tag for stat + header
844
        # Example: network_wlan0_rx_log
845
        if stat_name + '_log' in self._limits:
846
            return self._limits[stat_name + '_log'][0].lower() == 'true'
847
        if self.plugin_name + '_log' in self._limits:
848
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
849
        return default_action
850
851
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
852
        """Return the configuration (header_) value for the current plugin.
853
854
        ...or the one given by the plugin_name var.
855
        """
856
        if plugin_name is None:
857
            # If not default use the current plugin name
858
            plugin_name = self.plugin_name
859
860
        if header != "":
861
            # Add the header
862
            plugin_name = plugin_name + '_' + header
863
864
        try:
865
            return self._limits[plugin_name + '_' + value]
866
        except KeyError:
867
            return default
868
869
    def is_show(self, value, header=""):
870
        """Return True if the value is in the show configuration list.
871
872
        If the show value is empty, return True (show by default)
873
874
        The show configuration list is defined in the glances.conf file.
875
        It is a comma-separated list of regexp.
876
        Example for diskio:
877
        show=sda.*
878
        """
879
        # TODO: possible optimisation: create a re.compile list
880
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
881
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
882
883
    def is_hide(self, value, header=""):
884
        """Return True if the value is in the hide configuration list.
885
886
        The hide configuration list is defined in the glances.conf file.
887
        It is a comma-separated list of regexp.
888
        Example for diskio:
889
        hide=sda2,sda5,loop.*
890
        """
891
        # TODO: possible optimisation: create a re.compile list
892
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
893
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
894
895
    def is_display(self, value, header=""):
896
        """Return True if the value should be displayed in the UI"""
897
        if self.get_conf_value('show', header=header) != []:
898
            return self.is_show(value, header=header)
899
        return not self.is_hide(value, header=header)
900
901
    def is_display_any(self, *values, header=""):
902
        """Return True if any of the values should be displayed in the UI"""
903
        if self.get_conf_value('show', header=header) != []:
904
            return any(self.is_show(value, header=header) for value in values)
905
        return not any(self.is_hide(value, header=header) for value in values)
906
907
    def read_alias(self):
908
        if self.plugin_name + '_' + 'alias' in self._limits:
909
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
910
        return {}
911
912
    def has_alias(self, header):
913
        """Return the alias name for the relative header it it exists otherwise None."""
914
        return self.alias.get(header, None)
915
916
    def msg_curse(self, args=None, max_width=None):
917
        """Return default string to display in the curse interface."""
918
        return [self.curse_add_line(str(self.stats))]
919
920
    def get_stats_display(self, args=None, max_width=None):
921
        """Return a dict with all the information needed to display the stat.
922
923
        key     | description
924
        ----------------------------
925
        display | Display the stat (True or False)
926
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
927
        align   | Message position
928
        """
929
        display_curse = False
930
931
        if hasattr(self, 'display_curse'):
932
            display_curse = self.display_curse
933
        if hasattr(self, 'align'):
934
            align_curse = self._align
935
936
        if max_width is not None:
937
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 933 is False. Are you sure this can never be the case?
Loading history...
938
        else:
939
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
940
941
        return ret
942
943
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
944
        """Return a dict with.
945
946
        Where:
947
            msg: string
948
            decoration:
949
                DEFAULT: no decoration
950
                UNDERLINE: underline
951
                BOLD: bold
952
                TITLE: for stat title
953
                PROCESS: for process name
954
                STATUS: for process status
955
                CPU_TIME: for process cpu time
956
                OK: Value is OK and non logged
957
                OK_LOG: Value is OK and logged
958
                CAREFUL: Value is CAREFUL and non logged
959
                CAREFUL_LOG: Value is CAREFUL and logged
960
                WARNING: Value is WARNING and non logged
961
                WARNING_LOG: Value is WARNING and logged
962
                CRITICAL: Value is CRITICAL and non logged
963
                CRITICAL_LOG: Value is CRITICAL and logged
964
            optional: True if the stat is optional (display only if space is available)
965
            additional: True if the stat is additional (display only if space is available after optional)
966
            spittable: Line can be split to fit on the screen (default is not)
967
        """
968
        return {
969
            'msg': msg,
970
            'decoration': decoration,
971
            'optional': optional,
972
            'additional': additional,
973
            'splittable': splittable,
974
        }
975
976
    def curse_new_line(self):
977
        """Go to a new line."""
978
        return self.curse_add_line('\n')
979
980
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
981
        """Return a list of dict messages with the 'key: value' result
982
983
          <=== width ===>
984
        __key     : 80.5%__
985
        | |       | |    |_ trailer
986
        | |       | |_ self.stats[key]
987
        | |       |_ separator
988
        | |_ 'short_name' description or key or nothing if display_key is True
989
        |_ header
990
991
        Instead of:
992
            msg = '  {:8}'.format('idle:')
993
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
994
            msg = '{:5.1f}%'.format(self.stats['idle'])
995
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
996
997
        Use:
998
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
999
1000
        """
1001
        if key not in self.stats:
1002
            return []
1003
1004
        # Check if a shortname is defined
1005
        if not display_key:
1006
            key_name = ''
1007
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
1008
            key_name = self.fields_description[key]['short_name']
1009
        else:
1010
            key_name = key
1011
1012
        # Check if unit is defined and get the short unit char in the unit_sort dict
1013
        if (
1014
            key in self.fields_description
1015
            and 'unit' in self.fields_description[key]
1016
            and self.fields_description[key]['unit'] in fields_unit_short
1017
        ):
1018
            # Get the shortname
1019
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
1020
        else:
1021
            unit_short = ''
1022
1023
        # Check if unit is defined and get the unit type unit_type dict
1024
        if (
1025
            key in self.fields_description
1026
            and 'unit' in self.fields_description[key]
1027
            and self.fields_description[key]['unit'] in fields_unit_type
1028
        ):
1029
            # Get the shortname
1030
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
1031
        else:
1032
            unit_type = 'float'
1033
1034
        # Is it a rate ? Yes, get the pre-computed rate value
1035
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
1036
            value = self.stats.get(key + '_rate_per_sec', None)
1037
        else:
1038
            value = self.stats.get(key, None)
1039
1040
        if width is None:
1041
            msg_item = header + f'{key_name}' + separator
1042
            msg_template_float = '{:.1f}{}'
1043
            msg_template = '{}{}'
1044
        else:
1045
            # Define the size of the message
1046
            # item will be on the left
1047
            # value will be on the right
1048
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1049
            msg_template_float = '{:5.1f}{}'
1050
            msg_template = '{:>5}{}'
1051
1052
        if value is None:
1053
            msg_value = msg_template.format('-', '')
1054
        elif unit_type == 'float':
1055
            msg_value = msg_template_float.format(value, unit_short)
1056
        elif 'min_symbol' in self.fields_description[key]:
1057
            msg_value = msg_template.format(
1058
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1059
            )
1060
        else:
1061
            msg_value = msg_template.format(int(value), unit_short)
1062
1063
        # Add the trailer
1064
        msg_value = msg_value + trailer
1065
1066
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1067
        optional = self.get_views(key=key, option='optional')
1068
1069
        return [
1070
            self.curse_add_line(msg_item, optional=optional),
1071
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1072
        ]
1073
1074
    @property
1075
    def align(self):
1076
        """Get the curse align."""
1077
        return self._align
1078
1079
    @align.setter
1080
    def align(self, value):
1081
        """Set the curse align.
1082
1083
        value: left, right, bottom.
1084
        """
1085
        self._align = value
1086
1087
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1088
        """Make a nice human-readable string out of number.
1089
1090
        Number of decimal places increases as quantity approaches 1.
1091
        CASE: 613421788        RESULT:       585M low_precision:       585M
1092
        CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
1093
        CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
1094
        CASE: 838471403472     RESULT:       781G low_precision:       781G
1095
        CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
1096
        CASE: 1073741824       RESULT:      1024M low_precision:      1024M
1097
        CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
1098
1099
        :low_precision: returns less decimal places potentially (default is False)
1100
                        sacrificing precision for more readability.
1101
        :min_symbol: Do not approach if number < min_symbol (default is K)
1102
        :decimal_count: if set, force the number of decimal number (default is None)
1103
        """
1104
        if number is None:
1105
            return none_symbol
1106
        symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
1107
        if min_symbol in symbols:
1108
            symbols = symbols[symbols.index(min_symbol) :]
1109
        prefix = {
1110
            'Y': 1208925819614629174706176,
1111
            'Z': 1180591620717411303424,
1112
            'E': 1152921504606846976,
1113
            'P': 1125899906842624,
1114
            'T': 1099511627776,
1115
            'G': 1073741824,
1116
            'M': 1048576,
1117
            'K': 1024,
1118
        }
1119
1120
        if number == 0:
1121
            # Avoid 0.0
1122
            return '0'
1123
1124
        for symbol in reversed(symbols):
1125
            value = float(number) / prefix[symbol]
1126
            if value > 1:
1127
                decimal_precision = 0
1128
                if value < 10:
1129
                    decimal_precision = 2
1130
                elif value < 100:
1131
                    decimal_precision = 1
1132
                if low_precision:
1133
                    if symbol in 'MK':
1134
                        decimal_precision = 0
1135
                    else:
1136
                        decimal_precision = min(1, decimal_precision)
1137
                elif symbol in 'K':
1138
                    decimal_precision = 0
1139
                return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
1140
        return f'{number!s}'
1141
1142
    def trend_msg(self, trend, significant=1):
1143
        """Return the trend message.
1144
1145
        Do not take into account if trend < significant
1146
        """
1147
        ret = '-'
1148
        if trend is None:
1149
            ret = ' '
1150
        elif trend > significant:
1151
            ret = unicode_message('ARROW_UP', self.args)
1152
        elif trend < -significant:
1153
            ret = unicode_message('ARROW_DOWN', self.args)
1154
        return ret
1155
1156
    def _check_decorator(fct):
1157
        """Check decorator for update method.
1158
1159
        It checks:
1160
        - if the plugin is enabled.
1161
        - if the refresh_timer is finished
1162
        """
1163
1164
        def wrapper(self, *args, **kw):
1165
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1166
                # Run the method
1167
                ret = fct(self, *args, **kw)
1168
                # Reset the timer
1169
                self.refresh_timer.set(self.get_refresh())
1170
                self.refresh_timer.reset()
1171
            else:
1172
                # No need to call the method
1173
                # Return the last result available
1174
                ret = self.stats
1175
            return ret
1176
1177
        return wrapper
1178
1179 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1180
        """Log (DEBUG) the result of the function fct."""
1181
1182
        def wrapper(*args, **kw):
1183
            counter = Counter()
1184
            ret = fct(*args, **kw)
1185
            duration = counter.get()
1186
            class_name = args[0].__class__.__name__
1187
            class_module = args[0].__class__.__module__
1188
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1189
            return ret
1190
1191
        return wrapper
1192
1193
    def _manage_rate(fct):
1194
        """Manage rate decorator for update method."""
1195
1196
        def compute_rate(self, stat, stat_previous):
1197
            if stat_previous is None:
1198
                return stat
1199
1200
            # 1) set _gauge for all the rate fields
1201
            # 2) compute the _rate_per_sec
1202
            # 3) set the original field to the delta between the current and the previous value
1203
            for field in self.fields_description:
1204
                # For all the field with the rate=True flag
1205
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1206
                if self.fields_description[field].get('rate', False):
1207
                    # Create a new metadata with the gauge
1208
                    stat['time_since_update'] = self.time_since_last_update
1209
                    stat[field + '_gauge'] = stat[field]
1210
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1211
                        # The stat becomes the delta between the current and the previous value
1212
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1213
                        # Compute the rate
1214
                        if self.time_since_last_update > 0:
1215
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1216
                        else:
1217
                            stat[field] = 0
1218
                            stat[field + '_rate_per_sec'] = 0
1219
                    else:
1220
                        # Avoid strange rate at the first run
1221
                        stat[field] = 0
1222
                        stat[field + '_rate_per_sec'] = 0
1223
            return stat
1224
1225
        def compute_rate_on_list(self, stats, stats_previous):
1226
            if stats_previous is None:
1227
                return stats
1228
1229
            for stat in stats:
1230
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1231
                if len(olds) == 1:
1232
                    compute_rate(self, stat, olds[0])
1233
            return stats
1234
1235
        def wrapper(self, *args, **kw):
1236
            # Call the father method
1237
            stats = fct(self, *args, **kw)
1238
1239
            # Get the time since the last update
1240
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1241
1242
            # Compute the rate
1243
            if isinstance(stats, dict):
1244
                # Stats is a dict
1245
                compute_rate(self, stats, self.stats_previous)
1246
            elif isinstance(stats, list):
1247
                # Stats is a list
1248
                compute_rate_on_list(self, stats, self.stats_previous)
1249
1250
            # Memorized the current stats for next run
1251
            self.stats_previous = copy.deepcopy(stats)
1252
1253
            return stats
1254
1255
        return wrapper
1256
1257
    # Mandatory to call the decorator in child classes
1258
    _check_decorator = staticmethod(_check_decorator)
1259
    _log_result_decorator = staticmethod(_log_result_decorator)
1260
    _manage_rate = staticmethod(_manage_rate)
1261