Test Failed
Push — master ( cc9054...a8608f )
by Nicolas
03:40
created

unittest-core.TestGlances._common_plugin_tests()   A

Complexity

Conditions 1

Size

Total Lines 27
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 2
dl 0
loc 27
rs 9.95
c 0
b 0
f 0
1
#!/usr/bin/env python
2
#
3
# Glances - An eye on your system
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
# Refactor by @ariel-anieli in 2024
10
11
"""Glances unitary tests suite."""
12
13
import functools
14
import json
15
import time
16
import unittest
17
from datetime import datetime
18
19
from glances import __version__
20
from glances.events_list import GlancesEventsList
21
from glances.filter import GlancesFilter, GlancesFilterList
22
from glances.globals import LINUX, WINDOWS, pretty_date, string_value_to_float, subsample
23
from glances.main import GlancesMain
24
from glances.outputs.glances_bars import Bar
25
from glances.plugins.plugin.model import GlancesPluginModel
26
from glances.programs import processes_to_programs
27
from glances.stats import GlancesStats
28
from glances.thresholds import (
29
    GlancesThresholdCareful,
30
    GlancesThresholdCritical,
31
    GlancesThresholdOk,
32
    GlancesThresholds,
33
    GlancesThresholdWarning,
34
)
35
36
# Global variables
37
# =================
38
39
# Init Glances core
40
core = GlancesMain()
41
test_config = core.get_config()
42
test_args = core.get_args()
43
44
# Init Glances stats
45
stats = GlancesStats(config=test_config, args=test_args)
46
47
# Unitest class
48
# ==============
49
print(f'Unitary tests for Glances {__version__}')
50
51
52
class TestGlances(unittest.TestCase):
53
    """Test Glances class."""
54
55
    def setUp(self):
56
        """The function is called *every time* before test_*."""
57
        print('\n' + '=' * 78)
58
59
    def reset_stats_history_and_views(self, plugin):
60
        plugin_instance = stats.get_plugin(plugin)
61
        plugin_instance.reset()  # reset stats
62
        plugin_instance.reset_views()  # reset views
63
        plugin_instance.reset_stats_history()  # reset history
64
65
        return plugin_instance
66
67
    def zip_with(self, method, elems, values):
68
        [method(elem, value) for elem, value in zip(elems, values)]
69
70
    def do_checks_before_update(self, plugin_instance):
71
        elems = [
72
            plugin_instance.get_raw(),
73
            plugin_instance.is_enabled(),
74
            plugin_instance.is_disabled(),
75
            plugin_instance.get_views(),
76
        ]
77
78
        values = [plugin_instance.stats_init_value, True, False, {}]
79
80
        self.zip_with(self.assertEqual, elems, values)
81
82
        self.assertIsInstance(plugin_instance.get_raw(), (dict, list))
83
84
        if plugin_instance.history_enable() and isinstance(plugin_instance.get_raw(), dict):
85
            self.assertEqual(plugin_instance.get_key(), None)
86
            self.assertTrue(
87
                all(
88
                    f in [h['name'] for h in plugin_instance.items_history_list]
89
                    for f in plugin_instance.get_raw_history()
90
                )
91
            )
92
        elif plugin_instance.history_enable() and isinstance(plugin_instance.get_raw(), list):
93
            self.assertNotEqual(plugin_instance.get_key(), None)
94
95
    def update_stats(self, plugin_instance):
96
        plugin_instance.update()
97
        plugin_instance.update_stats_history()
98
        plugin_instance.update_views()
99
100
        return plugin_instance
101
102
    def is_field_in_plugin(self, plugin_instance):
103
        def is_in_description(result, f):
104
            return True if f in plugin_instance.fields_description else result
105
106
        def is_field_in_this_description(result, i):
107
            return functools.reduce(is_in_description, i, False)
108
109
        return is_field_in_this_description
110
111
    def is_field_in_stats(self, plugin_instance, plugin):
112
        def is_field_in_this_plugin(result, f):
113
            if f not in plugin_instance.get_raw():
114
                print(f"WARNING: {f} field not found in {plugin} plugin stats")
115
            else:
116
                result = True
117
118
            return result
119
120
        return is_field_in_this_plugin
121
122
    def check_stats(self, plugin_instance, plugin):
123
        self.assertIsInstance(plugin_instance.get_raw(), (dict, list))
124
        if isinstance(plugin_instance.get_raw(), dict) and plugin_instance.get_raw() != {}:
125
            init = False
126
            is_field_present = self.is_field_in_stats(plugin_instance, plugin)
127
            field_description = plugin_instance.fields_description
128
            result = functools.reduce(is_field_present, field_description, init)
129
130
            self.assertTrue(result)
131
132
        elif isinstance(plugin_instance.get_raw(), list) and len(plugin_instance.get_raw()) > 0:
133
            init = False
134
            is_field_present = self.is_field_in_plugin(plugin_instance)
135
            raw_from_instance = plugin_instance.get_raw()
136
            result = functools.reduce(is_field_present, raw_from_instance, init)
137
138
            self.assertTrue(result)
139
140
        elems = [plugin_instance.get_raw(), plugin_instance.get_stats(), json.loads(plugin_instance.get_stats())]
141
142
        values = [plugin_instance.get_export(), plugin_instance.get_json(), plugin_instance.get_raw()]
143
144
        self.zip_with(self.assertEqual, elems, values)
145
146
        if len(plugin_instance.fields_description) > 0:
147
            # Get first item of the fields_description
148
            first_field = list(plugin_instance.fields_description.keys())[0]
149
150
            elems = [
151
                plugin_instance.get_raw_stats_item(first_field),
152
                json.loads(plugin_instance.get_stats_item(first_field)),
153
                plugin_instance.get_item_info(first_field, 'description'),
154
            ]
155
156
            values = [dict, dict, str]
157
158
            self.zip_with(self.assertIsInstance, elems, values)
159
160
    def filter_stats(self, plugin_instance):
161
        current_stats = plugin_instance.get_raw()
162
        if isinstance(plugin_instance.get_raw(), dict):
163
            current_stats['foo'] = 'bar'
164
            current_stats = plugin_instance.filter_stats(current_stats)
165
            self.assertTrue('foo' not in current_stats)
166
        elif isinstance(plugin_instance.get_raw(), list) and len(plugin_instance.get_raw()) > 0:
167
            current_stats[0]['foo'] = 'bar'
168
            current_stats = plugin_instance.filter_stats(current_stats)
169
            self.assertTrue('foo' not in current_stats[0])
170
171
    def get_first_history_field(self, plugin_instance):
172
        if isinstance(plugin_instance.get_raw(), dict):
173
            first_history_field = plugin_instance.get_items_history_list()[0]['name']
174
        elif isinstance(plugin_instance.get_raw(), list) and len(plugin_instance.get_raw()) > 0:
175
            first_history_field = '_'.join(
176
                [
177
                    plugin_instance.get_raw()[0][plugin_instance.get_key()],
178
                    plugin_instance.get_items_history_list()[0]['name'],
179
                ]
180
            )
181
182
        return first_history_field
0 ignored issues
show
introduced by
The variable first_history_field does not seem to be defined for all execution paths.
Loading history...
183
184
    def maybe_assert_first_for_raw(self, plugin_instance, first_history_field):
185
        if len(plugin_instance.get_raw()) > 0:
186
            self.assertEqual(len(plugin_instance.get_raw_history(first_history_field)), 2)
187
            self.assertGreater(
188
                plugin_instance.get_raw_history(first_history_field)[1][0],
189
                plugin_instance.get_raw_history(first_history_field)[0][0],
190
            )
191
192
    def maybe_assert_first_for_stats(self, plugin_instance, first_history_field):
193
        if len(plugin_instance.get_raw()) > 0:
194
            self.assertEqual(len(plugin_instance.get_raw_history(first_history_field)), 3)
195
            self.assertEqual(len(plugin_instance.get_raw_history(first_history_field, 2)), 2)
196
            self.assertIsInstance(json.loads(plugin_instance.get_stats_history()), dict)
197
198
    def chk_hist_maybe_add_third_elem(self, plugin_instance):
199
        if plugin_instance.history_enable():
200
            first_history_field = self.get_first_history_field(plugin_instance)
201
            self.maybe_assert_first_for_raw(plugin_instance, first_history_field)
202
203
            # Update stats (add third element)
204
            plugin_instance = self.update_stats(plugin_instance)
205
206
            self.maybe_assert_first_for_stats(plugin_instance, first_history_field)
207
208
        return first_history_field
0 ignored issues
show
introduced by
The variable first_history_field does not seem to be defined in case plugin_instance.history_enable() on line 199 is False. Are you sure this can never be the case?
Loading history...
209
210
    def check_views(self, plugin_instance, first_history_field):
211
        self.assertIsInstance(plugin_instance.get_views(), dict)
212
        if isinstance(plugin_instance.get_raw(), dict):
213
            self.assertIsInstance(plugin_instance.get_views(first_history_field), dict)
214
            self.assertTrue('decoration' in plugin_instance.get_views(first_history_field))
215
        elif isinstance(plugin_instance.get_raw(), list) and len(plugin_instance.get_raw()) > 0:
216
            first_history_field = plugin_instance.get_items_history_list()[0]['name']
217
            first_item = plugin_instance.get_raw()[0][plugin_instance.get_key()]
218
            self.assertIsInstance(plugin_instance.get_views(item=first_item, key=first_history_field), dict)
219
            self.assertTrue('decoration' in plugin_instance.get_views(item=first_item, key=first_history_field))
220
        self.assertIsInstance(json.loads(plugin_instance.get_json_views()), dict)
221
        self.assertEqual(json.loads(plugin_instance.get_json_views()), plugin_instance.get_views())
222
223
    def _common_plugin_tests(self, plugin):
224
        """Common method to test a Glances plugin
225
        This method is called multiple time by test 100 to 1xx"""
226
227
        # Reset all the stats, history and views
228
        plugin_instance = self.reset_stats_history_and_views(plugin)
229
230
        # Check before update
231
        self.do_checks_before_update(plugin_instance)
232
233
        # Update stats (add first element)
234
        plugin_instance = self.update_stats(plugin_instance)
235
236
        # Check stats
237
        self.check_stats(plugin_instance, plugin)
238
239
        # Filter stats
240
        self.filter_stats(plugin_instance)
241
242
        # Update stats (add second element)
243
        plugin_instance = self.update_stats(plugin_instance)
244
245
        # Check history
246
        first_history_field = self.chk_hist_maybe_add_third_elem(plugin_instance)
247
248
        # Check views
249
        self.check_views(plugin_instance, first_history_field)
250
251
    def test_000_update(self):
252
        """Update stats (mandatory step for all the stats).
253
254
        The update is made twice (for rate computation).
255
        """
256
        print('INFO: [TEST_000] Test the stats update function')
257
        try:
258
            stats.update()
259
        except Exception as e:
260
            print(f'ERROR: Stats update failed: {e}')
261
            self.assertTrue(False)
262
        time.sleep(1)
263
        try:
264
            stats.update()
265
        except Exception as e:
266
            print(f'ERROR: Stats update failed: {e}')
267
            self.assertTrue(False)
268
269
        self.assertTrue(True)
270
271
    def test_001_plugins(self):
272
        """Check mandatory plugins."""
273
        plugins_to_check = ['system', 'cpu', 'load', 'mem', 'memswap', 'network', 'diskio', 'fs']
274
        print('INFO: [TEST_001] Check the mandatory plugins list: {}'.format(', '.join(plugins_to_check)))
275
        plugins_list = stats.getPluginsList()
276
        for plugin in plugins_to_check:
277
            self.assertTrue(plugin in plugins_list)
278
279
    def test_002_system(self):
280
        """Check SYSTEM plugin."""
281
        stats_to_check = ['hostname', 'os_name']
282
        print('INFO: [TEST_002] Check SYSTEM stats: {}'.format(', '.join(stats_to_check)))
283
        stats_grab = stats.get_plugin('system').get_raw()
284
        for stat in stats_to_check:
285
            # Check that the key exist
286
            self.assertTrue(stat in stats_grab, msg=f'Cannot find key: {stat}')
287
        print(f'INFO: SYSTEM stats: {stats_grab}')
288
289
    def test_003_cpu(self):
290
        """Check CPU plugin."""
291
        stats_to_check = ['system', 'user', 'idle']
292
        print('INFO: [TEST_003] Check mandatory CPU stats: {}'.format(', '.join(stats_to_check)))
293
        stats_grab = stats.get_plugin('cpu').get_raw()
294
        for stat in stats_to_check:
295
            # Check that the key exist
296
            self.assertTrue(stat in stats_grab, msg=f'Cannot find key: {stat}')
297
            # Check that % is > 0 and < 100
298
            self.assertGreaterEqual(stats_grab[stat], 0)
299
            self.assertLessEqual(stats_grab[stat], 100)
300
        print(f'INFO: CPU stats: {stats_grab}')
301
302
    @unittest.skipIf(WINDOWS, "Load average not available on Windows")
303
    def test_004_load(self):
304
        """Check LOAD plugin."""
305
        stats_to_check = ['cpucore', 'min1', 'min5', 'min15']
306
        print('INFO: [TEST_004] Check LOAD stats: {}'.format(', '.join(stats_to_check)))
307
        stats_grab = stats.get_plugin('load').get_raw()
308
        for stat in stats_to_check:
309
            # Check that the key exist
310
            self.assertTrue(stat in stats_grab, msg=f'Cannot find key: {stat}')
311
            # Check that % is > 0
312
            self.assertGreaterEqual(stats_grab[stat], 0)
313
        print(f'INFO: LOAD stats: {stats_grab}')
314
315
    def test_005_mem(self):
316
        """Check MEM plugin."""
317
        plugin_name = 'mem'
318
        stats_to_check = ['available', 'used', 'free', 'total']
319
        print('INFO: [TEST_005] Check {} stats: {}'.format(plugin_name, ', '.join(stats_to_check)))
320
        stats_grab = stats.get_plugin('mem').get_raw()
321
        for stat in stats_to_check:
322
            # Check that the key exist
323
            self.assertTrue(stat in stats_grab, msg=f'Cannot find key: {stat}')
324
            # Check that % is > 0
325
            self.assertGreaterEqual(stats_grab[stat], 0)
326
        print(f'INFO: MEM stats: {stats_grab}')
327
328
    def test_006_memswap(self):
329
        """Check MEMSWAP plugin."""
330
        stats_to_check = ['used', 'free', 'total']
331
        print('INFO: [TEST_006] Check MEMSWAP stats: {}'.format(', '.join(stats_to_check)))
332
        stats_grab = stats.get_plugin('memswap').get_raw()
333
        for stat in stats_to_check:
334
            # Check that the key exist
335
            self.assertTrue(stat in stats_grab, msg=f'Cannot find key: {stat}')
336
            # Check that % is > 0
337
            self.assertGreaterEqual(stats_grab[stat], 0)
338
        print(f'INFO: MEMSWAP stats: {stats_grab}')
339
340
    def test_007_network(self):
341
        """Check NETWORK plugin."""
342
        print('INFO: [TEST_007] Check NETWORK stats')
343
        stats_grab = stats.get_plugin('network').get_raw()
344
        self.assertTrue(isinstance(stats_grab, list), msg='Network stats is not a list')
345
        print(f'INFO: NETWORK stats: {stats_grab}')
346
347
    def test_008_diskio(self):
348
        """Check DISKIO plugin."""
349
        print('INFO: [TEST_008] Check DISKIO stats')
350
        stats_grab = stats.get_plugin('diskio').get_raw()
351
        self.assertTrue(isinstance(stats_grab, list), msg='DiskIO stats is not a list')
352
        print(f'INFO: diskio stats: {stats_grab}')
353
354
    def test_009_fs(self):
355
        """Check File System plugin."""
356
        # stats_to_check = [ ]
357
        print('INFO: [TEST_009] Check FS stats')
358
        stats_grab = stats.get_plugin('fs').get_raw()
359
        self.assertTrue(isinstance(stats_grab, list), msg='FileSystem stats is not a list')
360
        print(f'INFO: FS stats: {stats_grab}')
361
362
    def test_010_processes(self):
363
        """Check Process plugin."""
364
        # stats_to_check = [ ]
365
        print('INFO: [TEST_010] Check PROCESS stats')
366
        stats_grab = stats.get_plugin('processcount').get_raw()
367
        # total = stats_grab['total']
368
        self.assertTrue(isinstance(stats_grab, dict), msg='Process count stats is not a dict')
369
        print(f'INFO: PROCESS count stats: {stats_grab}')
370
        stats_grab = stats.get_plugin('processlist').get_raw()
371
        self.assertTrue(isinstance(stats_grab, list), msg='Process count stats is not a list')
372
        print(f'INFO: PROCESS list stats: {len(stats_grab)} items in the list')
373
        # Check if number of processes in the list equal counter
374
        # self.assertEqual(total, len(stats_grab))
375
376
    def test_011_folders(self):
377
        """Check File System plugin."""
378
        # stats_to_check = [ ]
379
        print('INFO: [TEST_011] Check FOLDER stats')
380
        stats_grab = stats.get_plugin('folders').get_raw()
381
        self.assertTrue(isinstance(stats_grab, list), msg='Folders stats is not a list')
382
        print(f'INFO: Folders stats: {stats_grab}')
383
384
    def test_012_ip(self):
385
        """Check IP plugin."""
386
        print('INFO: [TEST_012] Check IP stats')
387
        stats_grab = stats.get_plugin('ip').get_raw()
388
        self.assertTrue(isinstance(stats_grab, dict), msg='IP stats is not a dict')
389
        print(f'INFO: IP stats: {stats_grab}')
390
391
    @unittest.skipIf(not LINUX, "IRQs available only on Linux")
392
    def test_013_irq(self):
393
        """Check IRQ plugin."""
394
        print('INFO: [TEST_013] Check IRQ stats')
395
        stats_grab = stats.get_plugin('irq').get_raw()
396
        self.assertTrue(isinstance(stats_grab, list), msg='IRQ stats is not a list')
397
        print(f'INFO: IRQ stats: {stats_grab}')
398
399
    @unittest.skipIf(not LINUX, "GPU available only on Linux")
400
    def test_014_gpu(self):
401
        """Check GPU plugin."""
402
        print('INFO: [TEST_014] Check GPU stats')
403
        stats_grab = stats.get_plugin('gpu').get_raw()
404
        self.assertTrue(isinstance(stats_grab, list), msg='GPU stats is not a list')
405
        print(f'INFO: GPU stats: {stats_grab}')
406
407
    def test_015_sorted_stats(self):
408
        """Check sorted stats method."""
409
        print('INFO: [TEST_015] Check sorted stats method')
410
        aliases = {
411
            "key2": "alias11",
412
            "key5": "alias2",
413
        }
414
        unsorted_stats = [
415
            {"key": "key4"},
416
            {"key": "key2"},
417
            {"key": "key5"},
418
            {"key": "key21"},
419
            {"key": "key3"},
420
        ]
421
422
        gp = GlancesPluginModel()
423
        gp.get_key = lambda: "key"
424
        gp.has_alias = aliases.get
425
        gp.stats = unsorted_stats
426
427
        sorted_stats = gp.sorted_stats()
428
        self.assertEqual(len(sorted_stats), 5)
429
        self.assertEqual(sorted_stats[0]["key"], "key5")
430
        self.assertEqual(sorted_stats[1]["key"], "key2")
431
        self.assertEqual(sorted_stats[2]["key"], "key3")
432
        self.assertEqual(sorted_stats[3]["key"], "key4")
433
        self.assertEqual(sorted_stats[4]["key"], "key21")
434
435
    def test_016_subsample(self):
436
        """Test subsampling function."""
437
        print('INFO: [TEST_016] Subsampling')
438
        for l_test in [
439
            ([1, 2, 3], 4),
440
            ([1, 2, 3, 4], 4),
441
            ([1, 2, 3, 4, 5, 6, 7], 4),
442
            ([1, 2, 3, 4, 5, 6, 7, 8], 4),
443
            (list(range(1, 800)), 4),
444
            (list(range(1, 8000)), 800),
445
        ]:
446
            l_subsample = subsample(l_test[0], l_test[1])
447
            self.assertLessEqual(len(l_subsample), l_test[1])
448
449
    def test_017_hddsmart(self):
450
        """Check hard disk SMART data plugin."""
451
        try:
452
            from glances.globals import is_admin
453
        except ImportError:
454
            print("INFO: [TEST_017] pySMART not found, not running SMART plugin test")
455
            return
456
457
        stat = 'DeviceName'
458
        print(f'INFO: [TEST_017] Check SMART stats: {stat}')
459
        stats_grab = stats.get_plugin('smart').get_raw()
460
        if not is_admin():
461
            print("INFO: Not admin, SMART list should be empty")
462
            assert len(stats_grab) == 0
463
        elif stats_grab == {}:
464
            print("INFO: Admin but SMART list is empty")
465
            assert len(stats_grab) == 0
466
        else:
467
            print(stats_grab)
468
            self.assertTrue(stat in stats_grab[0].keys(), msg=f'Cannot find key: {stat}')
469
470
        print(f'INFO: SMART stats: {stats_grab}')
471
472
    def test_017_programs(self):
473
        """Check Programs function (it's not a plugin)."""
474
        # stats_to_check = [ ]
475
        print('INFO: [TEST_017] Check PROGRAM stats')
476
        stats_grab = processes_to_programs(stats.get_plugin('processlist').get_raw())
477
        self.assertIsInstance(stats_grab, list, msg='Programs stats list is not a list')
478
        self.assertIsInstance(stats_grab[0], dict, msg='First item should be a dict')
479
480
    def test_018_string_value_to_float(self):
481
        """Check string_value_to_float function"""
482
        print('INFO: [TEST_018] Check string_value_to_float function')
483
        self.assertEqual(string_value_to_float('32kB'), 32000.0)
484
        self.assertEqual(string_value_to_float('32 KB'), 32000.0)
485
        self.assertEqual(string_value_to_float('15.5MB'), 15500000.0)
486
        self.assertEqual(string_value_to_float('25.9'), 25.9)
487
        self.assertEqual(string_value_to_float('12'), 12)
488
        self.assertEqual(string_value_to_float('--'), None)
489
490
    def test_019_events(self):
491
        """Test events class"""
492
        print('INFO: [TEST_019] Test events')
493
        # Init events
494
        events = GlancesEventsList(max_events=5, min_duration=1, min_interval=3)
495
        # Minimal event duration not reached
496
        events.add('WARNING', 'LOAD', 4)
497
        events.add('CRITICAL', 'LOAD', 5)
498
        events.add('OK', 'LOAD', 1)
499
        self.assertEqual(len(events.get()), 0)
500
        # Minimal event duration LOAD reached
501
        events.add('WARNING', 'LOAD', 4)
502
        time.sleep(1)
503
        events.add('CRITICAL', 'LOAD', 5)
504
        time.sleep(1)
505
        events.add('OK', 'LOAD', 1)
506
        self.assertEqual(len(events.get()), 1)
507
        self.assertEqual(events.get()[0]['type'], 'LOAD')
508
        self.assertEqual(events.get()[0]['state'], 'CRITICAL')
509
        self.assertEqual(events.get()[0]['max'], 5)
510
        # Minimal event duration CPU reached
511
        events.add('WARNING', 'CPU', 60)
512
        time.sleep(1)
513
        events.add('WARNING', 'CPU', 70)
514
        time.sleep(1)
515
        events.add('OK', 'CPU', 10)
516
        self.assertEqual(len(events.get()), 2)
517
        self.assertEqual(events.get()[0]['type'], 'CPU')
518
        self.assertEqual(events.get()[0]['state'], 'WARNING')
519
        self.assertEqual(events.get()[0]['min'], 60)
520
        self.assertEqual(events.get()[0]['max'], 70)
521
        self.assertEqual(events.get()[0]['count'], 2)
522
        # Minimal event duration CPU reached (again)
523
        # but time between two events (min_interval) is too short
524
        # a merge will be done
525
        time.sleep(0.5)
526
        events.add('WARNING', 'CPU', 60)
527
        time.sleep(1)
528
        events.add('WARNING', 'CPU', 80)
529
        time.sleep(1)
530
        events.add('OK', 'CPU', 10)
531
        self.assertEqual(len(events.get()), 2)
532
        self.assertEqual(events.get()[0]['type'], 'CPU')
533
        self.assertEqual(events.get()[0]['state'], 'WARNING')
534
        self.assertEqual(events.get()[0]['min'], 60)
535
        self.assertEqual(events.get()[0]['max'], 80)
536
        self.assertEqual(events.get()[0]['count'], 4)
537
        # Clean WARNING events
538
        events.clean()
539
        self.assertEqual(len(events.get()), 1)
540
541
    def test_020_filter(self):
542
        """Test filter classes"""
543
        print('INFO: [TEST_020] Test filter')
544
        gf = GlancesFilter()
545
        gf.filter = '.*python.*'
546
        self.assertEqual(gf.filter, '.*python.*')
547
        self.assertEqual(gf.filter_key, None)
548
        self.assertTrue(gf.is_filtered({'name': 'python'}))
549
        self.assertTrue(gf.is_filtered({'name': '/usr/bin/python -m glances'}))
550
        self.assertFalse(gf.is_filtered({'noname': 'python'}))
551
        self.assertFalse(gf.is_filtered({'name': 'snake'}))
552
        gf.filter = 'username:nicolargo'
553
        self.assertEqual(gf.filter, 'nicolargo')
554
        self.assertEqual(gf.filter_key, 'username')
555
        self.assertTrue(gf.is_filtered({'username': 'nicolargo'}))
556
        self.assertFalse(gf.is_filtered({'username': 'notme'}))
557
        self.assertFalse(gf.is_filtered({'notuser': 'nicolargo'}))
558
        gfl = GlancesFilterList()
559
        gfl.filter = '.*python.*,username:nicolargo'
560
        self.assertTrue(gfl.is_filtered({'name': 'python is in the place'}))
561
        self.assertFalse(gfl.is_filtered({'name': 'snake is in the place'}))
562
        self.assertTrue(gfl.is_filtered({'name': 'snake is in the place', 'username': 'nicolargo'}))
563
        self.assertFalse(gfl.is_filtered({'name': 'snake is in the place', 'username': 'notme'}))
564
565
    def test_021_pretty_date(self):
566
        """Test pretty_date"""
567
        print('INFO: [TEST_021] pretty_date')
568
        self.assertEqual(pretty_date(datetime(2024, 1, 1, 12, 0), datetime(2024, 1, 1, 12, 0)), 'just now')
569
        self.assertEqual(pretty_date(datetime(2024, 1, 1, 11, 59), datetime(2024, 1, 1, 12, 0)), 'a min')
570
        self.assertEqual(pretty_date(datetime(2024, 1, 1, 11, 55), datetime(2024, 1, 1, 12, 0)), '5 mins')
571
        self.assertEqual(pretty_date(datetime(2024, 1, 1, 11, 0), datetime(2024, 1, 1, 12, 0)), 'an hour')
572
        self.assertEqual(pretty_date(datetime(2024, 1, 1, 0, 0), datetime(2024, 1, 1, 12, 0)), '12 hours')
573
        self.assertEqual(pretty_date(datetime(2023, 12, 20, 0, 0), datetime(2024, 1, 1, 12, 0)), '1 week')
574
        self.assertEqual(pretty_date(datetime(2023, 12, 5, 0, 0), datetime(2024, 1, 1, 12, 0)), '3 weeks')
575
        self.assertEqual(pretty_date(datetime(2023, 12, 1, 0, 0), datetime(2024, 1, 1, 12, 0)), '1 month')
576
        self.assertEqual(pretty_date(datetime(2023, 6, 1, 0, 0), datetime(2024, 1, 1, 12, 0)), '7 months')
577
        self.assertEqual(pretty_date(datetime(2023, 1, 1, 0, 0), datetime(2024, 1, 1, 12, 0)), '1 year')
578
        self.assertEqual(pretty_date(datetime(2020, 1, 1, 0, 0), datetime(2024, 1, 1, 12, 0)), '4 years')
579
580
    def test_094_thresholds(self):
581
        """Test thresholds classes"""
582
        print('INFO: [TEST_094] Thresholds')
583
        ok = GlancesThresholdOk()
584
        careful = GlancesThresholdCareful()
585
        warning = GlancesThresholdWarning()
586
        critical = GlancesThresholdCritical()
587
        self.assertTrue(ok < careful)
588
        self.assertTrue(careful < warning)
589
        self.assertTrue(warning < critical)
590
        self.assertFalse(ok > careful)
591
        self.assertEqual(ok, ok)
592
        self.assertEqual(str(ok), 'OK')
593
        thresholds = GlancesThresholds()
594
        thresholds.add('cpu_percent', 'OK')
595
        self.assertEqual(thresholds.get(stat_name='cpu_percent').description(), 'OK')
596
597
    def test_095_methods(self):
598
        """Test mandatories methods"""
599
        print('INFO: [TEST_095] Mandatories methods')
600
        mandatories_methods = ['reset', 'update']
601
        plugins_list = stats.getPluginsList()
602
        for plugin in plugins_list:
603
            for method in mandatories_methods:
604
                self.assertTrue(hasattr(stats.get_plugin(plugin), method), msg=f'{plugin} has no method {method}()')
605
606
    def test_096_views(self):
607
        """Test get_views method"""
608
        print('INFO: [TEST_096] Test views')
609
        plugins_list = stats.getPluginsList()
610
        for plugin in plugins_list:
611
            stats.get_plugin(plugin).get_raw()
612
            views_grab = stats.get_plugin(plugin).get_views()
613
            self.assertTrue(isinstance(views_grab, dict), msg=f'{plugin} view is not a dict')
614
615
    def test_097_attribute(self):
616
        """Test GlancesAttribute classes"""
617
        print('INFO: [TEST_097] Test attribute')
618
        # GlancesAttribute
619
        from glances.attribute import GlancesAttribute
620
621
        a = GlancesAttribute('a', description='ad', history_max_size=3)
622
        self.assertEqual(a.name, 'a')
623
        self.assertEqual(a.description, 'ad')
624
        a.description = 'adn'
625
        self.assertEqual(a.description, 'adn')
626
        a.value = 1
627
        a.value = 2
628
        self.assertEqual(len(a.history), 2)
629
        a.value = 3
630
        self.assertEqual(len(a.history), 3)
631
        a.value = 4
632
        # Check if history_max_size=3 is OK
633
        self.assertEqual(len(a.history), 3)
634
        self.assertEqual(a.history_size(), 3)
635
        self.assertEqual(a.history_len(), 3)
636
        self.assertEqual(a.history_value()[1], 4)
637
        self.assertEqual(a.history_mean(nb=3), 4.5)
638
639
    def test_098_history(self):
640
        """Test GlancesHistory classes"""
641
        print('INFO: [TEST_098] Test history')
642
        # GlancesHistory
643
        from glances.history import GlancesHistory
644
645
        h = GlancesHistory()
646
        h.add('a', 1, history_max_size=100)
647
        h.add('a', 2, history_max_size=100)
648
        h.add('a', 3, history_max_size=100)
649
        h.add('b', 10, history_max_size=100)
650
        h.add('b', 20, history_max_size=100)
651
        h.add('b', 30, history_max_size=100)
652
        self.assertEqual(len(h.get()), 2)
653
        self.assertEqual(len(h.get()['a']), 3)
654
        h.reset()
655
        self.assertEqual(len(h.get()), 2)
656
        self.assertEqual(len(h.get()['a']), 0)
657
658
    def test_099_output_bars(self):
659
        """Test quick look plugin.
660
661
        > bar.min_value
662
        0
663
        > bar.max_value
664
        100
665
        > bar.percent = -1
666
        > bar.percent
667
        0
668
        """
669
        print('INFO: [TEST_099] Test progress bar')
670
671
        bar = Bar(size=1)
672
        # Percent value can not be lower than min_value
673
        bar.percent = -1
674
        self.assertLessEqual(bar.percent, bar.min_value)
675
        # but... percent value can be higher than max_value
676
        bar.percent = 101
677
        self.assertLessEqual(bar.percent, 101)
678
679
        # Test display
680
        bar = Bar(size=50)
681
        bar.percent = 0
682
        self.assertEqual(bar.get(), '                                              0.0%')
683
        bar.percent = 70
684
        self.assertEqual(bar.get(), '|||||||||||||||||||||||||||||||              70.0%')
685
        bar.percent = 100
686
        self.assertEqual(bar.get(), '||||||||||||||||||||||||||||||||||||||||||||  100%')
687
        bar.percent = 110
688
        self.assertEqual(bar.get(), '|||||||||||||||||||||||||||||||||||||||||||| >100%')
689
690
    # Error in Github Action. Do not remove the comment.
691
    # def test_100_system_plugin_method(self):
692
    #     """Test system plugin methods"""
693
    #     print('INFO: [TEST_100] Test system plugin methods')
694
    #     self._common_plugin_tests('system')
695
696
    def test_101_cpu_plugin_method(self):
697
        """Test cpu plugin methods"""
698
        print('INFO: [TEST_100] Test cpu plugin methods')
699
        self._common_plugin_tests('cpu')
700
701
    @unittest.skipIf(WINDOWS, "Load average not available on Windows")
702
    def test_102_load_plugin_method(self):
703
        """Test load plugin methods"""
704
        print('INFO: [TEST_102] Test load plugin methods')
705
        self._common_plugin_tests('load')
706
707
    def test_103_mem_plugin_method(self):
708
        """Test mem plugin methods"""
709
        print('INFO: [TEST_103] Test mem plugin methods')
710
        self._common_plugin_tests('mem')
711
712
    def test_104_memswap_plugin_method(self):
713
        """Test memswap plugin methods"""
714
        print('INFO: [TEST_104] Test memswap plugin methods')
715
        self._common_plugin_tests('memswap')
716
717
    def test_105_network_plugin_method(self):
718
        """Test network plugin methods"""
719
        print('INFO: [TEST_105] Test network plugin methods')
720
        self._common_plugin_tests('network')
721
722
    # Error in Github Action. Do not remove the comment.
723
    # def test_106_diskio_plugin_method(self):
724
    #     """Test diskio plugin methods"""
725
    #     print('INFO: [TEST_106] Test diskio plugin methods')
726
    #     self._common_plugin_tests('diskio')
727
728
    def test_107_fs_plugin_method(self):
729
        """Test fs plugin methods"""
730
        print('INFO: [TEST_107] Test fs plugin methods')
731
        self._common_plugin_tests('fs')
732
733
    def test_200_views_hidden(self):
734
        """Test hide feature"""
735
        print('INFO: [TEST_200] Test views hidden feature')
736
        # Test will be done with the diskio plugin, first available interface (read_bytes fields)
737
        plugin = 'diskio'
738
        field = 'read_bytes_rate_per_sec'
739
        plugin_instance = stats.get_plugin(plugin)
740
        if len(plugin_instance.get_views()) == 0 or not test_config.get_bool_value(plugin, 'hide_zero', False):
741
            # No diskIO interface, test can not be done
742
            return
743
        # Get first disk interface
744
        key = list(plugin_instance.get_views().keys())[0]
745
        # Test
746
        ######
747
        # Init the stats
748
        plugin_instance.update()
749
        raw_stats = plugin_instance.get_raw()
750
        # Reset the views
751
        plugin_instance.set_views({})
752
        # Set field to 0 (should be hidden)
753
        raw_stats[0][field] = 0
754
        plugin_instance.set_stats(raw_stats)
755
        self.assertEqual(plugin_instance.get_raw()[0][field], 0)
756
        plugin_instance.update_views()
757
        self.assertTrue(plugin_instance.get_views()[key][field]['hidden'])
758
        # Set field to 0 (should be hidden)
759
        raw_stats[0][field] = 0
760
        plugin_instance.set_stats(raw_stats)
761
        self.assertEqual(plugin_instance.get_raw()[0][field], 0)
762
        plugin_instance.update_views()
763
        self.assertTrue(plugin_instance.get_views()[key][field]['hidden'])
764
        # Set field to 1 (should not be hidden)
765
        raw_stats[0][field] = 1
766
        plugin_instance.set_stats(raw_stats)
767
        self.assertEqual(plugin_instance.get_raw()[0][field], 1)
768
        plugin_instance.update_views()
769
        self.assertFalse(plugin_instance.get_views()[key][field]['hidden'])
770
        # Set field back to 0 (should not be hidden)
771
        raw_stats[0][field] = 0
772
        plugin_instance.set_stats(raw_stats)
773
        self.assertEqual(plugin_instance.get_raw()[0][field], 0)
774
        plugin_instance.update_views()
775
        self.assertFalse(plugin_instance.get_views()[key][field]['hidden'])
776
777
    # def test_700_secure(self):
778
    #     """Test secure functions"""
779
    #     print('INFO: [TEST_700] Secure functions')
780
781
    #     if WINDOWS:
782
    #         self.assertIn(secure_popen('echo TEST'), ['TEST\n', 'TEST\r\n'])
783
    #         self.assertIn(secure_popen('echo TEST1 && echo TEST2'), ['TEST1\nTEST2\n', 'TEST1\r\nTEST2\r\n'])
784
    #     else:
785
    #         self.assertEqual(secure_popen('echo -n TEST'), 'TEST')
786
    #         self.assertEqual(secure_popen('echo -n TEST1 && echo -n TEST2'), 'TEST1TEST2')
787
    #         # Make the test failed on Github (AssertionError: '' != 'FOO\n')
788
    #         # but not on my localLinux computer...
789
    #         # self.assertEqual(secure_popen('echo FOO | grep FOO'), 'FOO\n')
790
791
    # def test_800_memory_leak(self):
792
    #     """Memory leak check"""
793
    #     import tracemalloc
794
795
    #     print('INFO: [TEST_800] Memory leak check')
796
    #     tracemalloc.start()
797
    #     # 3 iterations just to init the stats and fill the memory
798
    #     for _ in range(3):
799
    #         stats.update()
800
801
    #     # Start the memory leak check
802
    #     snapshot_begin = tracemalloc.take_snapshot()
803
    #     for _ in range(3):
804
    #         stats.update()
805
    #     snapshot_end = tracemalloc.take_snapshot()
806
    #     snapshot_diff = snapshot_end.compare_to(snapshot_begin, 'filename')
807
    #     memory_leak = sum([s.size_diff for s in snapshot_diff])
808
    #     print(f'INFO: Memory leak: {memory_leak} bytes')
809
810
    #     # snapshot_begin = tracemalloc.take_snapshot()
811
    #     for _ in range(30):
812
    #         stats.update()
813
    #     snapshot_end = tracemalloc.take_snapshot()
814
    #     snapshot_diff = snapshot_end.compare_to(snapshot_begin, 'filename')
815
    #     memory_leak = sum([s.size_diff for s in snapshot_diff])
816
    #     print(f'INFO: Memory leak: {memory_leak} bytes')
817
818
    #     # snapshot_begin = tracemalloc.take_snapshot()
819
    #     for _ in range(300):
820
    #         stats.update()
821
    #     snapshot_end = tracemalloc.take_snapshot()
822
    #     snapshot_diff = snapshot_end.compare_to(snapshot_begin, 'filename')
823
    #     memory_leak = sum([s.size_diff for s in snapshot_diff])
824
    #     print(f'INFO: Memory leak: {memory_leak} bytes')
825
    #     snapshot_top = snapshot_end.compare_to(snapshot_begin, 'traceback')
826
    #     print("Memory consumption (top 5):")
827
    #     for stat in snapshot_top[:5]:
828
    #         print(stat)
829
    #         for line in stat.traceback.format():
830
    #             print(line)
831
832
    def test_999_the_end(self):
833
        """Free all the stats"""
834
        print('INFO: [TEST_999] Free the stats')
835
        stats.end()
836
        self.assertTrue(True)
837
838
839
if __name__ == '__main__':
840
    unittest.main()
841