Test Failed
Push — develop ( 420cf2...653997 )
by Nicolas
01:06 queued 15s
created

GlancesPluginModel.get_stats_snmp()   C

Complexity

Conditions 11

Size

Total Lines 55
Code Lines 33

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 11
eloc 33
nop 3
dl 0
loc 55
rs 5.4
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.plugin.model.GlancesPluginModel.get_stats_snmp() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
import threading
18
19
try:
20
    import thread
21
except ImportError:
22
    import _thread as thread
23
24
from glances.actions import GlancesActions
25
from glances.events_list import glances_events
26
from glances.globals import (
27
    auto_unit,
28
    dictlist,
29
    dictlist_json_dumps,
30
    json_dumps,
31
    list_to_dict,
32
    listkeys,
33
    mean,
34
    nativestr,
35
)
36
from glances.history import GlancesHistory
37
from glances.logger import logger
38
from glances.outputs.glances_unicode import unicode_message
39
from glances.thresholds import glances_thresholds
40
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
41
42
fields_unit_short = {'percent': '%'}
43
44
fields_unit_type = {
45
    'percent': 'float',
46
    'percents': 'float',
47
    'number': 'int',
48
    'numbers': 'int',
49
    'int': 'int',
50
    'ints': 'int',
51
    'float': 'float',
52
    'floats': 'float',
53
    'second': 'int',
54
    'seconds': 'int',
55
    'byte': 'int',
56
    'bytes': 'int',
57
}
58
59
60
class GlancesPluginModel:
61
    """Main class for Glances plugin model."""
62
63
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
64
        """Init the plugin of plugins model class.
65
66
        All Glances' plugins model should inherit from this class. Most of the
67
        methods are already implemented in the father classes.
68
69
        Your plugin should return a dict or a list of dicts (stored in the
70
        self.stats). As an example, you can have a look on the mem plugin
71
        (for dict) or network (for list of dicts).
72
73
        From version 4 of the API, the plugin should return a dict.
74
75
        A plugin should implement:
76
        - the reset method: to set your self.stats variable to {} or []
77
        - the update method: where your self.stats variable is set
78
        and optionally:
79
        - the get_key method: set the key of the dict (only for list of dict)
80
        - all others methods you want to overwrite
81
82
        :args: args parameters
83
        :config: configuration parameters
84
        :items_history_list: list of items to store in the history
85
        :stats_init_value: Default value for a stats item
86
        """
87
        # Build the plugin name
88
        # Internal or external module (former prefixed by 'glances.plugins')
89
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
90
        self.plugin_name = _mod.split('.')[0]
91
92
        if self.plugin_name.startswith('glances_'):
93
            self.plugin_name = self.plugin_name.split('glances_')[1]
94
        logger.debug(f"Init {self.plugin_name} plugin")
95
96
        # Init the args
97
        self.args = args
98
99
        # Init the default alignment (for curses)
100
        self._align = 'left'
101
102
        # Init the input method
103
        self._input_method = 'local'
104
        self._short_system_name = None
105
106
        # Init the history list
107
        self.items_history_list = items_history_list
108
        self.stats_history = self.init_stats_history()
109
110
        # Init the limits (configuration keys) dictionary
111
        self._limits = {}
112
        if config is not None:
113
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
114
            self.load_limits(config=config)
115
116
        # Init the alias (dictionary)
117
        self.alias = self.read_alias()
118
119
        # Init the actions
120
        self.actions = GlancesActions(args=args)
121
122
        # Init the views
123
        self.views = {}
124
125
        # Hide stats if all the hide_zero_fields has never been != 0
126
        # Default is False, always display stats
127
        self.hide_zero = False
128
        # The threshold needed to display a value if hide_zero is true.
129
        # Only hide a value if it is less than hide_threshold_bytes.
130
        self.hide_threshold_bytes = 0
131
        self.hide_zero_fields = []
132
133
        # Set the initial refresh time to display stats the first time
134
        self.refresh_timer = Timer(0)
135
136
        # Init stats description
137
        self.fields_description = fields_description
138
139
        # Init the stats
140
        self.stats_init_value = stats_init_value
141
        self.time_since_last_update = None
142
        self.stats = None
143
        self.stats_previous = None
144
        self.reset()
145
146
    def __str__(self):
147
        """Return the human-readable stats."""
148
        return str(self.stats)
149
150
    def __repr__(self):
151
        """Return the raw stats."""
152
        if isinstance(self.stats, list):
153
            return str(list_to_dict(self.stats))
154
        return str(self.stats)
155
156
    def __getitem__(self, item):
157
        """Return the stats item."""
158
        if isinstance(self.stats, dict) and item in self.stats:
159
            return self.stats[item]
160
161
        if isinstance(self.stats, list):
162
            ltd = list_to_dict(self.stats)
163
            if item in ltd:
164
                return ltd[item]
165
166
        raise KeyError(f"'{self.__class__.__name__}' object has no key '{item}'")
167
168
    def keys(self):
169
        """Return the keys of the stats."""
170
        if isinstance(self.stats, dict):
171
            return listkeys(self.stats)
172
        if isinstance(self.stats, list):
173
            return listkeys(list_to_dict(self.stats))
174
        return []
175
176
    def get(self, item, default=None):
177
        """Return the stats item or default if not found."""
178
        try:
179
            return self[item]
180
        except KeyError:
181
            return default
182
183
    def get_init_value(self):
184
        """Return a copy of the init value."""
185
        return copy.copy(self.stats_init_value)
186
187
    def reset(self):
188
        """Reset the stats.
189
190
        This method should be overwritten by child classes.
191
        """
192
        self.stats = self.get_init_value()
193
194
    def exit(self):
195
        """Just log an event when Glances exit."""
196
        logger.debug(f"Stop the {self.plugin_name} plugin")
197
198
    def get_key(self):
199
        """Return the key of the list."""
200
        return
201
202
    def is_enabled(self, plugin_name=None):
203
        """Return true if plugin is enabled."""
204
        if not plugin_name:
205
            plugin_name = self.plugin_name
206
        try:
207
            d = getattr(self.args, 'disable_' + plugin_name)
208
        except AttributeError:
209
            d = getattr(self.args, 'enable_' + plugin_name, True)
210
        return d is False
211
212
    def is_disabled(self, plugin_name=None):
213
        """Return true if plugin is disabled."""
214
        return not self.is_enabled(plugin_name=plugin_name)
215
216
    def history_enable(self):
217
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
218
219
    def init_stats_history(self):
220
        """Init the stats history (dict of GlancesAttribute)."""
221
        if self.history_enable():
222
            init_list = [a['name'] for a in self.get_items_history_list()]
223
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
224
        return GlancesHistory()
225
226
    def reset_stats_history(self):
227
        """Reset the stats history (dict of GlancesAttribute)."""
228
        if self.history_enable():
229
            reset_list = [a['name'] for a in self.get_items_history_list()]
230
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
231
            self.stats_history.reset()
232
233
    def update_stats_history(self):
234
        """Update stats history."""
235
        # Build the history
236
        _get_export = self.get_export()
237
        if not (_get_export and self.history_enable()):
238
            return
239
        # Itern through items history
240
        item_name = '' if self.get_key() is None else self.get_key()
241
        for i in self.get_items_history_list():
242
            if isinstance(_get_export, list):
243
                # Stats is a list of data
244
                # Iter through stats (for example, iter through network interface)
245
                for l_export in _get_export:
246
                    if i['name'] in l_export:
247
                        self.stats_history.add(
248
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
249
                            l_export[i['name']],
250
                            description=i['description'],
251
                            history_max_size=self._limits['history_size'],
252
                        )
253
            else:
254
                # Stats is not a list
255
                # Add the item to the history directly
256
                self.stats_history.add(
257
                    nativestr(i['name']),
258
                    _get_export[i['name']],
259
                    description=i['description'],
260
                    history_max_size=self._limits['history_size'],
261
                )
262
263
    def get_items_history_list(self):
264
        """Return the items history list."""
265
        return self.items_history_list
266
267
    def get_raw_history(self, item=None, nb=0):
268
        """Return the history (RAW format).
269
270
        - the stats history (dict of list) if item is None
271
        - the stats history for the given item (list) instead
272
        - None if item did not exist in the history
273
        """
274
        s = self.stats_history.get(nb=nb)
275
        if item is None:
276
            return s
277
        if item in s:
278
            return s[item]
279
        return None
280
281
    def get_export_history(self, item=None):
282
        """Return the stats history object to export."""
283
        return self.get_raw_history(item=item)
284
285
    def get_stats_history(self, item=None, nb=0):
286
        """Return the stats history (JSON format)."""
287
        s = self.stats_history.get_json(nb=nb)
288
289
        if item is None:
290
            return json_dumps(s)
291
292
        return dictlist_json_dumps(s, item)
293
294
    def get_trend(self, item, nb=30):
295
        """Get the trend regarding to the last nb values.
296
297
        The trend is the diffirence between the mean of the last 0 to nb / 2
298
        and nb / 2 to nb values.
299
        """
300
        raw_history = self.get_raw_history(item=item, nb=nb)
301
        if raw_history is None or len(raw_history) < nb:
302
            return None
303
        last_nb = [v[1] for v in raw_history]
304
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
305
306
    @property
307
    def input_method(self):
308
        """Get the input method."""
309
        return self._input_method
310
311
    @input_method.setter
312
    def input_method(self, input_method):
313
        """Set the input method.
314
315
        * local: system local grab (psutil or direct access)
316
        * snmp: Client server mode via SNMP
317
        * glances: Client server mode via Glances API
318
        """
319
        self._input_method = input_method
320
321
    @property
322
    def short_system_name(self):
323
        """Get the short detected OS name (SNMP)."""
324
        return self._short_system_name
325
326
    def sorted_stats(self):
327
        """Get the stats sorted by an alias (if present) or key."""
328
        key = self.get_key()
329
        if key is None:
330
            return self.stats
331
        try:
332
            return sorted(
333
                self.stats,
334
                key=lambda stat: tuple(
335
                    int(part) if part.isdigit() else part.lower()
336
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
337
                ),
338
            )
339
        except TypeError:
340
            # Correct "Starting an alias with a number causes a crash #1885"
341
            return sorted(
342
                self.stats,
343
                key=lambda stat: tuple(
344
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
345
                ),
346
            )
347
348
    @short_system_name.setter
349
    def short_system_name(self, short_name):
350
        """Set the short detected OS name (SNMP)."""
351
        self._short_system_name = short_name
352
353
    def set_stats(self, input_stats):
354
        """Set the stats to input_stats."""
355
        self.stats = input_stats
356
357
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
358
        """Update stats using SNMP.
359
360
        If bulk=True, use a bulk request instead of a get request.
361
        """
362
        snmp_oid = snmp_oid or {}
363
364
        from glances.snmp import GlancesSNMPClient
365
366
        # Init the SNMP request
367
        snmp_client = GlancesSNMPClient(
368
            host=self.args.client,
369
            port=self.args.snmp_port,
370
            version=self.args.snmp_version,
371
            community=self.args.snmp_community,
372
        )
373
374
        # Process the SNMP request
375
        ret = {}
376
        if bulk:
377
            # Bulk request
378
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(snmp_oid.values()))
379
            logger.info(snmp_result)
380
            if len(snmp_oid) == 1:
381
                # Bulk command for only one OID
382
                # Note: key is the item indexed but the OID result
383
                for item in snmp_result:
384
                    if item.keys()[0].startswith(snmp_oid.values()[0]):
385
                        ret[snmp_oid.keys()[0] + item.keys()[0].split(snmp_oid.values()[0])[1]] = item.values()[0]
386
            else:
387
                # Build the internal dict with the SNMP result
388
                # Note: key is the first item in the snmp_oid
389
                index = 1
390
                for item in snmp_result:
391
                    item_stats = {}
392
                    item_key = None
393
                    for key in snmp_oid:
394
                        oid = snmp_oid[key] + '.' + str(index)
395
                        if oid in item:
396
                            if item_key is None:
397
                                item_key = item[oid]
398
                            else:
399
                                item_stats[key] = item[oid]
400
                    if item_stats:
401
                        ret[item_key] = item_stats
402
                    index += 1
403
        else:
404
            # Simple get request
405
            snmp_result = snmp_client.get_by_oid(*list(snmp_oid.values()))
406
407
            # Build the internal dict with the SNMP result
408
            for key in snmp_oid:
409
                ret[key] = snmp_result[snmp_oid[key]]
410
411
        return ret
412
413
    def get_raw(self):
414
        """Return the stats object."""
415
        return self.stats
416
417
    def get_export(self):
418
        """Return the stats object to export.
419
        By default, return the raw stats.
420
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
421
        """
422
        return self.get_raw()
423
424
    def get_stats(self):
425
        """Return the stats object in JSON format."""
426
        return json_dumps(self.get_raw())
427
428
    def get_json(self):
429
        """Return the stats object in JSON format."""
430
        return self.get_stats()
431
432
    def get_raw_stats_item(self, item):
433
        """Return the stats object for a specific item in RAW format.
434
435
        Stats should be a list of dict (processlist, network...)
436
        """
437
        return dictlist(self.get_raw(), item)
438
439
    def get_raw_stats_key(self, item, key):
440
        """Return the stats object for a specific item in RAW format.
441
442
        Stats should be a list of dict (processlist, network...)
443
        """
444
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
445
446
    def get_stats_item(self, item):
447
        """Return the stats object for a specific item in JSON format.
448
449
        Stats should be a list of dict (processlist, network...)
450
        """
451
        return dictlist_json_dumps(self.get_raw(), item)
452
453
    def get_raw_stats_value(self, item, value):
454
        """Return the stats object for a specific item=value.
455
456
        Return None if the item=value does not exist
457
        Return None if the item is not a list of dict
458
        """
459
        if not isinstance(self.get_raw(), list):
460
            return None
461
462
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
463
            value = int(value)
464
        try:
465
            return {value: [i for i in self.get_raw() if i[item] == value]}
466
        except (KeyError, ValueError) as e:
467
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
468
            return None
469
470
    def get_stats_value(self, item, value):
471
        """Return the stats object for a specific item=value in JSON format.
472
473
        Stats should be a list of dict (processlist, network...)
474
        """
475
        rsv = self.get_raw_stats_value(item, value)
476
        if rsv is None:
477
            return None
478
        return json_dumps(rsv)
479
480
    def get_item_info(self, item, key, default=None):
481
        """Return the item info grabbed into self.fields_description."""
482
        if self.fields_description is None or item not in self.fields_description:
483
            return default
484
        return self.fields_description[item].get(key, default)
485
486
    def _build_field_decoration(self, field):
487
        """Return the field decoration.
488
489
        The decoration is used to display the field in the UI.
490
        """
491
        # Manage the decoration
492
        if self.fields_description and field in self.fields_description:
493
            if (
494
                self.fields_description[field].get('rate') is True
495
                and isinstance(self.stats, dict)
496
                and self.stats.get('time_since_update', 0) == 0
497
            ):
498
                return 'DEFAULT'
499
            if self.fields_description[field].get('log') is True:
500
                return self.get_alert_log(self.stats[field], header=field)
501
            if self.fields_description[field].get('alert') is True:
502
                return self.get_alert(self.stats[field], header=field)
503
        return 'DEFAULT'
504
505
    def _build_field_optional(self, field):
506
        """Return true if the field is optional."""
507
        if self.fields_description and field in self.fields_description:
508
            return self.fields_description[field].get('optional', False)
509
        return False
510
511
    def _build_view_for_field(self, field):
512
        value = {
513
            'decoration': self._build_field_decoration(field),
514
            'optional': self._build_field_optional(field),
515
            'additional': False,
516
            'splittable': False,
517
            'hidden': False,
518
        }
519
520
        # Manage the hidden feature
521
        # Allow to automatically hide fields when values is never different than 0
522
        # Refactoring done for #2929
523
        if not self.hide_zero:
524
            value['hidden'] = False
525
        elif field in self.views and 'hidden' in self.views[field]:
526
            value['hidden'] = self.views[field]['hidden']
527
            if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
528
                value['hidden'] = False
529
        else:
530
            value['hidden'] = field in self.hide_zero_fields
531
532
        return value
533
534
    def update_views(self):
535
        """Update the stats views.
536
537
        The V of MVC
538
        A dict of dict with the needed information to display the stats.
539
        Example for the stat xxx:
540
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
541
                'optional': False,        >>> Is the stat optional
542
                'additional': False,      >>> Is the stat provide additional information
543
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
544
                'hidden': False}          >>> Is the stats should be hidden in the UI
545
        """
546
        ret = {}
547
548
        if self.get_raw() is not None and isinstance(self.get_raw(), list) and self.get_key() is not None:
549
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
550
            for i in self.get_raw():
551
                key = i[self.get_key()]
552
                ret[key] = {}
553
                for field in listkeys(i):
554
                    ret[key][field] = self._build_view_for_field(field)
555
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
556
            # Stats are stored in a dict (ex: CPU, LOAD...)
557
            for field in listkeys(self.get_raw()):
558
                ret[field] = self._build_view_for_field(field)
559
560
        self.views = ret
561
562
        return self.views
563
564
    def set_views(self, input_views):
565
        """Set the views to input_views."""
566
        self.views = input_views
567
568
    def reset_views(self):
569
        """Reset the views to input_views."""
570
        self.views = {}
571
572
    def get_views(self, item=None, key=None, option=None):
573
        """Return the views object.
574
575
        If key is None, return all the view for the current plugin
576
        else if option is None return the view for the specific key (all option)
577
        else return the view of the specific key/option
578
579
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
580
        """
581
        if item is None:
582
            item_views = self.views
583
        else:
584
            item_views = self.views[item]
585
586
        if key is None:
587
            return item_views
588
        if key not in item_views:
589
            return 'DEFAULT'
590
        if option is None:
591
            return item_views[key]
592
        if option in item_views[key]:
593
            return item_views[key][option]
594
        return 'DEFAULT'
595
596
    def get_json_views(self, item=None, key=None, option=None):
597
        """Return the views (in JSON)."""
598
        return json_dumps(self.get_views(item, key, option))
599
600
    def load_limits(self, config):
601
        """Load limits from the configuration file, if it exists."""
602
        # By default set the history length to 3 points per second during one day
603
        self._limits['history_size'] = 28800
604
605
        if not hasattr(config, 'has_section'):
606
            return False
607
608
        # Read the global section
609
        # TODO: not optimized because this section is loaded for each plugin...
610
        if config.has_section('global'):
611
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
612
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
613
614
        # Read the plugin specific section
615
        if config.has_section(self.plugin_name):
616
            for level, _ in config.items(self.plugin_name):
617
                # Read limits
618
                limit = '_'.join([self.plugin_name, level])
619
                try:
620
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
621
                except ValueError:
622
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
623
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
624
625
        return True
626
627
    @property
628
    def limits(self):
629
        """Return the limits object."""
630
        return self._limits
631
632
    @limits.setter
633
    def limits(self, input_limits):
634
        """Set the limits to input_limits."""
635
        self._limits = input_limits
636
637
    def set_refresh(self, value):
638
        """Set the plugin refresh rate"""
639
        self.set_limits('refresh', value)
640
641
    def get_refresh(self):
642
        """Return the plugin refresh time"""
643
        ret = self.get_limits(item='refresh')
644
        if ret is None:
645
            ret = self.args.time if hasattr(self.args, 'time') else 2
646
        return ret
647
648
    def get_refresh_time(self):
649
        """Return the plugin refresh time"""
650
        return self.get_refresh()
651
652
    def set_limits(self, item, value):
653
        """Set the limits object."""
654
        self._limits[f'{self.plugin_name}_{item}'] = value
655
656
    def get_limits(self, item=None):
657
        """Return the limits object."""
658
        if item is None:
659
            return self._limits
660
        return self._limits.get(f'{self.plugin_name}_{item}', None)
661
662
    def get_stats_action(self):
663
        """Return stats for the action.
664
665
        By default return all the stats.
666
        Can be overwrite by plugins implementation.
667
        For example, Docker will return self.stats['containers']
668
        """
669
        return self.stats
670
671
    def get_stat_name(self, header=""):
672
        """Return the stat name with an optional header"""
673
        ret = self.plugin_name
674
        if header != '':
675
            ret += '_' + header
676
        return ret
677
678
    def get_alert(
679
        self,
680
        current=0,
681
        minimum=0,
682
        maximum=100,
683
        highlight_zero=True,
684
        is_max=False,
685
        header="",
686
        action_key=None,
687
        log=False,
688
    ):
689
        """Return the alert status relative to a current value.
690
691
        Use this function for minor stats.
692
693
        If current < CAREFUL of max then alert = OK
694
        If current > CAREFUL of max then alert = CAREFUL
695
        If current > WARNING of max then alert = WARNING
696
        If current > CRITICAL of max then alert = CRITICAL
697
698
        If highlight=True than 0.0 is highlighted
699
700
        If defined 'header' is added between the plugin name and the status.
701
        Only useful for stats with several alert status.
702
703
        If defined, 'action_key' define the key for the actions.
704
        By default, the action_key is equal to the header.
705
706
        If log=True than add log if necessary
707
        elif log=False than do not log
708
        elif log=None than apply the config given in the conf file
709
        """
710
        # Manage 0 (0.0) value if highlight_zero is not True
711
        if not highlight_zero and current == 0:
712
            return 'DEFAULT'
713
714
        # Compute the %
715
        try:
716
            value = (current * 100) / maximum
717
        except ZeroDivisionError:
718
            return 'DEFAULT'
719
        except TypeError:
720
            return 'DEFAULT'
721
722
        # Build the stat_name
723
        stat_name = self.get_stat_name(header=header).lower()
724
725
        # Manage limits
726
        # If is_max is set then default style is set to MAX else default is set to OK
727
        ret = 'MAX' if is_max else 'OK'
728
729
        # Iter through limits
730
        critical = self.get_limit('critical', stat_name=stat_name)
731
        warning = self.get_limit('warning', stat_name=stat_name)
732
        careful = self.get_limit('careful', stat_name=stat_name)
733
        if critical and value >= critical:
734
            ret = 'CRITICAL'
735
        elif warning and value >= warning:
736
            ret = 'WARNING'
737
        elif careful and value >= careful:
738
            ret = 'CAREFUL'
739
        elif not careful and not warning and not critical:
740
            ret = 'DEFAULT'
741
        else:
742
            ret = 'OK'
743
744
        if current < minimum:
745
            ret = 'CAREFUL'
746
747
        # Manage log
748
        log_str = ""
749
        if self.get_limit_log(stat_name=stat_name, default_action=log):
750
            # Add _LOG to the return string
751
            # So stats will be highlighted with a specific color
752
            log_str = "_LOG"
753
            # Add the log to the events list
754
            glances_events.add(ret, stat_name.upper(), value)
755
756
        # Manage threshold
757
        self.manage_threshold(stat_name, ret)
758
759
        # Manage action
760
        self.manage_action(stat_name, ret.lower(), header, action_key)
761
762
        # Default is 'OK'
763
        return ret + log_str
764
765
    def filter_stats(self, stats):
766
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
767
        if hasattr(stats, '_asdict'):
768
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
769
        if isinstance(stats, dict):
770
            return {k: v for k, v in stats.items() if k in self.fields_description}
771
        if isinstance(stats, list):
772
            return [self.filter_stats(s) for s in stats]
773
        return stats
774
775
    def manage_threshold(self, stat_name, trigger):
776
        """Manage the threshold for the current stat."""
777
        glances_thresholds.add(stat_name, trigger)
778
779
    def manage_action(self, stat_name, trigger, header, action_key):
780
        """Manage the action for the current stat."""
781
        # Here is a command line for the current trigger ?
782
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
783
        if not command and not repeat:
784
            # Reset the trigger
785
            self.actions.set(stat_name, trigger)
786
        else:
787
            # Define the action key for the stats dict
788
            # If not define, then it sets to header
789
            if action_key is None:
790
                action_key = header
791
792
            # A command line is available for the current alert
793
            # 1) Build the {{mustache}} dictionary
794
            if isinstance(self.get_stats_action(), list):
795
                # If the stats are stored in a list of dict (fs plugin for example)
796
                # Return the dict for the current header
797
                mustache_dict = {}
798
                for item in self.get_stats_action():
799
                    if item[self.get_key()] == action_key:
800
                        mustache_dict = item
801
                        break
802
            else:
803
                # Use the stats dict
804
                mustache_dict = self.get_stats_action()
805
            # 2) Run the action
806
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
807
808
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
809
        """Get the alert log."""
810
        return self.get_alert(
811
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
812
        )
813
814
    def is_limit(self, criticality, stat_name=""):
815
        """Return true if the criticality limit exist for the given stat_name"""
816
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
817
818
    def get_limit(self, criticality=None, stat_name=""):
819
        """Return the limit value for the given criticality.
820
        If criticality is None, return the dict of all the limits."""
821
        if criticality is None:
822
            return self._limits
823
824
        # Get the limit for stat + header
825
        # Example: network_wlan0_rx_careful
826
        stat_name = stat_name.lower()
827
        if stat_name + '_' + criticality in self._limits:
828
            return self._limits[stat_name + '_' + criticality]
829
        if self.plugin_name + '_' + criticality in self._limits:
830
            return self._limits[self.plugin_name + '_' + criticality]
831
832
        return None
833
834
    def get_limit_action(self, criticality, stat_name=""):
835
        """Return the tuple (action, repeat) for the alert.
836
837
        - action is a command line
838
        - repeat is a bool
839
        """
840
        # Get the action for stat + header
841
        # Example: network_wlan0_rx_careful_action
842
        # Action key available ?
843
        ret = [
844
            (stat_name + '_' + criticality + '_action', False),
845
            (stat_name + '_' + criticality + '_action_repeat', True),
846
            (self.plugin_name + '_' + criticality + '_action', False),
847
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
848
        ]
849
        for r in ret:
850
            if r[0] in self._limits:
851
                return self._limits[r[0]], r[1]
852
853
        # No key found, return None
854
        return None, None
855
856
    def get_limit_log(self, stat_name, default_action=False):
857
        """Return the log tag for the alert."""
858
        # Get the log tag for stat + header
859
        # Example: network_wlan0_rx_log
860
        if stat_name + '_log' in self._limits:
861
            return self._limits[stat_name + '_log'][0].lower() == 'true'
862
        if self.plugin_name + '_log' in self._limits:
863
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
864
        return default_action
865
866
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
867
        """Return the configuration (header_) value for the current plugin.
868
869
        ...or the one given by the plugin_name var.
870
        """
871
        if plugin_name is None:
872
            # If not default use the current plugin name
873
            plugin_name = self.plugin_name
874
875
        if header != "":
876
            # Add the header
877
            plugin_name = plugin_name + '_' + header
878
879
        try:
880
            return self._limits[plugin_name + '_' + value]
881
        except KeyError:
882
            return default
883
884
    def is_show(self, value, header=""):
885
        """Return True if the value is in the show configuration list.
886
887
        If the show value is empty, return True (show by default)
888
889
        The show configuration list is defined in the glances.conf file.
890
        It is a comma-separated list of regexp.
891
        Example for diskio:
892
        show=sda.*
893
        """
894
        # TODO: possible optimisation: create a re.compile list
895
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
896
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('show', header=header))
897
898
    def is_hide(self, value, header=""):
899
        """Return True if the value is in the hide configuration list.
900
901
        The hide configuration list is defined in the glances.conf file.
902
        It is a comma-separated list of regexp.
903
        Example for diskio:
904
        hide=sda2,sda5,loop.*
905
        """
906
        # TODO: possible optimisation: create a re.compile list
907
        # nguuuquaaa: no need a compile list, the re module caches the compiling itself
908
        return any(re.fullmatch(i, value, re.I) for i in self.get_conf_value('hide', header=header))
909
910
    def is_display(self, value, header=""):
911
        """Return True if the value should be displayed in the UI"""
912
        if self.get_conf_value('show', header=header) != []:
913
            return self.is_show(value, header=header)
914
        return not self.is_hide(value, header=header)
915
916
    def is_display_any(self, *values, header=""):
917
        """Return True if any of the values should be displayed in the UI"""
918
        if self.get_conf_value('show', header=header) != []:
919
            return any(self.is_show(value, header=header) for value in values)
920
        return not any(self.is_hide(value, header=header) for value in values)
921
922
    def read_alias(self):
923
        if self.plugin_name + '_' + 'alias' in self._limits:
924
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
925
        return {}
926
927
    def has_alias(self, header):
928
        """Return the alias name for the relative header it it exists otherwise None."""
929
        return self.alias.get(header, None)
930
931
    def msg_curse(self, args=None, max_width=None):
932
        """Return default string to display in the curse interface."""
933
        return [self.curse_add_line(str(self.stats))]
934
935
    def get_stats_display(self, args=None, max_width=None):
936
        """Return a dict with all the information needed to display the stat.
937
938
        key     | description
939
        ----------------------------
940
        display | Display the stat (True or False)
941
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
942
        align   | Message position
943
        """
944
        display_curse = False
945
946
        if hasattr(self, 'display_curse'):
947
            display_curse = self.display_curse
948
        if hasattr(self, 'align'):
949
            align_curse = self._align
950
951
        if max_width is not None:
952
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 948 is False. Are you sure this can never be the case?
Loading history...
953
        else:
954
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
955
956
        return ret
957
958
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
959
        """Return a dict with.
960
961
        Where:
962
            msg: string
963
            decoration:
964
                DEFAULT: no decoration
965
                UNDERLINE: underline
966
                BOLD: bold
967
                TITLE: for stat title
968
                PROCESS: for process name
969
                STATUS: for process status
970
                CPU_TIME: for process cpu time
971
                OK: Value is OK and non logged
972
                OK_LOG: Value is OK and logged
973
                CAREFUL: Value is CAREFUL and non logged
974
                CAREFUL_LOG: Value is CAREFUL and logged
975
                WARNING: Value is WARNING and non logged
976
                WARNING_LOG: Value is WARNING and logged
977
                CRITICAL: Value is CRITICAL and non logged
978
                CRITICAL_LOG: Value is CRITICAL and logged
979
            optional: True if the stat is optional (display only if space is available)
980
            additional: True if the stat is additional (display only if space is available after optional)
981
            spittable: Line can be split to fit on the screen (default is not)
982
        """
983
        return {
984
            'msg': msg,
985
            'decoration': decoration,
986
            'optional': optional,
987
            'additional': additional,
988
            'splittable': splittable,
989
        }
990
991
    def curse_new_line(self):
992
        """Go to a new line."""
993
        return self.curse_add_line('\n')
994
995
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
996
        """Return a list of dict messages with the 'key: value' result
997
998
          <=== width ===>
999
        __key     : 80.5%__
1000
        | |       | |    |_ trailer
1001
        | |       | |_ self.stats[key]
1002
        | |       |_ separator
1003
        | |_ 'short_name' description or key or nothing if display_key is True
1004
        |_ header
1005
1006
        Instead of:
1007
            msg = '  {:8}'.format('idle:')
1008
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1009
            msg = '{:5.1f}%'.format(self.stats['idle'])
1010
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
1011
1012
        Use:
1013
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
1014
1015
        """
1016
        if key not in self.stats:
1017
            return []
1018
1019
        # Check if a shortname is defined
1020
        if not display_key:
1021
            key_name = ''
1022
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
1023
            key_name = self.fields_description[key]['short_name']
1024
        else:
1025
            key_name = key
1026
1027
        # Check if unit is defined and get the short unit char in the unit_sort dict
1028
        if (
1029
            key in self.fields_description
1030
            and 'unit' in self.fields_description[key]
1031
            and self.fields_description[key]['unit'] in fields_unit_short
1032
        ):
1033
            # Get the shortname
1034
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
1035
        else:
1036
            unit_short = ''
1037
1038
        # Check if unit is defined and get the unit type unit_type dict
1039
        if (
1040
            key in self.fields_description
1041
            and 'unit' in self.fields_description[key]
1042
            and self.fields_description[key]['unit'] in fields_unit_type
1043
        ):
1044
            # Get the shortname
1045
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
1046
        else:
1047
            unit_type = 'float'
1048
1049
        # Is it a rate ? Yes, get the pre-computed rate value
1050
        if key in self.fields_description and self.fields_description[key].get('rate', False) is True:
1051
            value = self.stats.get(key + '_rate_per_sec', None)
1052
        else:
1053
            value = self.stats.get(key, None)
1054
1055
        if width is None:
1056
            msg_item = header + f'{key_name}' + separator
1057
            msg_template_float = '{:.1f}{}'
1058
            msg_template = '{}{}'
1059
        else:
1060
            # Define the size of the message
1061
            # item will be on the left
1062
            # value will be on the right
1063
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1064
            msg_template_float = '{:5.1f}{}'
1065
            msg_template = '{:>5}{}'
1066
1067
        if value is None:
1068
            msg_value = msg_template.format('-', '')
1069
        elif unit_type == 'float':
1070
            msg_value = msg_template_float.format(value, unit_short)
1071
        elif 'min_symbol' in self.fields_description[key]:
1072
            msg_value = msg_template.format(
1073
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1074
            )
1075
        else:
1076
            msg_value = msg_template.format(int(value), unit_short)
1077
1078
        # Add the trailer
1079
        msg_value = msg_value + trailer
1080
1081
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1082
        optional = self.get_views(key=key, option='optional')
1083
1084
        return [
1085
            self.curse_add_line(msg_item, optional=optional),
1086
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1087
        ]
1088
1089
    @property
1090
    def align(self):
1091
        """Get the curse align."""
1092
        return self._align
1093
1094
    @align.setter
1095
    def align(self, value):
1096
        """Set the curse align.
1097
1098
        value: left, right, bottom.
1099
        """
1100
        self._align = value
1101
1102
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1103
        """Return a nice human-readable string out of number."""
1104
        return auto_unit(number, low_precision=low_precision, min_symbol=min_symbol, none_symbol=none_symbol)
1105
1106
    def trend_msg(self, trend, significant=1):
1107
        """Return the trend message.
1108
1109
        Do not take into account if trend < significant
1110
        """
1111
        ret = '-'
1112
        if trend is None:
1113
            ret = ' '
1114
        elif trend > significant:
1115
            ret = unicode_message('ARROW_UP', self.args)
1116
        elif trend < -significant:
1117
            ret = unicode_message('ARROW_DOWN', self.args)
1118
        return ret
1119
1120
    def _check_decorator(fct):
1121
        """Check decorator for update method.
1122
1123
        It checks:
1124
        - if the plugin is enabled.
1125
        - if the refresh_timer is finished
1126
        """
1127
1128
        def wrapper(self, *args, **kw):
1129
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1130
                # Run the method
1131
                ret = fct(self, *args, **kw)
1132
                # Reset the timer
1133
                self.refresh_timer.set(self.get_refresh())
1134
                self.refresh_timer.reset()
1135
            else:
1136
                # No need to call the method
1137
                # Return the last result available
1138
                ret = self.stats
1139
            return ret
1140
1141
        return wrapper
1142
1143 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1144
        """Log (DEBUG) the result of the function fct."""
1145
1146
        def wrapper(*args, **kw):
1147
            counter = Counter()
1148
            ret = fct(*args, **kw)
1149
            duration = counter.get()
1150
            class_name = args[0].__class__.__name__
1151
            class_module = args[0].__class__.__module__
1152
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1153
            return ret
1154
1155
        return wrapper
1156
1157
    def _manage_rate(fct):
1158
        """Manage rate decorator for update method."""
1159
1160
        def compute_rate(self, stat, stat_previous):
1161
            if stat_previous is None:
1162
                return stat
1163
1164
            # 1) set _gauge for all the rate fields
1165
            # 2) compute the _rate_per_sec
1166
            # 3) set the original field to the delta between the current and the previous value
1167
            for field in self.fields_description:
1168
                # For all the field with the rate=True flag
1169
                # if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1170
                if self.fields_description[field].get('rate', False):
1171
                    # Create a new metadata with the gauge
1172
                    stat['time_since_update'] = self.time_since_last_update
1173
                    stat[field + '_gauge'] = stat[field]
1174
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1175
                        # The stat becomes the delta between the current and the previous value
1176
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1177
                        # Compute the rate
1178
                        if self.time_since_last_update > 0:
1179
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1180
                        else:
1181
                            stat[field] = 0
1182
                            stat[field + '_rate_per_sec'] = 0
1183
                    else:
1184
                        # Avoid strange rate at the first run
1185
                        stat[field] = 0
1186
                        stat[field + '_rate_per_sec'] = 0
1187
            return stat
1188
1189
        def compute_rate_on_list(self, stats, stats_previous):
1190
            if stats_previous is None:
1191
                return stats
1192
1193
            for stat in stats:
1194
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1195
                if len(olds) == 1:
1196
                    compute_rate(self, stat, olds[0])
1197
            return stats
1198
1199
        def wrapper(self, *args, **kw):
1200
            # Call the father method
1201
            stats = fct(self, *args, **kw)
1202
1203
            # Get the time since the last update
1204
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1205
1206
            # Compute the rate
1207
            if isinstance(stats, dict):
1208
                # Stats is a dict
1209
                compute_rate(self, stats, self.stats_previous)
1210
            elif isinstance(stats, list):
1211
                # Stats is a list
1212
                compute_rate_on_list(self, stats, self.stats_previous)
1213
1214
            # Memorized the current stats for next run
1215
            self.stats_previous = copy.deepcopy(stats)
1216
1217
            return stats
1218
1219
        return wrapper
1220
1221
    def _exit_after(second):
1222
        """Exit the function if it takes more than 'second' seconds to complete."""
1223
1224
        def outer(fn):
1225
            def inner(*args, **kwargs):
1226
                timer = threading.Timer(second, thread.interrupt_main, args=[fn.__name__])
1227
                timer.start()
1228
                try:
1229
                    result = fn(*args, **kwargs)
1230
                finally:
1231
                    timer.cancel()
1232
                return result
1233
1234
            return inner
1235
1236
        return outer
1237
1238
    # Mandatory to call the decorator in child classes
1239
    _check_decorator = staticmethod(_check_decorator)
1240
    _log_result_decorator = staticmethod(_log_result_decorator)
1241
    _manage_rate = staticmethod(_manage_rate)
1242
    _exit_after = staticmethod(_exit_after)
1243