GlancesPluginModel.update_views()   F
last analyzed

Complexity

Conditions 20

Size

Total Lines 64
Code Lines 38

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 20
eloc 38
nop 1
dl 0
loc 64
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like glances.plugins.plugin.model.GlancesPluginModel.update_views() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#
2
# This file is part of Glances.
3
#
4
# SPDX-FileCopyrightText: 2024 Nicolas Hennion <[email protected]>
5
#
6
# SPDX-License-Identifier: LGPL-3.0-only
7
#
8
9
"""
10
I am your father...
11
12
...of all Glances model plugins.
13
"""
14
15
import copy
16
import re
17
18
from glances.actions import GlancesActions
19
from glances.events_list import glances_events
20
from glances.globals import dictlist, dictlist_json_dumps, iterkeys, itervalues, json_dumps, listkeys, mean, nativestr
21
from glances.history import GlancesHistory
22
from glances.logger import logger
23
from glances.outputs.glances_unicode import unicode_message
24
from glances.thresholds import glances_thresholds
25
from glances.timer import Counter, Timer, getTimeSinceLastUpdate
26
27
fields_unit_short = {'percent': '%'}
28
29
fields_unit_type = {
30
    'percent': 'float',
31
    'percents': 'float',
32
    'number': 'int',
33
    'numbers': 'int',
34
    'int': 'int',
35
    'ints': 'int',
36
    'float': 'float',
37
    'floats': 'float',
38
    'second': 'int',
39
    'seconds': 'int',
40
    'byte': 'int',
41
    'bytes': 'int',
42
}
43
44
45
class GlancesPluginModel:
46
    """Main class for Glances plugin model."""
47
48
    def __init__(self, args=None, config=None, items_history_list=None, stats_init_value={}, fields_description=None):
49
        """Init the plugin of plugins model class.
50
51
        All Glances' plugins model should inherit from this class. Most of the
52
        methods are already implemented in the father classes.
53
54
        Your plugin should return a dict or a list of dicts (stored in the
55
        self.stats). As an example, you can have a look on the mem plugin
56
        (for dict) or network (for list of dicts).
57
58
        From version 4 of the API, the plugin should return a dict.
59
60
        A plugin should implement:
61
        - the reset method: to set your self.stats variable to {} or []
62
        - the update method: where your self.stats variable is set
63
        and optionally:
64
        - the get_key method: set the key of the dict (only for list of dict)
65
        - all others methods you want to overwrite
66
67
        :args: args parameters
68
        :config: configuration parameters
69
        :items_history_list: list of items to store in the history
70
        :stats_init_value: Default value for a stats item
71
        """
72
        # Build the plugin name
73
        # Internal or external module (former prefixed by 'glances.plugins')
74
        _mod = self.__class__.__module__.replace('glances.plugins.', '')
75
        self.plugin_name = _mod.split('.')[0]
76
77
        if self.plugin_name.startswith('glances_'):
78
            self.plugin_name = self.plugin_name.split('glances_')[1]
79
        logger.debug(f"Init {self.plugin_name} plugin")
80
81
        # Init the args
82
        self.args = args
83
84
        # Init the default alignment (for curses)
85
        self._align = 'left'
86
87
        # Init the input method
88
        self._input_method = 'local'
89
        self._short_system_name = None
90
91
        # Init the history list
92
        self.items_history_list = items_history_list
93
        self.stats_history = self.init_stats_history()
94
95
        # Init the limits (configuration keys) dictionary
96
        self._limits = {}
97
        if config is not None:
98
            logger.debug(f'Load section {self.plugin_name} in Glances configuration file')
99
            self.load_limits(config=config)
100
101
        # Init the alias (dictionary)
102
        self.alias = self.read_alias()
103
104
        # Init the actions
105
        self.actions = GlancesActions(args=args)
106
107
        # Init the views
108
        self.views = {}
109
110
        # Hide stats if all the hide_zero_fields has never been != 0
111
        # Default is False, always display stats
112
        self.hide_zero = False
113
        # The threshold needed to display a value if hide_zero is true.
114
        # Only hide a value if it is less than hide_threshold_bytes.
115
        self.hide_threshold_bytes = 0
116
        self.hide_zero_fields = []
117
118
        # Set the initial refresh time to display stats the first time
119
        self.refresh_timer = Timer(0)
120
121
        # Init stats description
122
        self.fields_description = fields_description
123
124
        # Init the stats
125
        self.stats_init_value = stats_init_value
126
        self.time_since_last_update = None
127
        self.stats = None
128
        self.stats_previous = None
129
        self.reset()
130
131
    def __repr__(self):
132
        """Return the raw stats."""
133
        return str(self.stats)
134
135
    def __str__(self):
136
        """Return the human-readable stats."""
137
        return str(self.stats)
138
139
    def get_init_value(self):
140
        """Return a copy of the init value."""
141
        return copy.copy(self.stats_init_value)
142
143
    def reset(self):
144
        """Reset the stats.
145
146
        This method should be overwritten by child classes.
147
        """
148
        self.stats = self.get_init_value()
149
150
    def exit(self):
151
        """Just log an event when Glances exit."""
152
        logger.debug(f"Stop the {self.plugin_name} plugin")
153
154
    def get_key(self):
155
        """Return the key of the list."""
156
        return
157
158
    def is_enabled(self, plugin_name=None):
159
        """Return true if plugin is enabled."""
160
        if not plugin_name:
161
            plugin_name = self.plugin_name
162
        try:
163
            d = getattr(self.args, 'disable_' + plugin_name)
164
        except AttributeError:
165
            d = getattr(self.args, 'enable_' + plugin_name, True)
166
        return d is False
167
168
    def is_disabled(self, plugin_name=None):
169
        """Return true if plugin is disabled."""
170
        return not self.is_enabled(plugin_name=plugin_name)
171
172
    def history_enable(self):
173
        return self.args is not None and not self.args.disable_history and self.get_items_history_list() is not None
174
175
    def init_stats_history(self):
176
        """Init the stats history (dict of GlancesAttribute)."""
177
        if self.history_enable():
178
            init_list = [a['name'] for a in self.get_items_history_list()]
179
            logger.debug(f"Stats history activated for plugin {self.plugin_name} (items: {init_list})")
180
        return GlancesHistory()
181
182
    def reset_stats_history(self):
183
        """Reset the stats history (dict of GlancesAttribute)."""
184
        if self.history_enable():
185
            reset_list = [a['name'] for a in self.get_items_history_list()]
186
            logger.debug(f"Reset history for plugin {self.plugin_name} (items: {reset_list})")
187
            self.stats_history.reset()
188
189
    def update_stats_history(self):
190
        """Update stats history."""
191
        # Build the history
192
        if not (self.get_export() and self.history_enable()):
193
            return
194
        # Itern through items history
195
        item_name = '' if self.get_key() is None else self.get_key()
196
        for i in self.get_items_history_list():
197
            if isinstance(self.get_export(), list):
198
                # Stats is a list of data
199
                # Iter through stats (for example, iter through network interface)
200
                for l_export in self.get_export():
201
                    if i['name'] in l_export:
202
                        self.stats_history.add(
203
                            nativestr(l_export[item_name]) + '_' + nativestr(i['name']),
204
                            l_export[i['name']],
205
                            description=i['description'],
206
                            history_max_size=self._limits['history_size'],
207
                        )
208
            else:
209
                # Stats is not a list
210
                # Add the item to the history directly
211
                self.stats_history.add(
212
                    nativestr(i['name']),
213
                    self.get_export()[i['name']],
214
                    description=i['description'],
215
                    history_max_size=self._limits['history_size'],
216
                )
217
218
    def get_items_history_list(self):
219
        """Return the items history list."""
220
        return self.items_history_list
221
222
    def get_raw_history(self, item=None, nb=0):
223
        """Return the history (RAW format).
224
225
        - the stats history (dict of list) if item is None
226
        - the stats history for the given item (list) instead
227
        - None if item did not exist in the history
228
        """
229
        s = self.stats_history.get(nb=nb)
230
        if item is None:
231
            return s
232
        if item in s:
233
            return s[item]
234
        return None
235
236
    def get_export_history(self, item=None):
237
        """Return the stats history object to export."""
238
        return self.get_raw_history(item=item)
239
240
    def get_stats_history(self, item=None, nb=0):
241
        """Return the stats history (JSON format)."""
242
        s = self.stats_history.get_json(nb=nb)
243
244
        if item is None:
245
            return json_dumps(s)
246
247
        return dictlist_json_dumps(s, item)
248
249
    def get_trend(self, item, nb=30):
250
        """Get the trend regarding to the last nb values.
251
252
        The trend is the diffirence between the mean of the last 0 to nb / 2
253
        and nb / 2 to nb values.
254
        """
255
        raw_history = self.get_raw_history(item=item, nb=nb)
256
        if raw_history is None or len(raw_history) < nb:
257
            return None
258
        last_nb = [v[1] for v in raw_history]
259
        return mean(last_nb[nb // 2 :]) - mean(last_nb[: nb // 2])
260
261
    @property
262
    def input_method(self):
263
        """Get the input method."""
264
        return self._input_method
265
266
    @input_method.setter
267
    def input_method(self, input_method):
268
        """Set the input method.
269
270
        * local: system local grab (psutil or direct access)
271
        * snmp: Client server mode via SNMP
272
        * glances: Client server mode via Glances API
273
        """
274
        self._input_method = input_method
275
276
    @property
277
    def short_system_name(self):
278
        """Get the short detected OS name (SNMP)."""
279
        return self._short_system_name
280
281
    def sorted_stats(self):
282
        """Get the stats sorted by an alias (if present) or key."""
283
        key = self.get_key()
284
        if key is None:
285
            return self.stats
286
        try:
287
            return sorted(
288
                self.stats,
289
                key=lambda stat: tuple(
290
                    int(part) if part.isdigit() else part.lower()
291
                    for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
292
                ),
293
            )
294
        except TypeError:
295
            # Correct "Starting an alias with a number causes a crash #1885"
296
            return sorted(
297
                self.stats,
298
                key=lambda stat: tuple(
299
                    part.lower() for part in re.split(r"(\d+|\D+)", self.has_alias(stat[key]) or stat[key])
300
                ),
301
            )
302
303
    @short_system_name.setter
304
    def short_system_name(self, short_name):
305
        """Set the short detected OS name (SNMP)."""
306
        self._short_system_name = short_name
307
308
    def set_stats(self, input_stats):
309
        """Set the stats to input_stats."""
310
        self.stats = input_stats
311
312
    def get_stats_snmp(self, bulk=False, snmp_oid=None):
313
        """Update stats using SNMP.
314
315
        If bulk=True, use a bulk request instead of a get request.
316
        """
317
        snmp_oid = snmp_oid or {}
318
319
        from glances.snmp import GlancesSNMPClient
320
321
        # Init the SNMP request
322
        snmp_client = GlancesSNMPClient(
323
            host=self.args.client,
324
            port=self.args.snmp_port,
325
            version=self.args.snmp_version,
326
            community=self.args.snmp_community,
327
        )
328
329
        # Process the SNMP request
330
        ret = {}
331
        if bulk:
332
            # Bulk request
333
            snmp_result = snmp_client.getbulk_by_oid(0, 10, *list(itervalues(snmp_oid)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable itervalues does not seem to be defined.
Loading history...
334
            logger.info(snmp_result)
335
            if len(snmp_oid) == 1:
336
                # Bulk command for only one OID
337
                # Note: key is the item indexed but the OID result
338
                for item in snmp_result:
339
                    if iterkeys(item)[0].startswith(itervalues(snmp_oid)[0]):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable iterkeys does not seem to be defined.
Loading history...
340
                        ret[iterkeys(snmp_oid)[0] + iterkeys(item)[0].split(itervalues(snmp_oid)[0])[1]] = itervalues(
341
                            item
342
                        )[0]
343
            else:
344
                # Build the internal dict with the SNMP result
345
                # Note: key is the first item in the snmp_oid
346
                index = 1
347
                for item in snmp_result:
348
                    item_stats = {}
349
                    item_key = None
350
                    for key in iterkeys(snmp_oid):
351
                        oid = snmp_oid[key] + '.' + str(index)
352
                        if oid in item:
353
                            if item_key is None:
354
                                item_key = item[oid]
355
                            else:
356
                                item_stats[key] = item[oid]
357
                    if item_stats:
358
                        ret[item_key] = item_stats
359
                    index += 1
360
        else:
361
            # Simple get request
362
            snmp_result = snmp_client.get_by_oid(*list(itervalues(snmp_oid)))
363
364
            # Build the internal dict with the SNMP result
365
            for key in iterkeys(snmp_oid):
366
                ret[key] = snmp_result[snmp_oid[key]]
367
368
        return ret
369
370
    def get_raw(self):
371
        """Return the stats object."""
372
        return self.stats
373
374
    def get_export(self):
375
        """Return the stats object to export.
376
        By default, return the raw stats.
377
        Note: this method could be overwritten by the plugin if a specific format is needed (ex: processlist)
378
        """
379
        return self.get_raw()
380
381
    def get_stats(self):
382
        """Return the stats object in JSON format."""
383
        return json_dumps(self.get_raw())
384
385
    def get_json(self):
386
        """Return the stats object in JSON format."""
387
        return self.get_stats()
388
389
    def get_raw_stats_item(self, item):
390
        """Return the stats object for a specific item in RAW format.
391
392
        Stats should be a list of dict (processlist, network...)
393
        """
394
        return dictlist(self.get_raw(), item)
395
396
    def get_raw_stats_key(self, item, key):
397
        """Return the stats object for a specific item in RAW format.
398
399
        Stats should be a list of dict (processlist, network...)
400
        """
401
        return {item: [i for i in self.get_raw() if 'key' in i and i[i['key']] == key][0].get(item)}
402
403
    def get_stats_item(self, item):
404
        """Return the stats object for a specific item in JSON format.
405
406
        Stats should be a list of dict (processlist, network...)
407
        """
408
        return dictlist_json_dumps(self.get_raw(), item)
409
410
    def get_raw_stats_value(self, item, value):
411
        """Return the stats object for a specific item=value.
412
413
        Return None if the item=value does not exist
414
        Return None if the item is not a list of dict
415
        """
416
        if not isinstance(self.get_raw(), list):
417
            return None
418
419
        if (not isinstance(value, int) and not isinstance(value, float)) and value.isdigit():
420
            value = int(value)
421
        try:
422
            return {value: [i for i in self.get_raw() if i[item] == value]}
423
        except (KeyError, ValueError) as e:
424
            logger.error(f"Cannot get item({item})=value({value}) ({e})")
425
            return None
426
427
    def get_stats_value(self, item, value):
428
        """Return the stats object for a specific item=value in JSON format.
429
430
        Stats should be a list of dict (processlist, network...)
431
        """
432
        rsv = self.get_raw_stats_value(item, value)
433
        if rsv is None:
434
            return None
435
        return json_dumps(rsv)
436
437
    def get_item_info(self, item, key, default=None):
438
        """Return the item info grabbed into self.fields_description."""
439
        if self.fields_description is None or item not in self.fields_description:
440
            return default
441
        return self.fields_description[item].get(key, default)
442
443
    def update_views(self):
444
        """Update the stats views.
445
446
        The V of MVC
447
        A dict of dict with the needed information to display the stats.
448
        Example for the stat xxx:
449
        'xxx': {'decoration': 'DEFAULT',  >>> The decoration of the stats
450
                'optional': False,        >>> Is the stat optional
451
                'additional': False,      >>> Is the stat provide additional information
452
                'splittable': False,      >>> Is the stat can be cut (like process lon name)
453
                'hidden': False}          >>> Is the stats should be hidden in the UI
454
        """
455
        ret = {}
456
457
        if isinstance(self.get_raw(), list) and self.get_raw() is not None and self.get_key() is not None:
458
            # Stats are stored in a list of dict (ex: DISKIO, NETWORK, FS...)
459
            for i in self.get_raw():
460
                key = i[self.get_key()]
461
                ret[key] = {}
462
                for field in listkeys(i):
463
                    value = {
464
                        'decoration': 'DEFAULT',
465
                        'optional': False,
466
                        'additional': False,
467
                        'splittable': False,
468
                    }
469
                    # Manage the hidden feature
470
                    # Allow to automatically hide fields when values is never different than 0
471
                    # Refactoring done for #2929
472
                    if not self.hide_zero:
473
                        value['hidden'] = False
474
                    elif key in self.views and field in self.views[key] and 'hidden' in self.views[key][field]:
475
                        value['hidden'] = self.views[key][field]['hidden']
476
                        if field in self.hide_zero_fields and i[field] > self.hide_threshold_bytes:
477
                            value['hidden'] = False
478
                    else:
479
                        value['hidden'] = field in self.hide_zero_fields
480
                    ret[key][field] = value
481
        elif isinstance(self.get_raw(), dict) and self.get_raw() is not None:
482
            # Stats are stored in a dict (ex: CPU, LOAD...)
483
            for field in listkeys(self.get_raw()):
484
                value = {
485
                    'decoration': 'DEFAULT',
486
                    'optional': False,
487
                    'additional': False,
488
                    'splittable': False,
489
                    'hidden': False,
490
                }
491
                # Manage the hidden feature
492
                # Allow to automatically hide fields when values is never different than 0
493
                # Refactoring done for #2929
494
                if not self.hide_zero:
495
                    value['hidden'] = False
496
                elif field in self.views and 'hidden' in self.views[field]:
497
                    value['hidden'] = self.views[field]['hidden']
498
                    if field in self.hide_zero_fields and self.get_raw()[field] >= self.hide_threshold_bytes:
499
                        value['hidden'] = False
500
                else:
501
                    value['hidden'] = field in self.hide_zero_fields
502
                ret[field] = value
503
504
        self.views = ret
505
506
        return self.views
507
508
    def set_views(self, input_views):
509
        """Set the views to input_views."""
510
        self.views = input_views
511
512
    def reset_views(self):
513
        """Reset the views to input_views."""
514
        self.views = {}
515
516
    def get_views(self, item=None, key=None, option=None):
517
        """Return the views object.
518
519
        If key is None, return all the view for the current plugin
520
        else if option is None return the view for the specific key (all option)
521
        else return the view of the specific key/option
522
523
        Specify item if the stats are stored in a dict of dict (ex: NETWORK, FS...)
524
        """
525
        if item is None:
526
            item_views = self.views
527
        else:
528
            item_views = self.views[item]
529
530
        if key is None or key not in item_views:
531
            return item_views
532
        if option is None:
533
            return item_views[key]
534
        if option in item_views[key]:
535
            return item_views[key][option]
536
        return 'DEFAULT'
537
538
    def get_json_views(self, item=None, key=None, option=None):
539
        """Return the views (in JSON)."""
540
        return json_dumps(self.get_views(item, key, option))
541
542
    def load_limits(self, config):
543
        """Load limits from the configuration file, if it exists."""
544
        # By default set the history length to 3 points per second during one day
545
        self._limits['history_size'] = 28800
546
547
        if not hasattr(config, 'has_section'):
548
            return False
549
550
        # Read the global section
551
        # TODO: not optimized because this section is loaded for each plugin...
552
        if config.has_section('global'):
553
            self._limits['history_size'] = config.get_float_value('global', 'history_size', default=28800)
554
            logger.debug("Load configuration key: {} = {}".format('history_size', self._limits['history_size']))
555
556
        # Read the plugin specific section
557
        if config.has_section(self.plugin_name):
558
            for level, _ in config.items(self.plugin_name):
559
                # Read limits
560
                limit = '_'.join([self.plugin_name, level])
561
                try:
562
                    self._limits[limit] = config.get_float_value(self.plugin_name, level)
563
                except ValueError:
564
                    self._limits[limit] = config.get_value(self.plugin_name, level).split(",")
565
                logger.debug(f"Load limit: {limit} = {self._limits[limit]}")
566
567
        return True
568
569
    @property
570
    def limits(self):
571
        """Return the limits object."""
572
        return self._limits
573
574
    @limits.setter
575
    def limits(self, input_limits):
576
        """Set the limits to input_limits."""
577
        self._limits = input_limits
578
579
    def set_refresh(self, value):
580
        """Set the plugin refresh rate"""
581
        self.set_limits('refresh', value)
582
583
    def get_refresh(self):
584
        """Return the plugin refresh time"""
585
        ret = self.get_limits(item='refresh')
586
        if ret is None:
587
            ret = self.args.time if hasattr(self.args, 'time') else 2
588
        return ret
589
590
    def get_refresh_time(self):
591
        """Return the plugin refresh time"""
592
        return self.get_refresh()
593
594
    def set_limits(self, item, value):
595
        """Set the limits object."""
596
        self._limits[f'{self.plugin_name}_{item}'] = value
597
598
    def get_limits(self, item=None):
599
        """Return the limits object."""
600
        if item is None:
601
            return self._limits
602
        return self._limits.get(f'{self.plugin_name}_{item}', None)
603
604
    def get_stats_action(self):
605
        """Return stats for the action.
606
607
        By default return all the stats.
608
        Can be overwrite by plugins implementation.
609
        For example, Docker will return self.stats['containers']
610
        """
611
        return self.stats
612
613
    def get_stat_name(self, header=""):
614
        """Return the stat name with an optional header"""
615
        ret = self.plugin_name
616
        if header != '':
617
            ret += '_' + header
618
        return ret
619
620
    def get_alert(
621
        self,
622
        current=0,
623
        minimum=0,
624
        maximum=100,
625
        highlight_zero=True,
626
        is_max=False,
627
        header="",
628
        action_key=None,
629
        log=False,
630
    ):
631
        """Return the alert status relative to a current value.
632
633
        Use this function for minor stats.
634
635
        If current < CAREFUL of max then alert = OK
636
        If current > CAREFUL of max then alert = CAREFUL
637
        If current > WARNING of max then alert = WARNING
638
        If current > CRITICAL of max then alert = CRITICAL
639
640
        If highlight=True than 0.0 is highlighted
641
642
        If defined 'header' is added between the plugin name and the status.
643
        Only useful for stats with several alert status.
644
645
        If defined, 'action_key' define the key for the actions.
646
        By default, the action_key is equal to the header.
647
648
        If log=True than add log if necessary
649
        elif log=False than do not log
650
        elif log=None than apply the config given in the conf file
651
        """
652
        # Manage 0 (0.0) value if highlight_zero is not True
653
        if not highlight_zero and current == 0:
654
            return 'DEFAULT'
655
656
        # Compute the %
657
        try:
658
            value = (current * 100) / maximum
659
        except ZeroDivisionError:
660
            return 'DEFAULT'
661
        except TypeError:
662
            return 'DEFAULT'
663
664
        # Build the stat_name
665
        stat_name = self.get_stat_name(header=header).lower()
666
667
        # Manage limits
668
        # If is_max is set then default style is set to MAX else default is set to OK
669
        ret = 'MAX' if is_max else 'OK'
670
671
        # Iter through limits
672
        critical = self.get_limit('critical', stat_name=stat_name)
673
        warning = self.get_limit('warning', stat_name=stat_name)
674
        careful = self.get_limit('careful', stat_name=stat_name)
675
        if critical and value >= critical:
676
            ret = 'CRITICAL'
677
        elif warning and value >= warning:
678
            ret = 'WARNING'
679
        elif careful and value >= careful:
680
            ret = 'CAREFUL'
681
        elif not careful and not warning and not critical:
682
            ret = 'DEFAULT'
683
        else:
684
            ret = 'OK'
685
686
        if current < minimum:
687
            ret = 'CAREFUL'
688
689
        # Manage log
690
        log_str = ""
691
        if self.get_limit_log(stat_name=stat_name, default_action=log):
692
            # Add _LOG to the return string
693
            # So stats will be highlighted with a specific color
694
            log_str = "_LOG"
695
            # Add the log to the events list
696
            glances_events.add(ret, stat_name.upper(), value)
697
698
        # Manage threshold
699
        self.manage_threshold(stat_name, ret)
700
701
        # Manage action
702
        self.manage_action(stat_name, ret.lower(), header, action_key)
703
704
        # Default is 'OK'
705
        return ret + log_str
706
707
    def filter_stats(self, stats):
708
        """Filter the stats to keep only the fields we want (the one defined in fields_description)."""
709
        if hasattr(stats, '_asdict'):
710
            return {k: v for k, v in stats._asdict().items() if k in self.fields_description}
711
        if isinstance(stats, dict):
712
            return {k: v for k, v in stats.items() if k in self.fields_description}
713
        if isinstance(stats, list):
714
            return [self.filter_stats(s) for s in stats]
715
        return stats
716
717
    def manage_threshold(self, stat_name, trigger):
718
        """Manage the threshold for the current stat."""
719
        glances_thresholds.add(stat_name, trigger)
720
721
    def manage_action(self, stat_name, trigger, header, action_key):
722
        """Manage the action for the current stat."""
723
        # Here is a command line for the current trigger ?
724
        command, repeat = self.get_limit_action(trigger, stat_name=stat_name)
725
        if not command and not repeat:
726
            # Reset the trigger
727
            self.actions.set(stat_name, trigger)
728
        else:
729
            # Define the action key for the stats dict
730
            # If not define, then it sets to header
731
            if action_key is None:
732
                action_key = header
733
734
            # A command line is available for the current alert
735
            # 1) Build the {{mustache}} dictionary
736
            if isinstance(self.get_stats_action(), list):
737
                # If the stats are stored in a list of dict (fs plugin for example)
738
                # Return the dict for the current header
739
                mustache_dict = {}
740
                for item in self.get_stats_action():
741
                    if item[self.get_key()] == action_key:
742
                        mustache_dict = item
743
                        break
744
            else:
745
                # Use the stats dict
746
                mustache_dict = self.get_stats_action()
747
            # 2) Run the action
748
            self.actions.run(stat_name, trigger, command, repeat, mustache_dict=mustache_dict)
749
750
    def get_alert_log(self, current=0, minimum=0, maximum=100, header="", action_key=None):
751
        """Get the alert log."""
752
        return self.get_alert(
753
            current=current, minimum=minimum, maximum=maximum, header=header, action_key=action_key, log=True
754
        )
755
756
    def is_limit(self, criticality, stat_name=""):
757
        """Return true if the criticality limit exist for the given stat_name"""
758
        return self.get_stat_name(stat_name).lower() + '_' + criticality in self._limits
759
760
    def get_limit(self, criticality=None, stat_name=""):
761
        """Return the limit value for the given criticality.
762
        If criticality is None, return the dict of all the limits."""
763
        if criticality is None:
764
            return self._limits
765
766
        # Get the limit for stat + header
767
        # Example: network_wlan0_rx_careful
768
        stat_name = stat_name.lower()
769
        if stat_name + '_' + criticality in self._limits:
770
            return self._limits[stat_name + '_' + criticality]
771
        if self.plugin_name + '_' + criticality in self._limits:
772
            return self._limits[self.plugin_name + '_' + criticality]
773
774
        return None
775
776
    def get_limit_action(self, criticality, stat_name=""):
777
        """Return the tuple (action, repeat) for the alert.
778
779
        - action is a command line
780
        - repeat is a bool
781
        """
782
        # Get the action for stat + header
783
        # Example: network_wlan0_rx_careful_action
784
        # Action key available ?
785
        ret = [
786
            (stat_name + '_' + criticality + '_action', False),
787
            (stat_name + '_' + criticality + '_action_repeat', True),
788
            (self.plugin_name + '_' + criticality + '_action', False),
789
            (self.plugin_name + '_' + criticality + '_action_repeat', True),
790
        ]
791
        for r in ret:
792
            if r[0] in self._limits:
793
                return self._limits[r[0]], r[1]
794
795
        # No key found, return None
796
        return None, None
797
798
    def get_limit_log(self, stat_name, default_action=False):
799
        """Return the log tag for the alert."""
800
        # Get the log tag for stat + header
801
        # Example: network_wlan0_rx_log
802
        if stat_name + '_log' in self._limits:
803
            return self._limits[stat_name + '_log'][0].lower() == 'true'
804
        if self.plugin_name + '_log' in self._limits:
805
            return self._limits[self.plugin_name + '_log'][0].lower() == 'true'
806
        return default_action
807
808
    def get_conf_value(self, value, header="", plugin_name=None, default=[]):
809
        """Return the configuration (header_) value for the current plugin.
810
811
        ...or the one given by the plugin_name var.
812
        """
813
        if plugin_name is None:
814
            # If not default use the current plugin name
815
            plugin_name = self.plugin_name
816
817
        if header != "":
818
            # Add the header
819
            plugin_name = plugin_name + '_' + header
820
821
        try:
822
            return self._limits[plugin_name + '_' + value]
823
        except KeyError:
824
            return default
825
826
    def is_show(self, value, header=""):
827
        """Return True if the value is in the show configuration list.
828
829
        If the show value is empty, return True (show by default)
830
831
        The show configuration list is defined in the glances.conf file.
832
        It is a comma-separated list of regexp.
833
        Example for diskio:
834
        show=sda.*
835
        """
836
        # TODO: possible optimisation: create a re.compile list
837
        return any(
838
            j for j in [re.fullmatch(i.lower(), value.lower()) for i in self.get_conf_value('show', header=header)]
839
        )
840
841
    def is_hide(self, value, header=""):
842
        """Return True if the value is in the hide configuration list.
843
844
        The hide configuration list is defined in the glances.conf file.
845
        It is a comma-separated list of regexp.
846
        Example for diskio:
847
        hide=sda2,sda5,loop.*
848
        """
849
        # TODO: possible optimisation: create a re.compile list
850
        return any(
851
            j for j in [re.fullmatch(i.lower(), value.lower()) for i in self.get_conf_value('hide', header=header)]
852
        )
853
854
    def is_display(self, value, header=""):
855
        """Return True if the value should be displayed in the UI"""
856
        if self.get_conf_value('show', header=header) != []:
857
            return self.is_show(value, header=header)
858
        return not self.is_hide(value, header=header)
859
860
    def read_alias(self):
861
        if self.plugin_name + '_' + 'alias' in self._limits:
862
            return {i.split(':')[0].lower(): i.split(':')[1] for i in self._limits[self.plugin_name + '_' + 'alias']}
863
        return {}
864
865
    def has_alias(self, header):
866
        """Return the alias name for the relative header it it exists otherwise None."""
867
        return self.alias.get(header, None)
868
869
    def msg_curse(self, args=None, max_width=None):
870
        """Return default string to display in the curse interface."""
871
        return [self.curse_add_line(str(self.stats))]
872
873
    def get_stats_display(self, args=None, max_width=None):
874
        """Return a dict with all the information needed to display the stat.
875
876
        key     | description
877
        ----------------------------
878
        display | Display the stat (True or False)
879
        msgdict | Message to display (list of dict [{ 'msg': msg, 'decoration': decoration } ... ])
880
        align   | Message position
881
        """
882
        display_curse = False
883
884
        if hasattr(self, 'display_curse'):
885
            display_curse = self.display_curse
886
        if hasattr(self, 'align'):
887
            align_curse = self._align
888
889
        if max_width is not None:
890
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args, max_width=max_width), 'align': align_curse}
0 ignored issues
show
introduced by
The variable align_curse does not seem to be defined in case hasattr(self, 'align') on line 886 is False. Are you sure this can never be the case?
Loading history...
891
        else:
892
            ret = {'display': display_curse, 'msgdict': self.msg_curse(args), 'align': align_curse}
893
894
        return ret
895
896
    def curse_add_line(self, msg, decoration="DEFAULT", optional=False, additional=False, splittable=False):
897
        """Return a dict with.
898
899
        Where:
900
            msg: string
901
            decoration:
902
                DEFAULT: no decoration
903
                UNDERLINE: underline
904
                BOLD: bold
905
                TITLE: for stat title
906
                PROCESS: for process name
907
                STATUS: for process status
908
                CPU_TIME: for process cpu time
909
                OK: Value is OK and non logged
910
                OK_LOG: Value is OK and logged
911
                CAREFUL: Value is CAREFUL and non logged
912
                CAREFUL_LOG: Value is CAREFUL and logged
913
                WARNING: Value is WARNING and non logged
914
                WARNING_LOG: Value is WARNING and logged
915
                CRITICAL: Value is CRITICAL and non logged
916
                CRITICAL_LOG: Value is CRITICAL and logged
917
            optional: True if the stat is optional (display only if space is available)
918
            additional: True if the stat is additional (display only if space is available after optional)
919
            spittable: Line can be split to fit on the screen (default is not)
920
        """
921
        return {
922
            'msg': msg,
923
            'decoration': decoration,
924
            'optional': optional,
925
            'additional': additional,
926
            'splittable': splittable,
927
        }
928
929
    def curse_new_line(self):
930
        """Go to a new line."""
931
        return self.curse_add_line('\n')
932
933
    def curse_add_stat(self, key, width=None, header='', display_key=True, separator='', trailer=''):
934
        """Return a list of dict messages with the 'key: value' result
935
936
          <=== width ===>
937
        __key     : 80.5%__
938
        | |       | |    |_ trailer
939
        | |       | |_ self.stats[key]
940
        | |       |_ separator
941
        | |_ 'short_name' description or key or nothing if display_key is True
942
        |_ header
943
944
        Instead of:
945
            msg = '  {:8}'.format('idle:')
946
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
947
            msg = '{:5.1f}%'.format(self.stats['idle'])
948
            ret.append(self.curse_add_line(msg, optional=self.get_views(key='idle', option='optional')))
949
950
        Use:
951
            ret.extend(self.curse_add_stat('idle', width=15, header='  '))
952
953
        """
954
        if key not in self.stats:
955
            return []
956
957
        # Check if a shortname is defined
958
        if not display_key:
959
            key_name = ''
960
        elif key in self.fields_description and 'short_name' in self.fields_description[key]:
961
            key_name = self.fields_description[key]['short_name']
962
        else:
963
            key_name = key
964
965
        # Check if unit is defined and get the short unit char in the unit_sort dict
966
        if (
967
            key in self.fields_description
968
            and 'unit' in self.fields_description[key]
969
            and self.fields_description[key]['unit'] in fields_unit_short
970
        ):
971
            # Get the shortname
972
            unit_short = fields_unit_short[self.fields_description[key]['unit']]
973
        else:
974
            unit_short = ''
975
976
        # Check if unit is defined and get the unit type unit_type dict
977
        if (
978
            key in self.fields_description
979
            and 'unit' in self.fields_description[key]
980
            and self.fields_description[key]['unit'] in fields_unit_type
981
        ):
982
            # Get the shortname
983
            unit_type = fields_unit_type[self.fields_description[key]['unit']]
984
        else:
985
            unit_type = 'float'
986
987
        # Is it a rate ? Yes, get the pre-computed rate value
988
        if (
989
            key in self.fields_description
990
            and 'rate' in self.fields_description[key]
991
            and self.fields_description[key]['rate'] is True
992
        ):
993
            value = self.stats.get(key + '_rate_per_sec', None)
994
        else:
995
            value = self.stats.get(key, None)
996
997
        if width is None:
998
            msg_item = header + f'{key_name}' + separator
999
            msg_template_float = '{:.1f}{}'
1000
            msg_template = '{}{}'
1001
        else:
1002
            # Define the size of the message
1003
            # item will be on the left
1004
            # value will be on the right
1005
            msg_item = header + '{:{width}}'.format(key_name, width=width - 7) + separator
1006
            msg_template_float = '{:5.1f}{}'
1007
            msg_template = '{:>5}{}'
1008
1009
        if value is None:
1010
            msg_value = msg_template.format('-', '')
1011
        elif unit_type == 'float':
1012
            msg_value = msg_template_float.format(value, unit_short)
1013
        elif 'min_symbol' in self.fields_description[key]:
1014
            msg_value = msg_template.format(
1015
                self.auto_unit(int(value), min_symbol=self.fields_description[key]['min_symbol']), unit_short
1016
            )
1017
        else:
1018
            msg_value = msg_template.format(int(value), unit_short)
1019
1020
        # Add the trailer
1021
        msg_value = msg_value + trailer
1022
1023
        decoration = self.get_views(key=key, option='decoration') if value is not None else 'DEFAULT'
1024
        optional = self.get_views(key=key, option='optional')
1025
1026
        return [
1027
            self.curse_add_line(msg_item, optional=optional),
1028
            self.curse_add_line(msg_value, decoration=decoration, optional=optional),
1029
        ]
1030
1031
    @property
1032
    def align(self):
1033
        """Get the curse align."""
1034
        return self._align
1035
1036
    @align.setter
1037
    def align(self, value):
1038
        """Set the curse align.
1039
1040
        value: left, right, bottom.
1041
        """
1042
        self._align = value
1043
1044
    def auto_unit(self, number, low_precision=False, min_symbol='K', none_symbol='-'):
1045
        """Make a nice human-readable string out of number.
1046
1047
        Number of decimal places increases as quantity approaches 1.
1048
        CASE: 613421788        RESULT:       585M low_precision:       585M
1049
        CASE: 5307033647       RESULT:      4.94G low_precision:       4.9G
1050
        CASE: 44968414685      RESULT:      41.9G low_precision:      41.9G
1051
        CASE: 838471403472     RESULT:       781G low_precision:       781G
1052
        CASE: 9683209690677    RESULT:      8.81T low_precision:       8.8T
1053
        CASE: 1073741824       RESULT:      1024M low_precision:      1024M
1054
        CASE: 1181116006       RESULT:      1.10G low_precision:       1.1G
1055
1056
        :low_precision: returns less decimal places potentially (default is False)
1057
                        sacrificing precision for more readability.
1058
        :min_symbol: Do not approach if number < min_symbol (default is K)
1059
        :decimal_count: if set, force the number of decimal number (default is None)
1060
        """
1061
        if number is None:
1062
            return none_symbol
1063
        symbols = ('K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
1064
        if min_symbol in symbols:
1065
            symbols = symbols[symbols.index(min_symbol) :]
1066
        prefix = {
1067
            'Y': 1208925819614629174706176,
1068
            'Z': 1180591620717411303424,
1069
            'E': 1152921504606846976,
1070
            'P': 1125899906842624,
1071
            'T': 1099511627776,
1072
            'G': 1073741824,
1073
            'M': 1048576,
1074
            'K': 1024,
1075
        }
1076
1077
        if number == 0:
1078
            # Avoid 0.0
1079
            return '0'
1080
1081
        for symbol in reversed(symbols):
1082
            value = float(number) / prefix[symbol]
1083
            if value > 1:
1084
                decimal_precision = 0
1085
                if value < 10:
1086
                    decimal_precision = 2
1087
                elif value < 100:
1088
                    decimal_precision = 1
1089
                if low_precision:
1090
                    if symbol in 'MK':
1091
                        decimal_precision = 0
1092
                    else:
1093
                        decimal_precision = min(1, decimal_precision)
1094
                elif symbol in 'K':
1095
                    decimal_precision = 0
1096
                return '{:.{decimal}f}{symbol}'.format(value, decimal=decimal_precision, symbol=symbol)
1097
        return f'{number!s}'
1098
1099
    def trend_msg(self, trend, significant=1):
1100
        """Return the trend message.
1101
1102
        Do not take into account if trend < significant
1103
        """
1104
        ret = '-'
1105
        if trend is None:
1106
            ret = ' '
1107
        elif trend > significant:
1108
            ret = unicode_message('ARROW_UP', self.args)
1109
        elif trend < -significant:
1110
            ret = unicode_message('ARROW_DOWN', self.args)
1111
        return ret
1112
1113
    def _check_decorator(fct):
1114
        """Check decorator for update method.
1115
1116
        It checks:
1117
        - if the plugin is enabled.
1118
        - if the refresh_timer is finished
1119
        """
1120
1121
        def wrapper(self, *args, **kw):
1122
            if self.is_enabled() and (self.refresh_timer.finished() or self.stats == self.get_init_value):
1123
                # Run the method
1124
                ret = fct(self, *args, **kw)
1125
                # Reset the timer
1126
                self.refresh_timer.set(self.get_refresh())
1127
                self.refresh_timer.reset()
1128
            else:
1129
                # No need to call the method
1130
                # Return the last result available
1131
                ret = self.stats
1132
            return ret
1133
1134
        return wrapper
1135
1136 View Code Duplication
    def _log_result_decorator(fct):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1137
        """Log (DEBUG) the result of the function fct."""
1138
1139
        def wrapper(*args, **kw):
1140
            counter = Counter()
1141
            ret = fct(*args, **kw)
1142
            duration = counter.get()
1143
            class_name = args[0].__class__.__name__
1144
            class_module = args[0].__class__.__module__
1145
            logger.debug(f"{class_name} {class_module} {fct.__name__} return {ret} in {duration} seconds")
1146
            return ret
1147
1148
        return wrapper
1149
1150
    def _manage_rate(fct):
1151
        """Manage rate decorator for update method."""
1152
1153
        def compute_rate(self, stat, stat_previous):
1154
            if stat_previous is None:
1155
                return stat
1156
1157
            # 1) set _gauge for all the rate fields
1158
            # 2) compute the _rate_per_sec
1159
            # 3) set the original field to the delta between the current and the previous value
1160
            for field in self.fields_description:
1161
                # For all the field with the rate=True flag
1162
                if 'rate' in self.fields_description[field] and self.fields_description[field]['rate'] is True:
1163
                    # Create a new metadata with the gauge
1164
                    stat['time_since_update'] = self.time_since_last_update
1165
                    stat[field + '_gauge'] = stat[field]
1166
                    if field + '_gauge' in stat_previous and stat[field] and stat_previous[field + '_gauge']:
1167
                        # The stat becomes the delta between the current and the previous value
1168
                        stat[field] = stat[field] - stat_previous[field + '_gauge']
1169
                        # Compute the rate
1170
                        if self.time_since_last_update > 0:
1171
                            stat[field + '_rate_per_sec'] = stat[field] // self.time_since_last_update
1172
                        else:
1173
                            stat[field] = 0
1174
                    else:
1175
                        # Avoid strange rate at the first run
1176
                        stat[field] = 0
1177
            return stat
1178
1179
        def compute_rate_on_list(self, stats, stats_previous):
1180
            if stats_previous is None:
1181
                return stats
1182
1183
            for stat in stats:
1184
                olds = [i for i in stats_previous if i[self.get_key()] == stat[self.get_key()]]
1185
                if len(olds) == 1:
1186
                    compute_rate(self, stat, olds[0])
1187
            return stats
1188
1189
        def wrapper(self, *args, **kw):
1190
            # Call the father method
1191
            stats = fct(self, *args, **kw)
1192
1193
            # Get the time since the last update
1194
            self.time_since_last_update = getTimeSinceLastUpdate(self.plugin_name)
1195
1196
            # Compute the rate
1197
            if isinstance(stats, dict):
1198
                # Stats is a dict
1199
                compute_rate(self, stats, self.stats_previous)
1200
            elif isinstance(stats, list):
1201
                # Stats is a list
1202
                compute_rate_on_list(self, stats, self.stats_previous)
1203
1204
            # Memorized the current stats for next run
1205
            self.stats_previous = copy.deepcopy(stats)
1206
1207
            return stats
1208
1209
        return wrapper
1210
1211
    # Mandatory to call the decorator in child classes
1212
    _check_decorator = staticmethod(_check_decorator)
1213
    _log_result_decorator = staticmethod(_log_result_decorator)
1214
    _manage_rate = staticmethod(_manage_rate)
1215