GlancesPluginModel.get_alert()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 86
Code Lines 41

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 41
nop 9
dl 0
loc 86
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity    Many Parameters   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.plugin.model.GlancesPluginModel.get_alert() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import dictlist, dictlist_json_dumps, iterkeys, itervalues, json_dumps, listkeys, mean, nativestr
21
from glances.history import GlancesHistory
22
from glances.logger import logger
23
from glances.outputs.glances_unicode import unicode_message
24
from glances.thresholds import glances_thresholds
25
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
26
27
fields_unit_short = {'percent': '%'}
28
29
fields_unit_type = {
30
    'percent': 'float',
31
    'percents': 'float',
32
    'number': 'int',
33
    'numbers': 'int',
34
    'int': 'int',
35
    'ints': 'int',
36
    'float': 'float',
37
    'floats': 'float',
38
    'second': 'int',
39
    'seconds': 'int',
40
    'byte': 'int',
41
    'bytes': 'int',
42
}
43
44
45
class GlancesPluginModel:
46
    """Main class for Glances plugin model."""
47
48
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
49
        """Init the plugin of plugins model class.
50
51
        All Glances' plugins model should inherit from this class. Most of the
52
        methods are already implemented in the father classes.
53
54
        Your plugin should return a dict or a list of dicts (stored in the
55
        self.stats). As an example, you can have a look on the mem plugin
56
        (for dict) or network (for list of dicts).
57
58
        From version 4 of the API, the plugin should return a dict.
59
60
        A plugin should implement:
61
        - the reset method: to set your self.stats variable to {} or []
62
        - the update method: where your self.stats variable is set
63
        and optionally:
64
        - the get_key method: set the key of the dict (only for list of dict)
65
        - all others methods you want to overwrite
66
67
        :args: args parameters
68
        :config: configuration parameters
69
        :items_history_list: list of items to store in the history
70
        :stats_init_value: Default value for a stats item
71
        """
72
        # Build the plugin name
73
        # Internal or external module (former prefixed by 'glances.plugins')
74
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
75
        self.plugin_name = _mod.split('.')[0]
76
77
        if self.plugin_name.startswith('glances_'):
78
            self.plugin_name = self.plugin_name.split('glances_')[1]
79
        logger.debug(f"Init {self.plugin_name} plugin")
80
81
        # Init the args
82
        self.args = args
83
84
        # Init the default alignment (for curses)
85
        self._align = 'left'
86
87
        # Init the input method
88
        self._input_method = 'local'
89
        self._short_system_name = None
90
91
        # Init the history list
92
        self.items_history_list = items_history_list
93
        self.stats_history = self.init_stats_history()
94
95
        # Init the limits (configuration keys) dictionary
96
        self._limits = {}
97
        if config is not None:
98
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
99
            self.load_limits(config=config)
100
101
        # Init the alias (dictionary)
102
        self.alias = self.read_alias()
103
104
        # Init the actions
105
        self.actions = GlancesActions(args=args)
106
107
        # Init the views
108
        self.views = {}
109
110
        # Hide stats if all the hide_zero_fields has never been != 0
111
        # Default is False, always display stats
112
        self.hide_zero = False
113
        # The threshold needed to display a value if hide_zero is true.
114
        # Only hide a value if it is less than hide_threshold_bytes.
115
        self.hide_threshold_bytes = 0
116
        self.hide_zero_fields = []
117
118
        # Set the initial refresh time to display stats the first time
119
        self.refresh_timer = Timer(0)
120
121
        # Init stats description
122
        self.fields_description = fields_description
123
124
        # Init the stats
125
        self.stats_init_value = stats_init_value
126
        self.time_since_last_update = None
127
        self.stats = None
128
        self.stats_previous = None
129
        self.reset()
130
131
    def __repr__(self):
132
        """Return the raw stats."""
133
        return str(self.stats)
134
135
    def __str__(self):
136
        """Return the human-readable stats."""
137
        return str(self.stats)
138
139
    def get_init_value(self):
140
        """Return a copy of the init value."""
141
        return copy.copy(self.stats_init_value)
142
143
    def reset(self):
144
        """Reset the stats.
145
146
        This method should be overwritten by child classes.
147
        """
148
        self.stats = self.get_init_value()
149
150
    def exit(self):
151
        """Just log an event when Glances exit."""
152
        logger.debug(f"Stop the {self.plugin_name} plugin")
153
154
    def get_key(self):
155
        """Return the key of the list."""
156
        return
157
158
    def is_enabled(self, plugin_name=None):
159
        """Return true if plugin is enabled."""
160
        if not plugin_name:
161
            plugin_name = self.plugin_name
162
        try:
163
            d = getattr(self.args, 'disable_' + plugin_name)
164
        except AttributeError:
165
            d = getattr(self.args, 'enable_' + plugin_name, True)
166
        return d is False
167
168
    def is_disabled(self, plugin_name=None):
169
        """Return true if plugin is disabled."""
170
        return not self.is_enabled(plugin_name=plugin_name)
171
172
    def history_enable(self):
173
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
174
175
    def init_stats_history(self):
176
        """Init the stats history (dict of GlancesAttribute)."""
177
        if self.history_enable():
178
            init_list = [a['name'] for a in self.get_items_history_list()]
179
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
180
        return GlancesHistory()
181
182
    def reset_stats_history(self):
183
        """Reset the stats history (dict of GlancesAttribute)."""
184
        if self.history_enable():
185
            reset_list = [a['name'] for a in self.get_items_history_list()]
186
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
187
            self.stats_history.reset()
188
189
    def update_stats_history(self):
190
        """Update stats history."""
191
        # Build the history
192
        _get_export = self.get_export()
193
        if not (_get_export and self.history_enable()):
194
            return
195
        # Itern through items history
196
        item_name = '' if self.get_key() is None else self.get_key()
197
        for i in self.get_items_history_list():
198
            if isinstance(_get_export, list):
199
                # Stats is a list of data
200
                # Iter through stats (for example, iter through network interface)
201
                for l_export in _get_export:
202
                    if i['name'] in l_export:
203
                        self.stats_history.add(
204
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
205
                            l_export[i['name']],
206
                            description=i['description'],
207
                            history_max_size=self._limits['history_size'],
208
                        )
209
            else:
210
                # Stats is not a list
211
                # Add the item to the history directly
212
                self.stats_history.add(
213
                    nativestr(i['name']),
214
                    _get_export[i['name']],
215
                    description=i['description'],
216
                    history_max_size=self._limits['history_size'],
217
                )
218
219
    def get_items_history_list(self):
220
        """Return the items history list."""
221
        return self.items_history_list
222
223
    def get_raw_history(self, item=None, nb=0):
224
        """Return the history (RAW format).
225
226
        - the stats history (dict of list) if item is None
227
        - the stats history for the given item (list) instead
228
        - None if item did not exist in the history
229
        """
230
        s = self.stats_history.get(nb=nb)
231
        if item is None:
232
            return s
233
        if item in s:
234
            return s[item]
235
        return None
236
237
    def get_export_history(self, item=None):
238
        """Return the stats history object to export."""
239
        return self.get_raw_history(item=item)
240
241
    def get_stats_history(self, item=None, nb=0):
242
        """Return the stats history (JSON format)."""
243
        s = self.stats_history.get_json(nb=nb)
244
245
        if item is None:
246
            return json_dumps(s)
247
248
        return dictlist_json_dumps(s, item)
249
250
    def get_trend(self, item, nb=30):
251
        """Get the trend regarding to the last nb values.
252
253
        The trend is the diffirence between the mean of the last 0 to nb / 2
254
        and nb / 2 to nb values.
255
        """
256
        raw_history = self.get_raw_history(item=item, nb=nb)
257
        if raw_history is None or len(raw_history) < nb:
258
            return None
259
        last_nb = [v[1] for v in raw_history]
260
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
261
262
    @property
263
    def input_method(self):
264
        """Get the input method."""
265
        return self._input_method
266
267
    @input_method.setter
268
    def input_method(self, input_method):
269
        """Set the input method.
270
271
        * local: system local grab (psutil or direct access)
272
        * snmp: Client server mode via SNMP
273
        * glances: Client server mode via Glances API
274
        """
275
        self._input_method = input_method
276
277
    @property
278
    def short_system_name(self):
279
        """Get the short detected OS name (SNMP)."""
280
        return self._short_system_name
281
282
    def sorted_stats(self):
283
        """Get the stats sorted by an alias (if present) or key."""
284
        key = self.get_key()
285
        if key is None:
286
            return self.stats
287
        try:
288
            return sorted(
289
                self.stats,
290
                key=lambda stat: tuple(
291
                    int(part) if part.isdigit() else part.lower()
292
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
293
                ),
294
            )
295
        except TypeError:
296
            # Correct "Starting an alias with a number causes a crash #1885"
297
            return sorted(
298
                self.stats,
299
                key=lambda stat: tuple(
300
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
301
                ),
302
            )
303
304
    @short_system_name.setter
305
    def short_system_name(self, short_name):
306
        """Set the short detected OS name (SNMP)."""
307
        self._short_system_name = short_name
308
309
    def set_stats(self, input_stats):
310
        """Set the stats to input_stats."""
311
        self.stats = input_stats
312
313
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
314
        """Update stats using SNMP.
315
316
        If bulk=True, use a bulk request instead of a get request.
317
        """
318
        snmp_oid = snmp_oid or {}
319
320
        from glances.snmp import GlancesSNMPClient
321
322
        # Init the SNMP request
323
        snmp_client = GlancesSNMPClient(
324
            host=self.args.client,
325
            port=self.args.snmp_port,
326
            version=self.args.snmp_version,
327
            community=self.args.snmp_community,
328
        )
329
330
        # Process the SNMP request
331
        ret = {}
332
        if bulk:
333
            # Bulk request
334
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(itervalues(snmp_oid)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable itervalues does not seem to be defined.
Loading history...
335
            logger.info(snmp_result)
336
            if len(snmp_oid) == 1:
337
                # Bulk command for only one OID
338
                # Note: key is the item indexed but the OID result
339
                for item in snmp_result:
340
                    if iterkeys(item)[0].startswith(itervalues(snmp_oid)[0]):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable iterkeys does not seem to be defined.
Loading history...
341
                        ret[iterkeys(snmp_oid)[0] + iterkeys(item)[0].split(itervalues(snmp_oid)[0])[1]] = itervalues(
342
                            item
343
                        )[0]
344
            else:
345
                # Build the internal dict with the SNMP result
346
                # Note: key is the first item in the snmp_oid
347
                index = 1
348
                for item in snmp_result:
349
                    item_stats = {}
350
                    item_key = None
351
                    for key in iterkeys(snmp_oid):
352
                        oid = snmp_oid[key] + '.' + str(index)
353
                        if oid in item:
354
                            if item_key is None:
355
                                item_key = item[oid]
356
                            else:
357
                                item_stats[key] = item[oid]
358
                    if item_stats:
359
                        ret[item_key] = item_stats
360
                    index += 1
361
        else:
362
            # Simple get request
363
            snmp_result = snmp_client.get_by_oid(*list(itervalues(snmp_oid)))
364
365
            # Build the internal dict with the SNMP result
366
            for key in iterkeys(snmp_oid):
367
                ret[key] = snmp_result[snmp_oid[key]]
368
369
        return ret
370
371
    def get_raw(self):
372
        """Return the stats object."""
373
        return self.stats
374
375
    def get_export(self):
376
        """Return the stats object to export.
377
        By default, return the raw stats.
378
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
379
        """
380
        return self.get_raw()
381
382
    def get_stats(self):
383
        """Return the stats object in JSON format."""
384
        return json_dumps(self.get_raw())
385
386
    def get_json(self):
387
        """Return the stats object in JSON format."""
388
        return self.get_stats()
389
390
    def get_raw_stats_item(self, item):
391
        """Return the stats object for a specific item in RAW format.
392
393
        Stats should be a list of dict (processlist, network...)
394
        """
395
        return dictlist(self.get_raw(), item)
396
397
    def get_raw_stats_key(self, item, key):
398
        """Return the stats object for a specific item in RAW format.
399
400
        Stats should be a list of dict (processlist, network...)
401
        """
402
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
403
404
    def get_stats_item(self, item):
405
        """Return the stats object for a specific item in JSON format.
406
407
        Stats should be a list of dict (processlist, network...)
408
        """
409
        return dictlist_json_dumps(self.get_raw(), item)
410
411
    def get_raw_stats_value(self, item, value):
412
        """Return the stats object for a specific item=value.
413
414
        Return None if the item=value does not exist
415
        Return None if the item is not a list of dict
416
        """
417
        if not isinstance(self.get_raw(), list):
418
            return None
419
420
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
421
            value = int(value)
422
        try:
423
            return {value: [i for i in self.get_raw() if i[item] == value]}
424
        except (KeyError, ValueError) as e:
425
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
426
            return None
427
428
    def get_stats_value(self, item, value):
429
        """Return the stats object for a specific item=value in JSON format.
430
431
        Stats should be a list of dict (processlist, network...)
432
        """
433
        rsv = self.get_raw_stats_value(item, value)
434
        if rsv is None:
435
            return None
436
        return json_dumps(rsv)
437
438
    def get_item_info(self, item, key, default=None):
439
        """Return the item info grabbed into self.fields_description."""
440
        if self.fields_description is None or item not in self.fields_description:
441
            return default
442
        return self.fields_description[item].get(key, default)
443
444
    def _build_field_decoration(self, field):
445
        """Return the field decoration.
446
447
        The decoration is used to display the field in the UI.
448
        """
449
        # Manage the decoration
450
        if self.fields_description and field in self.fields_description:
451
            if (
452
                self.fields_description[field].get('rate') is True
453
                and isinstance(self.stats, dict)
454
                and self.stats.get('time_since_update', 0) == 0
455
            ):
456
                return 'DEFAULT'
457
            if self.fields_description[field].get('log') is True:
458
                return self.get_alert_log(self.stats[field], header=field)
459
            if self.fields_description[field].get('alert') is True:
460
                return self.get_alert(self.stats[field], header=field)
461
        return 'DEFAULT'
462
463
    def _build_field_optional(self, field):
464
        """Return true if the field is optional."""
465
        if self.fields_description and field in self.fields_description:
466
            return self.fields_description[field].get('optional', False)
467
        return False
468
469
    def _build_view_for_field(self, field):
470
        value = {
471
            'decoration': self._build_field_decoration(field),
472
            'optional': self._build_field_optional(field),
473
            'additional': False,
474
            'splittable': False,
475
            'hidden': False,
476
        }
477
478
        # Manage the hidden feature
479
        # Allow to automatically hide fields when values is never different than 0
480
        # Refactoring done for #2929
481
        if not self.hide_zero:
482
            value['hidden'] = False
483
        elif field in self.views and 'hidden' in self.views[field]:
484
            value['hidden'] = self.views[field]['hidden']
485
            if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
486
                value['hidden'] = False
487
        else:
488
            value['hidden'] = field in self.hide_zero_fields
489
490
        return value
491
492
    def update_views(self):
493
        """Update the stats views.
494
495
        The V of MVC
496
        A dict of dict with the needed information to display the stats.
497
        Example for the stat xxx:
498
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
499
                'optional': False,        >>> Is the stat optional
500
                'additional': False,      >>> Is the stat provide additional information
501
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
502
                'hidden': False}          >>> Is the stats should be hidden in the UI
503
        """
504
        ret = {}
505
506
        if self.get_raw() is not None and isinstance(self.get_raw(), list) and self.get_key() is not None:
507
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
508
            for i in self.get_raw():
509
                key = i[self.get_key()]
510
                ret[key] = {}
511
                for field in listkeys(i):
512
                    ret[key][field] = self._build_view_for_field(field)
513
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
514
            # Stats are stored in a dict (ex: CPU, LOAD...)
515
            for field in listkeys(self.get_raw()):
516
                ret[field] = self._build_view_for_field(field)
517
518
        self.views = ret
519
520
        return self.views
521
522
    def set_views(self, input_views):
523
        """Set the views to input_views."""
524
        self.views = input_views
525
526
    def reset_views(self):
527
        """Reset the views to input_views."""
528
        self.views = {}
529
530
    def get_views(self, item=None, key=None, option=None):
531
        """Return the views object.
532
533
        If key is None, return all the view for the current plugin
534
        else if option is None return the view for the specific key (all option)
535
        else return the view of the specific key/option
536
537
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
538
        """
539
        if item is None:
540
            item_views = self.views
541
        else:
542
            item_views = self.views[item]
543
544
        if key is None:
545
            return item_views
546
        if key not in item_views:
547
            return 'DEFAULT'
548
        if option is None:
549
            return item_views[key]
550
        if option in item_views[key]:
551
            return item_views[key][option]
552
        return 'DEFAULT'
553
554
    def get_json_views(self, item=None, key=None, option=None):
555
        """Return the views (in JSON)."""
556
        return json_dumps(self.get_views(item, key, option))
557
558
    def load_limits(self, config):
559
        """Load limits from the configuration file, if it exists."""
560
        # By default set the history length to 3 points per second during one day
561
        self._limits['history_size'] = 28800
562
563
        if not hasattr(config, 'has_section'):
564
            return False
565
566
        # Read the global section
567
        # TODO: not optimized because this section is loaded for each plugin...
568
        if config.has_section('global'):
569
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
570
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
571
572
        # Read the plugin specific section
573
        if config.has_section(self.plugin_name):
574
            for level, _ in config.items(self.plugin_name):
575
                # Read limits
576
                limit = '_'.join([self.plugin_name, level])
577
                try:
578
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
579
                except ValueError:
580
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
581
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
582
583
        return True
584
585
    @property
586
    def limits(self):
587
        """Return the limits object."""
588
        return self._limits
589
590
    @limits.setter
591
    def limits(self, input_limits):
592
        """Set the limits to input_limits."""
593
        self._limits = input_limits
594
595
    def set_refresh(self, value):
596
        """Set the plugin refresh rate"""
597
        self.set_limits('refresh', value)
598
599
    def get_refresh(self):
600
        """Return the plugin refresh time"""
601
        ret = self.get_limits(item='refresh')
602
        if ret is None:
603
            ret = self.args.time if hasattr(self.args, 'time') else 2
604
        return ret
605
606
    def get_refresh_time(self):
607
        """Return the plugin refresh time"""
608
        return self.get_refresh()
609
610
    def set_limits(self, item, value):
611
        """Set the limits object."""
612
        self._limits[f'{self.plugin_name}_{item}'] = value
613
614
    def get_limits(self, item=None):
615
        """Return the limits object."""
616
        if item is None:
617
            return self._limits
618
        return self._limits.get(f'{self.plugin_name}_{item}', None)
619
620
    def get_stats_action(self):
621
        """Return stats for the action.
622
623
        By default return all the stats.
624
        Can be overwrite by plugins implementation.
625
        For example, Docker will return self.stats['containers']
626
        """
627
        return self.stats
628
629
    def get_stat_name(self, header=""):
630
        """Return the stat name with an optional header"""
631
        ret = self.plugin_name
632
        if header != '':
633
            ret += '_' + header
634
        return ret
635
636
    def get_alert(
637
        self,
638
        current=0,
639
        minimum=0,
640
        maximum=100,
641
        highlight_zero=True,
642
        is_max=False,
643
        header="",
644
        action_key=None,
645
        log=False,
646
    ):
647
        """Return the alert status relative to a current value.
648
649
        Use this function for minor stats.
650
651
        If current < CAREFUL of max then alert = OK
652
        If current > CAREFUL of max then alert = CAREFUL
653
        If current > WARNING of max then alert = WARNING
654
        If current > CRITICAL of max then alert = CRITICAL
655
656
        If highlight=True than 0.0 is highlighted
657
658
        If defined 'header' is added between the plugin name and the status.
659
        Only useful for stats with several alert status.
660
661
        If defined, 'action_key' define the key for the actions.
662
        By default, the action_key is equal to the header.
663
664
        If log=True than add log if necessary
665
        elif log=False than do not log
666
        elif log=None than apply the config given in the conf file
667
        """
668
        # Manage 0 (0.0) value if highlight_zero is not True
669
        if not highlight_zero and current == 0:
670
            return 'DEFAULT'
671
672
        # Compute the %
673
        try:
674
            value = (current * 100) / maximum
675
        except ZeroDivisionError:
676
            return 'DEFAULT'
677
        except TypeError:
678
            return 'DEFAULT'
679
680
        # Build the stat_name
681
        stat_name = self.get_stat_name(header=header).lower()
682
683
        # Manage limits
684
        # If is_max is set then default style is set to MAX else default is set to OK
685
        ret = 'MAX' if is_max else 'OK'
686
687
        # Iter through limits
688
        critical = self.get_limit('critical', stat_name=stat_name)
689
        warning = self.get_limit('warning', stat_name=stat_name)
690
        careful = self.get_limit('careful', stat_name=stat_name)
691
        if critical and value >= critical:
692
            ret = 'CRITICAL'
693
        elif warning and value >= warning:
694
            ret = 'WARNING'
695
        elif careful and value >= careful:
696
            ret = 'CAREFUL'
697
        elif not careful and not warning and not critical:
698
            ret = 'DEFAULT'
699
        else:
700
            ret = 'OK'
701
702
        if current < minimum:
703
            ret = 'CAREFUL'
704
705
        # Manage log
706
        log_str = ""
707
        if self.get_limit_log(stat_name=stat_name, default_action=log):
708
            # Add _LOG to the return string
709
            # So stats will be highlighted with a specific color
710
            log_str = "_LOG"
711
            # Add the log to the events list
712
            glances_events.add(ret, stat_name.upper(), value)
713
714
        # Manage threshold
715
        self.manage_threshold(stat_name, ret)
716
717
        # Manage action
718
        self.manage_action(stat_name, ret.lower(), header, action_key)
719
720
        # Default is 'OK'
721
        return ret + log_str
722
723
    def filter_stats(self, stats):
724
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
725
        if hasattr(stats, '_asdict'):
726
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
727
        if isinstance(stats, dict):
728
            return {k: v for k, v in stats.items() if k in self.fields_description}
729
        if isinstance(stats, list):
730
            return [self.filter_stats(s) for s in stats]
731
        return stats
732
733
    def manage_threshold(self, stat_name, trigger):
734
        """Manage the threshold for the current stat."""
735
        glances_thresholds.add(stat_name, trigger)
736
737
    def manage_action(self, stat_name, trigger, header, action_key):
738
        """Manage the action for the current stat."""
739
        # Here is a command line for the current trigger ?
740
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
741
        if not command and not repeat:
742
            # Reset the trigger
743
            self.actions.set(stat_name, trigger)
744
        else:
745
            # Define the action key for the stats dict
746
            # If not define, then it sets to header
747
            if action_key is None:
748
                action_key = header
749
750
            # A command line is available for the current alert
751
            # 1) Build the {{mustache}} dictionary
752
            if isinstance(self.get_stats_action(), list):
753
                # If the stats are stored in a list of dict (fs plugin for example)
754
                # Return the dict for the current header
755
                mustache_dict = {}
756
                for item in self.get_stats_action():
757
                    if item[self.get_key()] == action_key:
758
                        mustache_dict = item
759
                        break
760
            else:
761
                # Use the stats dict
762
                mustache_dict = self.get_stats_action()
763
            # 2) Run the action
764
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
765
766
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
767
        """Get the alert log."""
768
        return self.get_alert(
769
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
770
        )
771
772
    def is_limit(self, criticality, stat_name=""):
773
        """Return true if the criticality limit exist for the given stat_name"""
774
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
775
776
    def get_limit(self, criticality=None, stat_name=""):
777
        """Return the limit value for the given criticality.
778
        If criticality is None, return the dict of all the limits."""
779
        if criticality is None:
780
            return self._limits
781
782
        # Get the limit for stat + header
783
        # Example: network_wlan0_rx_careful
784
        stat_name = stat_name.lower()
785
        if stat_name + '_' + criticality in self._limits:
786
            return self._limits[stat_name + '_' + criticality]
787
        if self.plugin_name + '_' + criticality in self._limits:
788
            return self._limits[self.plugin_name + '_' + criticality]
789
790
        return None
791
792
    def get_limit_action(self, criticality, stat_name=""):
793
        """Return the tuple (action, repeat) for the alert.
794
795
        - action is a command line
796
        - repeat is a bool
797
        """
798
        # Get the action for stat + header
799
        # Example: network_wlan0_rx_careful_action
800
        # Action key available ?
801
        ret = [
802
            (stat_name + '_' + criticality + '_action', False),
803
            (stat_name + '_' + criticality + '_action_repeat', True),
804
            (self.plugin_name + '_' + criticality + '_action', False),
805
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
806
        ]
807
        for r in ret:
808
            if r[0] in self._limits:
809
                return self._limits[r[0]], r[1]
810
811
        # No key found, return None
812
        return None, None
813
814
    def get_limit_log(self, stat_name, default_action=False):
815
        """Return the log tag for the alert."""
816
        # Get the log tag for stat + header
817
        # Example: network_wlan0_rx_log
818
        if stat_name + '_log' in self._limits:
819
            return self._limits[stat_name + '_log'][0].lower() == 'true'
820
        if self.plugin_name + '_log' in self._limits:
821
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
822
        return default_action
823
824
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
825
        """Return the configuration (header_) value for the current plugin.
826
827
        ...or the one given by the plugin_name var.
828
        """
829
        if plugin_name is None:
830
            # If not default use the current plugin name
831
            plugin_name = self.plugin_name
832
833
        if header != "":
834
            # Add the header
835
            plugin_name = plugin_name + '_' + header
836
837
        try:
838
            return self._limits[plugin_name + '_' + value]
839
        except KeyError:
840
            return default
841
842
    def is_show(self, value, header=""):
843
        """Return True if the value is in the show configuration list.
844
845
        If the show value is empty, return True (show by default)
846
847
        The show configuration list is defined in the glances.conf file.
848
        It is a comma-separated list of regexp.
849
        Example for diskio:
850
        show=sda.*
851
        """
852
        # TODO: possible optimisation: create a re.compile list
853
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
854
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
855
856
    def is_hide(self, value, header=""):
857
        """Return True if the value is in the hide configuration list.
858
859
        The hide configuration list is defined in the glances.conf file.
860
        It is a comma-separated list of regexp.
861
        Example for diskio:
862
        hide=sda2,sda5,loop.*
863
        """
864
        # TODO: possible optimisation: create a re.compile list
865
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
866
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
867
868
    def is_display(self, value, header=""):
869
        """Return True if the value should be displayed in the UI"""
870
        if self.get_conf_value('show', header=header) != []:
871
            return self.is_show(value, header=header)
872
        return not self.is_hide(value, header=header)
873
874
    def is_display_any(self, *values, header=""):
875
        """Return True if any of the values should be displayed in the UI"""
876
        if self.get_conf_value('show', header=header) != []:
877
            return any(self.is_show(value, header=header) for value in values)
878
        return not any(self.is_hide(value, header=header) for value in values)
879
880
    def read_alias(self):
881
        if self.plugin_name + '_' + 'alias' in self._limits:
882
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
883
        return {}
884
885
    def has_alias(self, header):
886
        """Return the alias name for the relative header it it exists otherwise None."""
887
        return self.alias.get(header, None)
888
889
    def msg_curse(self, args=None, max_width=None):
890
        """Return default string to display in the curse interface."""
891
        return [self.curse_add_line(str(self.stats))]
892
893
    def get_stats_display(self, args=None, max_width=None):
894
        """Return a dict with all the information needed to display the stat.
895
896
        key     | description
897
        ----------------------------
898
        display | Display the stat (True or False)
899
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
900
        align   | Message position
901
        """
902
        display_curse = False
903
904
        if hasattr(self, 'display_curse'):
905
            display_curse = self.display_curse
906
        if hasattr(self, 'align'):
907
            align_curse = self._align
908
909
        if max_width is not None:
910
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 906 is False. Are you sure this can never be the case?
Loading history...
911
        else:
912
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
913
914
        return ret
915
916
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
917
        """Return a dict with.
918
919
        Where:
920
            msg: string
921
            decoration:
922
                DEFAULT: no decoration
923
                UNDERLINE: underline
924
                BOLD: bold
925
                TITLE: for stat title
926
                PROCESS: for process name
927
                STATUS: for process status
928
                CPU_TIME: for process cpu time
929
                OK: Value is OK and non logged
930
                OK_LOG: Value is OK and logged
931
                CAREFUL: Value is CAREFUL and non logged
932
                CAREFUL_LOG: Value is CAREFUL and logged
933
                WARNING: Value is WARNING and non logged
934
                WARNING_LOG: Value is WARNING and logged
935
                CRITICAL: Value is CRITICAL and non logged
936
                CRITICAL_LOG: Value is CRITICAL and logged
937
            optional: True if the stat is optional (display only if space is available)
938
            additional: True if the stat is additional (display only if space is available after optional)
939
            spittable: Line can be split to fit on the screen (default is not)
940
        """
941
        return {
942
            'msg': msg,
943
            'decoration': decoration,
944
            'optional': optional,
945
            'additional': additional,
946
            'splittable': splittable,
947
        }
948
949
    def curse_new_line(self):
950
        """Go to a new line."""
951
        return self.curse_add_line('\n')
952
953
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
954
        """Return a list of dict messages with the 'key: value' result
955
956
          <=== width ===>
957
        __key     : 80.5%__
958
        | |       | |    |_ trailer
959
        | |       | |_ self.stats[key]
960
        | |       |_ separator
961
        | |_ 'short_name' description or key or nothing if display_key is True
962
        |_ header
963
964
        Instead of:
965
            msg = '  {:8}'.format('idle:')
966
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
967
            msg = '{:5.1f}%'.format(self.stats['idle'])
968
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
969
970
        Use:
971
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
972
973
        """
974
        if key not in self.stats:
975
            return []
976
977
        # Check if a shortname is defined
978
        if not display_key:
979
            key_name = ''
980
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
981
            key_name = self.fields_description[key]['short_name']
982
        else:
983
            key_name = key
984
985
        # Check if unit is defined and get the short unit char in the unit_sort dict
986
        if (
987
            key in self.fields_description
988
            and 'unit' in self.fields_description[key]
989
            and self.fields_description[key]['unit'] in fields_unit_short
990
        ):
991
            # Get the shortname
992
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
993
        else:
994
            unit_short = ''
995
996
        # Check if unit is defined and get the unit type unit_type dict
997
        if (
998
            key in self.fields_description
999
            and 'unit' in self.fields_description[key]
1000
            and self.fields_description[key]['unit'] in fields_unit_type
1001
        ):
1002
            # Get the shortname
1003
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
1004
        else:
1005
            unit_type = 'float'
1006
1007
        # Is it a rate ? Yes, get the pre-computed rate value
1008
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
1009
            value = self.stats.get(key + '_rate_per_sec', None)
1010
        else:
1011
            value = self.stats.get(key, None)
1012
1013
        if width is None:
1014
            msg_item = header + f'{key_name}' + separator
1015
            msg_template_float = '{:.1f}{}'
1016
            msg_template = '{}{}'
1017
        else:
1018
            # Define the size of the message
1019
            # item will be on the left
1020
            # value will be on the right
1021
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1022
            msg_template_float = '{:5.1f}{}'
1023
            msg_template = '{:>5}{}'
1024
1025
        if value is None:
1026
            msg_value = msg_template.format('-', '')
1027
        elif unit_type == 'float':
1028
            msg_value = msg_template_float.format(value, unit_short)
1029
        elif 'min_symbol' in self.fields_description[key]:
1030
            msg_value = msg_template.format(
1031
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1032
            )
1033
        else:
1034
            msg_value = msg_template.format(int(value), unit_short)
1035
1036
        # Add the trailer
1037
        msg_value = msg_value + trailer
1038
1039
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1040
        optional = self.get_views(key=key, option='optional')
1041
1042
        return [
1043
            self.curse_add_line(msg_item, optional=optional),
1044
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1045
        ]
1046
1047
    @property
1048
    def align(self):
1049
        """Get the curse align."""
1050
        return self._align
1051
1052
    @align.setter
1053
    def align(self, value):
1054
        """Set the curse align.
1055
1056
        value: left, right, bottom.
1057
        """
1058
        self._align = value
1059
1060
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1061
        """Make a nice human-readable string out of number.
1062
1063
        Number of decimal places increases as quantity approaches 1.
1064
        CASE: 613421788        RESULT:       585M low_precision:       585M
1065
        CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
1066
        CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
1067
        CASE: 838471403472     RESULT:       781G low_precision:       781G
1068
        CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
1069
        CASE: 1073741824       RESULT:      1024M low_precision:      1024M
1070
        CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
1071
1072
        :low_precision: returns less decimal places potentially (default is False)
1073
                        sacrificing precision for more readability.
1074
        :min_symbol: Do not approach if number < min_symbol (default is K)
1075
        :decimal_count: if set, force the number of decimal number (default is None)
1076
        """
1077
        if number is None:
1078
            return none_symbol
1079
        symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
1080
        if min_symbol in symbols:
1081
            symbols = symbols[symbols.index(min_symbol) :]
1082
        prefix = {
1083
            'Y': 1208925819614629174706176,
1084
            'Z': 1180591620717411303424,
1085
            'E': 1152921504606846976,
1086
            'P': 1125899906842624,
1087
            'T': 1099511627776,
1088
            'G': 1073741824,
1089
            'M': 1048576,
1090
            'K': 1024,
1091
        }
1092
1093
        if number == 0:
1094
            # Avoid 0.0
1095
            return '0'
1096
1097
        for symbol in reversed(symbols):
1098
            value = float(number) / prefix[symbol]
1099
            if value > 1:
1100
                decimal_precision = 0
1101
                if value < 10:
1102
                    decimal_precision = 2
1103
                elif value < 100:
1104
                    decimal_precision = 1
1105
                if low_precision:
1106
                    if symbol in 'MK':
1107
                        decimal_precision = 0
1108
                    else:
1109
                        decimal_precision = min(1, decimal_precision)
1110
                elif symbol in 'K':
1111
                    decimal_precision = 0
1112
                return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
1113
        return f'{number!s}'
1114
1115
    def trend_msg(self, trend, significant=1):
1116
        """Return the trend message.
1117
1118
        Do not take into account if trend < significant
1119
        """
1120
        ret = '-'
1121
        if trend is None:
1122
            ret = ' '
1123
        elif trend > significant:
1124
            ret = unicode_message('ARROW_UP', self.args)
1125
        elif trend < -significant:
1126
            ret = unicode_message('ARROW_DOWN', self.args)
1127
        return ret
1128
1129
    def _check_decorator(fct):
1130
        """Check decorator for update method.
1131
1132
        It checks:
1133
        - if the plugin is enabled.
1134
        - if the refresh_timer is finished
1135
        """
1136
1137
        def wrapper(self, *args, **kw):
1138
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1139
                # Run the method
1140
                ret = fct(self, *args, **kw)
1141
                # Reset the timer
1142
                self.refresh_timer.set(self.get_refresh())
1143
                self.refresh_timer.reset()
1144
            else:
1145
                # No need to call the method
1146
                # Return the last result available
1147
                ret = self.stats
1148
            return ret
1149
1150
        return wrapper
1151
1152 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1153
        """Log (DEBUG) the result of the function fct."""
1154
1155
        def wrapper(*args, **kw):
1156
            counter = Counter()
1157
            ret = fct(*args, **kw)
1158
            duration = counter.get()
1159
            class_name = args[0].__class__.__name__
1160
            class_module = args[0].__class__.__module__
1161
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1162
            return ret
1163
1164
        return wrapper
1165
1166
    def _manage_rate(fct):
1167
        """Manage rate decorator for update method."""
1168
1169
        def compute_rate(self, stat, stat_previous):
1170
            if stat_previous is None:
1171
                return stat
1172
1173
            # 1) set _gauge for all the rate fields
1174
            # 2) compute the _rate_per_sec
1175
            # 3) set the original field to the delta between the current and the previous value
1176
            for field in self.fields_description:
1177
                # For all the field with the rate=True flag
1178
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1179
                if self.fields_description[field].get('rate', False):
1180
                    # Create a new metadata with the gauge
1181
                    stat['time_since_update'] = self.time_since_last_update
1182
                    stat[field + '_gauge'] = stat[field]
1183
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1184
                        # The stat becomes the delta between the current and the previous value
1185
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1186
                        # Compute the rate
1187
                        if self.time_since_last_update > 0:
1188
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1189
                        else:
1190
                            stat[field] = 0
1191
                            stat[field + '_rate_per_sec'] = 0
1192
                    else:
1193
                        # Avoid strange rate at the first run
1194
                        stat[field] = 0
1195
                        stat[field + '_rate_per_sec'] = 0
1196
            return stat
1197
1198
        def compute_rate_on_list(self, stats, stats_previous):
1199
            if stats_previous is None:
1200
                return stats
1201
1202
            for stat in stats:
1203
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1204
                if len(olds) == 1:
1205
                    compute_rate(self, stat, olds[0])
1206
            return stats
1207
1208
        def wrapper(self, *args, **kw):
1209
            # Call the father method
1210
            stats = fct(self, *args, **kw)
1211
1212
            # Get the time since the last update
1213
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1214
1215
            # Compute the rate
1216
            if isinstance(stats, dict):
1217
                # Stats is a dict
1218
                compute_rate(self, stats, self.stats_previous)
1219
            elif isinstance(stats, list):
1220
                # Stats is a list
1221
                compute_rate_on_list(self, stats, self.stats_previous)
1222
1223
            # Memorized the current stats for next run
1224
            self.stats_previous = copy.deepcopy(stats)
1225
1226
            return stats
1227
1228
        return wrapper
1229
1230
    # Mandatory to call the decorator in child classes
1231
    _check_decorator = staticmethod(_check_decorator)
1232
    _log_result_decorator = staticmethod(_log_result_decorator)
1233
    _manage_rate = staticmethod(_manage_rate)
1234