GlancesPluginModel.get_limit()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 15
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 9
nop 3
dl 0
loc 15
rs 9.95
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
from datetime import datetime
18
19
from glances.actions import GlancesActions
20
from glances.events_list import glances_events
21
from glances.globals import (
22
    auto_unit,
23
    dictlist,
24
    dictlist_json_dumps,
25
    json_dumps,
26
    list_to_dict,
27
    listkeys,
28
    mean,
29
    nativestr,
30
    split_esc,
31
)
32
from glances.history import GlancesHistory
33
from glances.logger import logger
34
from glances.outputs.glances_unicode import unicode_message
35
from glances.thresholds import glances_thresholds
36
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
37
38
fields_unit_short = {'percent': '%'}
39
40
fields_unit_type = {
41
    'percent': 'float',
42
    'percents': 'float',
43
    'number': 'int',
44
    'numbers': 'int',
45
    'int': 'int',
46
    'ints': 'int',
47
    'float': 'float',
48
    'floats': 'float',
49
    'second': 'int',
50
    'seconds': 'int',
51
    'byte': 'int',
52
    'bytes': 'int',
53
}
54
55
56
class GlancesPluginModel:
57
    """Main class for Glances plugin model."""
58
59
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
60
        """Init the plugin of plugins model class.
61
62
        All Glances' plugins model should inherit from this class. Most of the
63
        methods are already implemented in the father classes.
64
65
        Your plugin should return a dict or a list of dicts (stored in the
66
        self.stats). As an example, you can have a look on the mem plugin
67
        (for dict) or network (for list of dicts).
68
69
        From version 4 of the API, the plugin should return a dict.
70
71
        A plugin should implement:
72
        - the reset method: to set your self.stats variable to {} or []
73
        - the update method: where your self.stats variable is set
74
        and optionally:
75
        - the get_key method: set the key of the dict (only for list of dict)
76
        - all others methods you want to overwrite
77
78
        :args: args parameters
79
        :config: configuration parameters
80
        :items_history_list: list of items to store in the history
81
        :stats_init_value: Default value for a stats item
82
        """
83
        # Build the plugin name
84
        # Internal or external module (former prefixed by 'glances.plugins')
85
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
86
        self.plugin_name = _mod.split('.')[0]
87
88
        if self.plugin_name.startswith('glances_'):
89
            self.plugin_name = self.plugin_name.split('glances_')[1]
90
        logger.debug(f"Init {self.plugin_name} plugin")
91
92
        # Init the args
93
        self.args = args
94
95
        # Init the default alignment (for curses)
96
        self._align = 'left'
97
98
        # Init the input method
99
        self._input_method = 'local'
100
        self._short_system_name = None
101
102
        # Init the history list
103
        self.items_history_list = items_history_list
104
        self.stats_history = self.init_stats_history()
105
106
        # Init the limits (configuration keys) dictionary
107
        self._limits = {}
108
        if config is not None:
109
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
110
            self.load_limits(config=config)
111
112
        # Init the alias (dictionary)
113
        self.alias = self.read_alias()
114
115
        # Init the actions
116
        self.actions = GlancesActions(args=args)
117
118
        # Init the views
119
        self.views = {}
120
121
        # Hide stats if all the hide_zero_fields has never been != 0
122
        # Default is False, always display stats
123
        self.hide_zero = False
124
        # The threshold needed to display a value if hide_zero is true.
125
        # Only hide a value if it is less than hide_threshold_bytes.
126
        self.hide_threshold_bytes = 0
127
        self.hide_zero_fields = []
128
129
        # Set the initial refresh time to display stats the first time
130
        self.refresh_timer = Timer(0)
131
132
        # Init stats description
133
        self.fields_description = fields_description
134
135
        # Init the stats
136
        self.stats_init_value = stats_init_value
137
        self.time_since_last_update = None
138
        self.stats = None
139
        self.stats_previous = None
140
        self.reset()
141
142
    def __str__(self):
143
        """Return the human-readable stats."""
144
        return str(self.stats)
145
146
    def __repr__(self):
147
        """Return the raw stats."""
148
        if isinstance(self.stats, list):
149
            return str(list_to_dict(self.stats))
150
        return str(self.stats)
151
152
    def __getitem__(self, item):
153
        """Return the stats item."""
154
        if isinstance(self.stats, dict) and item in self.stats:
155
            return self.stats[item]
156
157
        if isinstance(self.stats, list):
158
            ltd = list_to_dict(self.stats)
159
            if item in ltd:
160
                return ltd[item]
161
162
        raise KeyError(f"'{self.__class__.__name__}' object has no key '{item}'")
163
164
    def keys(self):
165
        """Return the keys of the stats."""
166
        if isinstance(self.stats, dict):
167
            return listkeys(self.stats)
168
        if isinstance(self.stats, list):
169
            return listkeys(list_to_dict(self.stats))
170
        return []
171
172
    def get(self, item, default=None):
173
        """Return the stats item or default if not found."""
174
        try:
175
            return self[item]
176
        except KeyError:
177
            return default
178
179
    def get_init_value(self):
180
        """Return a copy of the init value."""
181
        return copy.copy(self.stats_init_value)
182
183
    def reset(self):
184
        """Reset the stats.
185
186
        This method should be overwritten by child classes.
187
        """
188
        self.stats = self.get_init_value()
189
190
    def exit(self):
191
        """Just log an event when Glances exit."""
192
        logger.debug(f"Stop the {self.plugin_name} plugin")
193
194
    def get_key(self):
195
        """Return the key of the list."""
196
        return
197
198
    def is_enabled(self, plugin_name=None):
199
        """Return true if plugin is enabled."""
200
        if not plugin_name:
201
            plugin_name = self.plugin_name
202
        try:
203
            d = getattr(self.args, 'disable_' + plugin_name)
204
        except AttributeError:
205
            d = getattr(self.args, 'enable_' + plugin_name, True)
206
        return d is False
207
208
    def is_disabled(self, plugin_name=None):
209
        """Return true if plugin is disabled."""
210
        return not self.is_enabled(plugin_name=plugin_name)
211
212
    def history_enable(self):
213
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
214
215
    def init_stats_history(self):
216
        """Init the stats history (dict of GlancesAttribute)."""
217
        if self.history_enable():
218
            init_list = [a['name'] for a in self.get_items_history_list()]
219
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
220
        return GlancesHistory()
221
222
    def reset_stats_history(self):
223
        """Reset the stats history (dict of GlancesAttribute)."""
224
        if self.history_enable():
225
            reset_list = [a['name'] for a in self.get_items_history_list()]
226
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
227
            self.stats_history.reset()
228
229
    def update_stats_history(self):
230
        """Update stats history."""
231
        # Exit if no history
232
        if not self.history_enable():
233
            return
234
        # Build the history
235
        _get_export = self.get_export()
236
        if not (_get_export and self.history_enable()):
237
            return
238
        # Itern through items history
239
        item_name = '' if self.get_key() is None else self.get_key()
240
        for i in self.get_items_history_list():
241
            if isinstance(_get_export, list):
242
                # Stats is a list of data
243
                # Iter through stats (for example, iter through network interface)
244
                for l_export in _get_export:
245
                    if i['name'] in l_export:
246
                        self.stats_history.add(
247
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
248
                            l_export[i['name']],
249
                            description=i['description'],
250
                            history_max_size=self._limits['history_size'],
251
                        )
252
            else:
253
                # Stats is not a list
254
                # Add the item to the history directly
255
                self.stats_history.add(
256
                    nativestr(i['name']),
257
                    _get_export[i['name']],
258
                    description=i['description'],
259
                    history_max_size=self._limits['history_size'],
260
                )
261
262
    def get_items_history_list(self):
263
        """Return the items history list."""
264
        return self.items_history_list
265
266
    def get_raw_history(self, item=None, nb=0):
267
        """Return the history (RAW format).
268
269
        - the stats history (dict of list) if item is None
270
        - the stats history for the given item (list) instead
271
        - None if item did not exist in the history
272
        """
273
        s = self.stats_history.get(nb=nb)
274
        if item is None:
275
            return s
276
        if item in s:
277
            return s[item]
278
        return None
279
280
    def get_export_history(self, item=None):
281
        """Return the stats history object to export."""
282
        return self.get_raw_history(item=item)
283
284
    def get_stats_history(self, item=None, nb=0):
285
        """Return the stats history (JSON format)."""
286
        s = self.stats_history.get_json(nb=nb)
287
288
        if item is None:
289
            return json_dumps(s)
290
291
        return dictlist_json_dumps(s, item)
292
293
    def get_trend(self, item, nb=30):
294
        """Get the trend regarding to the last nb values.
295
296
        The trend is the diffirence between the mean of the last 0 to nb / 2
297
        and nb / 2 to nb values.
298
        """
299
        raw_history = self.get_raw_history(item=item, nb=nb)
300
        if raw_history is None or len(raw_history) < nb:
301
            return None
302
        last_nb = [v[1] for v in raw_history]
303
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
304
305
    @property
306
    def input_method(self):
307
        """Get the input method."""
308
        return self._input_method
309
310
    @input_method.setter
311
    def input_method(self, input_method):
312
        """Set the input method.
313
314
        * local: system local grab (psutil or direct access)
315
        * snmp: Client server mode via SNMP
316
        * glances: Client server mode via Glances API
317
        """
318
        self._input_method = input_method
319
320
    @property
321
    def short_system_name(self):
322
        """Get the short detected OS name (SNMP)."""
323
        return self._short_system_name
324
325
    def sorted_stats(self):
326
        """Get the stats sorted by an alias (if present) or key."""
327
        key = self.get_key()
328
        if key is None:
329
            return self.stats
330
        try:
331
            return sorted(
332
                self.stats,
333
                key=lambda stat: tuple(
334
                    int(part) if part.isdigit() else part.lower()
335
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
336
                ),
337
            )
338
        except TypeError:
339
            # Correct "Starting an alias with a number causes a crash #1885"
340
            return sorted(
341
                self.stats,
342
                key=lambda stat: tuple(
343
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
344
                ),
345
            )
346
347
    @short_system_name.setter
348
    def short_system_name(self, short_name):
349
        """Set the short detected OS name (SNMP)."""
350
        self._short_system_name = short_name
351
352
    def set_stats(self, input_stats):
353
        """Set the stats to input_stats."""
354
        self.stats = input_stats
355
356
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
357
        """Update stats using SNMP.
358
359
        If bulk=True, use a bulk request instead of a get request.
360
        """
361
        snmp_oid = snmp_oid or {}
362
363
        from glances.snmp import GlancesSNMPClient
364
365
        # Init the SNMP request
366
        snmp_client = GlancesSNMPClient(
367
            host=self.args.client,
368
            port=self.args.snmp_port,
369
            version=self.args.snmp_version,
370
            community=self.args.snmp_community,
371
        )
372
373
        # Process the SNMP request
374
        ret = {}
375
        if bulk:
376
            # Bulk request
377
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(snmp_oid.values()))
378
            logger.info(snmp_result)
379
            if len(snmp_oid) == 1:
380
                # Bulk command for only one OID
381
                # Note: key is the item indexed but the OID result
382
                for item in snmp_result:
383
                    if item.keys()[0].startswith(snmp_oid.values()[0]):
384
                        ret[snmp_oid.keys()[0] + item.keys()[0].split(snmp_oid.values()[0])[1]] = item.values()[0]
385
            else:
386
                # Build the internal dict with the SNMP result
387
                # Note: key is the first item in the snmp_oid
388
                index = 1
389
                for item in snmp_result:
390
                    item_stats = {}
391
                    item_key = None
392
                    for key in snmp_oid:
393
                        oid = snmp_oid[key] + '.' + str(index)
394
                        if oid in item:
395
                            if item_key is None:
396
                                item_key = item[oid]
397
                            else:
398
                                item_stats[key] = item[oid]
399
                    if item_stats:
400
                        ret[item_key] = item_stats
401
                    index += 1
402
        else:
403
            # Simple get request
404
            snmp_result = snmp_client.get_by_oid(*list(snmp_oid.values()))
405
406
            # Build the internal dict with the SNMP result
407
            for key in snmp_oid:
408
                ret[key] = snmp_result[snmp_oid[key]]
409
410
        return ret
411
412
    def get_raw(self):
413
        """Return the stats object."""
414
        return self.stats
415
416
    def get_export(self):
417
        """Return the stats object to export.
418
        By default, return the raw stats.
419
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
420
        """
421
        return self.get_raw()
422
423
    def get_stats(self):
424
        """Return the stats object in JSON format."""
425
        return json_dumps(self.get_raw())
426
427
    def get_json(self):
428
        """Return the stats object in JSON format."""
429
        return self.get_stats()
430
431
    def get_raw_stats_item(self, item):
432
        """Return the stats object for a specific item in RAW format.
433
434
        Stats should be a list of dict (processlist, network...)
435
        """
436
        return dictlist(self.get_raw(), item)
437
438
    def get_raw_stats_key(self, item, key):
439
        """Return the stats object for a specific item in RAW format.
440
441
        Stats should be a list of dict (processlist, network...)
442
        """
443
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
444
445
    def get_stats_item(self, item):
446
        """Return the stats object for a specific item in JSON format.
447
448
        Stats should be a list of dict (processlist, network...)
449
        """
450
        return dictlist_json_dumps(self.get_raw(), item)
451
452
    def get_raw_stats_value(self, item, value):
453
        """Return the stats object for a specific item=value.
454
455
        Return None if the item=value does not exist
456
        Return None if the item is not a list of dict
457
        """
458
        if not isinstance(self.get_raw(), list):
459
            return None
460
461
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
462
            value = int(value)
463
        try:
464
            return {value: [i for i in self.get_raw() if i[item] == value]}
465
        except (KeyError, ValueError) as e:
466
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
467
            return None
468
469
    def get_stats_value(self, item, value):
470
        """Return the stats object for a specific item=value in JSON format.
471
472
        Stats should be a list of dict (processlist, network...)
473
        """
474
        rsv = self.get_raw_stats_value(item, value)
475
        if rsv is None:
476
            return None
477
        return json_dumps(rsv)
478
479
    def get_item_info(self, item, key, default=None):
480
        """Return the item info grabbed into self.fields_description."""
481
        if self.fields_description is None or item not in self.fields_description:
482
            return default
483
        return self.fields_description[item].get(key, default)
484
485
    def _build_field_decoration(self, field):
486
        """Return the field decoration.
487
488
        The decoration is used to display the field in the UI.
489
        """
490
        # Manage the decoration
491
        if self.fields_description and field in self.fields_description:
492
            if (
493
                self.fields_description[field].get('rate') is True
494
                and isinstance(self.stats, dict)
495
                and self.stats.get('time_since_update', 0) == 0
496
            ):
497
                return 'DEFAULT'
498
            if self.fields_description[field].get('log') is True:
499
                return self.get_alert_log(self.stats[field], header=field)
500
            if self.fields_description[field].get('alert') is True:
501
                return self.get_alert(self.stats[field], header=field)
502
        return 'DEFAULT'
503
504
    def _build_field_optional(self, field):
505
        """Return true if the field is optional."""
506
        if self.fields_description and field in self.fields_description:
507
            return self.fields_description[field].get('optional', False)
508
        return False
509
510
    def _build_view_for_field(self, key=None, field=None):
511
        view = {
512
            'decoration': self._build_field_decoration(field),
513
            'optional': self._build_field_optional(field),
514
            'additional': False,
515
            'splittable': False,
516
            'hidden': False,
517
        }
518
519
        # Manage the hidden feature
520
        # Allow to automatically hide fields when values is never different than 0
521
        # Refactoring done for #2929
522
        if not self.hide_zero:
523
            view['hidden'] = False
524
        elif key and key in self.views and field in self.views[key] and 'hidden' in self.views[key][field]:
525
            view['hidden'] = self.views[key][field]['hidden']
526
            if (
527
                field in self.hide_zero_fields
528
                and self.get_raw_stats_key(item=field, key=key).get(field) >= self.hide_threshold_bytes
529
            ):
530
                view['hidden'] = False
531
            # logger.info(f'{key=} {field=} {view["hidden"]=}')
532
        else:
533
            view['hidden'] = field in self.hide_zero_fields
534
535
        return view
536
537
    def update_views(self):
538
        """Update the stats views.
539
540
        The V of MVC
541
        A dict of dict with the needed information to display the stats.
542
        Example for the stat xxx:
543
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
544
                'optional': False,        >>> Is the stat optional
545
                'additional': False,      >>> Is the stat provide additional information
546
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
547
                'hidden': False}          >>> Is the stats should be hidden in the UI
548
        """
549
        ret = {}
550
551
        if self.get_raw() is not None and isinstance(self.get_raw(), list) and self.get_key() is not None:
552
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
553
            for i in self.get_raw():
554
                key = i[self.get_key()]
555
                ret[key] = {}
556
                for field in listkeys(i):
557
                    ret[key][field] = self._build_view_for_field(key=key, field=field)
558
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
559
            # Stats are stored in a dict (ex: CPU, LOAD...)
560
            for field in listkeys(self.get_raw()):
561
                ret[field] = self._build_view_for_field(key=None, field=field)
562
563
        self.views = ret
564
565
        return self.views
566
567
    def set_views(self, input_views):
568
        """Set the views to input_views."""
569
        self.views = input_views
570
571
    def reset_views(self):
572
        """Reset the views to input_views."""
573
        self.views = {}
574
575
    def get_views(self, item=None, key=None, option=None):
576
        """Return the views object.
577
578
        If key is None, return all the view for the current plugin
579
        else if option is None return the view for the specific key (all option)
580
        else return the view of the specific key/option
581
582
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
583
        """
584
        if item is None:
585
            item_views = self.views
586
        else:
587
            item_views = self.views[item]
588
        if key is None:
589
            return item_views
590
        if key not in item_views:
591
            return 'DEFAULT'
592
        if option is None:
593
            return item_views[key]
594
        if option in item_views[key]:
595
            return item_views[key][option]
596
        return 'DEFAULT'
597
598
    def get_json_views(self, item=None, key=None, option=None):
599
        """Return the views (in JSON)."""
600
        return json_dumps(self.get_views(item, key, option))
601
602
    def load_limits(self, config):
603
        """Load limits from the configuration file, if it exists."""
604
        # By default set the history length to 3 points per second during one day
605
        self._limits['history_size'] = 28800
606
607
        if not hasattr(config, 'has_section'):
608
            return False
609
610
        # Read the global section
611
        # TODO: not optimized because this section is loaded for each plugin...
612
        if config.has_section('global'):
613
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
614
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
615
616
        # Read the plugin specific section
617
        if config.has_section(self.plugin_name):
618
            for level, _ in config.items(self.plugin_name):
619
                # Read limits
620
                limit = '_'.join([self.plugin_name, level])
621
                try:
622
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
623
                except ValueError:
624
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
625
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
626
627
        return True
628
629
    @property
630
    def limits(self):
631
        """Return the limits object."""
632
        return self._limits
633
634
    @limits.setter
635
    def limits(self, input_limits):
636
        """Set the limits to input_limits."""
637
        self._limits = input_limits
638
639
    def set_refresh(self, value):
640
        """Set the plugin refresh rate"""
641
        self.set_limits('refresh', value)
642
643
    def get_refresh(self):
644
        """Return the plugin refresh time"""
645
        ret = self.get_limits(item='refresh')
646
        if ret is None:
647
            ret = self.args.time if hasattr(self.args, 'time') else 2
648
        return ret
649
650
    def get_refresh_time(self):
651
        """Return the plugin refresh time"""
652
        return self.get_refresh()
653
654
    def set_limits(self, item, value):
655
        """Set the limits object."""
656
        self._limits[f'{self.plugin_name}_{item}'] = value
657
658
    def get_limits(self, item=None):
659
        """Return the limits object."""
660
        if item is None:
661
            return self._limits
662
        return self._limits.get(f'{self.plugin_name}_{item}', None)
663
664
    def get_stats_action(self):
665
        """Return stats for the action.
666
667
        By default return all the stats.
668
        Can be overwrite by plugins implementation.
669
        For example, Docker will return self.stats['containers']
670
        """
671
        return self.stats
672
673
    def get_stat_name(self, header=None, action_key=None):
674
        """Return the stat name with an optional action_key and header"""
675
        ret = self.plugin_name
676
        if action_key is not None and action_key != '':
677
            ret += '_' + action_key
678
        if header is not None and header != '':
679
            ret += '_' + header
680
        return ret
681
682
    def get_alert(
683
        self,
684
        current=0,
685
        minimum=0,
686
        maximum=100,
687
        highlight_zero=True,
688
        is_max=False,
689
        header=None,
690
        action_key=None,
691
        log=False,
692
    ):
693
        """Return the alert status relative to a current value.
694
695
        Use this function for minor stats.
696
697
        If current < CAREFUL of max then alert = OK
698
        If current > CAREFUL of max then alert = CAREFUL
699
        If current > WARNING of max then alert = WARNING
700
        If current > CRITICAL of max then alert = CRITICAL
701
702
        If highlight=True than 0.0 is highlighted
703
704
        If defined 'header' is added between the plugin name and the status.
705
        Only useful for stats with several alert status.
706
707
        If defined, 'action_key' define the key for the actions.
708
        By default, the action_key is equal to the header.
709
710
        If log=True than add log if necessary
711
        elif log=False than do not log
712
        elif log=None than apply the config given in the conf file
713
        """
714
        # Manage 0 (0.0) value if highlight_zero is not True
715
        if not highlight_zero and current == 0:
716
            return 'DEFAULT'
717
718
        # Compute the %
719
        try:
720
            value = (current * 100) / maximum
721
        except ZeroDivisionError:
722
            return 'DEFAULT'
723
        except TypeError:
724
            return 'DEFAULT'
725
726
        # Build the stat_name
727
        stat_name = self.get_stat_name(header=header, action_key=action_key).lower()
728
729
        # Manage limits
730
        # If is_max is set then default style is set to MAX else default is set to OK
731
        ret = 'MAX' if is_max else 'OK'
732
733
        # Iter through limits
734
        critical = self.get_limit('critical', stat_name=stat_name)
735
        warning = self.get_limit('warning', stat_name=stat_name)
736
        careful = self.get_limit('careful', stat_name=stat_name)
737
        if critical and value >= critical:
738
            ret = 'CRITICAL'
739
        elif warning and value >= warning:
740
            ret = 'WARNING'
741
        elif careful and value >= careful:
742
            ret = 'CAREFUL'
743
        elif not careful and not warning and not critical:
744
            ret = 'DEFAULT'
745
        else:
746
            ret = 'OK'
747
748
        if current < minimum:
749
            ret = 'CAREFUL'
750
751
        # Manage log
752
        log_str = ""
753
        if self.get_limit_log(stat_name=stat_name, default_action=log) and ret != 'DEFAULT':
754
            # Add _LOG to the return string
755
            # So stats will be highlighted with a specific color
756
            log_str = "_LOG"
757
            # Add the log to the events list
758
            glances_events.add(ret, stat_name.upper(), value)
759
760
        # Manage threshold
761
        self.manage_threshold(stat_name, ret)
762
763
        # Manage action
764
        self.manage_action(stat_name, ret.lower(), header, action_key)
765
766
        # Default is 'OK'
767
        return ret + log_str
768
769
    def filter_stats(self, stats):
770
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
771
        if hasattr(stats, '_asdict'):
772
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
773
        if isinstance(stats, dict):
774
            return {k: v for k, v in stats.items() if k in self.fields_description}
775
        if isinstance(stats, list):
776
            return [self.filter_stats(s) for s in stats]
777
        return stats
778
779
    def manage_threshold(self, stat_name, trigger):
780
        """Manage the threshold for the current stat."""
781
        glances_thresholds.add(stat_name, trigger)
782
783
    def manage_action(self, stat_name, trigger, header, action_key):
784
        """Manage the action for the current stat."""
785
        # Here is a command line for the current trigger ?
786
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
787
        if not command and not repeat:
788
            # Reset the trigger
789
            self.actions.set(stat_name, trigger)
790
        else:
791
            # Define the action key for the stats dict
792
            # If not define, then it sets to header
793
            if action_key is None:
794
                action_key = header
795
796
            # A command line is available for the current alert
797
            # 1) Build the {{mustache}} dictionary
798
            stats_action = copy.deepcopy(self.get_stats_action())
799
            if isinstance(stats_action, list):
800
                # If the stats are stored in a list of dict (fs plugin for example)
801
                mustache_dict = {}
802
                for item in stats_action:
803
                    # Add the limit to the mustache dict
804
                    item['critical'] = self.get_limit('critical', stat_name=stat_name)
805
                    item['warning'] = self.get_limit('warning', stat_name=stat_name)
806
                    item['careful'] = self.get_limit('careful', stat_name=stat_name)
807
                    # Add the current time (now)
808
                    item['time'] = datetime.now().isoformat()
809
                    if item[self.get_key()] == action_key:
810
                        mustache_dict = item
811
                        break
812
            else:
813
                # Use the stats dict
814
                # Add the limit to the mustache dict
815
                stats_action['critical'] = self.get_limit('critical', stat_name=stat_name)
816
                stats_action['warning'] = self.get_limit('warning', stat_name=stat_name)
817
                stats_action['careful'] = self.get_limit('careful', stat_name=stat_name)
818
                # Add the current time (now)
819
                stats_action['time'] = datetime.now().isoformat()
820
                mustache_dict = stats_action
821
            # 2) Run the action
822
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
823
824
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
825
        """Get the alert log."""
826
        return self.get_alert(
827
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
828
        )
829
830
    def is_limit(self, criticality, stat_name=""):
831
        """Return true if the criticality limit exist for the given stat_name"""
832
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
833
834
    def get_limit(self, criticality=None, stat_name=""):
835
        """Return the limit value for the given criticality.
836
        If criticality is None, return the dict of all the limits."""
837
        if criticality is None:
838
            return self._limits
839
840
        # Get the limit for stat + header
841
        # Example: network_wlan0_rx_careful
842
        stat_name = stat_name.lower()
843
        if stat_name + '_' + criticality in self._limits:
844
            return self._limits[stat_name + '_' + criticality]
845
        if self.plugin_name + '_' + criticality in self._limits:
846
            return self._limits[self.plugin_name + '_' + criticality]
847
848
        return None
849
850
    def get_limit_action(self, criticality, stat_name=""):
851
        """Return the tuple (action, repeat) for the alert.
852
853
        - action is a command line
854
        - repeat is a bool
855
        """
856
        # Get the action for stat + header
857
        # Example: network_wlan0_rx_careful_action
858
        # Action key available ?
859
        ret = [
860
            (stat_name + '_' + criticality + '_action', False),
861
            (stat_name + '_' + criticality + '_action_repeat', True),
862
            (self.plugin_name + '_' + criticality + '_action', False),
863
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
864
        ]
865
        for r in ret:
866
            if r[0] in self._limits:
867
                return self._limits[r[0]], r[1]
868
869
        # No key found, return None
870
        return None, None
871
872
    def get_limit_log(self, stat_name, default_action=False):
873
        """Return the log tag for the alert."""
874
        # Get the log tag for stat + header
875
        # Example: network_wlan0_rx_log
876
        if stat_name + '_log' in self._limits:
877
            return self._limits[stat_name + '_log'][0].lower() == 'true'
878
        if self.plugin_name + '_log' in self._limits:
879
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
880
        return default_action
881
882
    def get_conf_value(self, value, header="", plugin_name=None, convert_bool=False, default=[]):
883
        """Return the configuration (header_) value for the current plugin.
884
885
        ...or the one given by the plugin_name var.
886
        """
887
        if plugin_name is None:
888
            # If not default use the current plugin name
889
            plugin_name = self.plugin_name
890
891
        if header != "":
892
            # Add the header
893
            plugin_name = plugin_name + '_' + header
894
895
        try:
896
            ret = self._limits[plugin_name + '_' + value]
897
            return bool(ret[0]) if convert_bool else ret
898
        except KeyError:
899
            return default
900
901
    def is_show(self, value, header=""):
902
        """Return True if the value is in the show configuration list.
903
904
        If the show value is empty, return True (show by default)
905
906
        The show configuration list is defined in the glances.conf file.
907
        It is a comma-separated list of regexp.
908
        Example for diskio:
909
        show=sda.*
910
        """
911
        # TODO: possible optimisation: create a re.compile list
912
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
913
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
914
915
    def is_hide(self, value, header=""):
916
        """Return True if the value is in the hide configuration list.
917
918
        The hide configuration list is defined in the glances.conf file.
919
        It is a comma-separated list of regexp.
920
        Example for diskio:
921
        hide=sda2,sda5,loop.*
922
        """
923
        # TODO: possible optimisation: create a re.compile list
924
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
925
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
926
927
    def is_display(self, value, header=""):
928
        """Return True if the value should be displayed in the UI"""
929
        if self.get_conf_value('show', header=header) != []:
930
            return self.is_show(value, header=header)
931
        return not self.is_hide(value, header=header)
932
933
    def is_display_any(self, *values, header=""):
934
        """Return True if any of the values should be displayed in the UI"""
935
        if self.get_conf_value('show', header=header) != []:
936
            return any(self.is_show(value, header=header) for value in values)
937
        return not any(self.is_hide(value, header=header) for value in values)
938
939
    def read_alias(self):
940
        if self.plugin_name + '_' + 'alias' in self._limits:
941
            return {
942
                split_esc(i, ':')[0].lower(): split_esc(i, ':')[1]
943
                for i in self._limits[self.plugin_name + '_' + 'alias']
944
            }
945
        return {}
946
947
    def has_alias(self, header):
948
        """Return the alias name for the relative header it it exists otherwise None."""
949
        return self.alias.get(header, None)
950
951
    def msg_curse(self, args=None, max_width=None):
952
        """Return default string to display in the curse interface."""
953
        return [self.curse_add_line(str(self.stats))]
954
955
    def get_stats_display(self, args=None, max_width=None):
956
        """Return a dict with all the information needed to display the stat.
957
958
        key     | description
959
        ----------------------------
960
        display | Display the stat (True or False)
961
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
962
        align   | Message position
963
        """
964
        display_curse = False
965
966
        if hasattr(self, 'display_curse'):
967
            display_curse = self.display_curse
968
        if hasattr(self, 'align'):
969
            align_curse = self._align
970
971
        if max_width is not None:
972
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 968 is False. Are you sure this can never be the case?
Loading history...
973
        else:
974
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
975
976
        return ret
977
978
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
979
        """Return a dict with.
980
981
        Where:
982
            msg: string
983
            decoration:
984
                DEFAULT: no decoration
985
                UNDERLINE: underline
986
                BOLD: bold
987
                TITLE: for stat title
988
                PROCESS: for process name
989
                STATUS: for process status
990
                CPU_TIME: for process cpu time
991
                OK: Value is OK and non logged
992
                OK_LOG: Value is OK and logged
993
                CAREFUL: Value is CAREFUL and non logged
994
                CAREFUL_LOG: Value is CAREFUL and logged
995
                WARNING: Value is WARNING and non logged
996
                WARNING_LOG: Value is WARNING and logged
997
                CRITICAL: Value is CRITICAL and non logged
998
                CRITICAL_LOG: Value is CRITICAL and logged
999
            optional: True if the stat is optional (display only if space is available)
1000
            additional: True if the stat is additional (display only if space is available after optional)
1001
            spittable: Line can be split to fit on the screen (default is not)
1002
        """
1003
        return {
1004
            'msg': msg,
1005
            'decoration': decoration,
1006
            'optional': optional,
1007
            'additional': additional,
1008
            'splittable': splittable,
1009
        }
1010
1011
    def curse_new_line(self):
1012
        """Go to a new line."""
1013
        return self.curse_add_line('\n')
1014
1015
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
1016
        """Return a list of dict messages with the 'key: value' result
1017
1018
          <=== width ===>
1019
        __key     : 80.5%__
1020
        | |       | |    |_ trailer
1021
        | |       | |_ self.stats[key]
1022
        | |       |_ separator
1023
        | |_ 'short_name' description or key or nothing if display_key is True
1024
        |_ header
1025
1026
        Instead of:
1027
            msg = '  {:8}'.format('idle:')
1028
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1029
            msg = '{:5.1f}%'.format(self.stats['idle'])
1030
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1031
1032
        Use:
1033
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
1034
1035
        """
1036
        if key not in self.stats:
1037
            return []
1038
1039
        # Check if a shortname is defined
1040
        if not display_key:
1041
            key_name = ''
1042
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
1043
            key_name = self.fields_description[key]['short_name']
1044
        else:
1045
            key_name = key
1046
1047
        # Check if unit is defined and get the short unit char in the unit_sort dict
1048
        if (
1049
            key in self.fields_description
1050
            and 'unit' in self.fields_description[key]
1051
            and self.fields_description[key]['unit'] in fields_unit_short
1052
        ):
1053
            # Get the shortname
1054
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
1055
        else:
1056
            unit_short = ''
1057
1058
        # Check if unit is defined and get the unit type unit_type dict
1059
        if (
1060
            key in self.fields_description
1061
            and 'unit' in self.fields_description[key]
1062
            and self.fields_description[key]['unit'] in fields_unit_type
1063
        ):
1064
            # Get the shortname
1065
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
1066
        else:
1067
            unit_type = 'float'
1068
1069
        # Is it a rate ? Yes, get the pre-computed rate value
1070
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
1071
            value = self.stats.get(key + '_rate_per_sec', None)
1072
        else:
1073
            value = self.stats.get(key, None)
1074
1075
        if width is None:
1076
            msg_item = header + f'{key_name}' + separator
1077
            msg_template_float = '{:.1f}{}'
1078
            msg_template = '{}{}'
1079
        else:
1080
            # Define the size of the message
1081
            # item will be on the left
1082
            # value will be on the right
1083
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1084
            msg_template_float = '{:5.1f}{}'
1085
            msg_template = '{:>5}{}'
1086
1087
        if value is None:
1088
            msg_value = msg_template.format('-', '')
1089
        elif unit_type == 'float':
1090
            msg_value = msg_template_float.format(value, unit_short)
1091
        elif 'min_symbol' in self.fields_description[key]:
1092
            msg_value = msg_template.format(
1093
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1094
            )
1095
        else:
1096
            msg_value = msg_template.format(int(value), unit_short)
1097
1098
        # Add the trailer
1099
        msg_value = msg_value + trailer
1100
1101
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1102
        optional = self.get_views(key=key, option='optional')
1103
1104
        return [
1105
            self.curse_add_line(msg_item, optional=optional),
1106
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1107
        ]
1108
1109
    @property
1110
    def align(self):
1111
        """Get the curse align."""
1112
        return self._align
1113
1114
    @align.setter
1115
    def align(self, value):
1116
        """Set the curse align.
1117
1118
        value: left, right, bottom.
1119
        """
1120
        self._align = value
1121
1122
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1123
        """Return a nice human-readable string out of number."""
1124
        return auto_unit(number, low_precision=low_precision, min_symbol=min_symbol, none_symbol=none_symbol)
1125
1126
    def trend_msg(self, trend, significant=1):
1127
        """Return the trend message.
1128
1129
        Do not take into account if trend < significant
1130
        """
1131
        ret = '-'
1132
        if trend is None:
1133
            ret = ' '
1134
        elif trend > significant:
1135
            ret = unicode_message('ARROW_UP', self.args)
1136
        elif trend < -significant:
1137
            ret = unicode_message('ARROW_DOWN', self.args)
1138
        return ret
1139
1140
    def _check_decorator(fct):
1141
        """Check decorator for update method.
1142
1143
        It checks:
1144
        - if the plugin is enabled.
1145
        - if the refresh_timer is finished
1146
        """
1147
1148
        def wrapper(self, *args, **kw):
1149
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1150
                # Run the method
1151
                ret = fct(self, *args, **kw)
1152
                # Reset the timer
1153
                self.refresh_timer.set(self.get_refresh())
1154
                self.refresh_timer.reset()
1155
            else:
1156
                # No need to call the method
1157
                # Return the last result available
1158
                ret = self.stats
1159
            return ret
1160
1161
        return wrapper
1162
1163 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1164
        """Log (DEBUG) the result of the function fct."""
1165
1166
        def wrapper(*args, **kw):
1167
            counter = Counter()
1168
            ret = fct(*args, **kw)
1169
            duration = counter.get()
1170
            class_name = args[0].__class__.__name__
1171
            class_module = args[0].__class__.__module__
1172
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1173
            return ret
1174
1175
        return wrapper
1176
1177
    def _manage_rate(fct):
1178
        """Manage rate decorator for update method."""
1179
1180
        def compute_rate(self, stat, stat_previous):
1181
            if stat_previous is None:
1182
                return stat
1183
1184
            # 1) set _gauge for all the rate fields
1185
            # 2) compute the _rate_per_sec
1186
            # 3) set the original field to the delta between the current and the previous value
1187
            for field in self.fields_description:
1188
                # For all the field with the rate=True flag
1189
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1190
                if self.fields_description[field].get('rate', False):
1191
                    # Create a new metadata with the gauge
1192
                    stat['time_since_update'] = self.time_since_last_update
1193
                    stat[field + '_gauge'] = stat[field]
1194
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1195
                        # The stat becomes the delta between the current and the previous value
1196
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1197
                        # Compute the rate
1198
                        if self.time_since_last_update > 0:
1199
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1200
                        else:
1201
                            stat[field] = 0
1202
                            stat[field + '_rate_per_sec'] = 0
1203
                    else:
1204
                        # Avoid strange rate at the first run
1205
                        stat[field] = 0
1206
                        stat[field + '_rate_per_sec'] = 0
1207
            return stat
1208
1209
        def compute_rate_on_list(self, stats, stats_previous):
1210
            if stats_previous is None:
1211
                return stats
1212
1213
            for stat in stats:
1214
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1215
                if len(olds) == 1:
1216
                    compute_rate(self, stat, olds[0])
1217
            return stats
1218
1219
        def wrapper(self, *args, **kw):
1220
            # Call the father method
1221
            stats = fct(self, *args, **kw)
1222
1223
            # Get the time since the last update
1224
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1225
1226
            # Compute the rate
1227
            if isinstance(stats, dict):
1228
                # Stats is a dict
1229
                compute_rate(self, stats, self.stats_previous)
1230
            elif isinstance(stats, list):
1231
                # Stats is a list
1232
                compute_rate_on_list(self, stats, self.stats_previous)
1233
1234
            # Memorized the current stats for next run
1235
            self.stats_previous = copy.deepcopy(stats)
1236
1237
            return stats
1238
1239
        return wrapper
1240
1241
    # Mandatory to call the decorator in child classes
1242
    _check_decorator = staticmethod(_check_decorator)
1243
    _log_result_decorator = staticmethod(_log_result_decorator)
1244
    _manage_rate = staticmethod(_manage_rate)
1245