|
1
|
|
|
#!/usr/bin/env python |
|
2
|
|
|
# -*- coding: utf-8 -*- |
|
3
|
|
|
# |
|
4
|
|
|
# Glances - An eye on your system |
|
5
|
|
|
# |
|
6
|
|
|
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]> |
|
7
|
|
|
# |
|
8
|
|
|
# SPDX-License-Identifier: LGPL-3.0-only |
|
9
|
|
|
# |
|
10
|
|
|
|
|
11
|
|
|
"""Glances unitary tests suite.""" |
|
12
|
|
|
|
|
13
|
|
|
import time |
|
14
|
|
|
import unittest |
|
15
|
|
|
import sys |
|
16
|
|
|
import json |
|
17
|
|
|
|
|
18
|
|
|
# Check Python version |
|
19
|
|
|
if sys.version_info < (3, 8): |
|
20
|
|
|
print('Glances requires at least Python 3.8 to run.') |
|
21
|
|
|
sys.exit(1) |
|
22
|
|
|
|
|
23
|
|
|
from glances.main import GlancesMain |
|
24
|
|
|
from glances.stats import GlancesStats |
|
25
|
|
|
from glances import __version__ |
|
26
|
|
|
from glances.globals import WINDOWS, LINUX, subsample, string_value_to_float |
|
27
|
|
|
from glances.outputs.glances_bars import Bar |
|
28
|
|
|
from glances.thresholds import GlancesThresholdOk |
|
29
|
|
|
from glances.thresholds import GlancesThresholdCareful |
|
30
|
|
|
from glances.thresholds import GlancesThresholdWarning |
|
31
|
|
|
from glances.thresholds import GlancesThresholdCritical |
|
32
|
|
|
from glances.thresholds import GlancesThresholds |
|
33
|
|
|
from glances.plugins.plugin.model import GlancesPluginModel |
|
34
|
|
|
from glances.programs import processes_to_programs |
|
35
|
|
|
from glances.secure import secure_popen |
|
36
|
|
|
from glances.events_list import GlancesEventsList |
|
37
|
|
|
from glances.filter import GlancesFilterList, GlancesFilter |
|
38
|
|
|
|
|
39
|
|
|
# Global variables |
|
40
|
|
|
# ================= |
|
41
|
|
|
|
|
42
|
|
|
# Init Glances core |
|
43
|
|
|
core = GlancesMain() |
|
44
|
|
|
test_config = core.get_config() |
|
45
|
|
|
test_args = core.get_args() |
|
46
|
|
|
|
|
47
|
|
|
# Init Glances stats |
|
48
|
|
|
stats = GlancesStats(config=test_config, |
|
49
|
|
|
args=test_args) |
|
50
|
|
|
|
|
51
|
|
|
# Unitest class |
|
52
|
|
|
# ============== |
|
53
|
|
|
print('Unitary tests for Glances %s' % __version__) |
|
54
|
|
|
|
|
55
|
|
|
|
|
56
|
|
|
class TestGlances(unittest.TestCase): |
|
57
|
|
|
"""Test Glances class.""" |
|
58
|
|
|
|
|
59
|
|
|
def setUp(self): |
|
60
|
|
|
"""The function is called *every time* before test_*.""" |
|
61
|
|
|
print('\n' + '=' * 78) |
|
62
|
|
|
|
|
63
|
|
|
def _common_plugin_tests(self, plugin): |
|
64
|
|
|
"""Common method to test a Glances plugin |
|
65
|
|
|
This method is called multiple time by test 100 to 1xx""" |
|
66
|
|
|
|
|
67
|
|
|
# Reset all the stats, history and views |
|
68
|
|
|
plugin_instance = stats.get_plugin(plugin) |
|
69
|
|
|
plugin_instance.reset() # reset stats |
|
70
|
|
|
plugin_instance.reset_views() # reset views |
|
71
|
|
|
plugin_instance.reset_stats_history() # reset history |
|
72
|
|
|
|
|
73
|
|
|
# Check before update |
|
74
|
|
|
self.assertEqual(plugin_instance.get_raw(), plugin_instance.stats_init_value) |
|
75
|
|
|
self.assertEqual(plugin_instance.is_enabled(), True) |
|
76
|
|
|
self.assertEqual(plugin_instance.is_disabled(), False) |
|
77
|
|
|
self.assertEqual(plugin_instance.get_views(), {}) |
|
78
|
|
|
self.assertIsInstance(plugin_instance.get_raw(), (dict, list)) |
|
79
|
|
|
if plugin_instance.history_enable() and isinstance(plugin_instance.get_raw(), dict): |
|
80
|
|
|
self.assertEqual(plugin_instance.get_key(), None) |
|
81
|
|
|
self.assertTrue(all([f in [h['name'] for h in plugin_instance.items_history_list] for f in plugin_instance.get_raw_history()])) |
|
82
|
|
|
elif plugin_instance.history_enable() and isinstance(plugin_instance.get_raw(), list): |
|
83
|
|
|
self.assertNotEqual(plugin_instance.get_key(), None) |
|
84
|
|
|
|
|
85
|
|
|
# Update stats (add first element) |
|
86
|
|
|
plugin_instance.update() |
|
87
|
|
|
plugin_instance.update_stats_history() |
|
88
|
|
|
plugin_instance.update_views() |
|
89
|
|
|
|
|
90
|
|
|
# Check stats |
|
91
|
|
|
self.assertIsInstance(plugin_instance.get_raw(), (dict, list)) |
|
92
|
|
|
if isinstance(plugin_instance.get_raw(), dict): |
|
93
|
|
|
res = False |
|
94
|
|
|
for f in plugin_instance.fields_description: |
|
95
|
|
|
if f not in plugin_instance.get_raw(): |
|
96
|
|
|
print(f"WARNING: {f} field not found in {plugin} plugin stats") |
|
97
|
|
|
else: |
|
98
|
|
|
res = True |
|
99
|
|
|
self.assertTrue(res) |
|
100
|
|
|
elif isinstance(plugin_instance.get_raw(), list): |
|
101
|
|
|
res = False |
|
102
|
|
|
for i in plugin_instance.get_raw(): |
|
103
|
|
|
for f in i: |
|
104
|
|
|
if f in plugin_instance.fields_description: |
|
105
|
|
|
res = True |
|
106
|
|
|
self.assertTrue(res) |
|
107
|
|
|
|
|
108
|
|
|
self.assertEqual(plugin_instance.get_raw(), plugin_instance.get_export()) |
|
109
|
|
|
self.assertEqual(plugin_instance.get_stats(), plugin_instance.get_json()) |
|
110
|
|
|
self.assertEqual(json.loads(plugin_instance.get_stats()), plugin_instance.get_raw()) |
|
111
|
|
|
if len(plugin_instance.fields_description) > 0: |
|
112
|
|
|
# Get first item of the fields_description |
|
113
|
|
|
first_field = list(plugin_instance.fields_description.keys())[0] |
|
114
|
|
|
self.assertIsInstance(plugin_instance.get_raw_stats_item(first_field), dict) |
|
115
|
|
|
self.assertIsInstance(json.loads(plugin_instance.get_stats_item(first_field)), dict) |
|
116
|
|
|
self.assertIsInstance(plugin_instance.get_item_info(first_field, 'description'), str) |
|
117
|
|
|
# Filter stats |
|
118
|
|
|
current_stats = plugin_instance.get_raw() |
|
119
|
|
|
if isinstance(plugin_instance.get_raw(), dict): |
|
120
|
|
|
current_stats['foo'] = 'bar' |
|
121
|
|
|
current_stats = plugin_instance.filter_stats(current_stats) |
|
122
|
|
|
self.assertTrue('foo' not in current_stats) |
|
123
|
|
|
elif isinstance(plugin_instance.get_raw(), list): |
|
124
|
|
|
current_stats[0]['foo'] = 'bar' |
|
125
|
|
|
current_stats = plugin_instance.filter_stats(current_stats) |
|
126
|
|
|
self.assertTrue('foo' not in current_stats[0]) |
|
127
|
|
|
|
|
128
|
|
|
# Update stats (add second element) |
|
129
|
|
|
plugin_instance.update() |
|
130
|
|
|
plugin_instance.update_stats_history() |
|
131
|
|
|
plugin_instance.update_views() |
|
132
|
|
|
|
|
133
|
|
|
# Check history |
|
134
|
|
|
if plugin_instance.history_enable(): |
|
135
|
|
|
if isinstance(plugin_instance.get_raw(), dict): |
|
136
|
|
|
first_history_field = plugin_instance.get_items_history_list()[0]['name'] |
|
137
|
|
|
elif isinstance(plugin_instance.get_raw(), list): |
|
138
|
|
|
first_history_field = '_'.join([plugin_instance.get_raw()[0][plugin_instance.get_key()], |
|
139
|
|
|
plugin_instance.get_items_history_list()[0]['name']]) |
|
140
|
|
|
self.assertEqual(len(plugin_instance.get_raw_history(first_history_field)), 2) |
|
|
|
|
|
|
141
|
|
|
self.assertGreater(plugin_instance.get_raw_history(first_history_field)[1][0], |
|
142
|
|
|
plugin_instance.get_raw_history(first_history_field)[0][0]) |
|
143
|
|
|
|
|
144
|
|
|
# Update stats (add third element) |
|
145
|
|
|
plugin_instance.update() |
|
146
|
|
|
plugin_instance.update_stats_history() |
|
147
|
|
|
plugin_instance.update_views() |
|
148
|
|
|
|
|
149
|
|
|
self.assertEqual(len(plugin_instance.get_raw_history(first_history_field)), 3) |
|
150
|
|
|
self.assertEqual(len(plugin_instance.get_raw_history(first_history_field, 2)), 2) |
|
151
|
|
|
self.assertIsInstance(json.loads(plugin_instance.get_stats_history()), dict) |
|
152
|
|
|
|
|
153
|
|
|
# Check views |
|
154
|
|
|
self.assertIsInstance(plugin_instance.get_views(), dict) |
|
155
|
|
|
if isinstance(plugin_instance.get_raw(), dict): |
|
156
|
|
|
self.assertIsInstance(plugin_instance.get_views(first_history_field), dict) |
|
157
|
|
|
self.assertTrue('decoration' in plugin_instance.get_views(first_history_field)) |
|
158
|
|
|
elif isinstance(plugin_instance.get_raw(), list): |
|
159
|
|
|
first_history_field = plugin_instance.get_items_history_list()[0]['name'] |
|
160
|
|
|
first_item = plugin_instance.get_raw()[0][plugin_instance.get_key()] |
|
161
|
|
|
self.assertIsInstance(plugin_instance.get_views(item=first_item, |
|
162
|
|
|
key=first_history_field), dict) |
|
163
|
|
|
self.assertTrue('decoration' in plugin_instance.get_views(item=first_item, |
|
164
|
|
|
key=first_history_field)) |
|
165
|
|
|
self.assertIsInstance(json.loads(plugin_instance.get_json_views()), dict) |
|
166
|
|
|
self.assertEqual(json.loads(plugin_instance.get_json_views()), plugin_instance.get_views()) |
|
167
|
|
|
|
|
168
|
|
|
def test_000_update(self): |
|
169
|
|
|
"""Update stats (mandatory step for all the stats). |
|
170
|
|
|
|
|
171
|
|
|
The update is made twice (for rate computation). |
|
172
|
|
|
""" |
|
173
|
|
|
print('INFO: [TEST_000] Test the stats update function') |
|
174
|
|
|
try: |
|
175
|
|
|
stats.update() |
|
176
|
|
|
except Exception as e: |
|
177
|
|
|
print('ERROR: Stats update failed: %s' % e) |
|
178
|
|
|
self.assertTrue(False) |
|
179
|
|
|
time.sleep(1) |
|
180
|
|
|
try: |
|
181
|
|
|
stats.update() |
|
182
|
|
|
except Exception as e: |
|
183
|
|
|
print('ERROR: Stats update failed: %s' % e) |
|
184
|
|
|
self.assertTrue(False) |
|
185
|
|
|
|
|
186
|
|
|
self.assertTrue(True) |
|
187
|
|
|
|
|
188
|
|
|
def test_001_plugins(self): |
|
189
|
|
|
"""Check mandatory plugins.""" |
|
190
|
|
|
plugins_to_check = ['system', 'cpu', 'load', 'mem', 'memswap', 'network', 'diskio', 'fs'] |
|
191
|
|
|
print('INFO: [TEST_001] Check the mandatory plugins list: %s' % ', '.join(plugins_to_check)) |
|
192
|
|
|
plugins_list = stats.getPluginsList() |
|
193
|
|
|
for plugin in plugins_to_check: |
|
194
|
|
|
self.assertTrue(plugin in plugins_list) |
|
195
|
|
|
|
|
196
|
|
|
def test_002_system(self): |
|
197
|
|
|
"""Check SYSTEM plugin.""" |
|
198
|
|
|
stats_to_check = ['hostname', 'os_name'] |
|
199
|
|
|
print('INFO: [TEST_002] Check SYSTEM stats: %s' % ', '.join(stats_to_check)) |
|
200
|
|
|
stats_grab = stats.get_plugin('system').get_raw() |
|
201
|
|
|
for stat in stats_to_check: |
|
202
|
|
|
# Check that the key exist |
|
203
|
|
|
self.assertTrue(stat in stats_grab, msg='Cannot find key: %s' % stat) |
|
204
|
|
|
print('INFO: SYSTEM stats: %s' % stats_grab) |
|
205
|
|
|
|
|
206
|
|
|
def test_003_cpu(self): |
|
207
|
|
|
"""Check CPU plugin.""" |
|
208
|
|
|
stats_to_check = ['system', 'user', 'idle'] |
|
209
|
|
|
print('INFO: [TEST_003] Check mandatory CPU stats: %s' % ', '.join(stats_to_check)) |
|
210
|
|
|
stats_grab = stats.get_plugin('cpu').get_raw() |
|
211
|
|
|
for stat in stats_to_check: |
|
212
|
|
|
# Check that the key exist |
|
213
|
|
|
self.assertTrue(stat in stats_grab, msg='Cannot find key: %s' % stat) |
|
214
|
|
|
# Check that % is > 0 and < 100 |
|
215
|
|
|
self.assertGreaterEqual(stats_grab[stat], 0) |
|
216
|
|
|
self.assertLessEqual(stats_grab[stat], 100) |
|
217
|
|
|
print('INFO: CPU stats: %s' % stats_grab) |
|
218
|
|
|
|
|
219
|
|
|
@unittest.skipIf(WINDOWS, "Load average not available on Windows") |
|
220
|
|
|
def test_004_load(self): |
|
221
|
|
|
"""Check LOAD plugin.""" |
|
222
|
|
|
stats_to_check = ['cpucore', 'min1', 'min5', 'min15'] |
|
223
|
|
|
print('INFO: [TEST_004] Check LOAD stats: %s' % ', '.join(stats_to_check)) |
|
224
|
|
|
stats_grab = stats.get_plugin('load').get_raw() |
|
225
|
|
|
for stat in stats_to_check: |
|
226
|
|
|
# Check that the key exist |
|
227
|
|
|
self.assertTrue(stat in stats_grab, msg='Cannot find key: %s' % stat) |
|
228
|
|
|
# Check that % is > 0 |
|
229
|
|
|
self.assertGreaterEqual(stats_grab[stat], 0) |
|
230
|
|
|
print('INFO: LOAD stats: %s' % stats_grab) |
|
231
|
|
|
|
|
232
|
|
|
def test_005_mem(self): |
|
233
|
|
|
"""Check MEM plugin.""" |
|
234
|
|
|
plugin_name = 'mem' |
|
235
|
|
|
stats_to_check = ['available', 'used', 'free', 'total'] |
|
236
|
|
|
print('INFO: [TEST_005] Check {} stats: {}'.format(plugin_name, ', '.join(stats_to_check))) |
|
237
|
|
|
stats_grab = stats.get_plugin('mem').get_raw() |
|
238
|
|
|
for stat in stats_to_check: |
|
239
|
|
|
# Check that the key exist |
|
240
|
|
|
self.assertTrue(stat in stats_grab, msg='Cannot find key: %s' % stat) |
|
241
|
|
|
# Check that % is > 0 |
|
242
|
|
|
self.assertGreaterEqual(stats_grab[stat], 0) |
|
243
|
|
|
print('INFO: MEM stats: %s' % stats_grab) |
|
244
|
|
|
|
|
245
|
|
|
def test_006_memswap(self): |
|
246
|
|
|
"""Check MEMSWAP plugin.""" |
|
247
|
|
|
stats_to_check = ['used', 'free', 'total'] |
|
248
|
|
|
print('INFO: [TEST_006] Check MEMSWAP stats: %s' % ', '.join(stats_to_check)) |
|
249
|
|
|
stats_grab = stats.get_plugin('memswap').get_raw() |
|
250
|
|
|
for stat in stats_to_check: |
|
251
|
|
|
# Check that the key exist |
|
252
|
|
|
self.assertTrue(stat in stats_grab, msg='Cannot find key: %s' % stat) |
|
253
|
|
|
# Check that % is > 0 |
|
254
|
|
|
self.assertGreaterEqual(stats_grab[stat], 0) |
|
255
|
|
|
print('INFO: MEMSWAP stats: %s' % stats_grab) |
|
256
|
|
|
|
|
257
|
|
|
def test_007_network(self): |
|
258
|
|
|
"""Check NETWORK plugin.""" |
|
259
|
|
|
print('INFO: [TEST_007] Check NETWORK stats') |
|
260
|
|
|
stats_grab = stats.get_plugin('network').get_raw() |
|
261
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='Network stats is not a list') |
|
262
|
|
|
print('INFO: NETWORK stats: %s' % stats_grab) |
|
263
|
|
|
|
|
264
|
|
|
def test_008_diskio(self): |
|
265
|
|
|
"""Check DISKIO plugin.""" |
|
266
|
|
|
print('INFO: [TEST_008] Check DISKIO stats') |
|
267
|
|
|
stats_grab = stats.get_plugin('diskio').get_raw() |
|
268
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='DiskIO stats is not a list') |
|
269
|
|
|
print('INFO: diskio stats: %s' % stats_grab) |
|
270
|
|
|
|
|
271
|
|
|
def test_009_fs(self): |
|
272
|
|
|
"""Check File System plugin.""" |
|
273
|
|
|
# stats_to_check = [ ] |
|
274
|
|
|
print('INFO: [TEST_009] Check FS stats') |
|
275
|
|
|
stats_grab = stats.get_plugin('fs').get_raw() |
|
276
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='FileSystem stats is not a list') |
|
277
|
|
|
print('INFO: FS stats: %s' % stats_grab) |
|
278
|
|
|
|
|
279
|
|
|
def test_010_processes(self): |
|
280
|
|
|
"""Check Process plugin.""" |
|
281
|
|
|
# stats_to_check = [ ] |
|
282
|
|
|
print('INFO: [TEST_010] Check PROCESS stats') |
|
283
|
|
|
stats_grab = stats.get_plugin('processcount').get_raw() |
|
284
|
|
|
# total = stats_grab['total'] |
|
285
|
|
|
self.assertTrue(isinstance(stats_grab, dict), msg='Process count stats is not a dict') |
|
286
|
|
|
print('INFO: PROCESS count stats: %s' % stats_grab) |
|
287
|
|
|
stats_grab = stats.get_plugin('processlist').get_raw() |
|
288
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='Process count stats is not a list') |
|
289
|
|
|
print('INFO: PROCESS list stats: %s items in the list' % len(stats_grab)) |
|
290
|
|
|
# Check if number of processes in the list equal counter |
|
291
|
|
|
# self.assertEqual(total, len(stats_grab)) |
|
292
|
|
|
|
|
293
|
|
|
def test_011_folders(self): |
|
294
|
|
|
"""Check File System plugin.""" |
|
295
|
|
|
# stats_to_check = [ ] |
|
296
|
|
|
print('INFO: [TEST_011] Check FOLDER stats') |
|
297
|
|
|
stats_grab = stats.get_plugin('folders').get_raw() |
|
298
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='Folders stats is not a list') |
|
299
|
|
|
print('INFO: Folders stats: %s' % stats_grab) |
|
300
|
|
|
|
|
301
|
|
|
def test_012_ip(self): |
|
302
|
|
|
"""Check IP plugin.""" |
|
303
|
|
|
print('INFO: [TEST_012] Check IP stats') |
|
304
|
|
|
stats_grab = stats.get_plugin('ip').get_raw() |
|
305
|
|
|
self.assertTrue(isinstance(stats_grab, dict), msg='IP stats is not a dict') |
|
306
|
|
|
print('INFO: IP stats: %s' % stats_grab) |
|
307
|
|
|
|
|
308
|
|
|
@unittest.skipIf(not LINUX, "IRQs available only on Linux") |
|
309
|
|
|
def test_013_irq(self): |
|
310
|
|
|
"""Check IRQ plugin.""" |
|
311
|
|
|
print('INFO: [TEST_013] Check IRQ stats') |
|
312
|
|
|
stats_grab = stats.get_plugin('irq').get_raw() |
|
313
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='IRQ stats is not a list') |
|
314
|
|
|
print('INFO: IRQ stats: %s' % stats_grab) |
|
315
|
|
|
|
|
316
|
|
|
@unittest.skipIf(not LINUX, "GPU available only on Linux") |
|
317
|
|
|
def test_014_gpu(self): |
|
318
|
|
|
"""Check GPU plugin.""" |
|
319
|
|
|
print('INFO: [TEST_014] Check GPU stats') |
|
320
|
|
|
stats_grab = stats.get_plugin('gpu').get_raw() |
|
321
|
|
|
self.assertTrue(isinstance(stats_grab, list), msg='GPU stats is not a list') |
|
322
|
|
|
print('INFO: GPU stats: %s' % stats_grab) |
|
323
|
|
|
|
|
324
|
|
|
def test_015_sorted_stats(self): |
|
325
|
|
|
"""Check sorted stats method.""" |
|
326
|
|
|
print('INFO: [TEST_015] Check sorted stats method') |
|
327
|
|
|
aliases = { |
|
328
|
|
|
"key2": "alias11", |
|
329
|
|
|
"key5": "alias2", |
|
330
|
|
|
} |
|
331
|
|
|
unsorted_stats = [ |
|
332
|
|
|
{"key": "key4"}, |
|
333
|
|
|
{"key": "key2"}, |
|
334
|
|
|
{"key": "key5"}, |
|
335
|
|
|
{"key": "key21"}, |
|
336
|
|
|
{"key": "key3"}, |
|
337
|
|
|
] |
|
338
|
|
|
|
|
339
|
|
|
gp = GlancesPluginModel() |
|
340
|
|
|
gp.get_key = lambda: "key" |
|
341
|
|
|
gp.has_alias = aliases.get |
|
342
|
|
|
gp.stats = unsorted_stats |
|
343
|
|
|
|
|
344
|
|
|
sorted_stats = gp.sorted_stats() |
|
345
|
|
|
self.assertEqual(len(sorted_stats), 5) |
|
346
|
|
|
self.assertEqual(sorted_stats[0]["key"], "key5") |
|
347
|
|
|
self.assertEqual(sorted_stats[1]["key"], "key2") |
|
348
|
|
|
self.assertEqual(sorted_stats[2]["key"], "key3") |
|
349
|
|
|
self.assertEqual(sorted_stats[3]["key"], "key4") |
|
350
|
|
|
self.assertEqual(sorted_stats[4]["key"], "key21") |
|
351
|
|
|
|
|
352
|
|
|
def test_016_subsample(self): |
|
353
|
|
|
"""Test subsampling function.""" |
|
354
|
|
|
print('INFO: [TEST_016] Subsampling') |
|
355
|
|
|
for l_test in [([1, 2, 3], 4), |
|
356
|
|
|
([1, 2, 3, 4], 4), |
|
357
|
|
|
([1, 2, 3, 4, 5, 6, 7], 4), |
|
358
|
|
|
([1, 2, 3, 4, 5, 6, 7, 8], 4), |
|
359
|
|
|
(list(range(1, 800)), 4), |
|
360
|
|
|
(list(range(1, 8000)), 800)]: |
|
361
|
|
|
l_subsample = subsample(l_test[0], l_test[1]) |
|
362
|
|
|
self.assertLessEqual(len(l_subsample), l_test[1]) |
|
363
|
|
|
|
|
364
|
|
|
def test_017_hddsmart(self): |
|
365
|
|
|
"""Check hard disk SMART data plugin.""" |
|
366
|
|
|
try: |
|
367
|
|
|
from glances.globals import is_admin |
|
368
|
|
|
except ImportError: |
|
369
|
|
|
print("INFO: [TEST_017] pySMART not found, not running SMART plugin test") |
|
370
|
|
|
return |
|
371
|
|
|
|
|
372
|
|
|
stat = 'DeviceName' |
|
373
|
|
|
print('INFO: [TEST_017] Check SMART stats: {}'.format(stat)) |
|
374
|
|
|
stats_grab = stats.get_plugin('smart').get_raw() |
|
375
|
|
|
if not is_admin(): |
|
376
|
|
|
print("INFO: Not admin, SMART list should be empty") |
|
377
|
|
|
assert len(stats_grab) == 0 |
|
378
|
|
|
elif stats_grab == {}: |
|
379
|
|
|
print("INFO: Admin but SMART list is empty") |
|
380
|
|
|
assert len(stats_grab) == 0 |
|
381
|
|
|
else: |
|
382
|
|
|
print(stats_grab) |
|
383
|
|
|
self.assertTrue(stat in stats_grab[0].keys(), msg='Cannot find key: %s' % stat) |
|
384
|
|
|
|
|
385
|
|
|
print('INFO: SMART stats: %s' % stats_grab) |
|
386
|
|
|
|
|
387
|
|
|
def test_017_programs(self): |
|
388
|
|
|
"""Check Programs function (it's not a plugin).""" |
|
389
|
|
|
# stats_to_check = [ ] |
|
390
|
|
|
print('INFO: [TEST_017] Check PROGRAM stats') |
|
391
|
|
|
stats_grab = processes_to_programs(stats.get_plugin('processlist').get_raw()) |
|
392
|
|
|
self.assertIsInstance(stats_grab, list, msg='Programs stats list is not a list') |
|
393
|
|
|
self.assertIsInstance(stats_grab[0], dict, msg='First item should be a dict') |
|
394
|
|
|
|
|
395
|
|
|
def test_018_string_value_to_float(self): |
|
396
|
|
|
"""Check string_value_to_float function""" |
|
397
|
|
|
print('INFO: [TEST_018] Check string_value_to_float function') |
|
398
|
|
|
self.assertEqual(string_value_to_float('32kB'), 32000.0) |
|
399
|
|
|
self.assertEqual(string_value_to_float('32 KB'), 32000.0) |
|
400
|
|
|
self.assertEqual(string_value_to_float('15.5MB'), 15500000.0) |
|
401
|
|
|
self.assertEqual(string_value_to_float('25.9'), 25.9) |
|
402
|
|
|
self.assertEqual(string_value_to_float('12'), 12) |
|
403
|
|
|
self.assertEqual(string_value_to_float('--'), None) |
|
404
|
|
|
|
|
405
|
|
|
def test_019_events(self): |
|
406
|
|
|
"""Test events class""" |
|
407
|
|
|
print('INFO: [TEST_019] Test events') |
|
408
|
|
|
# Init events |
|
409
|
|
|
events = GlancesEventsList(max_events=5, min_duration=1, min_interval=3) |
|
410
|
|
|
# Minimal event duration not reached |
|
411
|
|
|
events.add('WARNING', 'LOAD', 4) |
|
412
|
|
|
events.add('CRITICAL', 'LOAD', 5) |
|
413
|
|
|
events.add('OK', 'LOAD', 1) |
|
414
|
|
|
self.assertEqual(len(events.get()), 0) |
|
415
|
|
|
# Minimal event duration LOAD reached |
|
416
|
|
|
events.add('WARNING', 'LOAD', 4) |
|
417
|
|
|
time.sleep(1) |
|
418
|
|
|
events.add('CRITICAL', 'LOAD', 5) |
|
419
|
|
|
time.sleep(1) |
|
420
|
|
|
events.add('OK', 'LOAD', 1) |
|
421
|
|
|
self.assertEqual(len(events.get()), 1) |
|
422
|
|
|
self.assertEqual(events.get()[0]['type'], 'LOAD') |
|
423
|
|
|
self.assertEqual(events.get()[0]['state'], 'CRITICAL') |
|
424
|
|
|
self.assertEqual(events.get()[0]['max'], 5) |
|
425
|
|
|
# Minimal event duration CPU reached |
|
426
|
|
|
events.add('WARNING', 'CPU', 60) |
|
427
|
|
|
time.sleep(1) |
|
428
|
|
|
events.add('WARNING', 'CPU', 70) |
|
429
|
|
|
time.sleep(1) |
|
430
|
|
|
events.add('OK', 'CPU', 10) |
|
431
|
|
|
self.assertEqual(len(events.get()), 2) |
|
432
|
|
|
self.assertEqual(events.get()[0]['type'], 'CPU') |
|
433
|
|
|
self.assertEqual(events.get()[0]['state'], 'WARNING') |
|
434
|
|
|
self.assertEqual(events.get()[0]['min'], 60) |
|
435
|
|
|
self.assertEqual(events.get()[0]['max'], 70) |
|
436
|
|
|
self.assertEqual(events.get()[0]['count'], 2) |
|
437
|
|
|
# Minimal event duration CPU reached (again) |
|
438
|
|
|
# but time between two events (min_interval) is too short |
|
439
|
|
|
# a merge will be done |
|
440
|
|
|
time.sleep(0.5) |
|
441
|
|
|
events.add('WARNING', 'CPU', 60) |
|
442
|
|
|
time.sleep(1) |
|
443
|
|
|
events.add('WARNING', 'CPU', 80) |
|
444
|
|
|
time.sleep(1) |
|
445
|
|
|
events.add('OK', 'CPU', 10) |
|
446
|
|
|
self.assertEqual(len(events.get()), 2) |
|
447
|
|
|
self.assertEqual(events.get()[0]['type'], 'CPU') |
|
448
|
|
|
self.assertEqual(events.get()[0]['state'], 'WARNING') |
|
449
|
|
|
self.assertEqual(events.get()[0]['min'], 60) |
|
450
|
|
|
self.assertEqual(events.get()[0]['max'], 80) |
|
451
|
|
|
self.assertEqual(events.get()[0]['count'], 4) |
|
452
|
|
|
# Clean WARNING events |
|
453
|
|
|
events.clean() |
|
454
|
|
|
self.assertEqual(len(events.get()), 1) |
|
455
|
|
|
|
|
456
|
|
|
def test_020_filter(self): |
|
457
|
|
|
"""Test filter classes""" |
|
458
|
|
|
print('INFO: [TEST_020] Test filter') |
|
459
|
|
|
gf = GlancesFilter() |
|
460
|
|
|
gf.filter = '.*python.*' |
|
461
|
|
|
self.assertEqual(gf.filter, '.*python.*') |
|
462
|
|
|
self.assertEqual(gf.filter_key, None) |
|
463
|
|
|
self.assertTrue(gf.is_filtered({'name': 'python'})) |
|
464
|
|
|
self.assertTrue(gf.is_filtered({'name': '/usr/bin/python -m glances'})) |
|
465
|
|
|
self.assertFalse(gf.is_filtered({'noname': 'python'})) |
|
466
|
|
|
self.assertFalse(gf.is_filtered({'name': 'snake'})) |
|
467
|
|
|
gf.filter = 'username:nicolargo' |
|
468
|
|
|
self.assertEqual(gf.filter, 'nicolargo') |
|
469
|
|
|
self.assertEqual(gf.filter_key, 'username') |
|
470
|
|
|
self.assertTrue(gf.is_filtered({'username': 'nicolargo'})) |
|
471
|
|
|
self.assertFalse(gf.is_filtered({'username': 'notme'})) |
|
472
|
|
|
self.assertFalse(gf.is_filtered({'notuser': 'nicolargo'})) |
|
473
|
|
|
gfl = GlancesFilterList() |
|
474
|
|
|
gfl.filter = '.*python.*,username:nicolargo' |
|
475
|
|
|
self.assertTrue(gfl.is_filtered({'name': 'python is in the place'})) |
|
476
|
|
|
self.assertFalse(gfl.is_filtered({'name': 'snake is in the place'})) |
|
477
|
|
|
self.assertTrue(gfl.is_filtered({'name': 'snake is in the place', 'username': 'nicolargo'})) |
|
478
|
|
|
self.assertFalse(gfl.is_filtered({'name': 'snake is in the place', 'username': 'notme'})) |
|
479
|
|
|
|
|
480
|
|
|
def test_094_thresholds(self): |
|
481
|
|
|
"""Test thresholds classes""" |
|
482
|
|
|
print('INFO: [TEST_094] Thresholds') |
|
483
|
|
|
ok = GlancesThresholdOk() |
|
484
|
|
|
careful = GlancesThresholdCareful() |
|
485
|
|
|
warning = GlancesThresholdWarning() |
|
486
|
|
|
critical = GlancesThresholdCritical() |
|
487
|
|
|
self.assertTrue(ok < careful) |
|
488
|
|
|
self.assertTrue(careful < warning) |
|
489
|
|
|
self.assertTrue(warning < critical) |
|
490
|
|
|
self.assertFalse(ok > careful) |
|
491
|
|
|
self.assertEqual(ok, ok) |
|
492
|
|
|
self.assertEqual(str(ok), 'OK') |
|
493
|
|
|
thresholds = GlancesThresholds() |
|
494
|
|
|
thresholds.add('cpu_percent', 'OK') |
|
495
|
|
|
self.assertEqual(thresholds.get(stat_name='cpu_percent').description(), 'OK') |
|
496
|
|
|
|
|
497
|
|
|
def test_095_methods(self): |
|
498
|
|
|
"""Test mandatories methods""" |
|
499
|
|
|
print('INFO: [TEST_095] Mandatories methods') |
|
500
|
|
|
mandatories_methods = ['reset', 'update'] |
|
501
|
|
|
plugins_list = stats.getPluginsList() |
|
502
|
|
|
for plugin in plugins_list: |
|
503
|
|
|
for method in mandatories_methods: |
|
504
|
|
|
self.assertTrue(hasattr(stats.get_plugin(plugin), method), |
|
505
|
|
|
msg='{} has no method {}()'.format(plugin, method)) |
|
506
|
|
|
|
|
507
|
|
|
def test_096_views(self): |
|
508
|
|
|
"""Test get_views method""" |
|
509
|
|
|
print('INFO: [TEST_096] Test views') |
|
510
|
|
|
plugins_list = stats.getPluginsList() |
|
511
|
|
|
for plugin in plugins_list: |
|
512
|
|
|
stats.get_plugin(plugin).get_raw() |
|
513
|
|
|
views_grab = stats.get_plugin(plugin).get_views() |
|
514
|
|
|
self.assertTrue(isinstance(views_grab, dict), |
|
515
|
|
|
msg='{} view is not a dict'.format(plugin)) |
|
516
|
|
|
|
|
517
|
|
|
def test_097_attribute(self): |
|
518
|
|
|
"""Test GlancesAttribute classes""" |
|
519
|
|
|
print('INFO: [TEST_097] Test attribute') |
|
520
|
|
|
# GlancesAttribute |
|
521
|
|
|
from glances.attribute import GlancesAttribute |
|
522
|
|
|
a = GlancesAttribute('a', description='ad', history_max_size=3) |
|
523
|
|
|
self.assertEqual(a.name, 'a') |
|
524
|
|
|
self.assertEqual(a.description, 'ad') |
|
525
|
|
|
a.description = 'adn' |
|
526
|
|
|
self.assertEqual(a.description, 'adn') |
|
527
|
|
|
a.value = 1 |
|
528
|
|
|
a.value = 2 |
|
529
|
|
|
self.assertEqual(len(a.history), 2) |
|
530
|
|
|
a.value = 3 |
|
531
|
|
|
self.assertEqual(len(a.history), 3) |
|
532
|
|
|
a.value = 4 |
|
533
|
|
|
# Check if history_max_size=3 is OK |
|
534
|
|
|
self.assertEqual(len(a.history), 3) |
|
535
|
|
|
self.assertEqual(a.history_size(), 3) |
|
536
|
|
|
self.assertEqual(a.history_len(), 3) |
|
537
|
|
|
self.assertEqual(a.history_value()[1], 4) |
|
538
|
|
|
self.assertEqual(a.history_mean(nb=3), 4.5) |
|
539
|
|
|
|
|
540
|
|
|
def test_098_history(self): |
|
541
|
|
|
"""Test GlancesHistory classes""" |
|
542
|
|
|
print('INFO: [TEST_098] Test history') |
|
543
|
|
|
# GlancesHistory |
|
544
|
|
|
from glances.history import GlancesHistory |
|
545
|
|
|
h = GlancesHistory() |
|
546
|
|
|
h.add('a', 1, history_max_size=100) |
|
547
|
|
|
h.add('a', 2, history_max_size=100) |
|
548
|
|
|
h.add('a', 3, history_max_size=100) |
|
549
|
|
|
h.add('b', 10, history_max_size=100) |
|
550
|
|
|
h.add('b', 20, history_max_size=100) |
|
551
|
|
|
h.add('b', 30, history_max_size=100) |
|
552
|
|
|
self.assertEqual(len(h.get()), 2) |
|
553
|
|
|
self.assertEqual(len(h.get()['a']), 3) |
|
554
|
|
|
h.reset() |
|
555
|
|
|
self.assertEqual(len(h.get()), 2) |
|
556
|
|
|
self.assertEqual(len(h.get()['a']), 0) |
|
557
|
|
|
|
|
558
|
|
|
def test_099_output_bars(self): |
|
559
|
|
|
"""Test quick look plugin. |
|
560
|
|
|
|
|
561
|
|
|
> bar.min_value |
|
562
|
|
|
0 |
|
563
|
|
|
> bar.max_value |
|
564
|
|
|
100 |
|
565
|
|
|
> bar.percent = -1 |
|
566
|
|
|
> bar.percent |
|
567
|
|
|
0 |
|
568
|
|
|
""" |
|
569
|
|
|
print('INFO: [TEST_099] Test progress bar') |
|
570
|
|
|
|
|
571
|
|
|
bar = Bar(size=1) |
|
572
|
|
|
# Percent value can not be lower than min_value |
|
573
|
|
|
bar.percent = -1 |
|
574
|
|
|
self.assertLessEqual(bar.percent, bar.min_value) |
|
575
|
|
|
# but... percent value can be higher than max_value |
|
576
|
|
|
bar.percent = 101 |
|
577
|
|
|
self.assertLessEqual(bar.percent, 101) |
|
578
|
|
|
|
|
579
|
|
|
# Test display |
|
580
|
|
|
bar = Bar(size=50) |
|
581
|
|
|
bar.percent = 0 |
|
582
|
|
|
self.assertEqual(bar.get(), ' 0.0%') |
|
583
|
|
|
bar.percent = 70 |
|
584
|
|
|
self.assertEqual(bar.get(), '||||||||||||||||||||||||||||||| 70.0%') |
|
585
|
|
|
bar.percent = 100 |
|
586
|
|
|
self.assertEqual(bar.get(), '|||||||||||||||||||||||||||||||||||||||||||| 100%') |
|
587
|
|
|
bar.percent = 110 |
|
588
|
|
|
self.assertEqual(bar.get(), '|||||||||||||||||||||||||||||||||||||||||||| >100%') |
|
589
|
|
|
|
|
590
|
|
|
# def test_100_system_plugin_method(self): |
|
591
|
|
|
# """Test system plugin methods""" |
|
592
|
|
|
# print('INFO: [TEST_100] Test system plugin methods') |
|
593
|
|
|
# self._common_plugin_tests('system') |
|
594
|
|
|
|
|
595
|
|
|
def test_101_cpu_plugin_method(self): |
|
596
|
|
|
"""Test cpu plugin methods""" |
|
597
|
|
|
print('INFO: [TEST_100] Test cpu plugin methods') |
|
598
|
|
|
self._common_plugin_tests('cpu') |
|
599
|
|
|
|
|
600
|
|
|
@unittest.skipIf(WINDOWS, "Load average not available on Windows") |
|
601
|
|
|
def test_102_load_plugin_method(self): |
|
602
|
|
|
"""Test load plugin methods""" |
|
603
|
|
|
print('INFO: [TEST_102] Test load plugin methods') |
|
604
|
|
|
self._common_plugin_tests('load') |
|
605
|
|
|
|
|
606
|
|
|
def test_103_mem_plugin_method(self): |
|
607
|
|
|
"""Test mem plugin methods""" |
|
608
|
|
|
print('INFO: [TEST_103] Test mem plugin methods') |
|
609
|
|
|
self._common_plugin_tests('mem') |
|
610
|
|
|
|
|
611
|
|
|
def test_104_memswap_plugin_method(self): |
|
612
|
|
|
"""Test memswap plugin methods""" |
|
613
|
|
|
print('INFO: [TEST_104] Test memswap plugin methods') |
|
614
|
|
|
self._common_plugin_tests('memswap') |
|
615
|
|
|
|
|
616
|
|
|
def test_105_network_plugin_method(self): |
|
617
|
|
|
"""Test network plugin methods""" |
|
618
|
|
|
print('INFO: [TEST_105] Test network plugin methods') |
|
619
|
|
|
self._common_plugin_tests('network') |
|
620
|
|
|
|
|
621
|
|
|
# def test_106_diskio_plugin_method(self): |
|
622
|
|
|
# """Test diskio plugin methods""" |
|
623
|
|
|
# print('INFO: [TEST_106] Test diskio plugin methods') |
|
624
|
|
|
# self._common_plugin_tests('diskio') |
|
625
|
|
|
|
|
626
|
|
|
def test_107_fs_plugin_method(self): |
|
627
|
|
|
"""Test fs plugin methods""" |
|
628
|
|
|
print('INFO: [TEST_107] Test fs plugin methods') |
|
629
|
|
|
self._common_plugin_tests('fs') |
|
630
|
|
|
|
|
631
|
|
|
def test_700_secure(self): |
|
632
|
|
|
"""Test secure functions""" |
|
633
|
|
|
print('INFO: [TEST_700] Secure functions') |
|
634
|
|
|
|
|
635
|
|
|
if WINDOWS: |
|
636
|
|
|
self.assertIn(secure_popen('echo TEST'), ['TEST\n', |
|
637
|
|
|
'TEST\r\n']) |
|
638
|
|
|
self.assertIn(secure_popen('echo TEST1 && echo TEST2'), ['TEST1\nTEST2\n', |
|
639
|
|
|
'TEST1\r\nTEST2\r\n']) |
|
640
|
|
|
else: |
|
641
|
|
|
self.assertEqual(secure_popen('echo -n TEST'), 'TEST') |
|
642
|
|
|
self.assertEqual(secure_popen('echo -n TEST1 && echo -n TEST2'), 'TEST1TEST2') |
|
643
|
|
|
# Make the test failed on Github (AssertionError: '' != 'FOO\n') |
|
644
|
|
|
# but not on my localLinux computer... |
|
645
|
|
|
# self.assertEqual(secure_popen('echo FOO | grep FOO'), 'FOO\n') |
|
646
|
|
|
|
|
647
|
|
|
def test_800_memory_leak(self): |
|
648
|
|
|
"""Memory leak check""" |
|
649
|
|
|
import tracemalloc |
|
650
|
|
|
print('INFO: [TEST_800] Memory leak check') |
|
651
|
|
|
tracemalloc.start() |
|
652
|
|
|
# 3 iterations just to init the stats and fill the memory |
|
653
|
|
|
for _ in range(3): |
|
654
|
|
|
stats.update() |
|
655
|
|
|
|
|
656
|
|
|
# Start the memory leak check |
|
657
|
|
|
snapshot_begin = tracemalloc.take_snapshot() |
|
658
|
|
|
for _ in range(3): |
|
659
|
|
|
stats.update() |
|
660
|
|
|
snapshot_end = tracemalloc.take_snapshot() |
|
661
|
|
|
snapshot_diff = snapshot_end.compare_to(snapshot_begin, 'filename') |
|
662
|
|
|
memory_leak = sum([s.size_diff for s in snapshot_diff]) |
|
663
|
|
|
print('INFO: Memory leak: {} bytes'.format(memory_leak)) |
|
664
|
|
|
|
|
665
|
|
|
# snapshot_begin = tracemalloc.take_snapshot() |
|
666
|
|
|
for _ in range(30): |
|
667
|
|
|
stats.update() |
|
668
|
|
|
snapshot_end = tracemalloc.take_snapshot() |
|
669
|
|
|
snapshot_diff = snapshot_end.compare_to(snapshot_begin, 'filename') |
|
670
|
|
|
memory_leak = sum([s.size_diff for s in snapshot_diff]) |
|
671
|
|
|
print('INFO: Memory leak: {} bytes'.format(memory_leak)) |
|
672
|
|
|
|
|
673
|
|
|
# snapshot_begin = tracemalloc.take_snapshot() |
|
674
|
|
|
for _ in range(300): |
|
675
|
|
|
stats.update() |
|
676
|
|
|
snapshot_end = tracemalloc.take_snapshot() |
|
677
|
|
|
snapshot_diff = snapshot_end.compare_to(snapshot_begin, 'filename') |
|
678
|
|
|
memory_leak = sum([s.size_diff for s in snapshot_diff]) |
|
679
|
|
|
print('INFO: Memory leak: {} bytes'.format(memory_leak)) |
|
680
|
|
|
snapshot_top = snapshot_end.compare_to(snapshot_begin, 'traceback') |
|
681
|
|
|
print("Memory consumption (top 5):") |
|
682
|
|
|
for stat in snapshot_top[:5]: |
|
683
|
|
|
print(stat) |
|
684
|
|
|
for line in stat.traceback.format(): |
|
685
|
|
|
print(line) |
|
686
|
|
|
|
|
687
|
|
|
def test_999_the_end(self): |
|
688
|
|
|
"""Free all the stats""" |
|
689
|
|
|
print('INFO: [TEST_999] Free the stats') |
|
690
|
|
|
stats.end() |
|
691
|
|
|
self.assertTrue(True) |
|
692
|
|
|
|
|
693
|
|
|
|
|
694
|
|
|
if __name__ == '__main__': |
|
695
|
|
|
unittest.main() |
|
696
|
|
|
|