Test Failed
Push — develop ( 0abc39...ef45c6 )
by Nicolas
02:52
created

GlancesPluginModel.get_raw_stats_item()   A

Complexity

Conditions 1

Size

Total Lines 6
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 6
rs 10
c 0
b 0
f 0
1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import (
21
    auto_unit,
22
    dictlist,
23
    dictlist_json_dumps,
24
    json_dumps,
25
    list_to_dict,
26
    listkeys,
27
    mean,
28
    nativestr,
29
)
30
from glances.history import GlancesHistory
31
from glances.logger import logger
32
from glances.outputs.glances_unicode import unicode_message
33
from glances.thresholds import glances_thresholds
34
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
35
36
fields_unit_short = {'percent': '%'}
37
38
fields_unit_type = {
39
    'percent': 'float',
40
    'percents': 'float',
41
    'number': 'int',
42
    'numbers': 'int',
43
    'int': 'int',
44
    'ints': 'int',
45
    'float': 'float',
46
    'floats': 'float',
47
    'second': 'int',
48
    'seconds': 'int',
49
    'byte': 'int',
50
    'bytes': 'int',
51
}
52
53
54
class GlancesPluginModel:
55
    """Main class for Glances plugin model."""
56
57
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
58
        """Init the plugin of plugins model class.
59
60
        All Glances' plugins model should inherit from this class. Most of the
61
        methods are already implemented in the father classes.
62
63
        Your plugin should return a dict or a list of dicts (stored in the
64
        self.stats). As an example, you can have a look on the mem plugin
65
        (for dict) or network (for list of dicts).
66
67
        From version 4 of the API, the plugin should return a dict.
68
69
        A plugin should implement:
70
        - the reset method: to set your self.stats variable to {} or []
71
        - the update method: where your self.stats variable is set
72
        and optionally:
73
        - the get_key method: set the key of the dict (only for list of dict)
74
        - all others methods you want to overwrite
75
76
        :args: args parameters
77
        :config: configuration parameters
78
        :items_history_list: list of items to store in the history
79
        :stats_init_value: Default value for a stats item
80
        """
81
        # Build the plugin name
82
        # Internal or external module (former prefixed by 'glances.plugins')
83
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
84
        self.plugin_name = _mod.split('.')[0]
85
86
        if self.plugin_name.startswith('glances_'):
87
            self.plugin_name = self.plugin_name.split('glances_')[1]
88
        logger.debug(f"Init {self.plugin_name} plugin")
89
90
        # Init the args
91
        self.args = args
92
93
        # Init the default alignment (for curses)
94
        self._align = 'left'
95
96
        # Init the input method
97
        self._input_method = 'local'
98
        self._short_system_name = None
99
100
        # Init the history list
101
        self.items_history_list = items_history_list
102
        self.stats_history = self.init_stats_history()
103
104
        # Init the limits (configuration keys) dictionary
105
        self._limits = {}
106
        if config is not None:
107
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
108
            self.load_limits(config=config)
109
110
        # Init the alias (dictionary)
111
        self.alias = self.read_alias()
112
113
        # Init the actions
114
        self.actions = GlancesActions(args=args)
115
116
        # Init the views
117
        self.views = {}
118
119
        # Hide stats if all the hide_zero_fields has never been != 0
120
        # Default is False, always display stats
121
        self.hide_zero = False
122
        # The threshold needed to display a value if hide_zero is true.
123
        # Only hide a value if it is less than hide_threshold_bytes.
124
        self.hide_threshold_bytes = 0
125
        self.hide_zero_fields = []
126
127
        # Set the initial refresh time to display stats the first time
128
        self.refresh_timer = Timer(0)
129
130
        # Init stats description
131
        self.fields_description = fields_description
132
133
        # Init the stats
134
        self.stats_init_value = stats_init_value
135
        self.time_since_last_update = None
136
        self.stats = None
137
        self.stats_previous = None
138
        self.reset()
139
140
    def __str__(self):
141
        """Return the human-readable stats."""
142
        return str(self.stats)
143
144
    def __repr__(self):
145
        """Return the raw stats."""
146
        if isinstance(self.stats, list):
147
            return str(list_to_dict(self.stats))
148
        return str(self.stats)
149
150
    def __getitem__(self, item):
151
        """Return the stats item."""
152
        if isinstance(self.stats, dict) and item in self.stats:
153
            return self.stats[item]
154
155
        if isinstance(self.stats, list):
156
            ltd = list_to_dict(self.stats)
157
            if item in ltd:
158
                return ltd[item]
159
160
        raise KeyError(f"'{self.__class__.__name__}' object has no key '{item}'")
161
162
    def keys(self):
163
        """Return the keys of the stats."""
164
        if isinstance(self.stats, dict):
165
            return listkeys(self.stats)
166
        if isinstance(self.stats, list):
167
            return listkeys(list_to_dict(self.stats))
168
        return []
169
170
    def get(self, item, default=None):
171
        """Return the stats item or default if not found."""
172
        try:
173
            return self[item]
174
        except KeyError:
175
            return default
176
177
    def get_init_value(self):
178
        """Return a copy of the init value."""
179
        return copy.copy(self.stats_init_value)
180
181
    def reset(self):
182
        """Reset the stats.
183
184
        This method should be overwritten by child classes.
185
        """
186
        self.stats = self.get_init_value()
187
188
    def exit(self):
189
        """Just log an event when Glances exit."""
190
        logger.debug(f"Stop the {self.plugin_name} plugin")
191
192
    def get_key(self):
193
        """Return the key of the list."""
194
        return
195
196
    def is_enabled(self, plugin_name=None):
197
        """Return true if plugin is enabled."""
198
        if not plugin_name:
199
            plugin_name = self.plugin_name
200
        try:
201
            d = getattr(self.args, 'disable_' + plugin_name)
202
        except AttributeError:
203
            d = getattr(self.args, 'enable_' + plugin_name, True)
204
        return d is False
205
206
    def is_disabled(self, plugin_name=None):
207
        """Return true if plugin is disabled."""
208
        return not self.is_enabled(plugin_name=plugin_name)
209
210
    def history_enable(self):
211
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
212
213
    def init_stats_history(self):
214
        """Init the stats history (dict of GlancesAttribute)."""
215
        if self.history_enable():
216
            init_list = [a['name'] for a in self.get_items_history_list()]
217
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
218
        return GlancesHistory()
219
220
    def reset_stats_history(self):
221
        """Reset the stats history (dict of GlancesAttribute)."""
222
        if self.history_enable():
223
            reset_list = [a['name'] for a in self.get_items_history_list()]
224
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
225
            self.stats_history.reset()
226
227
    def update_stats_history(self):
228
        """Update stats history."""
229
        # Build the history
230
        _get_export = self.get_export()
231
        if not (_get_export and self.history_enable()):
232
            return
233
        # Itern through items history
234
        item_name = '' if self.get_key() is None else self.get_key()
235
        for i in self.get_items_history_list():
236
            if isinstance(_get_export, list):
237
                # Stats is a list of data
238
                # Iter through stats (for example, iter through network interface)
239
                for l_export in _get_export:
240
                    if i['name'] in l_export:
241
                        self.stats_history.add(
242
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
243
                            l_export[i['name']],
244
                            description=i['description'],
245
                            history_max_size=self._limits['history_size'],
246
                        )
247
            else:
248
                # Stats is not a list
249
                # Add the item to the history directly
250
                self.stats_history.add(
251
                    nativestr(i['name']),
252
                    _get_export[i['name']],
253
                    description=i['description'],
254
                    history_max_size=self._limits['history_size'],
255
                )
256
257
    def get_items_history_list(self):
258
        """Return the items history list."""
259
        return self.items_history_list
260
261
    def get_raw_history(self, item=None, nb=0):
262
        """Return the history (RAW format).
263
264
        - the stats history (dict of list) if item is None
265
        - the stats history for the given item (list) instead
266
        - None if item did not exist in the history
267
        """
268
        s = self.stats_history.get(nb=nb)
269
        if item is None:
270
            return s
271
        if item in s:
272
            return s[item]
273
        return None
274
275
    def get_export_history(self, item=None):
276
        """Return the stats history object to export."""
277
        return self.get_raw_history(item=item)
278
279
    def get_stats_history(self, item=None, nb=0):
280
        """Return the stats history (JSON format)."""
281
        s = self.stats_history.get_json(nb=nb)
282
283
        if item is None:
284
            return json_dumps(s)
285
286
        return dictlist_json_dumps(s, item)
287
288
    def get_trend(self, item, nb=30):
289
        """Get the trend regarding to the last nb values.
290
291
        The trend is the diffirence between the mean of the last 0 to nb / 2
292
        and nb / 2 to nb values.
293
        """
294
        raw_history = self.get_raw_history(item=item, nb=nb)
295
        if raw_history is None or len(raw_history) < nb:
296
            return None
297
        last_nb = [v[1] for v in raw_history]
298
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
299
300
    @property
301
    def input_method(self):
302
        """Get the input method."""
303
        return self._input_method
304
305
    @input_method.setter
306
    def input_method(self, input_method):
307
        """Set the input method.
308
309
        * local: system local grab (psutil or direct access)
310
        * snmp: Client server mode via SNMP
311
        * glances: Client server mode via Glances API
312
        """
313
        self._input_method = input_method
314
315
    @property
316
    def short_system_name(self):
317
        """Get the short detected OS name (SNMP)."""
318
        return self._short_system_name
319
320
    def sorted_stats(self):
321
        """Get the stats sorted by an alias (if present) or key."""
322
        key = self.get_key()
323
        if key is None:
324
            return self.stats
325
        try:
326
            return sorted(
327
                self.stats,
328
                key=lambda stat: tuple(
329
                    int(part) if part.isdigit() else part.lower()
330
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
331
                ),
332
            )
333
        except TypeError:
334
            # Correct "Starting an alias with a number causes a crash #1885"
335
            return sorted(
336
                self.stats,
337
                key=lambda stat: tuple(
338
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
339
                ),
340
            )
341
342
    @short_system_name.setter
343
    def short_system_name(self, short_name):
344
        """Set the short detected OS name (SNMP)."""
345
        self._short_system_name = short_name
346
347
    def set_stats(self, input_stats):
348
        """Set the stats to input_stats."""
349
        self.stats = input_stats
350
351
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
352
        """Update stats using SNMP.
353
354
        If bulk=True, use a bulk request instead of a get request.
355
        """
356
        snmp_oid = snmp_oid or {}
357
358
        from glances.snmp import GlancesSNMPClient
359
360
        # Init the SNMP request
361
        snmp_client = GlancesSNMPClient(
362
            host=self.args.client,
363
            port=self.args.snmp_port,
364
            version=self.args.snmp_version,
365
            community=self.args.snmp_community,
366
        )
367
368
        # Process the SNMP request
369
        ret = {}
370
        if bulk:
371
            # Bulk request
372
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(snmp_oid.values()))
373
            logger.info(snmp_result)
374
            if len(snmp_oid) == 1:
375
                # Bulk command for only one OID
376
                # Note: key is the item indexed but the OID result
377
                for item in snmp_result:
378
                    if item.keys()[0].startswith(snmp_oid.values()[0]):
379
                        ret[snmp_oid.keys()[0] + item.keys()[0].split(snmp_oid.values()[0])[1]] = item.values()[0]
380
            else:
381
                # Build the internal dict with the SNMP result
382
                # Note: key is the first item in the snmp_oid
383
                index = 1
384
                for item in snmp_result:
385
                    item_stats = {}
386
                    item_key = None
387
                    for key in snmp_oid:
388
                        oid = snmp_oid[key] + '.' + str(index)
389
                        if oid in item:
390
                            if item_key is None:
391
                                item_key = item[oid]
392
                            else:
393
                                item_stats[key] = item[oid]
394
                    if item_stats:
395
                        ret[item_key] = item_stats
396
                    index += 1
397
        else:
398
            # Simple get request
399
            snmp_result = snmp_client.get_by_oid(*list(snmp_oid.values()))
400
401
            # Build the internal dict with the SNMP result
402
            for key in snmp_oid:
403
                ret[key] = snmp_result[snmp_oid[key]]
404
405
        return ret
406
407
    def get_raw(self):
408
        """Return the stats object."""
409
        return self.stats
410
411
    def get_export(self):
412
        """Return the stats object to export.
413
        By default, return the raw stats.
414
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
415
        """
416
        return self.get_raw()
417
418
    def get_stats(self):
419
        """Return the stats object in JSON format."""
420
        return json_dumps(self.get_raw())
421
422
    def get_json(self):
423
        """Return the stats object in JSON format."""
424
        return self.get_stats()
425
426
    def get_raw_stats_item(self, item):
427
        """Return the stats object for a specific item in RAW format.
428
429
        Stats should be a list of dict (processlist, network...)
430
        """
431
        return dictlist(self.get_raw(), item)
432
433
    def get_raw_stats_key(self, item, key):
434
        """Return the stats object for a specific item in RAW format.
435
436
        Stats should be a list of dict (processlist, network...)
437
        """
438
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
439
440
    def get_stats_item(self, item):
441
        """Return the stats object for a specific item in JSON format.
442
443
        Stats should be a list of dict (processlist, network...)
444
        """
445
        return dictlist_json_dumps(self.get_raw(), item)
446
447
    def get_raw_stats_value(self, item, value):
448
        """Return the stats object for a specific item=value.
449
450
        Return None if the item=value does not exist
451
        Return None if the item is not a list of dict
452
        """
453
        if not isinstance(self.get_raw(), list):
454
            return None
455
456
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
457
            value = int(value)
458
        try:
459
            return {value: [i for i in self.get_raw() if i[item] == value]}
460
        except (KeyError, ValueError) as e:
461
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
462
            return None
463
464
    def get_stats_value(self, item, value):
465
        """Return the stats object for a specific item=value in JSON format.
466
467
        Stats should be a list of dict (processlist, network...)
468
        """
469
        rsv = self.get_raw_stats_value(item, value)
470
        if rsv is None:
471
            return None
472
        return json_dumps(rsv)
473
474
    def get_item_info(self, item, key, default=None):
475
        """Return the item info grabbed into self.fields_description."""
476
        if self.fields_description is None or item not in self.fields_description:
477
            return default
478
        return self.fields_description[item].get(key, default)
479
480
    def _build_field_decoration(self, field):
481
        """Return the field decoration.
482
483
        The decoration is used to display the field in the UI.
484
        """
485
        # Manage the decoration
486
        if self.fields_description and field in self.fields_description:
487
            if (
488
                self.fields_description[field].get('rate') is True
489
                and isinstance(self.stats, dict)
490
                and self.stats.get('time_since_update', 0) == 0
491
            ):
492
                return 'DEFAULT'
493
            if self.fields_description[field].get('log') is True:
494
                return self.get_alert_log(self.stats[field], header=field)
495
            if self.fields_description[field].get('alert') is True:
496
                return self.get_alert(self.stats[field], header=field)
497
        return 'DEFAULT'
498
499
    def _build_field_optional(self, field):
500
        """Return true if the field is optional."""
501
        if self.fields_description and field in self.fields_description:
502
            return self.fields_description[field].get('optional', False)
503
        return False
504
505
    def _build_view_for_field(self, field):
506
        value = {
507
            'decoration': self._build_field_decoration(field),
508
            'optional': self._build_field_optional(field),
509
            'additional': False,
510
            'splittable': False,
511
            'hidden': False,
512
        }
513
514
        # Manage the hidden feature
515
        # Allow to automatically hide fields when values is never different than 0
516
        # Refactoring done for #2929
517
        if not self.hide_zero:
518
            value['hidden'] = False
519
        elif field in self.views and 'hidden' in self.views[field]:
520
            value['hidden'] = self.views[field]['hidden']
521
            if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
522
                value['hidden'] = False
523
        else:
524
            value['hidden'] = field in self.hide_zero_fields
525
526
        return value
527
528
    def update_views(self):
529
        """Update the stats views.
530
531
        The V of MVC
532
        A dict of dict with the needed information to display the stats.
533
        Example for the stat xxx:
534
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
535
                'optional': False,        >>> Is the stat optional
536
                'additional': False,      >>> Is the stat provide additional information
537
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
538
                'hidden': False}          >>> Is the stats should be hidden in the UI
539
        """
540
        ret = {}
541
542
        if self.get_raw() is not None and isinstance(self.get_raw(), list) and self.get_key() is not None:
543
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
544
            for i in self.get_raw():
545
                key = i[self.get_key()]
546
                ret[key] = {}
547
                for field in listkeys(i):
548
                    ret[key][field] = self._build_view_for_field(field)
549
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
550
            # Stats are stored in a dict (ex: CPU, LOAD...)
551
            for field in listkeys(self.get_raw()):
552
                ret[field] = self._build_view_for_field(field)
553
554
        self.views = ret
555
556
        return self.views
557
558
    def set_views(self, input_views):
559
        """Set the views to input_views."""
560
        self.views = input_views
561
562
    def reset_views(self):
563
        """Reset the views to input_views."""
564
        self.views = {}
565
566
    def get_views(self, item=None, key=None, option=None):
567
        """Return the views object.
568
569
        If key is None, return all the view for the current plugin
570
        else if option is None return the view for the specific key (all option)
571
        else return the view of the specific key/option
572
573
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
574
        """
575
        if item is None:
576
            item_views = self.views
577
        else:
578
            item_views = self.views[item]
579
580
        if key is None:
581
            return item_views
582
        if key not in item_views:
583
            return 'DEFAULT'
584
        if option is None:
585
            return item_views[key]
586
        if option in item_views[key]:
587
            return item_views[key][option]
588
        return 'DEFAULT'
589
590
    def get_json_views(self, item=None, key=None, option=None):
591
        """Return the views (in JSON)."""
592
        return json_dumps(self.get_views(item, key, option))
593
594
    def load_limits(self, config):
595
        """Load limits from the configuration file, if it exists."""
596
        # By default set the history length to 3 points per second during one day
597
        self._limits['history_size'] = 28800
598
599
        if not hasattr(config, 'has_section'):
600
            return False
601
602
        # Read the global section
603
        # TODO: not optimized because this section is loaded for each plugin...
604
        if config.has_section('global'):
605
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
606
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
607
608
        # Read the plugin specific section
609
        if config.has_section(self.plugin_name):
610
            for level, _ in config.items(self.plugin_name):
611
                # Read limits
612
                limit = '_'.join([self.plugin_name, level])
613
                try:
614
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
615
                except ValueError:
616
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
617
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
618
619
        return True
620
621
    @property
622
    def limits(self):
623
        """Return the limits object."""
624
        return self._limits
625
626
    @limits.setter
627
    def limits(self, input_limits):
628
        """Set the limits to input_limits."""
629
        self._limits = input_limits
630
631
    def set_refresh(self, value):
632
        """Set the plugin refresh rate"""
633
        self.set_limits('refresh', value)
634
635
    def get_refresh(self):
636
        """Return the plugin refresh time"""
637
        ret = self.get_limits(item='refresh')
638
        if ret is None:
639
            ret = self.args.time if hasattr(self.args, 'time') else 2
640
        return ret
641
642
    def get_refresh_time(self):
643
        """Return the plugin refresh time"""
644
        return self.get_refresh()
645
646
    def set_limits(self, item, value):
647
        """Set the limits object."""
648
        self._limits[f'{self.plugin_name}_{item}'] = value
649
650
    def get_limits(self, item=None):
651
        """Return the limits object."""
652
        if item is None:
653
            return self._limits
654
        return self._limits.get(f'{self.plugin_name}_{item}', None)
655
656
    def get_stats_action(self):
657
        """Return stats for the action.
658
659
        By default return all the stats.
660
        Can be overwrite by plugins implementation.
661
        For example, Docker will return self.stats['containers']
662
        """
663
        return self.stats
664
665
    def get_stat_name(self, header=""):
666
        """Return the stat name with an optional header"""
667
        ret = self.plugin_name
668
        if header != '':
669
            ret += '_' + header
670
        return ret
671
672
    def get_alert(
673
        self,
674
        current=0,
675
        minimum=0,
676
        maximum=100,
677
        highlight_zero=True,
678
        is_max=False,
679
        header="",
680
        action_key=None,
681
        log=False,
682
    ):
683
        """Return the alert status relative to a current value.
684
685
        Use this function for minor stats.
686
687
        If current < CAREFUL of max then alert = OK
688
        If current > CAREFUL of max then alert = CAREFUL
689
        If current > WARNING of max then alert = WARNING
690
        If current > CRITICAL of max then alert = CRITICAL
691
692
        If highlight=True than 0.0 is highlighted
693
694
        If defined 'header' is added between the plugin name and the status.
695
        Only useful for stats with several alert status.
696
697
        If defined, 'action_key' define the key for the actions.
698
        By default, the action_key is equal to the header.
699
700
        If log=True than add log if necessary
701
        elif log=False than do not log
702
        elif log=None than apply the config given in the conf file
703
        """
704
        # Manage 0 (0.0) value if highlight_zero is not True
705
        if not highlight_zero and current == 0:
706
            return 'DEFAULT'
707
708
        # Compute the %
709
        try:
710
            value = (current * 100) / maximum
711
        except ZeroDivisionError:
712
            return 'DEFAULT'
713
        except TypeError:
714
            return 'DEFAULT'
715
716
        # Build the stat_name
717
        stat_name = self.get_stat_name(header=header).lower()
718
719
        # Manage limits
720
        # If is_max is set then default style is set to MAX else default is set to OK
721
        ret = 'MAX' if is_max else 'OK'
722
723
        # Iter through limits
724
        critical = self.get_limit('critical', stat_name=stat_name)
725
        warning = self.get_limit('warning', stat_name=stat_name)
726
        careful = self.get_limit('careful', stat_name=stat_name)
727
        if critical and value >= critical:
728
            ret = 'CRITICAL'
729
        elif warning and value >= warning:
730
            ret = 'WARNING'
731
        elif careful and value >= careful:
732
            ret = 'CAREFUL'
733
        elif not careful and not warning and not critical:
734
            ret = 'DEFAULT'
735
        else:
736
            ret = 'OK'
737
738
        if current < minimum:
739
            ret = 'CAREFUL'
740
741
        # Manage log
742
        log_str = ""
743
        if self.get_limit_log(stat_name=stat_name, default_action=log):
744
            # Add _LOG to the return string
745
            # So stats will be highlighted with a specific color
746
            log_str = "_LOG"
747
            # Add the log to the events list
748
            glances_events.add(ret, stat_name.upper(), value)
749
750
        # Manage threshold
751
        self.manage_threshold(stat_name, ret)
752
753
        # Manage action
754
        self.manage_action(stat_name, ret.lower(), header, action_key)
755
756
        # Default is 'OK'
757
        return ret + log_str
758
759
    def filter_stats(self, stats):
760
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
761
        if hasattr(stats, '_asdict'):
762
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
763
        if isinstance(stats, dict):
764
            return {k: v for k, v in stats.items() if k in self.fields_description}
765
        if isinstance(stats, list):
766
            return [self.filter_stats(s) for s in stats]
767
        return stats
768
769
    def manage_threshold(self, stat_name, trigger):
770
        """Manage the threshold for the current stat."""
771
        glances_thresholds.add(stat_name, trigger)
772
773
    def manage_action(self, stat_name, trigger, header, action_key):
774
        """Manage the action for the current stat."""
775
        # Here is a command line for the current trigger ?
776
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
777
        if not command and not repeat:
778
            # Reset the trigger
779
            self.actions.set(stat_name, trigger)
780
        else:
781
            # Define the action key for the stats dict
782
            # If not define, then it sets to header
783
            if action_key is None:
784
                action_key = header
785
786
            # A command line is available for the current alert
787
            # 1) Build the {{mustache}} dictionary
788
            if isinstance(self.get_stats_action(), list):
789
                # If the stats are stored in a list of dict (fs plugin for example)
790
                # Return the dict for the current header
791
                mustache_dict = {}
792
                for item in self.get_stats_action():
793
                    if item[self.get_key()] == action_key:
794
                        mustache_dict = item
795
                        break
796
            else:
797
                # Use the stats dict
798
                mustache_dict = self.get_stats_action()
799
            # 2) Run the action
800
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
801
802
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
803
        """Get the alert log."""
804
        return self.get_alert(
805
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
806
        )
807
808
    def is_limit(self, criticality, stat_name=""):
809
        """Return true if the criticality limit exist for the given stat_name"""
810
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
811
812
    def get_limit(self, criticality=None, stat_name=""):
813
        """Return the limit value for the given criticality.
814
        If criticality is None, return the dict of all the limits."""
815
        if criticality is None:
816
            return self._limits
817
818
        # Get the limit for stat + header
819
        # Example: network_wlan0_rx_careful
820
        stat_name = stat_name.lower()
821
        if stat_name + '_' + criticality in self._limits:
822
            return self._limits[stat_name + '_' + criticality]
823
        if self.plugin_name + '_' + criticality in self._limits:
824
            return self._limits[self.plugin_name + '_' + criticality]
825
826
        return None
827
828
    def get_limit_action(self, criticality, stat_name=""):
829
        """Return the tuple (action, repeat) for the alert.
830
831
        - action is a command line
832
        - repeat is a bool
833
        """
834
        # Get the action for stat + header
835
        # Example: network_wlan0_rx_careful_action
836
        # Action key available ?
837
        ret = [
838
            (stat_name + '_' + criticality + '_action', False),
839
            (stat_name + '_' + criticality + '_action_repeat', True),
840
            (self.plugin_name + '_' + criticality + '_action', False),
841
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
842
        ]
843
        for r in ret:
844
            if r[0] in self._limits:
845
                return self._limits[r[0]], r[1]
846
847
        # No key found, return None
848
        return None, None
849
850
    def get_limit_log(self, stat_name, default_action=False):
851
        """Return the log tag for the alert."""
852
        # Get the log tag for stat + header
853
        # Example: network_wlan0_rx_log
854
        if stat_name + '_log' in self._limits:
855
            return self._limits[stat_name + '_log'][0].lower() == 'true'
856
        if self.plugin_name + '_log' in self._limits:
857
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
858
        return default_action
859
860
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
861
        """Return the configuration (header_) value for the current plugin.
862
863
        ...or the one given by the plugin_name var.
864
        """
865
        if plugin_name is None:
866
            # If not default use the current plugin name
867
            plugin_name = self.plugin_name
868
869
        if header != "":
870
            # Add the header
871
            plugin_name = plugin_name + '_' + header
872
873
        try:
874
            return self._limits[plugin_name + '_' + value]
875
        except KeyError:
876
            return default
877
878
    def is_show(self, value, header=""):
879
        """Return True if the value is in the show configuration list.
880
881
        If the show value is empty, return True (show by default)
882
883
        The show configuration list is defined in the glances.conf file.
884
        It is a comma-separated list of regexp.
885
        Example for diskio:
886
        show=sda.*
887
        """
888
        # TODO: possible optimisation: create a re.compile list
889
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
890
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
891
892
    def is_hide(self, value, header=""):
893
        """Return True if the value is in the hide configuration list.
894
895
        The hide configuration list is defined in the glances.conf file.
896
        It is a comma-separated list of regexp.
897
        Example for diskio:
898
        hide=sda2,sda5,loop.*
899
        """
900
        # TODO: possible optimisation: create a re.compile list
901
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
902
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
903
904
    def is_display(self, value, header=""):
905
        """Return True if the value should be displayed in the UI"""
906
        if self.get_conf_value('show', header=header) != []:
907
            return self.is_show(value, header=header)
908
        return not self.is_hide(value, header=header)
909
910
    def is_display_any(self, *values, header=""):
911
        """Return True if any of the values should be displayed in the UI"""
912
        if self.get_conf_value('show', header=header) != []:
913
            return any(self.is_show(value, header=header) for value in values)
914
        return not any(self.is_hide(value, header=header) for value in values)
915
916
    def read_alias(self):
917
        if self.plugin_name + '_' + 'alias' in self._limits:
918
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
919
        return {}
920
921
    def has_alias(self, header):
922
        """Return the alias name for the relative header it it exists otherwise None."""
923
        return self.alias.get(header, None)
924
925
    def msg_curse(self, args=None, max_width=None):
926
        """Return default string to display in the curse interface."""
927
        return [self.curse_add_line(str(self.stats))]
928
929
    def get_stats_display(self, args=None, max_width=None):
930
        """Return a dict with all the information needed to display the stat.
931
932
        key     | description
933
        ----------------------------
934
        display | Display the stat (True or False)
935
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
936
        align   | Message position
937
        """
938
        display_curse = False
939
940
        if hasattr(self, 'display_curse'):
941
            display_curse = self.display_curse
942
        if hasattr(self, 'align'):
943
            align_curse = self._align
944
945
        if max_width is not None:
946
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 942 is False. Are you sure this can never be the case?
Loading history...
947
        else:
948
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
949
950
        return ret
951
952
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
953
        """Return a dict with.
954
955
        Where:
956
            msg: string
957
            decoration:
958
                DEFAULT: no decoration
959
                UNDERLINE: underline
960
                BOLD: bold
961
                TITLE: for stat title
962
                PROCESS: for process name
963
                STATUS: for process status
964
                CPU_TIME: for process cpu time
965
                OK: Value is OK and non logged
966
                OK_LOG: Value is OK and logged
967
                CAREFUL: Value is CAREFUL and non logged
968
                CAREFUL_LOG: Value is CAREFUL and logged
969
                WARNING: Value is WARNING and non logged
970
                WARNING_LOG: Value is WARNING and logged
971
                CRITICAL: Value is CRITICAL and non logged
972
                CRITICAL_LOG: Value is CRITICAL and logged
973
            optional: True if the stat is optional (display only if space is available)
974
            additional: True if the stat is additional (display only if space is available after optional)
975
            spittable: Line can be split to fit on the screen (default is not)
976
        """
977
        return {
978
            'msg': msg,
979
            'decoration': decoration,
980
            'optional': optional,
981
            'additional': additional,
982
            'splittable': splittable,
983
        }
984
985
    def curse_new_line(self):
986
        """Go to a new line."""
987
        return self.curse_add_line('\n')
988
989
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
990
        """Return a list of dict messages with the 'key: value' result
991
992
          <=== width ===>
993
        __key     : 80.5%__
994
        | |       | |    |_ trailer
995
        | |       | |_ self.stats[key]
996
        | |       |_ separator
997
        | |_ 'short_name' description or key or nothing if display_key is True
998
        |_ header
999
1000
        Instead of:
1001
            msg = '  {:8}'.format('idle:')
1002
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1003
            msg = '{:5.1f}%'.format(self.stats['idle'])
1004
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1005
1006
        Use:
1007
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
1008
1009
        """
1010
        if key not in self.stats:
1011
            return []
1012
1013
        # Check if a shortname is defined
1014
        if not display_key:
1015
            key_name = ''
1016
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
1017
            key_name = self.fields_description[key]['short_name']
1018
        else:
1019
            key_name = key
1020
1021
        # Check if unit is defined and get the short unit char in the unit_sort dict
1022
        if (
1023
            key in self.fields_description
1024
            and 'unit' in self.fields_description[key]
1025
            and self.fields_description[key]['unit'] in fields_unit_short
1026
        ):
1027
            # Get the shortname
1028
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
1029
        else:
1030
            unit_short = ''
1031
1032
        # Check if unit is defined and get the unit type unit_type dict
1033
        if (
1034
            key in self.fields_description
1035
            and 'unit' in self.fields_description[key]
1036
            and self.fields_description[key]['unit'] in fields_unit_type
1037
        ):
1038
            # Get the shortname
1039
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
1040
        else:
1041
            unit_type = 'float'
1042
1043
        # Is it a rate ? Yes, get the pre-computed rate value
1044
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
1045
            value = self.stats.get(key + '_rate_per_sec', None)
1046
        else:
1047
            value = self.stats.get(key, None)
1048
1049
        if width is None:
1050
            msg_item = header + f'{key_name}' + separator
1051
            msg_template_float = '{:.1f}{}'
1052
            msg_template = '{}{}'
1053
        else:
1054
            # Define the size of the message
1055
            # item will be on the left
1056
            # value will be on the right
1057
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1058
            msg_template_float = '{:5.1f}{}'
1059
            msg_template = '{:>5}{}'
1060
1061
        if value is None:
1062
            msg_value = msg_template.format('-', '')
1063
        elif unit_type == 'float':
1064
            msg_value = msg_template_float.format(value, unit_short)
1065
        elif 'min_symbol' in self.fields_description[key]:
1066
            msg_value = msg_template.format(
1067
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1068
            )
1069
        else:
1070
            msg_value = msg_template.format(int(value), unit_short)
1071
1072
        # Add the trailer
1073
        msg_value = msg_value + trailer
1074
1075
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1076
        optional = self.get_views(key=key, option='optional')
1077
1078
        return [
1079
            self.curse_add_line(msg_item, optional=optional),
1080
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1081
        ]
1082
1083
    @property
1084
    def align(self):
1085
        """Get the curse align."""
1086
        return self._align
1087
1088
    @align.setter
1089
    def align(self, value):
1090
        """Set the curse align.
1091
1092
        value: left, right, bottom.
1093
        """
1094
        self._align = value
1095
1096
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1097
        """Return a nice human-readable string out of number."""
1098
        return auto_unit(number, low_precision=low_precision, min_symbol=min_symbol, none_symbol=none_symbol)
1099
1100
    def trend_msg(self, trend, significant=1):
1101
        """Return the trend message.
1102
1103
        Do not take into account if trend < significant
1104
        """
1105
        ret = '-'
1106
        if trend is None:
1107
            ret = ' '
1108
        elif trend > significant:
1109
            ret = unicode_message('ARROW_UP', self.args)
1110
        elif trend < -significant:
1111
            ret = unicode_message('ARROW_DOWN', self.args)
1112
        return ret
1113
1114
    def _check_decorator(fct):
1115
        """Check decorator for update method.
1116
1117
        It checks:
1118
        - if the plugin is enabled.
1119
        - if the refresh_timer is finished
1120
        """
1121
1122
        def wrapper(self, *args, **kw):
1123
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1124
                # Run the method
1125
                ret = fct(self, *args, **kw)
1126
                # Reset the timer
1127
                self.refresh_timer.set(self.get_refresh())
1128
                self.refresh_timer.reset()
1129
            else:
1130
                # No need to call the method
1131
                # Return the last result available
1132
                ret = self.stats
1133
            return ret
1134
1135
        return wrapper
1136
1137 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1138
        """Log (DEBUG) the result of the function fct."""
1139
1140
        def wrapper(*args, **kw):
1141
            counter = Counter()
1142
            ret = fct(*args, **kw)
1143
            duration = counter.get()
1144
            class_name = args[0].__class__.__name__
1145
            class_module = args[0].__class__.__module__
1146
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1147
            return ret
1148
1149
        return wrapper
1150
1151
    def _manage_rate(fct):
1152
        """Manage rate decorator for update method."""
1153
1154
        def compute_rate(self, stat, stat_previous):
1155
            if stat_previous is None:
1156
                return stat
1157
1158
            # 1) set _gauge for all the rate fields
1159
            # 2) compute the _rate_per_sec
1160
            # 3) set the original field to the delta between the current and the previous value
1161
            for field in self.fields_description:
1162
                # For all the field with the rate=True flag
1163
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1164
                if self.fields_description[field].get('rate', False):
1165
                    # Create a new metadata with the gauge
1166
                    stat['time_since_update'] = self.time_since_last_update
1167
                    stat[field + '_gauge'] = stat[field]
1168
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1169
                        # The stat becomes the delta between the current and the previous value
1170
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1171
                        # Compute the rate
1172
                        if self.time_since_last_update > 0:
1173
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1174
                        else:
1175
                            stat[field] = 0
1176
                            stat[field + '_rate_per_sec'] = 0
1177
                    else:
1178
                        # Avoid strange rate at the first run
1179
                        stat[field] = 0
1180
                        stat[field + '_rate_per_sec'] = 0
1181
            return stat
1182
1183
        def compute_rate_on_list(self, stats, stats_previous):
1184
            if stats_previous is None:
1185
                return stats
1186
1187
            for stat in stats:
1188
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1189
                if len(olds) == 1:
1190
                    compute_rate(self, stat, olds[0])
1191
            return stats
1192
1193
        def wrapper(self, *args, **kw):
1194
            # Call the father method
1195
            stats = fct(self, *args, **kw)
1196
1197
            # Get the time since the last update
1198
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1199
1200
            # Compute the rate
1201
            if isinstance(stats, dict):
1202
                # Stats is a dict
1203
                compute_rate(self, stats, self.stats_previous)
1204
            elif isinstance(stats, list):
1205
                # Stats is a list
1206
                compute_rate_on_list(self, stats, self.stats_previous)
1207
1208
            # Memorized the current stats for next run
1209
            self.stats_previous = copy.deepcopy(stats)
1210
1211
            return stats
1212
1213
        return wrapper
1214
1215
    # Mandatory to call the decorator in child classes
1216
    _check_decorator = staticmethod(_check_decorator)
1217
    _log_result_decorator = staticmethod(_log_result_decorator)
1218
    _manage_rate = staticmethod(_manage_rate)
1219
    _manage_rate = staticmethod(_manage_rate)
1220
    _manage_rate = staticmethod(_manage_rate)
1221
    _manage_rate = staticmethod(_manage_rate)
1222