Test Failed
Push — master ( cc9054...a8608f )
by Nicolas
03:40
created

GlancesPluginModel.update_views()   F

Complexity

Conditions 20

Size

Total Lines 64
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 20
eloc 38
nop 1
dl 0
loc 64
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.plugin.model.GlancesPluginModel.update_views() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import dictlist, iterkeys, itervalues, json_dumps, json_dumps_dictlist, listkeys, mean, nativestr
21
from glances.history import GlancesHistory
22
from glances.logger import logger
23
from glances.outputs.glances_unicode import unicode_message
24
from glances.thresholds import glances_thresholds
25
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
26
27
fields_unit_short = {'percent': '%'}
28
29
fields_unit_type = {
30
    'percent': 'float',
31
    'percents': 'float',
32
    'number': 'int',
33
    'numbers': 'int',
34
    'int': 'int',
35
    'ints': 'int',
36
    'float': 'float',
37
    'floats': 'float',
38
    'second': 'int',
39
    'seconds': 'int',
40
    'byte': 'int',
41
    'bytes': 'int',
42
}
43
44
45
class GlancesPluginModel:
46
    """Main class for Glances plugin model."""
47
48
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
49
        """Init the plugin of plugins model class.
50
51
        All Glances' plugins model should inherit from this class. Most of the
52
        methods are already implemented in the father classes.
53
54
        Your plugin should return a dict or a list of dicts (stored in the
55
        self.stats). As an example, you can have a look on the mem plugin
56
        (for dict) or network (for list of dicts).
57
58
        From version 4 of the API, the plugin should return a dict.
59
60
        A plugin should implement:
61
        - the reset method: to set your self.stats variable to {} or []
62
        - the update method: where your self.stats variable is set
63
        and optionally:
64
        - the get_key method: set the key of the dict (only for list of dict)
65
        - all others methods you want to overwrite
66
67
        :args: args parameters
68
        :config: configuration parameters
69
        :items_history_list: list of items to store in the history
70
        :stats_init_value: Default value for a stats item
71
        """
72
        # Build the plugin name
73
        # Internal or external module (former prefixed by 'glances.plugins')
74
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
75
        self.plugin_name = _mod.split('.')[0]
76
77
        if self.plugin_name.startswith('glances_'):
78
            self.plugin_name = self.plugin_name.split('glances_')[1]
79
        logger.debug(f"Init {self.plugin_name} plugin")
80
81
        # Init the args
82
        self.args = args
83
84
        # Init the default alignment (for curses)
85
        self._align = 'left'
86
87
        # Init the input method
88
        self._input_method = 'local'
89
        self._short_system_name = None
90
91
        # Init the history list
92
        self.items_history_list = items_history_list
93
        self.stats_history = self.init_stats_history()
94
95
        # Init the limits (configuration keys) dictionary
96
        self._limits = {}
97
        if config is not None:
98
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
99
            self.load_limits(config=config)
100
101
        # Init the alias (dictionary)
102
        self.alias = self.read_alias()
103
104
        # Init the actions
105
        self.actions = GlancesActions(args=args)
106
107
        # Init the views
108
        self.views = {}
109
110
        # Hide stats if all the hide_zero_fields has never been != 0
111
        # Default is False, always display stats
112
        self.hide_zero = False
113
        self.hide_zero_fields = []
114
115
        # Set the initial refresh time to display stats the first time
116
        self.refresh_timer = Timer(0)
117
118
        # Init stats description
119
        self.fields_description = fields_description
120
121
        # Init the stats
122
        self.stats_init_value = stats_init_value
123
        self.time_since_last_update = None
124
        self.stats = None
125
        self.stats_previous = None
126
        self.reset()
127
128
    def __repr__(self):
129
        """Return the raw stats."""
130
        return str(self.stats)
131
132
    def __str__(self):
133
        """Return the human-readable stats."""
134
        return str(self.stats)
135
136
    def get_init_value(self):
137
        """Return a copy of the init value."""
138
        return copy.copy(self.stats_init_value)
139
140
    def reset(self):
141
        """Reset the stats.
142
143
        This method should be overwritten by child classes.
144
        """
145
        self.stats = self.get_init_value()
146
147
    def exit(self):
148
        """Just log an event when Glances exit."""
149
        logger.debug(f"Stop the {self.plugin_name} plugin")
150
151
    def get_key(self):
152
        """Return the key of the list."""
153
        return
154
155
    def is_enabled(self, plugin_name=None):
156
        """Return true if plugin is enabled."""
157
        if not plugin_name:
158
            plugin_name = self.plugin_name
159
        try:
160
            d = getattr(self.args, 'disable_' + plugin_name)
161
        except AttributeError:
162
            d = getattr(self.args, 'enable_' + plugin_name, True)
163
        return d is False
164
165
    def is_disabled(self, plugin_name=None):
166
        """Return true if plugin is disabled."""
167
        return not self.is_enabled(plugin_name=plugin_name)
168
169
    def history_enable(self):
170
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
171
172
    def init_stats_history(self):
173
        """Init the stats history (dict of GlancesAttribute)."""
174
        if self.history_enable():
175
            init_list = [a['name'] for a in self.get_items_history_list()]
176
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
177
        return GlancesHistory()
178
179
    def reset_stats_history(self):
180
        """Reset the stats history (dict of GlancesAttribute)."""
181
        if self.history_enable():
182
            reset_list = [a['name'] for a in self.get_items_history_list()]
183
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
184
            self.stats_history.reset()
185
186
    def update_stats_history(self):
187
        """Update stats history."""
188
        # Build the history
189
        if not (self.get_export() and self.history_enable()):
190
            return
191
        # Itern through items history
192
        item_name = '' if self.get_key() is None else self.get_key()
193
        for i in self.get_items_history_list():
194
            if isinstance(self.get_export(), list):
195
                # Stats is a list of data
196
                # Iter through stats (for example, iter through network interface)
197
                for l_export in self.get_export():
198
                    if i['name'] in l_export:
199
                        self.stats_history.add(
200
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
201
                            l_export[i['name']],
202
                            description=i['description'],
203
                            history_max_size=self._limits['history_size'],
204
                        )
205
            else:
206
                # Stats is not a list
207
                # Add the item to the history directly
208
                self.stats_history.add(
209
                    nativestr(i['name']),
210
                    self.get_export()[i['name']],
211
                    description=i['description'],
212
                    history_max_size=self._limits['history_size'],
213
                )
214
215
    def get_items_history_list(self):
216
        """Return the items history list."""
217
        return self.items_history_list
218
219
    def get_raw_history(self, item=None, nb=0):
220
        """Return the history (RAW format).
221
222
        - the stats history (dict of list) if item is None
223
        - the stats history for the given item (list) instead
224
        - None if item did not exist in the history
225
        """
226
        s = self.stats_history.get(nb=nb)
227
        if item is None:
228
            return s
229
        if item in s:
230
            return s[item]
231
        return None
232
233
    def get_export_history(self, item=None):
234
        """Return the stats history object to export."""
235
        return self.get_raw_history(item=item)
236
237
    def get_stats_history(self, item=None, nb=0):
238
        """Return the stats history (JSON format)."""
239
        s = self.stats_history.get_json(nb=nb)
240
241
        if item is None:
242
            return json_dumps(s)
243
244
        return json_dumps_dictlist(s, item)
245
246
    def get_trend(self, item, nb=30):
247
        """Get the trend regarding to the last nb values.
248
249
        The trend is the diffirence between the mean of the last 0 to nb / 2
250
        and nb / 2 to nb values.
251
        """
252
        raw_history = self.get_raw_history(item=item, nb=nb)
253
        if raw_history is None or len(raw_history) < nb:
254
            return None
255
        last_nb = [v[1] for v in raw_history]
256
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
257
258
    @property
259
    def input_method(self):
260
        """Get the input method."""
261
        return self._input_method
262
263
    @input_method.setter
264
    def input_method(self, input_method):
265
        """Set the input method.
266
267
        * local: system local grab (psutil or direct access)
268
        * snmp: Client server mode via SNMP
269
        * glances: Client server mode via Glances API
270
        """
271
        self._input_method = input_method
272
273
    @property
274
    def short_system_name(self):
275
        """Get the short detected OS name (SNMP)."""
276
        return self._short_system_name
277
278
    def sorted_stats(self):
279
        """Get the stats sorted by an alias (if present) or key."""
280
        key = self.get_key()
281
        if key is None:
282
            return self.stats
283
        try:
284
            return sorted(
285
                self.stats,
286
                key=lambda stat: tuple(
287
                    int(part) if part.isdigit() else part.lower()
288
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
289
                ),
290
            )
291
        except TypeError:
292
            # Correct "Starting an alias with a number causes a crash #1885"
293
            return sorted(
294
                self.stats,
295
                key=lambda stat: tuple(
296
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
297
                ),
298
            )
299
300
    @short_system_name.setter
301
    def short_system_name(self, short_name):
302
        """Set the short detected OS name (SNMP)."""
303
        self._short_system_name = short_name
304
305
    def set_stats(self, input_stats):
306
        """Set the stats to input_stats."""
307
        self.stats = input_stats
308
309
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
310
        """Update stats using SNMP.
311
312
        If bulk=True, use a bulk request instead of a get request.
313
        """
314
        snmp_oid = snmp_oid or {}
315
316
        from glances.snmp import GlancesSNMPClient
317
318
        # Init the SNMP request
319
        snmp_client = GlancesSNMPClient(
320
            host=self.args.client,
321
            port=self.args.snmp_port,
322
            version=self.args.snmp_version,
323
            community=self.args.snmp_community,
324
        )
325
326
        # Process the SNMP request
327
        ret = {}
328
        if bulk:
329
            # Bulk request
330
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(itervalues(snmp_oid)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable itervalues does not seem to be defined.
Loading history...
331
            logger.info(snmp_result)
332
            if len(snmp_oid) == 1:
333
                # Bulk command for only one OID
334
                # Note: key is the item indexed but the OID result
335
                for item in snmp_result:
336
                    if iterkeys(item)[0].startswith(itervalues(snmp_oid)[0]):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable iterkeys does not seem to be defined.
Loading history...
337
                        ret[iterkeys(snmp_oid)[0] + iterkeys(item)[0].split(itervalues(snmp_oid)[0])[1]] = itervalues(
338
                            item
339
                        )[0]
340
            else:
341
                # Build the internal dict with the SNMP result
342
                # Note: key is the first item in the snmp_oid
343
                index = 1
344
                for item in snmp_result:
345
                    item_stats = {}
346
                    item_key = None
347
                    for key in iterkeys(snmp_oid):
348
                        oid = snmp_oid[key] + '.' + str(index)
349
                        if oid in item:
350
                            if item_key is None:
351
                                item_key = item[oid]
352
                            else:
353
                                item_stats[key] = item[oid]
354
                    if item_stats:
355
                        ret[item_key] = item_stats
356
                    index += 1
357
        else:
358
            # Simple get request
359
            snmp_result = snmp_client.get_by_oid(*list(itervalues(snmp_oid)))
360
361
            # Build the internal dict with the SNMP result
362
            for key in iterkeys(snmp_oid):
363
                ret[key] = snmp_result[snmp_oid[key]]
364
365
        return ret
366
367
    def get_raw(self):
368
        """Return the stats object."""
369
        return self.stats
370
371
    def get_export(self):
372
        """Return the stats object to export.
373
        By default, return the raw stats.
374
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
375
        """
376
        return self.get_raw()
377
378
    def get_stats(self):
379
        """Return the stats object in JSON format."""
380
        return json_dumps(self.get_raw())
381
382
    def get_json(self):
383
        """Return the stats object in JSON format."""
384
        return self.get_stats()
385
386
    def get_raw_stats_item(self, item):
387
        """Return the stats object for a specific item in RAW format.
388
389
        Stats should be a list of dict (processlist, network...)
390
        """
391
        return dictlist(self.get_raw(), item)
392
393
    def get_stats_item(self, item):
394
        """Return the stats object for a specific item in JSON format.
395
396
        Stats should be a list of dict (processlist, network...)
397
        """
398
        return json_dumps_dictlist(self.get_raw(), item)
399
400
    def get_raw_stats_value(self, item, value):
401
        """Return the stats object for a specific item=value.
402
403
        Return None if the item=value does not exist
404
        Return None if the item is not a list of dict
405
        """
406
        if not isinstance(self.get_raw(), list):
407
            return None
408
409
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
410
            value = int(value)
411
        try:
412
            return {value: [i for i in self.get_raw() if i[item] == value]}
413
        except (KeyError, ValueError) as e:
414
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
415
            return None
416
417
    def get_stats_value(self, item, value):
418
        """Return the stats object for a specific item=value in JSON format.
419
420
        Stats should be a list of dict (processlist, network...)
421
        """
422
        rsv = self.get_raw_stats_value(item, value)
423
        if rsv is None:
424
            return None
425
        return json_dumps(rsv)
426
427
    def get_item_info(self, item, key, default=None):
428
        """Return the item info grabbed into self.fields_description."""
429
        if self.fields_description is None or item not in self.fields_description:
430
            return default
431
        return self.fields_description[item].get(key, default)
432
433
    def update_views(self):
434
        """Update the stats views.
435
436
        The V of MVC
437
        A dict of dict with the needed information to display the stats.
438
        Example for the stat xxx:
439
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
440
                'optional': False,        >>> Is the stat optional
441
                'additional': False,      >>> Is the stat provide additional information
442
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
443
                'hidden': False}          >>> Is the stats should be hidden in the UI
444
        """
445
        ret = {}
446
447
        if isinstance(self.get_raw(), list) and self.get_raw() is not None and self.get_key() is not None:
448
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
449
            for i in self.get_raw():
450
                key = i[self.get_key()]
451
                ret[key] = {}
452
                for field in listkeys(i):
453
                    value = {
454
                        'decoration': 'DEFAULT',
455
                        'optional': False,
456
                        'additional': False,
457
                        'splittable': False,
458
                    }
459
                    # Manage the hidden feature
460
                    # Allow to automatically hide fields when values is never different than 0
461
                    # Refactoring done for #2929
462
                    if not self.hide_zero:
463
                        value['hidden'] = False
464
                    elif key in self.views and field in self.views[key] and 'hidden' in self.views[key][field]:
465
                        value['hidden'] = self.views[key][field]['hidden']
466
                        if field in self.hide_zero_fields and i[field] != 0:
467
                            value['hidden'] = False
468
                    else:
469
                        value['hidden'] = field in self.hide_zero_fields
470
                    ret[key][field] = value
471
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
472
            # Stats are stored in a dict (ex: CPU, LOAD...)
473
            for field in listkeys(self.get_raw()):
474
                value = {
475
                    'decoration': 'DEFAULT',
476
                    'optional': False,
477
                    'additional': False,
478
                    'splittable': False,
479
                    'hidden': False,
480
                }
481
                # Manage the hidden feature
482
                # Allow to automatically hide fields when values is never different than 0
483
                # Refactoring done for #2929
484
                if not self.hide_zero:
485
                    value['hidden'] = False
486
                elif field in self.views and 'hidden' in self.views[field]:
487
                    value['hidden'] = self.views[field]['hidden']
488
                    if field in self.hide_zero_fields and self.get_raw()[field] != 0:
489
                        value['hidden'] = False
490
                else:
491
                    value['hidden'] = field in self.hide_zero_fields
492
                ret[field] = value
493
494
        self.views = ret
495
496
        return self.views
497
498
    def set_views(self, input_views):
499
        """Set the views to input_views."""
500
        self.views = input_views
501
502
    def reset_views(self):
503
        """Reset the views to input_views."""
504
        self.views = {}
505
506
    def get_views(self, item=None, key=None, option=None):
507
        """Return the views object.
508
509
        If key is None, return all the view for the current plugin
510
        else if option is None return the view for the specific key (all option)
511
        else return the view of the specific key/option
512
513
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
514
        """
515
        if item is None:
516
            item_views = self.views
517
        else:
518
            item_views = self.views[item]
519
520
        if key is None or key not in item_views:
521
            return item_views
522
        if option is None:
523
            return item_views[key]
524
        if option in item_views[key]:
525
            return item_views[key][option]
526
        return 'DEFAULT'
527
528
    def get_json_views(self, item=None, key=None, option=None):
529
        """Return the views (in JSON)."""
530
        return json_dumps(self.get_views(item, key, option))
531
532
    def load_limits(self, config):
533
        """Load limits from the configuration file, if it exists."""
534
        # By default set the history length to 3 points per second during one day
535
        self._limits['history_size'] = 28800
536
537
        if not hasattr(config, 'has_section'):
538
            return False
539
540
        # Read the global section
541
        # TODO: not optimized because this section is loaded for each plugin...
542
        if config.has_section('global'):
543
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
544
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
545
546
        # Read the plugin specific section
547
        if config.has_section(self.plugin_name):
548
            for level, _ in config.items(self.plugin_name):
549
                # Read limits
550
                limit = '_'.join([self.plugin_name, level])
551
                try:
552
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
553
                except ValueError:
554
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
555
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
556
557
        return True
558
559
    @property
560
    def limits(self):
561
        """Return the limits object."""
562
        return self._limits
563
564
    @limits.setter
565
    def limits(self, input_limits):
566
        """Set the limits to input_limits."""
567
        self._limits = input_limits
568
569
    def set_refresh(self, value):
570
        """Set the plugin refresh rate"""
571
        self.set_limits('refresh', value)
572
573
    def get_refresh(self):
574
        """Return the plugin refresh time"""
575
        ret = self.get_limits(item='refresh')
576
        if ret is None:
577
            ret = self.args.time if hasattr(self.args, 'time') else 2
578
        return ret
579
580
    def get_refresh_time(self):
581
        """Return the plugin refresh time"""
582
        return self.get_refresh()
583
584
    def set_limits(self, item, value):
585
        """Set the limits object."""
586
        self._limits[f'{self.plugin_name}_{item}'] = value
587
588
    def get_limits(self, item=None):
589
        """Return the limits object."""
590
        if item is None:
591
            return self._limits
592
        return self._limits.get(f'{self.plugin_name}_{item}', None)
593
594
    def get_stats_action(self):
595
        """Return stats for the action.
596
597
        By default return all the stats.
598
        Can be overwrite by plugins implementation.
599
        For example, Docker will return self.stats['containers']
600
        """
601
        return self.stats
602
603
    def get_stat_name(self, header=""):
604
        """Return the stat name with an optional header"""
605
        ret = self.plugin_name
606
        if header != "":
607
            ret += '_' + header
608
        return ret
609
610
    def get_alert(
611
        self,
612
        current=0,
613
        minimum=0,
614
        maximum=100,
615
        highlight_zero=True,
616
        is_max=False,
617
        header="",
618
        action_key=None,
619
        log=False,
620
    ):
621
        """Return the alert status relative to a current value.
622
623
        Use this function for minor stats.
624
625
        If current < CAREFUL of max then alert = OK
626
        If current > CAREFUL of max then alert = CAREFUL
627
        If current > WARNING of max then alert = WARNING
628
        If current > CRITICAL of max then alert = CRITICAL
629
630
        If highlight=True than 0.0 is highlighted
631
632
        If defined 'header' is added between the plugin name and the status.
633
        Only useful for stats with several alert status.
634
635
        If defined, 'action_key' define the key for the actions.
636
        By default, the action_key is equal to the header.
637
638
        If log=True than add log if necessary
639
        elif log=False than do not log
640
        elif log=None than apply the config given in the conf file
641
        """
642
        # Manage 0 (0.0) value if highlight_zero is not True
643
        if not highlight_zero and current == 0:
644
            return 'DEFAULT'
645
646
        # Compute the %
647
        try:
648
            value = (current * 100) / maximum
649
        except ZeroDivisionError:
650
            return 'DEFAULT'
651
        except TypeError:
652
            return 'DEFAULT'
653
654
        # Build the stat_name
655
        stat_name = self.get_stat_name(header=header)
656
657
        # Manage limits
658
        # If is_max is set then default style is set to MAX else default is set to OK
659
        ret = 'MAX' if is_max else 'OK'
660
661
        # Iter through limits
662
        try:
663
            limit = self.get_limit('critical', stat_name=stat_name)
664
        except KeyError:
665
            try:
666
                limit = self.get_limit('warning', stat_name=stat_name)
667
            except KeyError:
668
                try:
669
                    limit = self.get_limit('careful', stat_name=stat_name)
670
                except KeyError:
671
                    return 'DEFAULT'
672
                else:
673
                    if value >= limit:
674
                        ret = 'CAREFUL'
675
            else:
676
                if value >= limit:
677
                    ret = 'WARNING'
678
        else:
679
            if value >= limit:
680
                ret = 'CRITICAL'
681
682
        if current < minimum:
683
            ret = 'CAREFUL'
684
685
        # Manage log
686
        log_str = ""
687
        if self.get_limit_log(stat_name=stat_name, default_action=log):
688
            # Add _LOG to the return string
689
            # So stats will be highlighted with a specific color
690
            log_str = "_LOG"
691
            # Add the log to the events list
692
            glances_events.add(ret, stat_name.upper(), value)
693
694
        # Manage threshold
695
        self.manage_threshold(stat_name, ret)
696
697
        # Manage action
698
        self.manage_action(stat_name, ret.lower(), header, action_key)
699
700
        # Default is 'OK'
701
        return ret + log_str
702
703
    def filter_stats(self, stats):
704
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
705
        if hasattr(stats, '_asdict'):
706
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
707
        if isinstance(stats, dict):
708
            return {k: v for k, v in stats.items() if k in self.fields_description}
709
        if isinstance(stats, list):
710
            return [self.filter_stats(s) for s in stats]
711
        return stats
712
713
    def manage_threshold(self, stat_name, trigger):
714
        """Manage the threshold for the current stat."""
715
        glances_thresholds.add(stat_name, trigger)
716
717
    def manage_action(self, stat_name, trigger, header, action_key):
718
        """Manage the action for the current stat."""
719
        # Here is a command line for the current trigger ?
720
        try:
721
            command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
722
        except KeyError:
723
            # Reset the trigger
724
            self.actions.set(stat_name, trigger)
725
        else:
726
            # Define the action key for the stats dict
727
            # If not define, then it sets to header
728
            if action_key is None:
729
                action_key = header
730
731
            # A command line is available for the current alert
732
            # 1) Build the {{mustache}} dictionary
733
            if isinstance(self.get_stats_action(), list):
734
                # If the stats are stored in a list of dict (fs plugin for example)
735
                # Return the dict for the current header
736
                mustache_dict = {}
737
                for item in self.get_stats_action():
738
                    if item[self.get_key()] == action_key:
739
                        mustache_dict = item
740
                        break
741
            else:
742
                # Use the stats dict
743
                mustache_dict = self.get_stats_action()
744
            # 2) Run the action
745
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
746
747
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
748
        """Get the alert log."""
749
        return self.get_alert(
750
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
751
        )
752
753
    def is_limit(self, criticality, stat_name=""):
754
        """Return true if the criticality limit exist for the given stat_name"""
755
        if stat_name == "":
756
            return self.plugin_name + '_' + criticality in self._limits
757
        return stat_name + '_' + criticality in self._limits
758
759
    def get_limit(self, criticality=None, stat_name=""):
760
        """Return the limit value for the given criticality.
761
        If criticality is None, return the dict of all the limits."""
762
        if criticality is None:
763
            return self._limits
764
765
        # Get the limit for stat + header
766
        # Example: network_wlan0_rx_careful
767
        if stat_name + '_' + criticality in self._limits:
768
            return self._limits[stat_name + '_' + criticality]
769
        if self.plugin_name + '_' + criticality in self._limits:
770
            return self._limits[self.plugin_name + '_' + criticality]
771
772
        # No key found, the raise an error
773
        raise KeyError
774
775
    def get_limit_action(self, criticality, stat_name=""):
776
        """Return the tuple (action, repeat) for the alert.
777
778
        - action is a command line
779
        - repeat is a bool
780
        """
781
        # Get the action for stat + header
782
        # Example: network_wlan0_rx_careful_action
783
        # Action key available ?
784
        ret = [
785
            (stat_name + '_' + criticality + '_action', False),
786
            (stat_name + '_' + criticality + '_action_repeat', True),
787
            (self.plugin_name + '_' + criticality + '_action', False),
788
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
789
        ]
790
        for r in ret:
791
            if r[0] in self._limits:
792
                return self._limits[r[0]], r[1]
793
794
        # No key found, the raise an error
795
        raise KeyError
796
797
    def get_limit_log(self, stat_name, default_action=False):
798
        """Return the log tag for the alert."""
799
        # Get the log tag for stat + header
800
        # Example: network_wlan0_rx_log
801
        if stat_name + '_log' in self._limits:
802
            return self._limits[stat_name + '_log'][0].lower() == 'true'
803
        if self.plugin_name + '_log' in self._limits:
804
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
805
        return default_action
806
807
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
808
        """Return the configuration (header_) value for the current plugin.
809
810
        ...or the one given by the plugin_name var.
811
        """
812
        if plugin_name is None:
813
            # If not default use the current plugin name
814
            plugin_name = self.plugin_name
815
816
        if header != "":
817
            # Add the header
818
            plugin_name = plugin_name + '_' + header
819
820
        try:
821
            return self._limits[plugin_name + '_' + value]
822
        except KeyError:
823
            return default
824
825
    def is_show(self, value, header=""):
826
        """Return True if the value is in the show configuration list.
827
828
        If the show value is empty, return True (show by default)
829
830
        The show configuration list is defined in the glances.conf file.
831
        It is a comma-separated list of regexp.
832
        Example for diskio:
833
        show=sda.*
834
        """
835
        # TODO: possible optimisation: create a re.compile list
836
        return any(
837
            j for j in [re.fullmatch(i.lower(), value.lower()) for i in self.get_conf_value('show', header=header)]
838
        )
839
840
    def is_hide(self, value, header=""):
841
        """Return True if the value is in the hide configuration list.
842
843
        The hide configuration list is defined in the glances.conf file.
844
        It is a comma-separated list of regexp.
845
        Example for diskio:
846
        hide=sda2,sda5,loop.*
847
        """
848
        # TODO: possible optimisation: create a re.compile list
849
        return any(
850
            j for j in [re.fullmatch(i.lower(), value.lower()) for i in self.get_conf_value('hide', header=header)]
851
        )
852
853
    def is_display(self, value, header=""):
854
        """Return True if the value should be displayed in the UI"""
855
        if self.get_conf_value('show', header=header) != []:
856
            return self.is_show(value, header=header)
857
        return not self.is_hide(value, header=header)
858
859
    def read_alias(self):
860
        if self.plugin_name + '_' + 'alias' in self._limits:
861
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
862
        return {}
863
864
    def has_alias(self, header):
865
        """Return the alias name for the relative header it it exists otherwise None."""
866
        return self.alias.get(header, None)
867
868
    def msg_curse(self, args=None, max_width=None):
869
        """Return default string to display in the curse interface."""
870
        return [self.curse_add_line(str(self.stats))]
871
872
    def get_stats_display(self, args=None, max_width=None):
873
        """Return a dict with all the information needed to display the stat.
874
875
        key     | description
876
        ----------------------------
877
        display | Display the stat (True or False)
878
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
879
        align   | Message position
880
        """
881
        display_curse = False
882
883
        if hasattr(self, 'display_curse'):
884
            display_curse = self.display_curse
885
        if hasattr(self, 'align'):
886
            align_curse = self._align
887
888
        if max_width is not None:
889
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 885 is False. Are you sure this can never be the case?
Loading history...
890
        else:
891
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
892
893
        return ret
894
895
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
896
        """Return a dict with.
897
898
        Where:
899
            msg: string
900
            decoration:
901
                DEFAULT: no decoration
902
                UNDERLINE: underline
903
                BOLD: bold
904
                TITLE: for stat title
905
                PROCESS: for process name
906
                STATUS: for process status
907
                NICE: for process niceness
908
                CPU_TIME: for process cpu time
909
                OK: Value is OK and non logged
910
                OK_LOG: Value is OK and logged
911
                CAREFUL: Value is CAREFUL and non logged
912
                CAREFUL_LOG: Value is CAREFUL and logged
913
                WARNING: Value is WARNING and non logged
914
                WARNING_LOG: Value is WARNING and logged
915
                CRITICAL: Value is CRITICAL and non logged
916
                CRITICAL_LOG: Value is CRITICAL and logged
917
            optional: True if the stat is optional (display only if space is available)
918
            additional: True if the stat is additional (display only if space is available after optional)
919
            spittable: Line can be split to fit on the screen (default is not)
920
        """
921
        return {
922
            'msg': msg,
923
            'decoration': decoration,
924
            'optional': optional,
925
            'additional': additional,
926
            'splittable': splittable,
927
        }
928
929
    def curse_new_line(self):
930
        """Go to a new line."""
931
        return self.curse_add_line('\n')
932
933
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
934
        """Return a list of dict messages with the 'key: value' result
935
936
          <=== width ===>
937
        __key     : 80.5%__
938
        | |       | |    |_ trailer
939
        | |       | |_ self.stats[key]
940
        | |       |_ separator
941
        | |_ 'short_name' description or key or nothing if display_key is True
942
        |_ header
943
944
        Instead of:
945
            msg = '  {:8}'.format('idle:')
946
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
947
            msg = '{:5.1f}%'.format(self.stats['idle'])
948
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
949
950
        Use:
951
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
952
953
        """
954
        if key not in self.stats:
955
            return []
956
957
        # Check if a shortname is defined
958
        if not display_key:
959
            key_name = ''
960
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
961
            key_name = self.fields_description[key]['short_name']
962
        else:
963
            key_name = key
964
965
        # Check if unit is defined and get the short unit char in the unit_sort dict
966
        if (
967
            key in self.fields_description
968
            and 'unit' in self.fields_description[key]
969
            and self.fields_description[key]['unit'] in fields_unit_short
970
        ):
971
            # Get the shortname
972
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
973
        else:
974
            unit_short = ''
975
976
        # Check if unit is defined and get the unit type unit_type dict
977
        if (
978
            key in self.fields_description
979
            and 'unit' in self.fields_description[key]
980
            and self.fields_description[key]['unit'] in fields_unit_type
981
        ):
982
            # Get the shortname
983
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
984
        else:
985
            unit_type = 'float'
986
987
        # Is it a rate ? Yes, get the pre-computed rate value
988
        if (
989
            key in self.fields_description
990
            and 'rate' in self.fields_description[key]
991
            and self.fields_description[key]['rate'] is True
992
        ):
993
            value = self.stats.get(key + '_rate_per_sec', None)
994
        else:
995
            value = self.stats.get(key, None)
996
997
        if width is None:
998
            msg_item = header + f'{key_name}' + separator
999
            msg_template_float = '{:.1f}{}'
1000
            msg_template = '{}{}'
1001
        else:
1002
            # Define the size of the message
1003
            # item will be on the left
1004
            # value will be on the right
1005
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1006
            msg_template_float = '{:5.1f}{}'
1007
            msg_template = '{:>5}{}'
1008
1009
        if value is None:
1010
            msg_value = msg_template.format('-', '')
1011
        elif unit_type == 'float':
1012
            msg_value = msg_template_float.format(value, unit_short)
1013
        elif 'min_symbol' in self.fields_description[key]:
1014
            msg_value = msg_template.format(
1015
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1016
            )
1017
        else:
1018
            msg_value = msg_template.format(int(value), unit_short)
1019
1020
        # Add the trailer
1021
        msg_value = msg_value + trailer
1022
1023
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1024
        optional = self.get_views(key=key, option='optional')
1025
1026
        return [
1027
            self.curse_add_line(msg_item, optional=optional),
1028
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1029
        ]
1030
1031
    @property
1032
    def align(self):
1033
        """Get the curse align."""
1034
        return self._align
1035
1036
    @align.setter
1037
    def align(self, value):
1038
        """Set the curse align.
1039
1040
        value: left, right, bottom.
1041
        """
1042
        self._align = value
1043
1044
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1045
        """Make a nice human-readable string out of number.
1046
1047
        Number of decimal places increases as quantity approaches 1.
1048
        CASE: 613421788        RESULT:       585M low_precision:       585M
1049
        CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
1050
        CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
1051
        CASE: 838471403472     RESULT:       781G low_precision:       781G
1052
        CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
1053
        CASE: 1073741824       RESULT:      1024M low_precision:      1024M
1054
        CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
1055
1056
        :low_precision: returns less decimal places potentially (default is False)
1057
                        sacrificing precision for more readability.
1058
        :min_symbol: Do not approach if number < min_symbol (default is K)
1059
        :decimal_count: if set, force the number of decimal number (default is None)
1060
        """
1061
        if number is None:
1062
            return none_symbol
1063
        symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
1064
        if min_symbol in symbols:
1065
            symbols = symbols[symbols.index(min_symbol) :]
1066
        prefix = {
1067
            'Y': 1208925819614629174706176,
1068
            'Z': 1180591620717411303424,
1069
            'E': 1152921504606846976,
1070
            'P': 1125899906842624,
1071
            'T': 1099511627776,
1072
            'G': 1073741824,
1073
            'M': 1048576,
1074
            'K': 1024,
1075
        }
1076
1077
        if number == 0:
1078
            # Avoid 0.0
1079
            return '0'
1080
1081
        for symbol in reversed(symbols):
1082
            value = float(number) / prefix[symbol]
1083
            if value > 1:
1084
                decimal_precision = 0
1085
                if value < 10:
1086
                    decimal_precision = 2
1087
                elif value < 100:
1088
                    decimal_precision = 1
1089
                if low_precision:
1090
                    if symbol in 'MK':
1091
                        decimal_precision = 0
1092
                    else:
1093
                        decimal_precision = min(1, decimal_precision)
1094
                elif symbol in 'K':
1095
                    decimal_precision = 0
1096
                return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
1097
        return f'{number!s}'
1098
1099
    def trend_msg(self, trend, significant=1):
1100
        """Return the trend message.
1101
1102
        Do not take into account if trend < significant
1103
        """
1104
        ret = '-'
1105
        if trend is None:
1106
            ret = ' '
1107
        elif trend > significant:
1108
            ret = unicode_message('ARROW_UP', self.args)
1109
        elif trend < -significant:
1110
            ret = unicode_message('ARROW_DOWN', self.args)
1111
        return ret
1112
1113
    def _check_decorator(fct):
1114
        """Check decorator for update method.
1115
1116
        It checks:
1117
        - if the plugin is enabled.
1118
        - if the refresh_timer is finished
1119
        """
1120
1121
        def wrapper(self, *args, **kw):
1122
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1123
                # Run the method
1124
                ret = fct(self, *args, **kw)
1125
                # Reset the timer
1126
                self.refresh_timer.set(self.get_refresh())
1127
                self.refresh_timer.reset()
1128
            else:
1129
                # No need to call the method
1130
                # Return the last result available
1131
                ret = self.stats
1132
            return ret
1133
1134
        return wrapper
1135
1136 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1137
        """Log (DEBUG) the result of the function fct."""
1138
1139
        def wrapper(*args, **kw):
1140
            counter = Counter()
1141
            ret = fct(*args, **kw)
1142
            duration = counter.get()
1143
            class_name = args[0].__class__.__name__
1144
            class_module = args[0].__class__.__module__
1145
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1146
            return ret
1147
1148
        return wrapper
1149
1150
    def _manage_rate(fct):
1151
        """Manage rate decorator for update method."""
1152
1153
        def compute_rate(self, stat, stat_previous):
1154
            if stat_previous is None:
1155
                return stat
1156
1157
            # 1) set _gauge for all the rate fields
1158
            # 2) compute the _rate_per_sec
1159
            # 3) set the original field to the delta between the current and the previous value
1160
            for field in self.fields_description:
1161
                # For all the field with the rate=True flag
1162
                if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1163
                    # Create a new metadata with the gauge
1164
                    stat['time_since_update'] = self.time_since_last_update
1165
                    stat[field + '_gauge'] = stat[field]
1166
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1167
                        # The stat becomes the delta between the current and the previous value
1168
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1169
                        # Compute the rate
1170
                        if self.time_since_last_update > 0:
1171
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1172
                        else:
1173
                            stat[field] = 0
1174
                    else:
1175
                        # Avoid strange rate at the first run
1176
                        stat[field] = 0
1177
            return stat
1178
1179
        def compute_rate_on_list(self, stats, stats_previous):
1180
            if stats_previous is None:
1181
                return stats
1182
1183
            for stat in stats:
1184
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1185
                if len(olds) == 1:
1186
                    compute_rate(self, stat, olds[0])
1187
            return stats
1188
1189
        def wrapper(self, *args, **kw):
1190
            # Call the father method
1191
            stats = fct(self, *args, **kw)
1192
1193
            # Get the time since the last update
1194
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1195
1196
            # Compute the rate
1197
            if isinstance(stats, dict):
1198
                # Stats is a dict
1199
                compute_rate(self, stats, self.stats_previous)
1200
            elif isinstance(stats, list):
1201
                # Stats is a list
1202
                compute_rate_on_list(self, stats, self.stats_previous)
1203
1204
            # Memorized the current stats for next run
1205
            self.stats_previous = copy.deepcopy(stats)
1206
1207
            return stats
1208
1209
        return wrapper
1210
1211
    # Mandatory to call the decorator in child classes
1212
    _check_decorator = staticmethod(_check_decorator)
1213
    _log_result_decorator = staticmethod(_log_result_decorator)
1214
    _manage_rate = staticmethod(_manage_rate)
1215